Airflow basics
Introduction
Example of the DAG dashboard

DAG Creation
A DAG (Directed acyclic graph) is a set of tasks connected between them, it has a start and an end.
Example: Create table -> Is API Available -> Extract data -> Store user -> process user
Skeleton example:
1
| airflow.cfg airflow.db dags/ logs/ unitests.cfg webserver_config.py
|
Sample of a DAG user_processing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| from airflow.models import DAG
from datetime import datetime
from airflow.providers.sqlite.operators.sqlite import SqliteOperator
default_args = {
'start_date': datetime(2020, 1, 1)
}
with DAG('user_processing', schedule_interval='@daily',
default_args=default_args,
catchup=False) as dag:
create_table = SqliteOpreator(
task_id='creating_table',
sqlite_conn_id='db_sqlite',
sql='''
CREATE TABLE users (
firstname TEXT NOT NULL,
lastname TEXT NOT NULL,
country TEXT NOT NULL,
username TEXT NOT NULL,
password TEXT NOT NULL,
email TEXT NOT NULL PRIMARY KEY
);
'''
)
|
Then start the airflow webserver: airflow webserver
and also airflow scheduler
Types of operators
- Action operators: execute action
- Transfer operators: transfer data
- Sensors: wait for a condition to be met
Notes:
- Remember to use 1 operator and 1 task, it is not a N-to-N relationship, it must be 1:1