Running an Airflow DAG/Schedule on a Time Zone

Irrespective of the amount of experience you have, it is not possible for us to be aware of and remember each and every facet of a programming environment. I have experienced it multiple times. 

Some of you will definitely say that I would be superficial for not having explored each and every facet of a language like Java if I claim to have sufficient experience using it. So, be it. But extending help to others has helped me, in turn, because I have been able to learn many new things that I did not have a chance to encounter during my project work.  

Example: Thinking Out of the Box

About two years ago, I worked on a project where we had to migrate an enterprise application from an MPP to Spark with Delta.io on an on-prem Hadoop environment.  

We had around 17,000 SQL statements in 4,000 files. These 4,000 files were then executed as per a schedule. For execution, the SQLs were executed on the MPP using a custom database driver. Yup. This was because the application had developed over a period of 20 years.  

A Simple Code Generator Using a Cool Python feature

For my most recent project, I wrote a couple of code generators - three variants of a Python/Spark application generator and at least four variants of an Airflow DAG generator. Different variants were needed as the requirements and the complexity of the output evolved over a period of time. Using this experience, I will show how you can get started on your journey of writing a code generator using a cool feature of Python.

For the purpose of this article, I will use a Python program that generates a basic Python/Spark application to get and display 10 rows of the specified table. The application to be generated is as below

Python
 
import os
import sys
import pyspark
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession

spark_session = SparkSession.builder.appName("generator").getOrCreate()
try:
    df = spark_session.sql("select * from user_names")
    df.show(10, False)
except Exception as e:
    print("Got an error {er}".format(er=str(e)))
spark_session.stop()

Version 1

The simplest method for generating this application is to make use of print statements as below

Visualize Airflow Workflows Without Airflow

Apache Airflow has gained a lot of traction in the data processing world. It is a Python-based orchestration tool. When I say "Python-based" it is not just that the application has been developed using Python. The directed acyclic graphs (DAGs) — Airflows term for workflows — are also written as Python. In other words, workflows are code. Many of the popular workflows tools like Informatica and Talend have visual tools that allow developers to lay out the workflow visually. As Airflow workflows are Python code, we are able to visualize the workflow only after uploading it. While this is an acceptable situation, in some cases, it can become problematic because Airflow refuses to load the workflow due to errors. Additionally, during development, it is difficult to visualize all the connections mentioned in Python code.

While looking for a way to visualize the workflow, I came across a Sankey diagram. Not just that, I also came across a gist where Python code has been conveniently packaged into a function. All I had to do was download the gist and include it in my program.

Packaging Python Classes as a Wheel

Introduction

Recently, while working on a project where we were using Python, I realized how easy the Java package mechanism makes it, to organize and reuse common classes and functions.

Due to my Java hangover, I organized the common classes and functions in a directory hierarchy like com.<company>.<project>.util. The util package contained a few classes for UUID creation as well as classes for date and time manipulation. After placing the files in the above-mentioned directory, I started developing the application as a micro-service. I placed the micro-service code in the directory com.<company>.<project>.<service>. While trying to use the date and time classes in the micro-service, I used the syntax from com.<company>.<project>.util.mydatetime import MyDateTime.

Execute Spark Applications on Databricks Using the REST API

Introduction

While many of us are habituated to executing Spark applications using the 'spark-submit' command, with the popularity of Databricks, this seemingly easy activity is getting relegated to the background. Databricks has made it very easy to provision Spark-enabled VMs on the two most popular cloud platforms, namely AWS and Azure. A couple of weeks ago, Databricks announced their availability on GCP as well. The beauty of the Databricks platform is that they have made it very easy to become a part of their platform. While Spark application development will continue to have its challenges - depending on the problem being addressed - the Databricks platform has taken out the pain of having to establish and manage your own Spark cluster.

Using Databricks

Once registered on the platform, the Databricks platform allows us to define a cluster of one or more VMs, with configurable RAM and executor specifications. We can also define a cluster that can launch a minimum number of VMs at startup and then scale to a maximum number of VMs as required. After defining the cluster, we have to define jobs and notebooks. Notebooks contain the actual code executed on the cluster. We need to assign notebooks to jobs as the Databricks cluster executes jobs (and not Notebooks). Databricks also allows us to setup the cluster such that it can download additional JARs and/or Python packages during cluster startup. We can also upload and install our own packages (I used a Python wheel).

Reading an XML File in Python

Late last year (2019), after exploring Apache NiFi, I wrote a couple of Python scripts to manipulate NiFi workflows using the NiFi API. One of the scripts read a NiFi template and generated a new template. As it was not a simple 'copy' operation, I had to write a program to read and update various parameters stored in the NiFi template (which is an XML file).

A few weeks ago, I came across a program that had me visting NiFi templates once again. As I was required to do more than simple find and replace, I searched for libraries that would help ease the task of XML manipulation.

On looking around, I came across the xmltodict package and there was no looking back!! I was amazed at the simplicity that xmltodict brings to XML parsing.

The xmltodict package enables us to read and parse an XML file which it then converts into a dictionary. Each node of the XML is represented by a key and value pair in the dictionary. If a node has additional sub-nodes, we can keep adding the name of the node to the dictionary notation and we are able to refer the data. It is that simple.

Handling JSON Data in Python

I recently finished writing two assets — a Spark-based data ingestion framework and a Spark-based data quality framework; both were metadata-driven. As is typical, the behavior of the assets is stored in an RDBMS. In the data ingestion framework, I needed to store parameters for the source (information like username, password, path, format, etc), the destination (information like username, password, path, format, etc), compression, etc. In normal schemas, I have seen these parameters modeled as columns in a table.

Being a programmer at heart, I decided not to use multiple columns. Instead, all the parameters would be stored in a single column (as a string in the database table). The Spark application would have the responsibility of reading the string and extracting the required parameters.

Validating Data in a Spark DataFrame – Part One

How your DataFrame looks after this tutorial

Recently, in conjunction with the development of a modular, metadata-based ingestion engine that I am developing using Spark, we got into a discussion relating to data validation. Data validation was a natural next step of data ingestion and that is why we came to that topic.

You might be wondering, "What is so special about data validation? Is it because of Spark?" The reason for this article is partly due to Spark, but more importantly due to the fact that it demonstrates the power of Spark and also illustrates the principle that there is more than one method available to achieve our goal.

Arm Twisting Apache NiFi

Introduction

Apache NiFi, is a software project from Apache Software Foundation, designed to automate the flow of data between software systems.

Early this year, I created a generic, meta-data driven data offloading framework using Talend. While championing that tool, many accounts raised concerns regarding the Talend license. While some were apprehensive of the additional cost, many others questioned the tool itself, due to the fact that their account already had licenses for other competitive ETL tools like DataStage and Informatica (to name a few). A few accounts also wanted to know if the same concept of offloading could be made available using NiFi. Therefore, it was most logical to explore NiFi.

Encryption, Part 1B: Symmetric Encryption of Voluminous Files

In my recent article, Encryption Part 1: Symmetric Encryption, I covered the symmetric encryption of data and shared example Java code.

The method I covered in that article operates on the complete data/string. Obviously, this method is not suitable when dealing with voluminous files — MBs, GBs, TBs — particularly in the world of Big Data. Ideally, I should have also shared the code to encrypt and decrypt voluminous files, which I am doing in this article.

Encryption, Part 2: Public Key/Private Key Encryption

In my previous article, I presented the concept of symmetric encryption, where the same key is used to encrypt and decrypt data. The biggest limitation of symmetric encryption is the key itself. The key used for encryption and decryption has to be kept a secret. If the key is compromised, the encrypted data is no longer secure. While you may feel that it will be easy to keep the key safe, consider the fact that the same key cannot be used to encrypt data between multiple parties. For example, if Alice and Bob agree to use a secret key X for exchanging their messages, the same key X cannot be used to exchange messages between Alice and Jane. This is because such messages can be decrypted by Bob as well. Hence, in addition to keeping the key a secret, each pair that wishes to communicate secretly will have to maintain a key for their conversation.

This problem is overcome by the concept of public key/private key encryption (also known as Public Key Encryption or PKE for short).