You Can Easily Build a Scalable Data Pipeline With Apache Airflow: Here’s How.
If you want an easy way to build and manage a data pipeline, Apache Airflow may be the tool for you. Apache Airflow provides a simple way to write, schedule, and monitor workflows using pure Python. In this blog post, we’ll cover Apache Airflow core concepts and components and then build and operate a simple data pipeline.
Why Apache Airflow is ideal for data pipelines
Apache Airflow was started by Airbnb in 2014 as a solution to manage complex data workflows. It has been open source since the first commit and was announced as a top-level project by Apache in 2019. Apache Airflow has a number of benefits that make it easier to manage the complexity of managing batch scheduled jobs, including:
• Scalable: the architecture uses a message queue system to run an arbitrary number of workers.
• Dynamic: pipelines are written in Python, allowing dynamic generation.
• Extensible: it’s easy to integrate customer operators and libraries.
Apache Airflow works well as an orchestrator, but it’s not meant to process data. You can integrate a different tool into a pipeline, such as Apache Spark, if you need to process data.
What use cases are best for Apache Airflow
Imagine you need to run a SQL query daily in a data warehouse and use the results to call a third-party Application Programming Interface (API). Perhaps the API response needs to be sent to a client or a different internal application. What happens if one part of the pipeline fails? Should the next tasks execute and can you easily rerun the failing tasks?
This is exactly where Apache Airflow comes in. The Apache Airflow UI allows complex dependencies to be managed while identifying portions of the process that take too long or that fail. The use cases are limitless and Apache Airflow works well for any pipeline that needs to be run on a consistent cadence.
Understanding Apache Airflow components
Apache Airflow components include:.
• Web server: presents the UI that is used to visualize and manage the data pipelines.
• Scheduler: handles triggering scheduled workflows and submitting tasks to the executor.
• Executor: handles running the tasks. By default, the executor will run inside the scheduler, but production instances will often utilize workers for better scalability.
• Metadata database: used by other components to store state.
Understanding Apache Airflow concepts
Apache Airflow has a few unique concepts that you should understand before starting.
Working with DAGs
In graph theory, a Directed Acyclic Graph (DAG) is a conceptual representation of a series of activities. In programming, a DAG can be used as a mathematical abstraction of a data pipeline, defining a sequence of execution stages, or nodes, in a non-recurring algorithm. In Apache Airflow, a DAG is a graph where the nodes represent tasks. Each DAG is written in Python and stored in the /dags folder in the Apache Airflow installation. A DAG is only concerned with how to execute tasks and not what happens during any particular task. Each DAG is instantiated with a start date and a schedule interval that describes how often the workflow should be run (such as daily or weekly).
A DAG is a general overview of the workflow and each instance that the DAG is executed is referred to as a DAG run. This means that one DAG will have a new run each time that it is executed as defined by the schedule interval.
Working with tasks
In Apache Airflow, a task is the basic unit of work to be executed. The tasks are arranged in DAGs in a manner that reflects their upstream and downstream dependencies.
Working with operators
Conceptually, an operator is a predefined template for a specific type of task. These are the building blocks used to construct tasks. Apache Airflow comes with an extensive set of operators and the community provides even more. You can also write custom operators to serve your own unique needs.
Working with XComs
By default, tasks are isolated and can’t pass information. Xcoms, short for cross-communication, provide a way to pass small amounts of data across tasks. Information can be explicitly pushed or pulled. Many operators will automatically push the return value in an XCom key.
Working with connections
Apache Airflow is often used to push and pull data from other systems such as Amazon Web Services (AWS) or Google Cloud. Connections are used to store credentials for communicating with external systems.
Setting up for this tutorial
This section will assume some basic Python and Unix command line knowledge. Ideally, this installation would use a virtual environment and a constraint file to match versions, but we’ll skip that for this tutorial.
Before you start, make sure that Python3 and pip are installed on your machine. After that, you just need to install the package using pip and run one Apache Airflow command.
# create a folder for airflow and enter it
mkdir airflow-example && cd airflow-example
# set the "home" for airflow. In this case, it will be the location of the new folder.
# This can be found with "pwd"
export AIRFLOW_HOME=your-path/airflow-example
# install airflow using pip
pip install apache-airflow
The standalone command initializes the metastore, creates an admin user, and starts the web server and scheduler. These commands can also be run individually, if desired.
Now if you open localhost:8080 in your browser, you should see the Apache Airflow UI with an admin login screen. Log in with the credentials shown in the terminal to see some example DAGs.
Checking out the Apache Airflow UI
Since we’ve started the web server, let’s take a quick tour of a few of the very useful views in the Apache Airflow UI. Variables can also be easily added or modified. The code for each DAG can even be seen in the UI. The web server will refresh approximately every 30 seconds so if you make code changes to the DAG on a local instance, these can be seen without killing the server.
Show running DAGs with the DAGs view
The DAGs view shows a list of DAGs that are running, as well as recent failures or successes. . Apache Airflow comes with a set of example DAGs by default that you’ll see these when you first start up; these examples can be turned off later.
See DAG run history with the grid view
This view displays a grid of tasks in which each square is a task instance from a specific DAG run. This is an easy way to see the history of each run over time.
Visualize DAG tasks and dependencies with the graph view
This is my favorite view because it visualizes the DAGs tasks and dependencies for a specific DAG run. Note that this view only shows the tasks for one specific run at a time, and it will default to the most recent run.
Find tasks that are slowing down the workflow in the Gantt view
This view displays the time taken by each particular task in a specific DAG run as a Gantt chart. This is helpful for viewing tasks that slow down the workflow.
Building a data pipeline with Apache Airflow
In this example, we’ll build a pipeline that simulates a machine-learning model by fetching two random numbers from an API and determining if at least one of the “models” is accurate.
1. First, create a new folder called `dags` in the airflow-example folder.
2. Then create a file named first_dag.py.
# create a new folder to store the dags and enter it
mkdir dags && cd dags
# create a new file for our first DAG!
touch first_dag.p
Apache Airflow will automatically pick up the DAGs from here.
3. Now, add the minimum imports and define the DAG. We’ll use the SimpleHttpOperator to simulate the models by fetching a random digit and the BranchPythonOperator to determine if at least one of the models is sufficiently accurate.
The BashOperator will print out if the models are accurate. Each DAG and task must have a unique identifier. The daily string will set the workflow to run once a day starting from June 01, 2022. If the start_date is in the past, it will execute the dag runs that are missing; this is called backfilling.
from airflow.models import DAG
from airflow.operators.python import BranchPythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
from datetime import datetime
# define the dag with an ID, schedule interval and start date
dag = DAG(
'first_dag',
schedule_interval='@daily',
start_date=datetime(2022, 10, 18)
)
The DAG is now set up and ready to run daily, and will show up in the DAGs view. But it has no tasks yet.
4. Let’s add two different tasks to fetch a random integer in between 1 and 10. Since we are interacting with an API, we’ll need to add a HTTP connection first.
5. Go to connections dropdown under admin and click the blue “+” to add a new record.
6. Call the new connection random_data_default, set the connection type to HTTP, and add the host as http://random-data-api.com/api.
Now we can call the API to receive a random number using the SimpleHTTPOperator.
7. Create two tasks to create random numbers. These will push the random number into an XCom to be used in the next task.
# Perform a GET request to /number/random number
# the response filter will extract the digit value from the JSON response
model_a = SimpleHttpOperator(
task_id='model_a',
method='GET',
endpoint='number/random_number',
http_conn_id='random_data_default',
response_filter=lambda response: response.json()['digit'],
dag=dag
)
8. The next step is to determine whether either of the digits are greater than 7. We’ll use the BranchPythonOperator to call a Python function. This operator will decide which task to run next based on the task id returned from the function. Adding ti (for task instance) as a parameter to the function allows us to pull the information from the XCom of the previous task.
# Pull the digit retrieved from the API from model_a and model_b tasks
# If either of the digits is greater than 7, then we'll run the accurate task
# to be defined in the next step. Otherwise, run the inaccurate step.
def _check_accuracy(ti):
accuracies = ti.xcom_pull(task_ids=['model_a', 'model_b'])
return 'accurate' if max(accuracies) > 7 else 'inaccurate'
check_accuracy = BranchPythonOperator(
task_id='check_accuracy',
python_callable=_check_accuracy,
dag=dag
)
9. Create two tasks using the BashOperator to print out either accurate or inaccurate.
accurate = BashOperator(
task_id="accurate",
bash_command="echo 'accurate'",
dag=dag
)
inaccurate = BashOperator(
task_id="inaccurate",
bash_command="echo 'inaccurate'",
dag=dag
)
10. Finally, define the DAG dependencies to create the graph. The >> operator declares that a task has other downstream tasks that depend on it. The brackets indicate that tasks can be run in parallel.
# the models can be run at the same but the accuracy check must wait until both are complete
# after checking the accuracy, either inaccurate or accurate will be run depending on the results
[model_a, model_b] >> check_accuracy >> [accurate, inaccurate
11. Refresh the page and go to the graph view to visualize the DAG.
12. Click on run to execute the DAG run either the inaccurate or accurate task, depending on the random numbers generated.
That’s it—you’ve just built your first data pipeline in Apache Airflow.
Next steps: explore how Apache Airflow can help you build and scale data pipelines
In just a few minutes, we’ve set up Apache Airflow, created a simple DAG, and executed it. Imagine how useful this could be in production, when you may have dozens or even hundreds of complex data pipelines running on different schedules. With built-in scalability and flexibility, Apache Airflow is a powerful tool for orchestrating workflows, and I hope this brief tutorial convinces you to discover how you can use it in your environment.