Introduction

Hey folks, previously I wrote a blog post about how to setup Airflow and how it can streamline data pipelines and a great replacement for cron tasks.

In this post I’ll go over how to enable their api to trigger dags through nodes vs manually or scheduled runs.

By default after installing Airflow, the api is disabled for all requests. From Airflow docs, a few apis are experimental and need additional security like basic auth or only accept requests from trusted ips.

[api]
auth_backend = airflow.api.auth.backend.deny_all

You can enable it by updating it accept requests with no authentication. Very open, but you can use it for testing out Airflow apis.

[api]
auth_backend = airflow.api.auth.backend.default

To enable basic auth you can update the airflow.cfg with

[api]
auth_backend = airflow.api.auth.backend.basic_auth

Great now that we updated the config for it let’s try it out. For reference, adding a link to the all the APIs available via Airflow.

Awesome! Now let’s try to fetch the dag details using the API.

curl -X GET -u username:password http://localhost:[PORT_NUMBER]/api/v1/dags
{
  "dags": [
    {
      "dag_id": "clean_notification_logs",
      "description": null,
      "file_token": ".eJwNx9ENgDAIBcBd_FdWcYOGVNqilWcoxri93t9N1HAKnWhDxGh17JJj0AWPgq6Ywzkf4sTqpeNJfpv93bgOyl3YkiG0aOZQWOqoY7ne6QP_xCKH.kHa1zjTSHln-1Df9NSc6Imtyk0c",
      "fileloc": "/home/mohseen/Projects/airflow_runner/dags/clean_notification_logs.py",
      "is_paused": false,
      "is_subdag": false,
      "owners": ["airflow-runner"],
      "root_dag_id": null,
      "schedule_interval": {
        "__type": "CronExpression",
        "value": "0 23 * * *"
      },
      "tags": [
        {
          "name": "Frieza-runner"
        }
      ]
    }
  ],
  "total_entries": 1
}

Cool, one great use case for using Airflow APIs is for triggering a DAG run once certain conditions are met. Like the export of a table to S3 or scraping certain information periodically from a website. It’s a pretty neat feature to allow you to customize when to trigger a DAG run along with support for scheduled runs.

To trigger a new dag run you can send in a POST request to the following endpoint:

curl -X POST http://localhost:[PORT_NUMBER]/api/v1/dags/{dag_id}/dagRuns \
  -u username:password \
  --header "Content-Type: application/json" \
  --data-raw '{
      "dag_run_id": "api__2021-12-09T02:26:49.920317+00:00"
  }'
{
  "conf": {},
  "dag_id": "clean_notification_logs",
  "dag_run_id": "api__2021-12-09T02:26:49.920317+00:00",
  "end_date": null,
  "execution_date": "2021-12-09T02:26:49.918072+00:00",
  "external_trigger": true,
  "start_date": "2021-12-09T02:26:49.920317+00:00",
  "state": "running"
}

The dag_run_id is used to identify the dag run once created, if already in use it will return a 400 error.

Sweet, this allows users to chain and trigger different runs whenever the data is ready for it to consume. Airflow also enabled trigger batch DAG runs, more info from Airflow API docs.

Now, let’s see if our DAG run has finished

curl -X GET -u username:password http://localhost:[PORT_NUMBER]/api/v1/dags/{dag_id}/dagRuns/{dag_run_id}
{
  "conf": {},
  "dag_id": "clean_notification_logs",
  "dag_run_id": "api__2021-12-09T02:26:49.920317+00:00",
  "end_date": "2021-12-09T02:26:52.627239+00:00",
  "execution_date": "2021-12-09T02:26:49.918072+00:00",
  "external_trigger": true,
  "start_date": "2021-12-09T02:26:49.920317+00:00",
  "state": "success"
}

Awesome, you can now use Airflow API for your use cases as well and see how well it works. Airflow also supports XCom which is a way to communicate between tasks. You can also use it to fetch any current state between task runs or check logs from the previous runs.

That concludes this post, thanks for reading and have a great day!

Join the email list and get notified about new content

Be the first to receive latest content with the ability to opt-out at anytime.
We promise to not spam your inbox or share your email with any third parties.

The email you entered is not valid.