Programming, Math and Physics
https://habr.com/ru/articles/905828/
Book Practical Guide to Apache Airflow® 3 https://www.astronomer.io/ebooks/practical-guide-to-apache-airflow-3/
https://habr.com/ru/articles/866542/
https://www.astronomer.io/ebooks/apache-airflow-best-practices-etl-elt-pipelines/
https://habr.com/ru/articles/873668/ Airflow in Docker on Mac
https://towardsdatascience.com/airflow-data-intervals-a-deep-dive-15d0ccfb0661
Airflow uses SequentialExecutor by default. However, by its nature, the user is limited to executing at most one task at a time.
Sequential Executor also pauses the scheduler when it runs a task, hence it is not recommended in a production setup.
You should use the LocalExecutor for a single machine.
For a multi-node setup, you should use the Kubernetes executor or the Celery executor.
https://www.youtube.com/watch?v=J1wze8tUvw0
создание шаблонных DAG на основе конфигурационных и коллективных файлов для повышения эффективности работы с AirFlow.
значение генерации автоматических сущностей AirFlow и создания событийных пайплайнов AirFlow.
If you are using Apache Airflow, avoid the common mistake of using in your DAGs! ❌
🔄 Instead, use . Both of these templates return the same date for daily scheduled DAGs, but there are key differences you should be aware of.
📌 Why?
The approach can lead to confusion in several scenarios:
• When you manually trigger a DAG run.
• When you have multiple DAG runs within a single day.
• When you schedule your DAGs less frequently than daily, such as weekly or monthly intervals.
💡 Understanding Data Intervals in Airflow:
Every DAG run in Airflow is associated with a "Data Interval", which indicates the time range that the DAG run operates on. For example, take a DAG scheduled to run daily:
1️⃣ On January 1st, the first run of the DAG will operate over the interval starting from 00:00 on January 1st and ending at 00:00 on January 2nd.
2️⃣ The second run of the DAG starts at 00:00 on January 2nd and ends at 00:00 on January 3rd, and so on.
3️⃣ This pattern continues, with each run operating on its assigned interval, defined by a specific start and end time.
Switching to ensures your DAGs remain aligned with their defined intervals, especially if your schedule includes non-daily intervals like weekly or monthly runs. This approach makes your pipelines more consistent and reliable. ✅
all_success — правило по умолчанию. Задача запускается, если все предыдущие задачи завершены со статусом успеха.
one_failed — запуск происходит, если хотя бы одна из зависимых задач завершилась неуспешно.
all_failed — задача запускается, если все предыдущие задачи завершены со статусом неудачи.
all_done — задача будет запущена после завершения всех предыдущих задач, независимо от их статуса (успех, неудача или пропуск).
all_skipped, one_success, one_done и другие — существует множество других триггеров, которые можно детально изучить в документации.
3. TriggerDagRunOperator
Этот оператор отправляет сигнал запуска из одного графа в другой. Например, ETL-граф может запускать граф обучения после завершения своих задач.
Также этот метод можно использовать для запуска других графов.
Он решает задачу своевременного запуска зависимых графов и позволяет отслеживать связи в разделе DAG Dependencies.
Однако при добавлении нового графа потребуется вручную прописывать зависимость в коде.
4. ExternalTaskSensor
Этот оператор, похожий на TriggerDagRunOperator, позволяет зависеть от нескольких задач в разных DAG-ах и запускать часть задач,
пока другие остаются на паузе. Он также отображается в DAG Dependencies.
Разница в том, что зависимость прописывается в графе-потребителе, а не в графе-поставщике данных.
https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/datasets.html
https://stackoverflow.com/questions/75869048/how-to-use-a-table-as-dataset-for-airflow-in-data-aware-scheduling
https://www.youtube.com/watch?v=qkMSarl7BVE Mastering Advanced Dataset Scheduling in Apache Airflow
Airflow поддерживает концепцию датасетов — групп данных.
Если задачи графа обновляют определённые данные (например, таблицу в базе), можно указать флаг об обновлении датасета.
Граф, выполняющий расчёты, считается поставщиком данных, а любой граф, использующий этот датасет, — потребителем.
Планировщик запустит граф-потребитель, как только данные будут обновлены.
Это гибкий инструмент, позволяющий командам взаимодействовать на основе данных без необходимости вносить изменения в код.
Однако датасеты не подходят, если нужно триггерить графы на основе завершения задач, а не обновления данных.
Используя Dataset, можно связать граф аналитики и граф обработки данных так,
чтобы граф аналитики запускался при обновлении датасета в ETL-графе.
These DAGs are triggered by external events rather than a predefined schedule.
Triggers include:
File Sensor: A DAG runs when a file arrives in a specific directory.
API Trigger: A DAG runs when an external system sends an API request.
Task Dependency: A DAG runs when another DAG completes.
Custom Event Trigger: Using Airflow's event-driven capabilities like TriggerDagRunOperator.
Example: Triggering a DAG when a file arrives in a directory
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.sensors.filesystem import FileSensor
from datetime import datetime
default_args = {
'owner': 'airflow',
'start_date': datetime(2024, 1, 1),
}
dag = DAG(
'event_based_dag',
default_args=default_args,
schedule_interval=None, # No predefined schedule
catchup=False
)
file_sensor = FileSensor(
task_id='wait_for_file',
filepath='/path/to/trigger_file.csv',
poke_interval=60, # Check every 60 seconds
timeout=600, # Timeout after 10 minutes
mode='poke',
dag=dag
)
task = DummyOperator(task_id='dummy_task', dag=dag)
file_sensor >> task # Execute task after file detection
https://habr.com/ru/companies/dbraincloud/articles/861842/
https://airflow.apache.org/docs/apache-airflow/1.10.9/concepts.html#trigger-rules
В стандартных настройках Airflow при прерывании одной из задач работа DAG полностью останавливается,
но эту последовательность можно перенастроить с помощью тега Trigger Rule.
Как это выглядит в коде:
@task.virtualenv(task_id='delete_rbd', trigger_rule=TriggerRule.ALWAYS, requirements=["requests==2.32.3"])
Примеры других значений Trigger Rule для определения последовательности выполнения task:
all_success : (default) all parents have succeeded
all_failed : all parents are in a failed or upstream_failed state
all_done : all parents are done with their execution
one_failed : fires as soon as at least one parent has failed,
it does not wait for all parents to be done
one_success : fires as soon as at least one parent succeeds,
it does not wait for all parents to be done
none_failed : all parents have not failed (failed or upstream_failed)
i.e. all parents have succeeded or been skipped
none_skipped : no parent is in a skipped state,
i.e. all parents are in a success , failed , or upstream_failed state
dummy : dependencies are just for show, trigger at will
https://www.waitingforcode.com/apache-airflow
https://habr.com/ru/articles/811807/
https://cloud.google.com/blog/products/data-analytics/airflow-dag-and-task-concurrency-in-cloud-composer/
https://habr.com/ru/companies/ozonbank/articles/805011/ Мониторинг Apache Airflow. Оценка «прожорливости» тасок
DAG linting https://medium.com/@snir.isl/mastering-airflow-dag-standardization-with-pythons-ast-a-deep-dive-into-linting-at-scale-1396771a9b90
https://asrathore08.medium.com/airflow-interview-questions-iv-cef5100d44c5
https://www.linkedin.com/posts/shwetank-singh-68023283_data-engineering-airflow-concepts-ugcPost-7229856597575970816-bC2r
https://docs.databricks.com/en/jobs/how-to/use-airflow-with-jobs.html Airflow + Databricks
https://medium.com/apache-airflow/running-databricks-notebook-with-airflow-9b38bcfb8740
The Databricks provider was recently updated to include something like 'DatabricksWorkflowOperator',
which acts as an Airflow task group, and combines all tasks (run notebook, execute jar,..) in 1 job.
Upon running the Dag, Airflow uses the job create/update API to create/update the exact job definition, and triggers it.
Very useful because you can (re)use job compute, follow progress in both Airflow and Databricks,
your engineers can look at logs in the regular Databricks workflows menu like before while the orchestration/triggering stays in Airflow
(useful if you need to orchestrate with other processes outside of Databricks).
It takes some time to get started with it because there are not too many examples/documentation,
but we've build a config (yaml) driven framework that renders the dags.
The yaml definition almost (on purpose) matches 1-1 with the DAB workflow definitions,
so conversion of existing dabs/workflows is easy.
https://medium.com/@anupamk36/airflow-best-practices-d982fabd61f6
Data Exchange Strategies between Airflow DAGs:
https://blog.det.life/data-exchange-strategies-between-airflow-dags-18cb0848c93c
brew search airflow
brew info airflow
https://airflow.apache.org/docs/docker-stack/index.html
https://airflow.apache.org/docs/apache-airflow/stable/installation/index.html#using-production-docker-images
https://habr.com/ru/articles/831248/ Docker for airflow
https://medium.com/@harshalpagar/running-airflow-with-docker-on-developer-environment-9c1c9559668c
https://medium.com/@patricklowe33/etl-using-docker-python-postgres-airflow-ed3e9508bd2e
docker pull apache/airflow:2.10.0
https://github.com/ntd284/personal_install_airflow_docker
https://www.linkedin.com/pulse/how-can-install-airflow-docker-minutes-duong-nguyen-tuan-wkddc/?trackingId=0K0AgWo0iNYvHboc%2FIM4cQ%3D%3D
https://stackabuse.com/running-airflow-locally-with-docker-a-technical-guide/
https://airflowsummit.org/sessions/2023/introducing-airflowctl/
pip install airflow ctl
https://github.com/kaxil/airflowctl
https://www.restack.io/docs/airflow-knowledge-apache-mac-m1-install
python3 -m venv airflow_venv
source airflow_venv/bin/activate
pip install apache-airflow==2.2.3 --constraint https://raw.githubusercontent.com/apache/airflow/constraints-2.2.3/constraints-3.8.txt
airflow db init
airflow webserver -p 8080 &
airflow scheduler &
Airflow: как повысить стабильность загрузки данных в 5 раз https://habr.com/ru/articles/792872/
https://www.mage.ai/
https://gitlab.kitware.com/computer-vision/cmd_queue
Airflow Book: https://www.astronomer.io/ebooks/data-pipelines-with-apache-airflow.pdf
https://medium.com/numberly-tech-blog/orchestrating-python-workflows-in-apache-airflow-fd8be71ad504
https://medium.com/nerd-for-tech/airflow-mwaa-automating-etl-for-a-data-warehouse-f5e50d14713c
https://medium.com/nerd-for-tech/airflow-catchup-backfill-demystified-355def1b6f92
pip install apache-airflow
Airflow is basically coding DAGs, that are composed of Tasks, that are run by Operators
Airflow “Hooks” are a high level interface that leverages Airflow Connections to get access to such resources.
They often use external libraries or tedious network operations “under the hood”.
Operators
Airflow has a very extensive set of operators available, here are some examples:
BashOperator - executes a bash command
PythonOperator - calls an arbitrary Python function
EmailOperator - sends an email
https://medium.com/nerd-for-tech/airflow-features-callback-trigger-clsuter-policy-cc7f8022e7d3
https://medium.com/numberly-tech-blog/orchestrating-python-workflows-in-apache-airflow-fd8be71ad504
Deferreble operators
https://blog.devgenius.io/airflow-deferrable-operators-5a7c90aaa14f
https://www.youtube.com/watch?v=uB7zweaF8EA What is new in Airflow 2.7 ?
https://blog.devgenius.io/airflow-task-parallelism-6360e60ab942
https://www.youtube.com/watch?v=JJ_nnGkZjBc&list=PL2Uw4_HvXqvY2zhJ9AMUa_Z6dtMGF3gtb&index=126
https://stackoverflow.com/questions/50708226/airflow-macros-in-python-operator
https://www.astronomer.io/events/webinars/improve-your-dags-with-hidden-airflow-features/
https://www.datafold.com/blog/3-most-underused-features-of-apache-airflow
Task failure handling: https://stackabuse.com/handling-task-failures-in-airflow-a-practical-guide/
https://habr.com/ru/articles/737046/
https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html
https://habr.com/ru/companies/neoflex/articles/736292/
dag = DAG(
"example_parameterized_dag",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
)
How to start DAG from command line:
airflow.sh schedule -d airflow_stats_dag &
How to disable DAG:
1) if you set the start date of the dag to some date way in the future, you’ll halt it until that time period. Dag stays in tact, but doesn’t run.
2) set an end_date in the past
3)
default_args: {
schedule_interval = '@once'
}
https://habr.com/ru/company/otus/blog/679104/
https://habr.com/ru/post/682460/
https://habr.com/ru/post/682384/
https://habr.com/ru/post/682714/
https://habr.com/ru/post/684296/
https://shopify.engineering/lessons-learned-apache-airflow-scale
https://news.ycombinator.com/item?id=31480320
docker-compose up –build
MWAA is abbreviation of: https://aws.amazon.com/managed-workflows-for-apache-airflow/
https://airflow.apache.org/blog/airflow-two-point-oh-is-here/
https://towardsdatascience.com/is-apache-airflow-2-0-good-enough-for-current-data-engineering-needs-6e152455775c
Airflow 2.3.0 dropped support for Python 3.6. It’s tested with Python 3.7, 3.8, 3.9 and 3.10.
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
@dag(default_args={'owner': 'airflow'}, schedule_interval=None, start_date=days_ago(2))
def tutorial_taskflow_api_etl():
@task
def extract():
return {"1001": 301.27, "1002": 433.21, "1003": 502.22}
@task
def transform(order_data_dict: dict) -> dict:
total_order_value = 0
for value in order_data_dict.values():
total_order_value += value
return {"total_order_value": total_order_value}
@task()
def load(total_order_value: float):
print("Total order value is: %.2f" % total_order_value)
order_data = extract()
order_summary = transform(order_data)
load(order_summary["total_order_value"])
tutorial_etl_dag = tutorial_taskflow_api_etl()
def _choose_platform(**kwargs):
platform = kwargs.get("templates_dict").get("platform")
print("INSIDE _choose_platform(): platform=", platform)
if platform == "all":
return ["etl_reviews_ios", "etl_reviews_android"]
elif platform == "ios":
return [
"etl_reviews_ios",
]
elif platform == "android":
return [
"etl_reviews_android",
]
else:
return None
choose_platform = BranchPythonOperator(
task_id="choose_platform",
python_callable=_choose_platform,
templates_dict={"platform": ""},
)
start >> choose_platform >> [etl_reviews_ios, etl_reviews_android]
https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/python/index.html
classairflow.operators.python.BranchPythonOperator(*,
python_callable,
op_args=None,
op_kwargs=None,
templates_dict=None,
templates_exts=None,
show_return_value_in_logs=True, **kwargs)
Bases: PythonOperator, airflow.models.skipmixin.SkipMixin
You can pass params to python_callable vi op_args or op_kwarrgs
PythonOperator have a named parameter op_kwargs and accepts dict object.
have
t5_send_notification = PythonOperator(
task_id='t5_send_notification',
provide_context=True,
python_callable=SendEmail,
op_kwargs={"my_param":'value1'},
dag=dag,
)
def SendEmail(my_param,**kwargs):
print(my_param) #'value_1'
msg = MIMEText("The pipeline for client1 is completed, please check.")
msg['Subject'] = "xxxx"
msg['From'] = "xxxx"
Allows a workflow to “branch” or follow a path following the execution of this task.
It derives the PythonOperator and expects a Python function that returns a single task_id or list of task_ids to follow. The task_id(s) returned should point to a task directly downstream from {self}.
From https://databand.ai/blog/airflow-2-0-and-why-we-are-excited-at-databand/
def prepare_email(**kwargs):
ti = kwargs['ti']
raw_json = ti.xcom_pull(task_ids='get_ip')
external_ip = json.loads(raw_json)['origin']
ti.xcom_push(key="subject", value=f'Server connected from {external_ip}')
ti.xcom_push(key="body", value=f'Seems like today your server executing Airflow is connected from the external IP {external_ip}')
with DAG('send_server_ip', default_args=default_args, schedule_interval=None) as dag:
get_ip = SimpleHttpOperator(task_id='get_ip', endpoint='get', method='GET', xcom_push=True)
email_info = PythonOperator(task_id="prepare_email", python_callable=prepare_email, xcom_push=True, provide_context=True)
send_email = EmailOperator(
task_id='send_email',
to='example@example.com',
subject="",
html_content=""
)
get_ip >> email_info >> send_email
From https://databand.ai/blog/airflow-2-0-and-why-we-are-excited-at-databand/ https://databand.ai/blog/streamline-your-pipeline-code-with-functional-dags-in-airflow-2-0/
Annotating a function with the @task decorator converts the function to a “PythonFunctionalOperator” that’s created behind the scenes when Airflow prepares your DAG for execution.
The multiple_outputs attribute marks that this function will return more than a single value.
@task(multiple_outputs=True)
def prepare_email(raw_json: str) -> Dict[str, str]:
external_ip = json.loads(raw_json)['origin']
return {
'subject':f'Server connected from {external_ip}',
'body': f'Seems like today your server executing Airflow is connected from the external IP {external_ip}'
}
with DAG('send_server_ip', default_args=default_args, schedule_interval=None) as dag:
get_ip = SimpleHttpOperator(task_id='get_ip', endpoint='get', method='GET', xcom_push=True)
email_info = prepare_email(get_ip.output)
send_email = EmailOperator(
task_id='send_email',
to='example@example.com',
subject=email_info['subject'],
html_content=email_info['body']
)
Четыре хитрости в работе с пайплайнами данных, о которых знают не все https://habr.com/ru/company/vk/blog/659389/
e-mail notification
from airflow.utils.email import send_email
email_to = 'receivers@email.com'
email_subject = '(FAILED) ' + jobname + date
email_content = 'Your job has failed.'
send_email(email_to, email_subject, email_content)
Slack nitification https://slack.dev/python-slack-sdk/web/index.html
from slack import WebClient
client = WebClient(token = 'your token here')
response = client.chat_postMessage(
channel = slack_cannel,
text = message
)
Logging
Import PostgresHook
#Extraction Job
def ExtractFromSource(query):
query_to_run = query
logging.info("Query : %" query_to_run)
cursor = PostgresHook(connection).get_conn().cursor()
logging.info("Connecting to Postgres Connection %" connection)
cursor.execute(query_to_run)
result = cursor.fetchall()
https://marclamberti.com/blog/airflow-sensors/
https://www.mikulskibartosz.name/postpone-airflow-dag-until-s3-upload/
https://hevodata.com/learn/s3keysensor/
https://airflow.apache.org/docs/apache-airflow/1.10.5/_api/airflow/sensors/s3_key_sensor/index.html
https://blog.fal.ai/the-unbundling-of-airflow-2/
https://airflow.apache.org/docs/
https://levelup.gitconnected.com/airflow-command-line-interface-cli-cheat-sheet-6e5d90bd3552
airflow dags list airflow tasks list
https://airflow.readthedocs.io/en/1.10.10/concepts.html
https://www.astronomer.io/guides/dynamically-generating-dags
https://towardsdatascience.com/data-engineers-shouldnt-write-airflow-dags-b885d57737ce
https://towardsdatascience.com/data-engineers-shouldnt-write-airflow-dags-part-2-8dee642493fb
To to make Several DAGs folders? https://xnuinside.medium.com/how-to-load-use-several-dag-folders-airflow-dagbags-b93e4ef4663c
we need to put in your standard dag_folder special tiny python script. Call this file something like ‘add_dag_bags.py’ with very simple code inside. To show, how it works, we will create two separate folders: ‘~/new_dag_bag1’ and ‘~/work/new_dag_bag2’. It does not matter how much long path and there it is placed. Airflow just must have rights to access those folders. Code in add_dag_bags.py will be:
""" add additional DAGs folders """
import os
from airflow.models import DagBag
dags_dirs = ['~/new_dag_bag1', '~/work/new_dag_bag2']
for dir in dags_dirs:
dag_bag = DagBag(os.path.expanduser(dir))
if dag_bag:
for dag_id, dag in dag_bag.dags.items():
globals()[dag_id] = dag
https://www.manning.com/books/data-pipelines-with-apache-airflow
https://www.manning.com/downloads/2060 . source code
https://marclamberti.com/blog/airflow-branchpythonoperator/ . BranchPythonOperator
Dynamic DAG https://galea.medium.com/airflow-dynamic-dags-python-globals-4f40905d314a
Decorator Airflow https://levelup.gitconnected.com/airflow-decorators-for-a-clean-data-pipeline-48ebdf12e9b0
import os
import sys
import sysconfig
import logging
from airflow.operators.python_operator import PythonOperator
def py_info():
print("-- Python version --")
print(sys.version_info)
#logging.info(sys.version_info)
print("site-packages=")
print(sysconfig.get_path('purelib'))
#logging.info(sysconfig.get_path('purelib'))
print("\n--- ENVIRONMENT VARIABLES ---\n")
for k,v in sorted(os.environ.items()):
print(k,":",v)
#logging.info(k + "=" + v)
print("\n---- PATH ----\n")
for item in os.environ["PATH"].split(':') :
print(item)
#logging.info(item)
info = PythonOperator(
task_id = 'info',
python_callable = py_info,
dag = dag
)
https://towardsdatascience.com/tagged/apache-airflow
https://blog.devgenius.io/airflow-cross-dag-dependency-b127dd3b69d8
https://medium.com/quintoandar-tech-blog/effective-cross-dags-dependency-in-apache-airflow-1885dc7ece9f
Airflow provides us with 3 native ways to create cross-dag dependency.
Push-based — TriggerDagRunOperator
Pull-based — ExternalTaskSensor
Across Environments — Airflow API (SimpleHttpOperator)
https://towardsdatascience.com/is-apache-airflow-2-0-good-enough-for-current-data-engineering-needs-6e152455775c
https://www.astronomer.io/guides/dynamically-generating-dags
https://www.udemy.com/course/the-ultimate-hands-on-course-to-master-apache-airflow/learn/lecture/15819418#overview
(code in introduction )
https://github.com/marclamberti/airflow-materials-vm
https://marclamberti.com/
https://www.udemy.com/course/apache-airflow/learn/lecture/17368650
https://habr.com/ru/post/549458/
https://khashtamov.com/en/introduction-to-apache-airflow/
https://github.com/apache/airflow/issues/12341
sudo /opt/airflow/airflow backfill --help
[2020-12-08 03:07:05,373] {__init__.py:51} INFO - Using executor CeleryExecutor
usage: airflow backfill [-h] [-t TASK_REGEX] [-s START_DATE] [-e END_DATE]
[-m] [-l] [-x] [-i] [-I] [-sd SUBDIR] [--pool POOL]
[--delay_on_limit DELAY_ON_LIMIT] [-dr] [-v] [-c CONF]
[--reset_dagruns] [--rerun_failed_tasks]
dag_id
positional arguments:
dag_id The id of the dag
optional arguments:
-h, --help show this help message and exit
-t TASK_REGEX, --task_regex TASK_REGEX
The regex to filter specific task_ids to backfill
(optional)
-s START_DATE, --start_date START_DATE
Override start_date YYYY-MM-DD
-e END_DATE, --end_date END_DATE
Override end_date YYYY-MM-DD
-m, --mark_success Mark jobs as succeeded without running them
-l, --local Run the task using the LocalExecutor
-x, --donot_pickle Do not attempt to pickle the DAG object to send over
to the workers, just tell the workers to run their
version of the code.
-i, --ignore_dependencies
Skip upstream tasks, run only the tasks matching the
regexp. Only works in conjunction with task_regex
-I, --ignore_first_depends_on_past
Ignores depends_on_past dependencies for the first set
of tasks only (subsequent executions in the backfill
DO respect depends_on_past).
-sd SUBDIR, --subdir SUBDIR
File location or directory from which to look for the
dag
--pool POOL Resource pool to use
--delay_on_limit DELAY_ON_LIMIT
Amount of time in seconds to wait when the limit on
maximum active dag runs (max_active_runs) has been
reached before trying to execute a dag run again.
-dr, --dry_run Perform a dry run
-v, --verbose Make logging output more verbose
-c CONF, --conf CONF JSON string that gets pickled into the DagRun's conf
attribute
--reset_dagruns if set, the backfill will delete existing backfill-
related DAG runs and start anew with fresh, running
DAG runs
--rerun_failed_tasks if set, the backfill will auto-rerun all the failed
tasks for the backfill date range instead of throwing
exceptions
https://marclamberti.com/blog/the-postgresoperator-all-you-need-to-know/
https://airflow.apache.org/docs/stable/_api/airflow/hooks/postgres_hook/index.html
https://youtu.be/ATUARuFh3JQ
https://stackoverflow.com/questions/64796614/airflow-how-to-pass-template-macro-to-hive-script
Book: https://livebook.manning.com/book/data-pipelines-with-apache-airflow/chapter-5/v-5/78
https://www.mikulskibartosz.name/using-sensors-in-airflow/
https://www.astronomer.io/guides/templating/
ds and macros variables can only be accessed through template as they only exists during execution and not during python code parsing
https://stackoverflow.com/questions/43149276/accessing-the-ds-variable-in-airflow
https://stackoverflow.com/questions/36730714/execution-date-in-airflow-need-to-access-as-a-variable/45725005#45725005
EXEC_DATE = ''
EXEC_DATE = ''
https://towardsdatascience.com/best-practices-for-airflow-developers-990c8a04f7c6
not all operator parameters are templated, so you need to make sure Jinja templating is enabled for the operators that you plan to pass macros to.
To check which parameters in an operator take macros as arguments, look for the template_fields attribute in the operator source code.
For example, as of today, the most recent version of PythonOperator has three templated parameters:
‘templates_dict’, ‘op_args’, and ‘op_kwargs’:
template_fields = ('templates_dict', 'op_args', 'op_kwargs')
In order to enable templating for more parameters, simply overwrite thextemplate_fields attribute.
Since this attribute is an immutable tuple, make sure to include the original list of templated parameters when you overwrite it.
http://airflow.apache.org/docs/stable/_modules/airflow/operators/hive_operator.html#HiveOperator.template_fields
https://habr.com/ru/company/lamoda/blog/518620/
https://diogoalexandrefranco.github.io/about-airflow-date-macros-ds-and-execution-date/
The execution date of the running DAG as YYYY-MM-DD
add 5 days
https://airflow.apache.org/docs/stable/scheduler.html#scheduling-triggers
Note that if you run a DAG on a schedule_interval of one day, the run stamped 2016-01-01 will be trigger soon after 2016-01-01T23:59.
In other words, the job instance is started once the period it covers has ended.
Let's Repeat That The scheduler runs your job one schedule_interval AFTER the start date, at the END of the period.
https://stackoverflow.com/questions/58414350/airflow-skip-current-task SkipTask
https://habr.com/ru/post/512386/ Airflow in russian
http://blog.manugarri.com/how-to-trigger-a-dag-with-custom-parameters-on-airflow-ui/
Task C will run after both Task A and B complete [task_a, task_b] » task_c
from airflow.utils.helpers import chain
# Both Task B and C depend on Task A
# Task D depends on both Task B and C
chain(task_a, [task_b, task_c], task_d)
# The statement above is equivalent to:
task_a >> [task_b, task_c] >> task_d
Use cross_downstream() to set dependencies between two groups of tasks:
from airflow.utils.helpers import cross_downstream
# Task C and D will run after both Task A and B complete
cross_downstream([task_a, task_b], [task_c, task_d])
# The statement above is equivalent to:
[task_a, task_b] >> task_c
[task_a, task_b] >> task_d
https://stackoverflow.com/questions/62895219/getting-error-in-airflow-dag-unsupported-operand-types-for-list-and-lis
File "/tmp/mlubinsky/roku-dag-bag.pex/agg/agg_daily_non_bucketed/agg_daily_non_bucketed.py", line 476, in get_agg_daily_dag
>> [hive_agg_amoeba_allocation_daily_task_1, redshift_agg_amoeba_allocation_events_load_task_1]
TypeError: unsupported operand type(s) for >>: 'HiveOperator' and 'list'
Following does not work:
task_1 >> [task_2 , task_3] >> [ task_4 , task_5 ] >> task_6
You can fix it:
task_1 >> [task_2 , task_3]
task_2 >> [task_4, task_5] >> task_6
task_3 >> [task_4, task_5]
or
task_1 >> [task_2 , task_3]
task_2 >> task_4
task_3 >> task_5
[task_4, task_5] >> task_6
Airflow task dependencies can’t handle [list]»[list]. Easiest way around this is to specify your dependencies over multiple lines:
task_1 >> [task_2 , task_3]
task_2 >> [task_4, task_5]
task_3 >> [task_4, task_5]
[task_4 , task_5 ] >> task_6
Another example:
first_event_chain =
(
hv_agg_channel_ux_day_prep &
hv_agg_channel_ux_day_stg &
hv_agg_channel_ux_day &
(
agg_channel_ux_day |
(
hv_agg_channel_ux_week & agg_channel_ux_week
)
|
(hv_agg_channel_ux_month & agg_channel_ux_month)
)
&
channel_events_done_dummy_task
)
-----------------------------------------------
channel_provider_ux_agg_task_chain =
(
channel_provider_ux_check_done &
(hv_channel_provider_ux_details_stg &
(hv_channel_provider_ux_monthly &
(
(
check_agg_channel_ux_provider_metrics_monthly &
channel_provider_ux_month &
hv_channel_provider_ux_weekly &
channel_provider_ux_weekly
)
|
channel_provider_task_chain
) &
channel_provider_ux_dm_task
) | channel_provider_ux_done_dummy_task
) & channel_provider_ux_join_task
)
IF3 - Looking to check for path s3://roku-dea-dev/sand-box/roku-data-warehouse/donemarkers/tables/roku/agg_channel_ux_provider_metrics_time_grain/2020-10-23.done
https://airflow.apache.org/docs/stable/_modules/airflow/operators/sql.html
https://stackoverflow.com/questions/60601713/how-to-store-the-sql-query-result-using-airflow-and-use-the-result-in-if-else-co
https://towardsdatascience.com/airflow-sharing-data-between-tasks-7bbaa27eeb1
def do_work():
hiveserver = HiveServer2Hook()
hql = "SELECT COUNT(*) FROM foo.bar"
row_count = hiveserver.get_records(hql, schema='foo')
print row_count[0][0]
All available Hive Hooks methods can be found here:
https://github.com/apache/incubator-airflow/blob/master/airflow/hooks/hive_hooks.py
https://pythonhosted.org/airflow/concepts.html#xcoms
https://github.com/airbnb/airflow/blob/master/airflow/example_dags/example_xcom.py
https://stackoverflow.com/questions/62403142/airflow-branchpythonoperator-chaining
think you suggest to use the chain func like this:
chain ([t1,t2,t3, fork_task, join_task] )
? (edited)
Michael Lu 2 hours ago
I am not clear
Nick Benthem 2 hours ago
I'd just use it as chain(t1,t2,t3,fork_task,[branch_1,branch_2],join_task)
Michael Lu 2 hours ago
I see your point. I will try it shortly
The chain() returns nothing.
How my return statement should look like?
Nick Benthem 1 hour ago
I wouldn't even use a new function - I'd just call
chain(t1,t2,t3,fork_task,[branch_1,branch_2],join_task) for your downstream dependencies
Michael Lu 1 hour ago
I am trying to say what I do not understand clearly how to use it - f() has the return statement; what I shout pass to return?
Michael Lu 43 minutes ago
The def f() : is not created by me, I have to obey the contract.
How to convert the current return:
return ( t1 >> t2 >> t3)
to my requirement?
Nick Benthem 40 minutes ago
That seems like a weird contract - and I would re-examine what you're trying to do. You might be able to wrap the function in a lambda of some sort - but t1 >> t2 is just a moniker for __rshift__. It's a function call you're trying to do - but that's getting awfully wonky.
Check out the code here for what that >> logic is doing: https://github.com/apache/airflow/blob/5355909b5f4ef0366e38f21141db5c95baf443ad/airflow/models.py#L2569
airflow/models.py:2569
def __rshift__(self, other):
<https://github.com/apache/airflow|apache/airflow>apache/airflow | Added by GitHub
https://www.youtube.com/watch?v=XJf-f56JbFM
https://bhavaniravi.com/blog/apache-airflow-introduction
https://github.com/quantumblacklabs/kedro . Kedro: best-practice for data and ML pipelines.
https://pyvideo.org/pycon-hk-2018/industrial-machine-learning-pipelines-with-python-airflow.html
https://gtoonstra.github.io/etl-with-airflow/index.html
https://www.udemy.com/course/the-ultimate-hands-on-course-to-master-apache-airflow/ . Udemy /pereuc…
https://medium.com/@achilleus/robust-apache-airflow-deployment-dd02a6c75c78
https://medium.com/@achilleus/easy-way-to-manage-your-airflow-setup-b7c030dd1cb8
https://marclamberti.com/blog/airflow-bashoperator/
https://airflow.apache.org/docs/stable/_modules/airflow/operators/s3_file_transform_operator.html S3 file transform
https://airflow.apache.org/docs/stable/_modules/airflow/operators/s3_to_redshift_operator.html S3 -> Redshift
https://sonra.io/2018/01/01/using-apache-airflow-to-build-a-data-pipeline-on-aws/ . S3 to Redshift
copy product_tgt1
from 's3://productdata/product_tgt/product_details.csv
iam_role 'arn:aws:iam::<aws-account-id>:role/<role-name>'
region 'us-east-2';
copy AAA.T_FACT
from 's3://AAA-data-warehouse/facts_orc/AAA_device/date_key=2019-10-20/000000_0'
credentials 'aws_iam_role=arn:aws:iam::182333787270:role/RedShiftDevDataProcessingDevPrivileges'
format as ORC
The scheduler runs a DAG soon after (start_date + schedule_interval) is passed
airflow list_dags
airflow initdb
airflow webserver
airflow scheduler
airflow connections
airflow connections -h
airflow list_tasks <DAG_id>
airflow list_dag_runs <DAG_id>
airflow trigger_dag <DAG_id>
airflow test <DAG_id> <task_id> arguments # runs task without checking dependencies
airflow next_execution <DAG_id> . # next execution time
airflow delete_dag <DAG_id>
in airflow.cfg there are 2 parameters related to concurrency:
parallelism dag_concurrency
with DAG (...) as dag:
t1= SomeOperator() # no need to pass dag to operator !!!
t2= Another Operator
t1 >> t2 >> [t3, t4] >> t8
https://habr.com/ru/company/mailru/blog/344398/
https://towardsdatascience.com/how-to-use-airflow-without-headaches-4e6e37e6c2bc
https://habr.com/ru/company/mailru/blog/479900/
https://janakiev.com/blog/apache-airflow-systemd/
Roku https://blog.usejournal.com/roku-is-locking-down-tvs-until-you-give-personal-data-397ceadfd458
https://www.reddit.com/r/bigdata/comments/dwae40/tutorial_on_how_to_use_airflow_without_pain/
Airflow doesn’t treat data as a first class citizen. You should query data, then pass it via XCOM.
Airflow’s usage pattern is to extract data, save it somewhere like S3, then pass the s3 bucket and key location to the next task via XCOM. There are many many downsides to using heavy (really big) XCOMs, and your metadata database has to store that data to pass between tasks, and IIRC it doesn’t ever delete the data.
https://airflow-tutorial.readthedocs.io/
https://towardsdatascience.com/apache-airflow-tips-and-best-practices-ff64ce92ef8
https://towardsdatascience.com/how-to-use-airflow-without-headaches-4e6e37e6c2bc
https://tech.marksblogg.com/install-and-configure-apache-airflow.html
https://tech.marksblogg.com/airflow-postgres-redis-forex.html
https://zulily-tech.com/2019/11/19/evolution-of-zulilys-airflow-infrastructure/
https://airflow.apache.org/docs/stable/faq.html By design, an Airflow DAG will execute at the completion of its schedule_interval.
The default schedule_interval is one day (datetime.timedelta(1)). You must specify a different schedule_interval directly to the DAG object you instantiate
The task instances directly upstream from the task need to be in a success state. Also, if you have set
depends_on_past=True
,
the previous task instance needs to have succeeded (except if it is the first run for that task). Also, if wait_for_downstream=True
, make sure you understand what it means. You can view how these properties are set from the Task Instance Details page for your task.
That means one schedule_interval AFTER the start date. An hourly DAG, for example, will execute its 2pm run when the clock strikes 3pm. The reasoning here is that Airflow can’t ensure that all data corresponding to the 2pm interval is present until the end of that hourly interval.
For a DAG to be executed, the start_date
must be a time in the past, otherwise Airflow will assume that it’s not yet ready to execute. When Airflow evaluates your DAG file, it interprets datetime.now()
as the current timestamp (i.e. NOT a time in the past) and decides that it’s not ready to run. Since this will happen every time Airflow heartbeats (evaluates your DAG) every 5-10 seconds, it’ll never run.
To properly trigger your DAG to run, make sure to insert a fixed time in the past (e.g. datetime(2019,1,1)) and set catchup=False
(unless you’re looking to run a backfill).
Note: You can manually trigger a DAG run via Airflow’s UI directly on your dashboard (it looks like a “Play” button). A manual trigger executes immediately and will not interrupt regular scheduling, though it will be limited by any concurrency configurations you have at the DAG, deployment level or task level. When you look at corresponding logs, the run_id will show manual__ instead of scheduled__.
https://www.astronomer.io/blog/7-common-errors-to-check-when-debugging-airflow-dag/
psql -U airflow
\dt
alembic_version
chart
connection
dag
dag_pickle
dag_run
import_error
job
..
log
..
task_instance
..
users
variable
xcom
https://levelup.gitconnected.com/running-airflow-in-docker-759068fb43b2
https://github.com/benattali/airflow-with-docker
https://github.com/godatadriven/whirl . https://blog.godatadriven.com/open-source-airflow-local-development https://blog.godatadriven.com/testing-and-debugging-apache-airflow Testing and debugging Apache Airflow
How We Solved Our Airflow I/O Problem By Using A Custom Docker Operator https://medium.com/enigma-engineering/how-we-solved-our-airflow-i-o-problem-by-using-a-custom-docker-operator-dcc7c8111be5
Containerizing Data Workflows https://medium.com/enigma-engineering/containerizing-data-workflows-95df1d338048
https://github.com/puckel/docker-airflow used by Udemy class
docker exec -it <container_id> bash
docker exec -it <container_id> sh -c "/entrypoint.sh" /bin/bash"
https://www.manning.com/books/data-pipelines-with-apache-airflow . BOOK
https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Links Links
http://michal.karzynski.pl/blog/2017/03/19/developing-workflows-with-apache-airflow/
https://github.com/geosolutions-it/evo-odas/wiki/Airflow—about-subDAGs,-branching-and-xcom
https://github.com/apache/airflow
http://pydoc.net/apache-airflow
http://airflow.apache.org/faq.html
https://airflow.apache.org/concepts.html#hooks
https://www.sicara.ai/blog/2019-01-28-automate-aws-tasks-thanks-to-airflow-hooks AirFlow Hooks
Hooks uses aifflow.model.connection.Connection to get hostnames and auth info
https://medium.com/geoblinktech/bring-sanity-to-your-data-pipelines-with-apache-airflow-3c9906aac77c
https://medium.com/walmartlabs/auditing-airflow-batch-jobs-73b45100045 Auditing Airflow
https://eng.lyft.com/running-apache-airflow-at-lyft-6e53bb8fccff Lyft uses Airflow
https://medium.com/the-prefect-blog/why-not-airflow-4cfa423299c4 Why Not Airflow?
pip install apache-airflow[celery]
pip install apache-airflow[postgres,s3]
pip install -U apache-airflow
https://medium.com/slido-dev-blog/automate-executing-aws-athena-queries-and-moving-the-results-around-s3-with-airflow-dd3603dc611d Automate executing AWS Athena queries and moving the results around S3 with Airflow:
DAGs are a high-level outline that define the dependent and exclusive tasks that can be ordered and scheduled.
This service is responsible for:
https://stackoverflow.com/questions/33126159/airflow-not-scheduling-correctly-python
is not interpreted by Airflow as the start time of the DAG, but rather the end of an interval capped by the DAG’s start time. Ad-hoc runs are now possible as long as they don’t share an execution_date with any other run.
a utility that was introduced to allow tasks to exchange small pieces of metadata. This is a useful feature if you want task A to tell task B that a large dataframe was written to a known location in cloud storage
https://www.sicara.ai/blog/2019-04-08-apache-airflow-celery-workers
The executor is defined in airflow.cfg
Airflow proposes several executor out of the box, from the simplest to the most full-featured:
SequentialExecutor: a very basic, single task at a time, executor that is also the default one. You do NOT want to use this one for anything but unit testing, it uss SQLite -> will not require you to install any external db
https://medium.com/leboncoin-engineering-blog/data-traffic-control-with-apache-airflow-ab8fd3fc8638
Also look into attached PDF in this folder
airflow scheduler &
airflow webserver & . http://localhost:8080
pip freeze | grep Flask
Flask==1.1.1
>>> from airflow.models import DAG
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/Users/mlubinsky/ide_virtual_env/lib/python2.7/site-packages/airflow/__init__.py", line 30, in <module>
from airflow import version
ImportError: cannot import name version
# print the list of active DAGs
airflow list_dags
# prints the list of tasks the "tutorial" dag_id
airflow list_tasks tutorial
# prints the hierarchy of tasks in the tutorial DAG
airflow list_tasks tutorial --tree
# initialize the database
airflow initdb # This creates airflow directory in home path with airflow.cfg and logs folder.
# start the web server, default port is 8080
airflow webserver -p 8080
# start the scheduler
airflow scheduler
# visit localhost:8080 in the browser and enable the example dag in the home page
The first time you run Airflow, it will create a file called airflow.cfg in your $AIRFLOW_HOME directory (~/airflow by default). This file contains Airflow’s configuration and you can edit it to change any of the settings.
cp airflow_test.py ~/airflow/dags/
https://medium.com/@guillaume_payen/use-conditional-tasks-with-apache-airflow-98bab35f1846
https://medium.com/@dustinstansbury/understanding-apache-airflows-key-concepts-a96efed52b1a
https://eng.lyft.com/running-apache-airflow-at-lyft-6e53bb8fccff
Airflow provides various configurables to tune the DAG performance. we suggest users tune the following variables:
Tasks are user-defined activities ran by the operators. They can be functions in Python or external scripts that you can call. Tasks are expected to be idempotent — no matter how many times you run a task, it needs to result in the same outcome for the same input parameters.
https://medium.com/@dustinstansbury/understanding-apache-airflows-key-concepts-a96efed52b1a Tasks can have two flavors: they can either execute some explicit operation, in which case they are an Operator, or they can pause the execution of dependent tasks until some criterion has been met, in which case they are a Sensor. In principle, Operators can perform any function that can be executed in Python. Similarly, Sensors can check the state of any process or data structure.
Don’t confuse operators with tasks. Tasks are defined as “what to run?” and operators are “how to run”. For example, a Python function to read from S3 and push to a database is a task. The method that calls this Python function in Airflow is the operator. Airflow has built-in operators that you can use for common tasks. You can create custom operators by extending the BaseOperator class and implementing the execute() method.
if two operators need to share information, like a filename or small amount of data, you should consider combining them into a single operator. If it absolutely can’t be avoided, Airflow does have a feature for operator cross-communication called XCom.
if you provide provide_context=True
, you need to have **kwargs
in your function
if you’re passing everything through templates_dict
your param will be available in kwargs['templates_dict']['file_in']
DummyOperator
BashOperator - executes a bash command
PythonOperator - calls an arbitrary Python function
BranchPythonOperator (python_callable) returns task_id or list of task to which control to be given
HiveOperator
EmailOperator - sends an email
HTTPOperator - sends an HTTP request
MySqlOperator, SqliteOperator, PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator, etc. - executes a SQL command
Sensor - waits for a certain time (poke_interval, timeout, soft_fail), file, database row, S3 key, event etc… FileSensor, TimeDeltaSensor, S3KeySensor
TranferOperator - moves data from one system to another: e.g. S3ToRedshiftTranfer, etc
There are five categories of tests in Airflow that you can write:
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators import MultiplyBy5Operator
def print_hello():
return 'Hello Wolrd'
dag = DAG('hello_world', description='Hello world example', schedule_interval='0 12 * * *', start_date=datetime(2017, 3, 20), catchup=False)
dummy_operator = DummyOperator(task_id='dummy_task', retries = 3, dag=dag)
hello_operator = PythonOperator(task_id='hello_task', python_callable=print_hello, dag=dag)
multiplyby5_operator = MultiplyBy5Operator(my_operator_param='my_operator_param',
task_id='multiplyby5_task', dag=dag)
dummy_operator >> hello_operator
dummy_operator >> multiplyby5_operator
Here is multiplyby5_operator:
import logging
from airflow.models import BaseOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
log = logging.getLogger(__name__)
class MultiplyBy5Operator(BaseOperator):
@apply_defaults
def __init__(self, my_operator_param, *args, **kwargs):
self.operator_param = my_operator_param
super(MultiplyBy5Operator, self).__init__(*args, **kwargs)
def execute(self, context):
log.info('operator_param: %s', self.operator_param)
return (self.operator_param * 5)
class MultiplyBy5Plugin(AirflowPlugin):
name = "multiplyby5_plugin"
Validation Test
import unittest
from airflow.models import DagBag
class TestDagIntegrity(unittest.TestCase):
LOAD_SECOND_THRESHOLD = 2
def setUp(self):
self.dagbag = DagBag()
def test_import_dags(self):
self.assertFalse(
len(self.dagbag.import_errors),
'DAG import failures. Errors: {}'.format(
self.dagbag.import_errors
)
)
def test_alert_email_present(self):
for dag_id, dag in self.dagbag.dags.iteritems():
emails = dag.default_args.get('email', [])
msg = 'Alert email not set for DAG {id}'.format(id=dag_id)
self.assertIn('alert.email@gmail.com', emails, msg)
suite = unittest.TestLoader().loadTestsFromTestCase(TestDagIntegrity)
unittest.TextTestRunner(verbosity=2).run(suite)
Pipeline /definition test:
import unittest
from airflow.models import DagBag
class TestHelloWorldDAG(unittest.TestCase):
"""Check HelloWorldDAG expectation"""
def setUp(self):
self.dagbag = DagBag()
def test_task_count(self):
"""Check task count of hello_world dag"""
dag_id='hello_world'
dag = self.dagbag.get_dag(dag_id)
self.assertEqual(len(dag.tasks), 3)
def test_contain_tasks(self):
"""Check task contains in hello_world dag"""
dag_id='hello_world'
dag = self.dagbag.get_dag(dag_id)
tasks = dag.tasks
task_ids = list(map(lambda task: task.task_id, tasks))
self.assertListEqual(task_ids, ['dummy_task', 'multiplyby5_task','hello_task'])
def test_dependencies_of_dummy_task(self):
"""Check the task dependencies of dummy_task in hello_world dag"""
dag_id='hello_world'
dag = self.dagbag.get_dag(dag_id)
dummy_task = dag.get_task('dummy_task')
upstream_task_ids = list(map(lambda task: task.task_id, dummy_task.upstream_list))
self.assertListEqual(upstream_task_ids, [])
downstream_task_ids = list(map(lambda task: task.task_id, dummy_task.downstream_list))
self.assertListEqual(downstream_task_ids, ['hello_task', 'multiplyby5_task'])
def test_dependencies_of_hello_task(self):
"""Check the task dependencies of hello_task in hello_world dag"""
dag_id='hello_world'
dag = self.dagbag.get_dag(dag_id)
hello_task = dag.get_task('hello_task')
upstream_task_ids = list(map(lambda task: task.task_id, hello_task.upstream_list))
self.assertListEqual(upstream_task_ids, ['dummy_task'])
downstream_task_ids = list(map(lambda task: task.task_id, hello_task.downstream_list))
self.assertListEqual(downstream_task_ids, [])
suite = unittest.TestLoader().loadTestsFromTestCase(TestHelloWorldDAG)
unittest.TextTestRunner(verbosity=2).run(suite)
TestMultiplyBy5Operator:
import unittest
from datetime import datetime
from airflow import DAG
from airflow.models import TaskInstance
from airflow.operators import MultiplyBy5Operator
class TestMultiplyBy5Operator(unittest.TestCase):
def test_execute(self):
dag = DAG(dag_id='anydag', start_date=datetime.now())
task = MultiplyBy5Operator(my_operator_param=10, dag=dag, task_id='anytask')
ti = TaskInstance(task=task, execution_date=datetime.now())
result = task.execute(ti.get_template_context())
self.assertEqual(result, 50)
suite = unittest.TestLoader().loadTestsFromTestCase(TestMultiplyBy5Operator)
unittest.TextTestRunner(verbosity=2).run(suite)
https://medium.com/datareply/airflow-lesser-known-tips-tricks-and-best-practises-cf4d4a90f8f
https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Home
https://towardsdatascience.com/a-definitive-compilation-of-apache-airflow-resources-82bc4980c154
https://twitter.com/ApacheAirflow
https://github.com/jghoman/awesome-apache-airflow
https://www.reddit.com/r/dataengineering/
https://www.sicara.ai/blog/2019-01-28-automate-aws-tasks-thanks-to-airflow-hooks
https://medium.com/airbnb-engineering/airflow-a-workflow-management-platform-46318b977fd8 ‘Airflow: a workflow management platform’ by Maxime Beauchemin (Creator of Apache Airflow). Airbnb Engineering and Data Science. June, 2015.
https://medium.com/@r39132/apache-airflow-grows-up-c820ee8a8324 ‘Apache Airflow Grows Up!’ by Sid Anand (Chief Data Engineer, Paypal. Committer & PMC Member Apache Airflow). Medium. Jan, 2019.
‘Understanding Apache Airflow’s Key Concepts’ by Dustin Stansbury (Data Scientist, Quizlet). Medium (~ 1.8k +1's). May, 2017.
‘Airflow 101: How to start automating your data pipelines with Airflow’ by Sriram Baskaran (Program Director, Data Engineering Insight Data Science). Medium(~ 1.2k +1's). Oct, 2018.
[Video] ‘Best practices with Airflow- an open source platform for workflows & schedules’ by Maxime Beauchemin (Creator of Apache Airflow).
[Video] ‘Modern ETL-ing with Python and Airflow (and Spark)’ by Tamara Mendt (Data Engineer, HelloFresh). PyConDE 2017.
[Video] ‘A Practical Introduction to Airflow’ by Matt Davis (Data Platform Engineering at Clover). PyData SF 2016.
[Video] ‘Developing elegant workflows in Python code with Apache Airflow’ by Michael Karzynski (Tech Lead at Intel). EuroPython Conference. July 2017.
[Video] ‘How I learned to time travel, or, data pipelining and scheduling with Airflow’ by Laura Lorenz (Data & SWE at Industry Dive). PyData DC 2016.
Airflow in the Industry
‘Managing Uber’s Data Workflows at Scale’ by Alex Kira. Uber Data Engineering. Feb, 2019.
‘Productionizing ML with workflows at Twitter’ by Samuel Ngahane and Devin Goodsell. Twitter Engineering. June, 2018.
‘Apache Airflow at Pandora’ by Ace Haidrey. Pandora Engineering. Mar, 2018.
‘Running Apache Airflow at Lyft’ by Tao Feng, Andrew Stahlman, and Junda Yang. Lyft Engineering. Dec, 2018.
‘Why Robinhood uses Airflow’ by Vineet Goel. Robinhood Engineering. May, 2017.
‘Collaboration between data engineers, data analysts and data scientists’ by Germain Tangus (Senior Data Engineer, Dailymotion). Dailymotion Engineering. May, 2019.
‘How Sift Trains Thousands of Models using Apache Airflow’ by Duy Tran. Sift Engineering. Mar, 2018.
‘Airflow, Meta Data Engineering, and a Data Platform for the World’s Largest Democracy’ by Vinayak Mehta. Socialcops Engineering. Aug, 2018.
‘Data Traffic Control with Apache Airflow’ by Nicolas Goll Perrier (Data Engineer, leboncoin). leboncoin Engineering Blog. Jan, 2019.
‘Airflow Part 2: Lessons Learned (at SnapTravel)’ by Nehil Jain. SnapTravel Engineering. Jun, 2018.
‘Using Apache Airflow to Create Data Infrastructure in the Public Sector’ by Varun Adibhatla and Laurel Brunk. Astronomer.io. Oct, 2017.
‘Airflow at WePay’ by Chris Riccomini. WePay. Jul, 2016.
Airflow Distributed Deployment
‘How Apache Airflow Distributes Jobs on Celery workers’ by Hugo Lime (Data Scientist, Sicara AI and Big Data). Sicara Engineering. Apr, 2019.
‘A Guide On How To Build An Airflow Server/Cluster’ by Tianlon Song (Sr. Software Engineer, Machine Learning & Big Data at Zillow). Oct, 2016.
Testing
‘Data’s Inferno: 7 Circles of Data Testing Hell with Airflow’ by WB Advanced Analytics. Jan, 2018.
‘Testing in Airflow Part 1 — DAG Validation Tests, DAG Definition Tests and Unit Tests’ by Chandu Kavar on Medium ( ~1k +1's). Aug 2018.
Additional Reading
https://medium.com/bluecore-engineering/were-all-using-airflow-wrong-and-how-to-fix-it-a56f14cb0753 ‘We’re All Using Airflow Wrong and How to Fix It’ by Jessica Laughlin. Bluecore Engineering. Aug, 2018.