[ https://issues.apache.org/jira/browse/AIRFLOW-6361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17046846#comment-17046846 ]
ASF GitHub Bot commented on AIRFLOW-6361: ----------------------------------------- mik-laj commented on pull request #6905: [AIRFLOW-6361] Run LocalTaskJob directly in Celery task URL: https://github.com/apache/airflow/pull/6905 Hello, The executor runs multiple processes to perform one task. Many processes have a very short life cycle, so the process of starting it is a significant overhead. Firstly, the Celery executor trigger Celery tasks - app.task. This task runs the CLI command (first process), which contains LocalTaskJob. LocalTaskJob runs the separate command (second process) that executes user-code. This level of isolation is redundant because LocalTaskJob doesn't execute unsafe code. The first command is run by a new process creation, not by a fork, so this is an expensive operation. I suggest running code from the first process as part of the celery task to reduce the need to create new processes. The code currently uses CLIFactory to run the LocalTaskJob It is better to do this without unnecessary dependence on CLI, but it is a big change and I plan to do it in a separate PR. WIP PR: https://github.com/mik-laj/incubator-airflow/pull/10 (Travis green :-D ) Performance benchmark: =================== Example DAG from Airflow with unneeded sleep instructions deleted. ```python """Example DAG demonstrating the usage of the BashOperator.""" from datetime import timedelta import airflow from airflow.models import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.dummy_operator import DummyOperator args = { 'owner': 'airflow', 'start_date': airflow.utils.dates.days_ago(2), } dag = DAG( dag_id='example_bash_operator', default_args=args, schedule_interval='0 0 * * *', dagrun_timeout=timedelta(minutes=60), ) run_this_last = DummyOperator( task_id='run_this_last', dag=dag, ) # [START howto_operator_bash] run_this = BashOperator( task_id='run_after_loop', bash_command='echo 1', dag=dag, ) # [END howto_operator_bash] run_this >> run_this_last for i in range(3): task = BashOperator( task_id='runme_' + str(i), bash_command='echo "{{ task_instance_key_str }}", dag=dag, ) task >> run_this # [START howto_operator_bash_template] also_run_this = BashOperator( task_id='also_run_this', bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"', dag=dag, ) # [END howto_operator_bash_template] also_run_this >> run_this_last if __name__ == "__main__": dag.cli() ``` ```python import airflow from airflow import DAG from airflow.models import DagBag dagbag = airflow.models.DagBag() dag: DAG = dagbag.get_dag("example_bash_operator") dag.clear() dag.run() ``` Environment: Brreze ``` unset AIRFLOW__CORE__DAGS_FOLDER unset AIRFLOW__CORE__UNIT_TEST_MODE chmod -R 777 /root sudo -E su airflow export AIRFLOW__CORE__EXECUTOR="CeleryExecutor" export AIRFLOW__CELERY__BROKER_URL="redis://redis:6379/0" export AIRFLOW__CELERY__WORKER_CONCURRENCY=8 seq 1 10 | xargs -n 1 -I {} bash -c "time python /files/benchmark_speed.py > /dev/null 2>&1" | grep '^(real\|user\|sys)'; ``` Result: |Fn. | After | Before | Change| |--------|-------|--------|-------| |AVERAGE | 56.48 | 38.32 | -32% | |VAR | 23.60 | 0.04 | -98% | |MAX | 68.29 | 38.68 | -43% | |MIN | 53.26 | 38.08 | -28% | |STDEV | 4.86 | 0.19 | -96%. | Raw data After: ``` real 0m38.394s user 0m4.340s sys 0m1.600s real 0m38.355s user 0m4.700s sys 0m1.340s real 0m38.675s user 0m4.760s sys 0m1.530s real 0m38.488s user 0m4.770s sys 0m1.280s real 0m38.434s user 0m4.600s sys 0m1.390s real 0m38.378s user 0m4.500s sys 0m1.270s real 0m38.106s user 0m4.200s sys 0m1.100s real 0m38.082s user 0m4.170s sys 0m1.030s real 0m38.173s user 0m4.290s sys 0m1.340s real 0m38.161s user 0m4.460s sys 0m1.370s ``` Before: ``` real 0m53.488s user 0m5.140s sys 0m1.700s real 1m8.288s user 0m6.430s sys 0m2.200s real 0m53.371s user 0m5.330s sys 0m1.630s real 0m58.939s user 0m6.470s sys 0m1.730s real 0m53.255s user 0m4.950s sys 0m1.640s real 0m58.802s user 0m5.970s sys 0m1.790s real 0m58.449s user 0m5.380s sys 0m1.580s real 0m53.308s user 0m5.120s sys 0m1.430s real 0m53.485s user 0m5.220s sys 0m1.290s real 0m53.387s user 0m5.020s sys 0m1.590s ``` --- Link to JIRA issue: https://issues.apache.org/jira/browse/AIRFLOW-6361 - [x] Description above provides context of the change - [x] Commit message starts with `[AIRFLOW-NNNN]`, where AIRFLOW-NNNN = JIRA ID* - [x] Unit tests coverage for changes (not needed for documentation changes) - [x] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)" - [x] Relevant documentation is updated including usage instructions. - [x] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example). (*) For document-only changes, no JIRA issue is needed. Commit message starts `[AIRFLOW-XXXX]`. --- In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed. In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md). Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Run LocalTaskJob directly in Celery task > ---------------------------------------- > > Key: AIRFLOW-6361 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6361 > Project: Apache Airflow > Issue Type: Improvement > Components: executors > Affects Versions: 1.10.6 > Reporter: Kamil Bregula > Priority: Major > Labels: performance > > Hello, > Celery runs the CLI first command, which contains LocalTaskJob. LocalTaskJob > is responsible for starting the next user-code process. This level of > isolation is redundant because LocalTaskJob doesn't execute unsafe code. The > first command is run by a new process creation, not by a fork, so this is an > expensive operation. > According to preliminary measurements, this change results in an increase in > performance close to 30%. > I will provide more information in PR. > Best regards > Kamil Bregula > After: > ``` > real 0m38.394s > user 0m4.340s > sys 0m1.600s > real 0m38.355s > user 0m4.700s > sys 0m1.340s > real 0m38.675s > user 0m4.760s > sys 0m1.530s > real 0m38.488s > user 0m4.770s > sys 0m1.280s > real 0m38.434s > user 0m4.600s > sys 0m1.390s > real 0m38.378s > user 0m4.500s > sys 0m1.270s > real 0m38.106s > user 0m4.200s > sys 0m1.100s > real 0m38.082s > user 0m4.170s > sys 0m1.030s > real 0m38.173s > user 0m4.290s > sys 0m1.340s > real 0m38.161s > user 0m4.460s > sys 0m1.370s > ``` > Before: > ``` > real 0m53.488s > user 0m5.140s > sys 0m1.700s > real 1m8.288s > user 0m6.430s > sys 0m2.200s > real 0m53.371s > user 0m5.330s > sys 0m1.630s > real 0m58.939s > user 0m6.470s > sys 0m1.730s > real 0m53.255s > user 0m4.950s > sys 0m1.640s > real 0m58.802s > user 0m5.970s > sys 0m1.790s > real 0m58.449s > user 0m5.380s > sys 0m1.580s > real 0m53.308s > user 0m5.120s > sys 0m1.430s > real 0m53.485s > user 0m5.220s > sys 0m1.290s > real 0m53.387s > user 0m5.020s > sys 0m1.590s > ``` -- This message was sent by Atlassian Jira (v8.3.4#803005)