Introduction to Apache Airflow

“There were 5 Exabytes of information created between the dawn of civilization through 2003, but that much information is now created every 2 days.” — Eric Schmidt, Former CEO and Executive Chairman of Google

Whether this quote from 2010 is accurate or not, the world of data-driven businesses is a very competitive one. Every group leader or product manager out there needs a framework in place for the data science pipeline in order to improve efficiency, overall revenue, and day-to-day operations, otherwise it’s a setup for failure.

Enter workflow. Generally speaking, workflow is a managed and repeatable sequence of operations that provide services or process information. Using it correctly can minimize room for errors and increase overall efficiency.

But what happens if one or more of the operations is poorly configured? The company will waste time and lose money on each workflow cycle!

Albert Einstein once said that “Insanity is doing the same thing over and over again and expecting different results”. Therefore, There is a need for a tool that assists in easily and properly creating workflows, and above all enables to monitor and maintain them.

Apache Airflow is a platform to “programmatically author, schedule and monitor workflows” that became the de-facto choice to manage data workflows.

It helps in creating workflows, visualizing pipelines and monitoring their progress.

What is Apache Airflow?

Airflow is an open-source tool for orchestrating complex computational workflows and data processing pipelines that was originally introduced by Airbnb.

This tool enables author workflows as Directed Acyclic Graphs (DAGs) of tasks. It consists of a scheduler that executes said tasks on an array of workers, making it scalable to infinity, and a user interface that makes it easy to visualize created pipelines, monitor their progress, and troubleshoot any issues should they arise.

Airflow configures pipelines as code written in Python, and as such they become more maintainable and testable, allowing dynamic pipeline creation as well as personalizing  by extending the library to fit one needs.

Airflow is not a data streaming solution. Tasks do not move data from one to the next (though they can communicate with each other!) and workflows are expected to be mostly static or slowly changing and to look similar from one run to the next.

Example use cases for Airflow:

Core Concepts

DAG

Short for directed acyclic graphs, DAG is an organized collection of all the tasks one wants to run, that reflects their dependencies and relationship. It is defined in a Python script, which represents its structure as code and placed in Airflow’s DAG_FOLDER.

Airflow will execute the code in each script to dynamically build the DAG objects. There is no limit to the number of DAGs, and each can describe an arbitrary number of tasks. In general, each one should correspond to a single logical workflow. For example:

A DAG describes how to carry out the workflow — stating that the tasks will run at the right time, or in the right order, or with the right handling of any unexpected issues, without taking into account what the tasks do.

DAG Runs

A DAG run is a physical instance of a DAG, containing task instances that run for a specific logical date and time called execution_date.

It’s usually created by the scheduler, but can also be triggered externally. Multiple DAG runs may be running at once for a particular DAG, each of them having a different execution_date.

Tasks

A Task defines a unit of work, written in Python and represented as a node in the DAG graph. Each task is an implementation of an Operator and one can establish dependencies between tasks. For example:

task1 = DummyOperator('task1')

def print_hello_world_n_times(n):
   for i in range(n):
       print(f'Hello World! {i} times')

task2 = PythonOperator(
   task_id='task2',
   python_callable=print_hello_world_n_times,
   op_kwargs={'n': 5}
)

task3 = DummyOperator('task3')

task1 >> task2 >> task3
 

That means that when a DAG Run is created, task1 will start running and task2 waits for task1 to complete successfully before it starts and so on with task3.

It is said that task1 is “upstream” of task2 and task3, task3 is “downstream” of task1 and task2 and task2 is upstream of task3 and downstream of task1.

Task Instances

Task Instances belong to DAG Runs, have an associated execution_date, and are instantiated, runnable entities. Each represents a specific run of a task and it contains a DAG, a task, a point in time (execution_date) and a state — “running”, “success”, “up for retry”, “failed”, etc.

Operators

Operators are the building blocks of our dag, each describes what actually gets done by a single task in a workflow. Usually they can stand on their own and don’t require to share resources with any other operators.

If however, two or more operators do need to share some information, most likely they could be combined into a single operator. If that’s not possible, Airflow does have a feature for operator cross-communication called XCom that will be covered in the following section.

Airflow provides operators for many common tasks, including:

Aside from those basic operators, There are more complex and specific operators including but not limited to:

Sensor

Sensors are a special type of operator that will keep running the same specified method until a certain criteria is met and returned True, or the timeout time has expired.

The criteria may be a file landing in S3, a partition appearing in Hive, or a specific time of the day.

Normally, if by the timeout time the criteria was not met, the sensor will be marked as failed and as a result, it’s downstream tasks will be marked as upstream_failed. This behavior however can be changed by using the soft_fail option of the sensor that will mark the sensor and all downstream tasks as skipped instead.

Using sensors helps in reducing DAG runs that has nothing to do since their required information is not yet available.

BranchPythonOperator

BranchPythonOperator Allows the workflow to “branch” or follow a certain path based on an arbitrary condition which is typically related to something that happened in an upstream task. It expects a Python function that returns a single task_id or list of task_ids.

The task_id(s) returned should point to a task directly downstream from the BranchPythonOperator. That task is chosen as the path to follow, whilst all other paths are skipped. For example:

It’s important to note that using tasks with depends_on_past=True downstream from BranchPythonOperator is logically unsound as skipped status will invariably lead to block tasks that depend on their past successes.

SubDagOperator

SubDagOperator enables scheduling a DAG inside a DAG as a task. Defining a function that returns a DAG is a good design pattern, it helps in writing less code and by having a cleaner flow.

Although SubDAGs are very helpful in repeating patterns, they are to be used with caution because when not used properly, they may lead to a lot of unexpected and hard to debug errors. 

A common use case for SubDagOperator is ETLs. When performing a repeating pattern of Extracting data from a source, loading it to a staging database and then moving the data to the production database. This will run multiple times on multiple files at once and may look something like this:

By using SubDAGs this can be combined to a much more cleaner view like so:

As said, although using SubDAGs has it’s advantages, it also has many limitations and is easy to make mistakes with. To name a few:

  • Not using a function to declare the SubDAG will make the GUI treat it as a separate DAG.
  • Visuality and logs become limited and harder to track.
  • Running more than a single task at a time on a SubDAG may oversubscribe the workers and lead to deadlocks, that is why out of the box its parallelism is limited to one and it runs its tasks in sequence.

Platform Specific Operators

Since Airflow started at Airbnb, and only later moved to Apache, it currently supports two major libraries for operators on specific platforms such as AWS, GCP, Microsoft Azure, kubernetes and many more:

  1. Providers – library created and maintained by the companies and systems owners that provide access to their resources.
  2. Contrib – library created and maintained by the community to use with the resources of other companies or systems.

XComs

XComs, short for “cross-communication”, let tasks exchange information and shared state. They are defined by a key, value, and timestamp, and also contain track attributes like the task/DAG that created them. Any object that can be pickled, can be shared using XComs. XComs are either “pushed” (sent) or “pulled” (received).

Tasks can push XComs at any time explicitly by calling xcom_push() from their operator, or by returning a value that is automatically pushed (In some operators, the parameter do_xcom_push should be set to True).

Tasks can pull XComs, optionally applying filters based on criteria like key, source task_ids, and source dag_id. Whenever an Operator has a context given to it, the context may be used to pull XComs by calling xcom_pull(). When no context is available, It’s possible to pull XComs via Jinja templates but on parameters that are indicated as templated.

For example:

def push_function(context):
   name = 'foo'
   context['task_instance'].xcom_push('name', name)  # Explicitly pushing
   return name  # Implicitly pushing

def pull_function(context):
   name = context.xcom_pull(task_ids='task1', key='name')
   print(name)

task1 = PythonOperator(
   task_id='task1',
   python_callable=push_function,
   provide_context=True
)

task2 = PythonOperator(
   task_id='task2',
   python_callable=pull_function,
   provide_context=True
)

task3 = MySqlOperator(
   task_id='task3',
   # The sql field in MySqlOperator is indicated as templated
   sql='SELECT * FROM {{ task_instance.xcom_pull(task_ids="task1", key="name") }}'  ) 

Hooks

Hooks are interfaces to external platforms and databases like MySQL, Hive, Slack, HDFS, Docker and more. They act as a building block for operators and implement a common interface when possible.

They keep authentication code and information out of pipelines and are also very useful on their own to use in custom operators that require a connection to an external platform.

Variables

Variables enable storing and retrieving of arbitrary content or settings as a simple key value store within Airflow. They can be listed, created, updated and deleted from the UI, CLI or code (also supporting Jinja templates).

Unlike XComs, where real time data is intended to be shared between running tasks, variables are intended to be used in the dag itself, in order to retrieve configuration data to be used as part of the dag creation. Usually the variables will contain a JSON as value which needs to be serialized when it is retrieved.

In addition, variables can also be created and managed using Environment Variables. The naming convention is AIRFLOW_VAR_<variable_name>, all uppercase.

 

Core Components

Configuration file

Upon first installation and running of Airflow, it will create the airflow.cfg file in the $AIRFLOW_HOME directory. This is the main configuration file of Airflow that holds the configuration of the entire tool. It is advised to change most of its default parameters since they are not meant for developing nor production.

CLI

Airflow has an extremely rich Command Line Interface that can be used for testing and running DAGs in development and also allows for many types of operations on almost all components and concepts of Airflow including the web server, users, DAGs, tasks, variables, databases, connections configurations and so on.

Web server

Airflows GUI makes it easy to monitor and troubleshoot the existing DAGs. It consists of many views besides the default DAGs view (that allows starting a DAG for example), including Dag Runs and Task Instances to view the status of all running DAGs and Tasks, the Connections and Variables views to list, create, edit and delete connections and variables and many more.

On top of that, for each DAG there are views for its code, run duration, run history and more. For every Task Instance there are views for its status, logs and returned XComs.

Starting the web server is done by running `airflow webserver`. Performing this will run the Airflow webserver as per the default configurations for port (8080), number of workers (4), and more. All of those are configurable via CLI flags or the airflow.cfg file.

Here is an example of an airflow GUI:

Scheduler

The scheduler monitors all DAGs and tasks and triggers the Task Instances whose dependencies have been met. That is achieved by staying in sync with the folder that stores all DAGs, and periodically collecting DAG parsing results and inspecting active tasks to see whether they can be triggered.

Starting the Scheduler is done by running `airflow scheduler`. Performing this will start the Scheduler process, with all configurations taken from the airflow.cfg file. Under a production environment, the Scheduler is designed to run as a persistent service. 

Executor

Executors are the mechanism by which Task Instances get run. They decide on the parallelism of tasks and their ability to scale out. Airflow supports various executors such as the CeleryExecutor, LocalExecutor and the KubernetesExecutor. The default one is the SequentialExecutor however it has no parallelism so it is not designed for development nor production. The executor being used is described in the airflow.cfg file under the core section.

Conclusions

Apache Airflow is a powerful tool, filled with features to author and maintain workflows.

It enables the creation of pipelines written in Python, as Directed Acyclic Graphs (DAGs) of tasks, and using its own Scheduler, orchestrates when and in what order they should run.

Those tasks can do a variety of operations starting from running a simple Bash or Python script, and going up to interacting with databases and storage platforms such as MySQL, Postgres, Hive, AWS, GCP, HDFS, Azure and more.

Although the tasks can communicate some information between them using XComs, they are not designed to build up on each other’s work, they should be designed to stand on their own.

Airflow has a built in Web Server that visualizes the DAGs, and allows for fast debugging and review.

Overall, as shown above, choosing to use Apache Airflow for data processing has many advantages, and while it may take some time to fully grasp all of its abilities, when used correctly it will increase overall efficiency, revenue, and day-to-day operations, as well as minimize room for errors that costs the organization a lot of time and money.