[GitHub] [airflow] mik-laj opened a new pull request #6905: [AIRFLOW-6361] Run LocalTaskJob directly in Celery task

2020-02-27 Thread GitBox
mik-laj opened a new pull request #6905: [AIRFLOW-6361] Run LocalTaskJob 
directly in Celery task
URL: https://github.com/apache/airflow/pull/6905
 
 
   Hello,
   
   The executor runs multiple processes to perform one task. Many processes 
have a very short life cycle, so the process of starting it is a significant 
overhead.
   
   Firstly, the Celery executor trigger Celery tasks - app.task. This task runs 
the CLI  command (first process), which contains LocalTaskJob. LocalTaskJob 
runs the separate command (second process) that executes user-code. This level 
of isolation is redundant because LocalTaskJob doesn't execute unsafe code. The 
first command is run by a new process creation, not by a fork, so this is an 
expensive operation. I suggest running code from the first process as part of 
the celery task to reduce the need to create new processes.
   
   The code currently uses CLIFactory to run the LocalTaskJob It is better to 
do this without unnecessary dependence on CLI, but it is a big change and I 
plan to do it in a separate PR.
   WIP PR: https://github.com/mik-laj/incubator-airflow/pull/10 (Travis green 
:-D )
   
   Performance benchmark: 
   ===
   Example DAG from Airflow with unneeded sleep instructions deleted.
   ```python
   """Example DAG demonstrating the usage of the BashOperator."""
   
   from datetime import timedelta
   
   import airflow
   from airflow.models import DAG
   from airflow.operators.bash_operator import BashOperator
   from airflow.operators.dummy_operator import DummyOperator
   
   args = {
   'owner': 'airflow',
   'start_date': airflow.utils.dates.days_ago(2),
   }
   
   dag = DAG(
   dag_id='example_bash_operator',
   default_args=args,
   schedule_interval='0 0 * * *',
   dagrun_timeout=timedelta(minutes=60),
   )
   
   run_this_last = DummyOperator(
   task_id='run_this_last',
   dag=dag,
   )
   
   # [START howto_operator_bash]
   run_this = BashOperator(
   task_id='run_after_loop',
   bash_command='echo 1',
   dag=dag,
   )
   # [END howto_operator_bash]
   
   run_this >> run_this_last
   
   for i in range(3):
   task = BashOperator(
   task_id='runme_' + str(i),
   bash_command='echo "{{ task_instance_key_str }}",
   dag=dag,
   )
   task >> run_this
   
   # [START howto_operator_bash_template]
   also_run_this = BashOperator(
   task_id='also_run_this',
   bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
   dag=dag,
   )
   # [END howto_operator_bash_template]
   also_run_this >> run_this_last
   
   if __name__ == "__main__":
   dag.cli()
   
   ```
   ```python
   import airflow
   from airflow import DAG
   from airflow.models import DagBag
   
   dagbag = airflow.models.DagBag()
   dag: DAG = dagbag.get_dag("example_bash_operator")
   
   dag.clear()
   dag.run()
   ```
   Environment: Brreze
   ```
   unset AIRFLOW__CORE__DAGS_FOLDER
   unset AIRFLOW__CORE__UNIT_TEST_MODE
   chmod -R 777 /root
   sudo -E su airflow
   export AIRFLOW__CORE__EXECUTOR="CeleryExecutor"
   export AIRFLOW__CELERY__BROKER_URL="redis://redis:6379/0"
   export AIRFLOW__CELERY__WORKER_CONCURRENCY=8
   seq 1 10 | xargs -n 1 -I {} bash -c "time python /files/benchmark_speed.py > 
/dev/null 2>&1" | grep '^(real\|user\|sys)';
   ```
   
   Result: 
   
   |Fn. | After | Before | Change|
   ||---||---|
   |AVERAGE | 56.48 | 38.32  | -32%  |
   |VAR | 23.60 | 0.04   | -98%  |
   |MAX | 68.29 | 38.68  | -43%  |
   |MIN | 53.26 | 38.08  | -28%  |
   |STDEV   | 4.86  | 0.19   | -96%. |
   
   Raw data
   After:
   ```
   real 0m38.394s
   user 0m4.340s
   sys 0m1.600s
   
   real 0m38.355s
   user 0m4.700s
   sys 0m1.340s
   
   real 0m38.675s
   user 0m4.760s
   sys 0m1.530s
   
   real 0m38.488s
   user 0m4.770s
   sys 0m1.280s
   
   real 0m38.434s
   user 0m4.600s
   sys 0m1.390s
   
   real 0m38.378s
   user 0m4.500s
   sys 0m1.270s
   
   real 0m38.106s
   user 0m4.200s
   sys 0m1.100s
   
   real 0m38.082s
   user 0m4.170s
   sys 0m1.030s
   
   real 0m38.173s
   user 0m4.290s
   sys 0m1.340s
   
   real 0m38.161s
   user 0m4.460s
   sys 0m1.370s
   ```
   
   Before:
   ```
   real 0m53.488s
   user 0m5.140s
   sys 0m1.700s
   
   real 1m8.288s
   user 0m6.430s
   sys 0m2.200s
   
   real 0m53.371s
   user 0m5.330s
   sys 0m1.630s
   
   real 0m58.939s
   user 0m6.470s
   sys 0m1.730s
   
   real 0m53.255s
   user 0m4.950s
   sys 0m1.640s
   
   real 0m58.802s
   user 0m5.970s
   sys 0m1.790s
   
   real 0m58.449s
   user 0m5.380s
   sys 0m1.580s
   
   real 0m53.308s
   user 0m5.120s
   sys 0m1.430s
   
   real 0m53.485s
   user 0m5.220s
   sys 0m1.290s
   
   real 0m53.387s
   user 0m5.020s
   sys 0m1.590s
   ```
   
   ---
   Link to JIRA issue: https://issues.apache.org/jira/browse/AIRFLOW-6361
   
   - [x] Description above provides context of the change
   - [x] Commit message starts with 

[GitHub] [airflow] mik-laj opened a new pull request #6905: [AIRFLOW-6361] Run LocalTaskJob directly in Celery task

2019-12-26 Thread GitBox
mik-laj opened a new pull request #6905: [AIRFLOW-6361] Run LocalTaskJob 
directly in Celery task
URL: https://github.com/apache/airflow/pull/6905
 
 
   Hello,
   
   Celery runs the CLI first command, which contains LocalTaskJob. LocalTaskJob 
is responsible for starting the next user-code process. This level of isolation 
is redundant because LocalTaskJob doesn't execute unsafe code. The first 
command is run by a new process creation, not by a fork, so this is an 
expensive operation.
   
   According to preliminary measurements, this change results in an increase in 
performance close to 30%.
   
   I will provide more information in PR.
   
   Best regards
   Kamil Bregula
   
   After:
   ```
   real 0m38.394s
   user 0m4.340s
   sys 0m1.600s
   
   real 0m38.355s
   user 0m4.700s
   sys 0m1.340s
   
   real 0m38.675s
   user 0m4.760s
   sys 0m1.530s
   
   real 0m38.488s
   user 0m4.770s
   sys 0m1.280s
   
   real 0m38.434s
   user 0m4.600s
   sys 0m1.390s
   
   real 0m38.378s
   user 0m4.500s
   sys 0m1.270s
   
   real 0m38.106s
   user 0m4.200s
   sys 0m1.100s
   
   real 0m38.082s
   user 0m4.170s
   sys 0m1.030s
   
   real 0m38.173s
   user 0m4.290s
   sys 0m1.340s
   
   real 0m38.161s
   user 0m4.460s
   sys 0m1.370s
   ```
   
   Before:
   ```
   real 0m53.488s
   user 0m5.140s
   sys 0m1.700s
   
   real 1m8.288s
   user 0m6.430s
   sys 0m2.200s
   
   real 0m53.371s
   user 0m5.330s
   sys 0m1.630s
   
   real 0m58.939s
   user 0m6.470s
   sys 0m1.730s
   
   real 0m53.255s
   user 0m4.950s
   sys 0m1.640s
   
   real 0m58.802s
   user 0m5.970s
   sys 0m1.790s
   
   real 0m58.449s
   user 0m5.380s
   sys 0m1.580s
   
   real 0m53.308s
   user 0m5.120s
   sys 0m1.430s
   
   real 0m53.485s
   user 0m5.220s
   sys 0m1.290s
   
   real 0m53.387s
   user 0m5.020s
   sys 0m1.590s
   ```
   
   Make sure you have checked _all_ steps below.
   
   ### Jira
   
   - [ ] My PR addresses the following [Airflow 
Jira](https://issues.apache.org/jira/browse/AIRFLOW/) issues and references 
them in the PR title. For example, "\[AIRFLOW-XXX\] My Airflow PR"
 - https://issues.apache.org/jira/browse/AIRFLOW-6361
 - In case you are fixing a typo in the documentation you can prepend your 
commit with \[AIRFLOW-XXX\], code changes always need a Jira issue.
 - In case you are proposing a fundamental code change, you need to create 
an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)).
 - In case you are adding a dependency, check if the license complies with 
the [ASF 3rd Party License 
Policy](https://www.apache.org/legal/resolved.html#category-x).
   
   ### Description
   
   - [ ] Here are some details about my PR, including screenshots of any UI 
changes:
   
   ### Tests
   
   - [ ] My PR adds the following unit tests __OR__ does not need testing for 
this extremely good reason:
   
   ### Commits
   
   - [ ] My commits all reference Jira issues in their subject lines, and I 
have squashed multiple commits if they address the same issue. In addition, my 
commits follow the guidelines from "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)":
 1. Subject is separated from body by a blank line
 1. Subject is limited to 50 characters (not including Jira issue reference)
 1. Subject does not end with a period
 1. Subject uses the imperative mood ("add", not "adding")
 1. Body wraps at 72 characters
 1. Body explains "what" and "why", not "how"
   
   ### Documentation
   
   - [ ] In case of new functionality, my PR adds documentation that describes 
how to use it.
 - All the public functions and the classes in the PR contain docstrings 
that explain what it does
 - If you implement backwards incompatible changes, please leave a note in 
the [Updating.md](https://github.com/apache/airflow/blob/master/UPDATING.md) so 
we can assign it to a appropriate release
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services