mik-laj opened a new pull request #6905: [AIRFLOW-6361] Run LocalTaskJob
directly in Celery task
URL: https://github.com/apache/airflow/pull/6905
Hello,
The executor runs multiple processes to perform one task. Many processes
have a very short life cycle, so the process of starting it is a significant
overhead.
Firstly, the Celery executor trigger Celery tasks - app.task. This task runs
the CLI command (first process), which contains LocalTaskJob. LocalTaskJob
runs the separate command (second process) that executes user-code. This level
of isolation is redundant because LocalTaskJob doesn't execute unsafe code. The
first command is run by a new process creation, not by a fork, so this is an
expensive operation. I suggest running code from the first process as part of
the celery task to reduce the need to create new processes.
The code currently uses CLIFactory to run the LocalTaskJob It is better to
do this without unnecessary dependence on CLI, but it is a big change and I
plan to do it in a separate PR.
WIP PR: https://github.com/mik-laj/incubator-airflow/pull/10 (Travis green
:-D )
Performance benchmark:
===
Example DAG from Airflow with unneeded sleep instructions deleted.
```python
"""Example DAG demonstrating the usage of the BashOperator."""
from datetime import timedelta
import airflow
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2),
}
dag = DAG(
dag_id='example_bash_operator',
default_args=args,
schedule_interval='0 0 * * *',
dagrun_timeout=timedelta(minutes=60),
)
run_this_last = DummyOperator(
task_id='run_this_last',
dag=dag,
)
# [START howto_operator_bash]
run_this = BashOperator(
task_id='run_after_loop',
bash_command='echo 1',
dag=dag,
)
# [END howto_operator_bash]
run_this >> run_this_last
for i in range(3):
task = BashOperator(
task_id='runme_' + str(i),
bash_command='echo "{{ task_instance_key_str }}",
dag=dag,
)
task >> run_this
# [START howto_operator_bash_template]
also_run_this = BashOperator(
task_id='also_run_this',
bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
dag=dag,
)
# [END howto_operator_bash_template]
also_run_this >> run_this_last
if __name__ == "__main__":
dag.cli()
```
```python
import airflow
from airflow import DAG
from airflow.models import DagBag
dagbag = airflow.models.DagBag()
dag: DAG = dagbag.get_dag("example_bash_operator")
dag.clear()
dag.run()
```
Environment: Brreze
```
unset AIRFLOW__CORE__DAGS_FOLDER
unset AIRFLOW__CORE__UNIT_TEST_MODE
chmod -R 777 /root
sudo -E su airflow
export AIRFLOW__CORE__EXECUTOR="CeleryExecutor"
export AIRFLOW__CELERY__BROKER_URL="redis://redis:6379/0"
export AIRFLOW__CELERY__WORKER_CONCURRENCY=8
seq 1 10 | xargs -n 1 -I {} bash -c "time python /files/benchmark_speed.py >
/dev/null 2>&1" | grep '^(real\|user\|sys)';
```
Result:
|Fn. | After | Before | Change|
||---||---|
|AVERAGE | 56.48 | 38.32 | -32% |
|VAR | 23.60 | 0.04 | -98% |
|MAX | 68.29 | 38.68 | -43% |
|MIN | 53.26 | 38.08 | -28% |
|STDEV | 4.86 | 0.19 | -96%. |
Raw data
After:
```
real 0m38.394s
user 0m4.340s
sys 0m1.600s
real 0m38.355s
user 0m4.700s
sys 0m1.340s
real 0m38.675s
user 0m4.760s
sys 0m1.530s
real 0m38.488s
user 0m4.770s
sys 0m1.280s
real 0m38.434s
user 0m4.600s
sys 0m1.390s
real 0m38.378s
user 0m4.500s
sys 0m1.270s
real 0m38.106s
user 0m4.200s
sys 0m1.100s
real 0m38.082s
user 0m4.170s
sys 0m1.030s
real 0m38.173s
user 0m4.290s
sys 0m1.340s
real 0m38.161s
user 0m4.460s
sys 0m1.370s
```
Before:
```
real 0m53.488s
user 0m5.140s
sys 0m1.700s
real 1m8.288s
user 0m6.430s
sys 0m2.200s
real 0m53.371s
user 0m5.330s
sys 0m1.630s
real 0m58.939s
user 0m6.470s
sys 0m1.730s
real 0m53.255s
user 0m4.950s
sys 0m1.640s
real 0m58.802s
user 0m5.970s
sys 0m1.790s
real 0m58.449s
user 0m5.380s
sys 0m1.580s
real 0m53.308s
user 0m5.120s
sys 0m1.430s
real 0m53.485s
user 0m5.220s
sys 0m1.290s
real 0m53.387s
user 0m5.020s
sys 0m1.590s
```
---
Link to JIRA issue: https://issues.apache.org/jira/browse/AIRFLOW-6361
- [x] Description above provides context of the change
- [x] Commit message starts with