Airflow

Airflow is a popular open-source framework for running data pipelines, but also really great at running tasks/scripts on a schedule. Cron has supported scheduling tasks for a while, but it’s really hard to configure and monitor it correctly. Fortunately for me, Airflow was a lot easier to setup DAGs (Directed Acyclic Graphs) and dependencies needed between tasks.

Overview

Airflow has a few key features that make it easy to use:

  • Scheduler (helps in scheduling tasks once dependencies complete and fetch results)
  • Executors (determines how to run the task, for local instances or remote instances)
  • Webserver (allows you to view results, debug and trigger DAGs)
  • Metadata database (stores all the state related informFation about DAGs)

Airflow Setup

Installation

While Airflow is designed to handle many tasks and process data pipelines, it’s also a great substitute for running tasks locally or on a host, and makes it easier to manage and trigger DAGs.

AIRFLOW_VERSION=2.2.2
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow[celery,postgres]==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

With Airflow 2, they recently introduced a constraints version that have known to be working versions of dependencies for reliable builds.

After first installing Airflow, it automatically creates a metadata database using SQLite. Only SequentialExecutor is supported with SQLite, so you can’t use CeleryExecutor or any of the other supported Executors. Good news is, Airflow uses SQLAlchemy for connecting to the database, so any database supported by it should be ready to be hooked up with Airflow. Set up a database backend for airflow

Create a new database for airflow and grant permissions for accessing it. (using Postgres for the example)

CREATE DATABASE airflow_db;
CREATE USER airflow_user WITH PASSWORD 'airflow_pass';
GRANT ALL PRIVILEGES ON DATABASE airflow_db TO airflow_user;

ALTER USER airflow_user SET search_path = public;

Once you’re all setup you can run the scheduler and webserver to see the predefined DAGs.

airflow webserver --port 8080
airflow scheduler

Note: Only SequentialExecutor is supported with SQLite, so you can’t use CeleryExecutor or any of the other supported Executors with the default MetaDB.

One of the first changes I recommend is after using a different database update the executor type used from SequentialExecutor to CeleryExecutor or LocalExecutor.

# The executor class that airflow-runner should use. Choices include
# ``SequentialExecutor``, ``LocalExecutor``, ``CeleryExecutor``, ``DaskExecutor``,
# ``KubernetesExecutor``, ``CeleryKubernetesExecutor`` or the
# full import path to the class when using a custom executor.
executor = CeleryExecutor

DAGs

The really neat feature about Airflow is you can create tasks in form of DAGs that let you sequence tasks and dependencies. So if you wanted to fetch information check if any new records are present and sync with database or external services like S3 it’s easy to configure.

Directed Acyclic Graph

DAGs also have CRON presets with options to catch up with missed tasks, backfill on a time interval, and retries on failure.

Here’s an example of setting a few bash scripts in sequence.

from datetime import timedelta

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago

args = {
    'owner': 'airflow',
}

dag = DAG(
    dag_id='example_bash_operator',
    default_args=args,
    schedule_interval='0 0 * * *',
    start_date=days_ago(2),
    dagrun_timeout=timedelta(minutes=60),
    tags=['example', 'example2'],
    params={"example_key": "example_value"},
)

run_this_last = DummyOperator(
    task_id='run_this_last',
    dag=dag,
)

run_this_first = BashOperator(
    task_id='run_this_first',
    bash_command='echo Hola Mundo',
    dag=dag,
)

run_this_first >> run_this_last

if __name__ == "__main__":
    dag.cli()

The operators let you define tasks that can be linked to a single DAG.

example_dag_1

Tasks/Operators can be dynamically created and linked to different DAGs which in turn can be reused across different DAG flows.

You can create new tasks based on dynamic results like fetch account history for a list of users, letting you run the same task for different users. Modifying the above example to setup tasks dynamically. Example from Airflow docs

run_this_last = DummyOperator(
    task_id='run_this_last',
    dag=dag,
)

run_this = BashOperator(
    task_id='run_after_loop',
    bash_command='echo Hola Mundo',
    dag=dag,
)

run_this >> run_this_last

for i in range(3):
    task = BashOperator(
        task_id='runme_' + str(i),
        bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
        dag=dag,
    )
    task >> run_this

also_run_this = BashOperator(
    task_id='also_run_this',
    bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
    dag=dag,
)

also_run_this >> run_this_last

example_dag_2

Operators

Operators in Airflow are tasks with baked in functionality, that makes it easier to setup and configure tasks. If you have a lot of scripts written in Python, it can be easier to pull into Airflow and run it via PythonOperator or PythonVirtualenvOperator.

Operators can define tasks that run a bash script, or write records to a database or send an email. Here’s a list of core operators supported in Airflow.

There are also community provider packages for Postges, S3, Docker, etc. that allows you to interact with other services while building your DAGs. Example of using PythonOperator to setup a DAG for fetching and updating OHLC data for a list of stocks/crypto.

def update_crypto_history(ticker: str):
    """Gets current history and updates records in database"""
    session = get_db_session()
    ticker_last_updated = (
        session.query(StockHistory)
            .filter(StockHistory.ticker == ticker)
            .order_by(StockHistory.quote_date.desc())
            .first()
    )
    results = YahooClient().history(ticker, crypto=True)

    if ticker_last_updated:
        next_date = ticker_last_updated.quote_date + datetime.timedelta(days=1)
        results = results[next_date.strftime(DATE_TIME_FORMAT):]

    if len(results) > 0:
        db_objs = [
            StockHistory(
                ticker=ticker,
                quote_date=row["Date"],
                open=row["Open"],
                high=row["High"],
                low=row["Low"],
                last=row["Close"],
                volume=row["Volume"],
            )
            for (_, row) in results.iterrows()
        ]
        records = len(db_objs)

        print(f"### Writing {records} records for ticker: {ticker} ###")

        session.add_all(db_objs)
        session.commit()

        return f"### Write complete for ticker: {ticker}, records added: {records} ###"
    return f"No new records added for ticker: {ticker}"


def print_context(ds, **kwargs):
    """Print the Airflow context and ds variable from the context."""
    pprint(kwargs)
    print(ds)
    return "Whatever you return gets printed in the logs"

run_this = PythonOperator(
    task_id="print_the_context",
    python_callable=print_context,
    dag=dag,
)

tickers = get_all_crypto_tickers_for_portfolio(SAMPLE_ID)

# setup update task for each ticker
for ticker_symbol in tickers:
    task = PythonOperator(
        task_id=f"update_crypto_history_{ticker_symbol}",
        python_callable=update_crypto_history,
        op_kwargs={"ticker": ticker_symbol},
        dag=dag,
    )

    run_this >> task

Note: beware of database connections used inside DAGs as they are created for every DAG task run.

It might be better to have them scoped to each operation that needs it, or gather them from a connection pool. Airflow recommends using Pgbouncer to manage connections to Postgres from DAGs. See guide to setup a database backend for Airflow.

Triggering DAGs

DAG runs can be triggered manually using cli / web UI or can be triggered by api once it’s enabled with airflow. Generally DAG runs are triggered when it scheduled to run, you can set the schedule for a DAG using schedule_interval which uses a CRON format, although it’s not strictly required to have one set in order to run DAGs.

airflow dags trigger --conf '{"user": "sample_id", "action": "send_password_update_notification"}' dag_id

It’s also possible to trigger DAG runs using the Airflow API, but you would need to enable it with the Airflow webserver to allow api requests. Check out this article on how to enable Airflow API for your Airflow webserver.

curl -X POST http://airflow-service:[PORT_NUMBER]/api/v1/dags/{dag_id}/dagRuns \
    -H "Content-Type: application/json" \
    --user "username:password"

Secret manager and Dockerizing Airflow

Airflow has a separate secrets backend that allows you to store secrets in a secure way. This is useful for storing connection strings, passwords, and managing different environments. These connections can be accessed from your DAGs and used in any operator that needs them. Note, when dockerizing you Airflow application, you might need to sync the SQLAlchemy connection string and Fernet key used by Airflow.

# Environment Variables
AIRFLOW__CORE__SQL_ALCHEMY_CONN
AIRFLOW__CORE__FERNET_KEY
# airflow.cfg
sql_alchemy_conn
fernet_key

Airflow encrypts connections and variables stored using the fernet key, and will not be usable unless you use the same key to decrypt the secrets. More info on using and managing fernet keys.

CRON replacement

While Airflow was more designed for data pipelines and creating workflows, it’s a great alternative to using CRON. Making it easier to schedule and trigger DAGs and catch up on any missed runs. It’s more easily testable and has great support for connecting to different services making it one of my favorite for setting up new scripts / scraping information.

Join the email list and get notified about new content

Be the first to receive latest content with the ability to opt-out at anytime.
We promise to not spam your inbox or share your email with any third parties.

The email you entered is not valid.