Migrating cron jobs to Airflow

What is Airflow?

Airflow is an open-source tool for managing, executing, and monitoring complex computational workflows and data processing pipelines started at AirBnb.

While working in my previous team, I had to integrate and process various data sources on scheduled basis. These tasks often depend on and relate to one another, creating a network of jobs. In Airflow, these networks of jobs are  DAGs (directed acyclic graphs). Using cron to manage networks of jobs will not scale effectively. Airflow offers ability to schedule, monitor, and most importantly, scale, increasingly complex workflows.

Terms and definitions

Think of a DAG as a box for one or more tasks. The box organizes related tasks into one unit to allow definition of common variables and relationships (upstream, downstream, parallel, etc).

There are already many well-written articles that explain the fundamental concepts of Airflow. I will list below my favourite Airflow resources that I’ve found while starting out. As a result, this article will stop at the basic definition of a DAG, and move directly to migrating jobs from cron to Airflow.

How to get Airflow to talk to existing Python scripts

Set-up: DAG file definition

The DAG file definition has been documented in many places. My personal favourite is the set of example DAGs from the Airflow repository. Mostly as a reference for my future self, I will include a template DAG I have used often in this migration.

To note: the scripts called inside tasks my_script.py and my_script2.py are placeholders for existing scripts that are previously scheduled in cron jobs.


from airflow import DAG
from airflow.models import Variable

# Operator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.latest_only_operator import LatestOnlyOperator

from datetime import datetime, timedelta

script_dir = Variable.get('scripts_dir')  # where my Python scripts live
mail_to = ['firstname.lastname@domain']

default_args = {
    'depends_on_past': False,
    'start_date': datetime(2019, 1, 20, 00, 00),
    'email': mail_to,
    'email_on_success': True,
    'email_on_failure': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=10),
    'catchup': False
}

# Instantiate a DAG my_dag that runs every day
# DAG objects contain tasks
# Time is in UTC!!!
my_dag = DAG(dag_id='my_dag_id',
             default_args=default_args,
             schedule_interval='30 13 * * *', )  # 8:30AM EST

# Hack to stop backfilling upstream tasks
latest_only = LatestOnlyOperator(task_id='task_1', dag=my_dag_id)

# Instantiate tasks
task_1 = BashOperator(
    task_id='task_1_id',
    bash_command=f'python {script_dir}/my_script.py -Args ',
    dag=my_dag_id
)

task_2 = BashOperator(
    task_id='task_2_id',
    bash_command=f'python {script_dir}/my_script2.py -Args ',
    dag=my_dag_id
)

# set task relationship
latest_only >> task_1 >> task_2

    

Calling Python functions that live in other repositories (but are not installed as modules)

Initially I thought I could use PythonOperator. After a few Module not found and No module named errors and some Googling, PythonOperator turns out to only work for functions installed as modules or functions that are within the same directory, hence the module errors. This StackOverflow post suggests creating a zip file of those dependencies. Since these data import functions are updated regularly and used in other places, this is not the most practical solution for my use case. As a result, I turned to BashOperator to call all the data import scripts in a separate repository.

Calling Python functions that live in other files and are installed as modules aka installing custom Python dependencies for jobs running inside Docker

Due to my Airflow jobs living inside a Docker container, all the dependencies have to be installed in the Docker image. The external libs are easy enough; pip freeze > requirements.txt (while in the appropriate environment) to generate requirements.txt, run pip install requirements.txt command upon start up, and all the dependencies are available!

I also have to deal with custom modules, which live in a different repository. Adding custom modules directly inside requirements.txt will cause pip install to fail, because custom modules are not available through the default install path. A separate pip install followed by the path going to those modules on the server takes care of the remaining dependencies.

One thing that is painfully clear to me now but was not before: simply running pip install inside the server on which the Docker container is does not install dependencies inside the container, because Docker is not aware of anything outside of the container. Airflow running inside a Docker container is looking for dependencies installed inside that container. Sometimes solving a problem involves drawing boxes around things and naming them. Then, it helps to reason about the relationships between boxes.

Questions? Comments? Reach out on Twitter.

Airflow resources I found helpful