This is an automated email from the ASF dual-hosted git repository. husseinawala pushed a commit to branch feat/max_active_tis_per_dagrun in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 04cb0da4b57255376c7c66276d7ab4eb8094034f Merge: 4081117d06 058b6ebdd8 Author: Hussein Awala <houssein.awala...@gmail.com> AuthorDate: Fri Apr 14 03:18:42 2023 +0200 Merge branch 'main' into feat/max_active_tis_per_dagrun .asf.yaml | 2 - .github/workflows/ci.yml | 7 +- .pre-commit-config.yaml | 10 +- BREEZE.rst | 14 +- CONTRIBUTORS_QUICK_START.rst | 10 +- Dockerfile.ci | 10 +- STATIC_CODE_CHECKS.rst | 34 +- airflow/api_connexion/endpoints/health_endpoint.py | 4 +- airflow/api_connexion/schemas/job_schema.py | 4 +- airflow/api_internal/endpoints/rpc_api_endpoint.py | 4 +- airflow/cli/commands/dag_command.py | 10 +- airflow/cli/commands/dag_processor_command.py | 32 +- airflow/cli/commands/jobs_command.py | 17 +- airflow/cli/commands/scheduler_command.py | 22 +- airflow/cli/commands/standalone_command.py | 8 +- airflow/cli/commands/task_command.py | 13 +- airflow/cli/commands/triggerer_command.py | 11 +- airflow/dag_processing/manager.py | 24 +- airflow/dag_processing/processor.py | 2 +- airflow/executors/celery_executor.py | 2 +- airflow/executors/kubernetes_executor.py | 21 +- airflow/jobs/JOB_LIFECYCLE.md | 158 +++ .../{backfill_job.py => backfill_job_runner.py} | 22 +- airflow/jobs/{job_runner.py => base_job_runner.py} | 9 +- ...rocessor_job.py => dag_processor_job_runner.py} | 24 +- airflow/jobs/{base_job.py => job.py} | 198 +-- ...{local_task_job.py => local_task_job_runner.py} | 26 +- .../{scheduler_job.py => scheduler_job_runner.py} | 45 +- .../{triggerer_job.py => triggerer_job_runner.py} | 24 +- .../0093_2_2_0_taskinstance_keyed_to_dagrun.py | 2 +- ...3_0_migrate_rtif_to_use_run_id_and_map_index.py | 2 +- .../0105_2_3_0_add_map_index_to_taskfail.py | 2 +- airflow/models/__init__.py | 2 +- airflow/models/dag.py | 52 +- airflow/models/dagbag.py | 21 +- airflow/models/dagrun.py | 2 +- airflow/models/serialized_dag.py | 18 + airflow/models/taskinstance.py | 8 +- airflow/models/trigger.py | 14 +- airflow/operators/trigger_dagrun.py | 11 +- airflow/providers/amazon/aws/hooks/batch_client.py | 87 +- airflow/providers/amazon/aws/operators/batch.py | 102 +- airflow/providers/amazon/aws/operators/ecs.py | 19 +- airflow/providers/databricks/provider.yaml | 4 + .../databricks/sensors/__init__.py} | 26 - .../providers/databricks/sensors/databricks_sql.py | 134 ++ airflow/providers/google/CHANGELOG.rst | 1 + airflow/providers/microsoft/azure/CHANGELOG.rst | 2 + airflow/providers/mysql/hooks/mysql.py | 18 +- airflow/providers/mysql/provider.yaml | 6 +- airflow/providers/sqlite/hooks/sqlite.py | 19 +- airflow/serialization/enums.py | 2 +- .../serialization/pydantic/{base_job.py => job.py} | 6 +- airflow/serialization/serialized_objects.py | 10 +- airflow/task/task_runner/__init__.py | 4 +- airflow/task/task_runner/base_task_runner.py | 23 +- airflow/task/task_runner/cgroup_task_runner.py | 6 +- airflow/task/task_runner/standard_task_runner.py | 8 +- airflow/triggers/external_task.py | 2 +- airflow/utils/db.py | 4 +- airflow/utils/scheduler_health.py | 8 +- airflow/utils/task_group.py | 9 +- airflow/www/extensions/init_views.py | 17 + airflow/www/package.json | 3 +- airflow/www/utils.py | 21 +- airflow/www/views.py | 10 +- airflow/www/yarn.lock | 372 ++++-- chart/Chart.yaml | 191 +-- chart/RELEASE_NOTES.rst | 70 ++ chart/newsfragments/30054.significant.rst | 4 - chart/newsfragments/30411.significant.rst | 3 - chart/templates/_helpers.yaml | 20 +- .../templates/pgbouncer/pgbouncer-deployment.yaml | 3 + chart/values.schema.json | 25 + chart/values.yaml | 7 + dev/README_RELEASE_HELM_CHART.md | 2 + dev/breeze/src/airflow_breeze/pre_commit_ids.py | 10 +- dev/perf/scheduler_dag_execution_timing.py | 24 +- dev/perf/sql_queries.py | 7 +- docker_tests/test_prod_image.py | 4 +- .../operators/sql.rst | 42 +- docs/apache-airflow-providers-google/commits.rst | 2 + .../operators/cloud/dataform.rst | 4 +- .../operators/cloud/dataproc.rst | 2 +- .../commits.rst | 10 +- .../connections/sqlite.rst | 53 +- .../logging-monitoring/check-health.rst | 2 +- .../logging-monitoring/metrics.rst | 1 - .../dagfile-processing.rst | 1 + .../authoring-and-scheduling/datasets.rst | 4 +- docs/apache-airflow/howto/operator/python.rst | 10 +- docs/apache-airflow/howto/set-up-database.rst | 8 +- docs/apache-airflow/img/airflow_erd.sha256 | 2 +- docs/apache-airflow/project.rst | 1 + generated/provider_dependencies.json | 1 - images/breeze/output-commands-hash.txt | 2 +- images/breeze/output_static-checks.svg | 108 +- images/quick_start/mysql_connection.png | Bin 99933 -> 0 bytes images/quick_start/postgresql_connection.png | Bin 0 -> 240443 bytes newsfragments/30374.significant.rst | 5 + .../pre_commit_check_pre_commit_hooks.py | 2 +- scripts/docker/entrypoint_ci.sh | 10 +- scripts/in_container/verify_providers.py | 4 +- .../endpoints/test_health_endpoint.py | 30 +- .../endpoints/test_task_instance_endpoint.py | 15 +- tests/charts/test_airflow_common.py | 43 +- tests/charts/test_annotations.py | 14 + tests/cli/commands/test_dag_processor_command.py | 4 +- tests/cli/commands/test_jobs_command.py | 73 +- tests/cli/commands/test_task_command.py | 9 +- tests/cli/commands/test_triggerer_command.py | 2 +- tests/core/test_impersonation_tests.py | 8 +- .../{test_manager.py => test_job_runner.py} | 668 +++++----- tests/executors/test_dask_executor.py | 21 +- tests/jobs/test_backfill_job.py | 772 ++++++------ tests/jobs/test_base_job.py | 75 +- tests/jobs/test_local_task_job.py | 260 ++-- tests/jobs/test_scheduler_job.py | 1296 +++++++++++--------- tests/jobs/test_triggerer_job.py | 141 ++- tests/jobs/test_triggerer_job_logging.py | 46 +- tests/listeners/test_listeners.py | 7 +- tests/models/test_dagbag.py | 48 + tests/models/test_trigger.py | 17 +- tests/operators/test_trigger_dagrun.py | 40 +- .../amazon/aws/hooks/test_batch_client.py | 74 +- tests/providers/amazon/aws/operators/test_batch.py | 86 +- tests/providers/amazon/aws/operators/test_ecs.py | 4 +- tests/providers/amazon/aws/operators/test_sqs.py | 4 +- .../providers/databricks/sensors/__init__.py | 26 - .../databricks/sensors/test_databricks_sql.py | 97 ++ tests/providers/mysql/hooks/test_mysql.py | 57 - .../mysql/hooks/test_mysql_connector_python.py | 86 ++ tests/providers/sqlite/hooks/test_sqlite.py | 26 +- tests/serialization/test_pydantic_models.py | 13 +- .../databricks/example_databricks_sensors.py | 86 ++ tests/task/task_runner/test_base_task_runner.py | 10 +- tests/task/task_runner/test_cgroup_task_runner.py | 11 +- .../task/task_runner/test_standard_task_runner.py | 151 ++- tests/task/task_runner/test_task_runner.py | 18 +- tests/test_utils/db.py | 6 +- tests/utils/test_helpers.py | 10 +- tests/utils/test_log_handlers.py | 8 +- tests/www/views/test_views_base.py | 40 +- 143 files changed, 4231 insertions(+), 2639 deletions(-) diff --cc airflow/jobs/scheduler_job_runner.py index e725695254,706d980253..a39b879ede --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@@ -326,9 -304,12 +329,8 @@@ class SchedulerJobRunner(BaseJobRunner starved_pools = {pool_name for pool_name, stats in pools.items() if stats["open"] <= 0} # dag_id to # of running tasks and (dag_id, task_id) to # of running tasks. - dag_active_tasks_map: DefaultDict[str, int] - task_concurrency_map: DefaultDict[tuple[str, str], int] - dag_active_tasks_map, task_concurrency_map = self.__get_concurrency_maps( - states=list(EXECUTION_STATES), session=session - ) + concurrency_map = self.__get_concurrency_maps(states=list(EXECUTION_STATES), session=session) - num_tasks_in_executor = 0 # Number of tasks that cannot be scheduled because of no open slot in pool num_starving_tasks_total = 0 diff --cc tests/jobs/test_backfill_job.py index c5f415c9b7,7001f6f1b9..164d5af1df --- a/tests/jobs/test_backfill_job.py +++ b/tests/jobs/test_backfill_job.py @@@ -362,89 -358,6 +359,88 @@@ class TestBackfillJob assert task_concurrency_limit_reached_at_least_once times_dag_concurrency_limit_reached_in_debug = self._times_called_with( + mock_log.debug, + DagConcurrencyLimitReached, + ) + + times_pool_limit_reached_in_debug = self._times_called_with( + mock_log.debug, + NoAvailablePoolSlot, + ) + + times_task_concurrency_limit_reached_in_debug = self._times_called_with( + mock_log.debug, + TaskConcurrencyLimitReached, + ) + + assert 0 == times_pool_limit_reached_in_debug + assert 0 == times_dag_concurrency_limit_reached_in_debug + assert times_task_concurrency_limit_reached_in_debug > 0 + + @pytest.mark.parametrize("with_max_active_tis_per_dag", [False, True]) - @patch("airflow.jobs.backfill_job.BackfillJobRunner.log") ++ @patch("airflow.jobs.backfill_job_runner.BackfillJobRunner.log") + def test_backfill_respect_max_active_tis_per_dagrun_limit( + self, mock_log, dag_maker, with_max_active_tis_per_dag + ): + max_active_tis_per_dag = 3 + max_active_tis_per_dagrun = 2 + kwargs = {"max_active_tis_per_dagrun": max_active_tis_per_dagrun} + if with_max_active_tis_per_dag: + kwargs["max_active_tis_per_dag"] = max_active_tis_per_dag + + with dag_maker(dag_id="test_backfill_respect_max_active_tis_per_dag_limit", schedule="@daily") as dag: + EmptyOperator.partial(task_id="task1", **kwargs).expand_kwargs([{"x": i} for i in range(10)]) + + dag_maker.create_dagrun(state=None) + + executor = MockExecutor() + - job = BaseJob( - job_runner=BackfillJobRunner( - dag=dag, - start_date=DEFAULT_DATE, - end_date=DEFAULT_DATE + datetime.timedelta(days=7), - ), - executor=executor, ++ job = Job(executor=executor) ++ job_runner = BackfillJobRunner( ++ job=job, ++ dag=dag, ++ start_date=DEFAULT_DATE, ++ end_date=DEFAULT_DATE + datetime.timedelta(days=7), + ) + - job.run() ++ run_job(job=job, execute_callable=job_runner._execute) + + assert len(executor.history) > 0 + + task_concurrency_limit_reached_at_least_once = False + + def get_running_tis_per_dagrun(running_tis): + running_tis_per_dagrun_dict = defaultdict(int) + for running_ti in running_tis: + running_tis_per_dagrun_dict[running_ti[3].dag_run.id] += 1 + return running_tis_per_dagrun_dict + + num_running_task_instances = 0 + for running_task_instances in executor.history: + if with_max_active_tis_per_dag: + assert len(running_task_instances) <= max_active_tis_per_dag + running_tis_per_dagrun_dict = get_running_tis_per_dagrun(running_task_instances) + assert all( + [ + num_running_tis <= max_active_tis_per_dagrun + for num_running_tis in running_tis_per_dagrun_dict.values() + ] + ) + num_running_task_instances += len(running_task_instances) + task_concurrency_limit_reached_at_least_once = ( + task_concurrency_limit_reached_at_least_once + or any( + [ + num_running_tis == max_active_tis_per_dagrun + for num_running_tis in running_tis_per_dagrun_dict.values() + ] + ) + ) + + assert 80 == num_running_task_instances # (7 backfill run + 1 manual run ) * 10 mapped task per run + assert task_concurrency_limit_reached_at_least_once + + times_dag_concurrency_limit_reached_in_debug = self._times_called_with( mock_log.debug, DagConcurrencyLimitReached, ) diff --cc tests/jobs/test_scheduler_job.py index c1f9f37d6e,4870a2aa13..fafc5ee2b4 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@@ -1238,66 -1264,6 +1264,68 @@@ class TestSchedulerJob session.rollback() + def test_find_executable_task_instances_task_concurrency_per_dagrun_for_first(self, dag_maker): - self.scheduler_job = BaseJob(job_runner=SchedulerJobRunner(subdir=os.devnull)) ++ scheduler_job = Job() ++ self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull) + session = settings.Session() + + dag_id = "SchedulerJobTest.test_find_executable_task_instances_task_concurrency_per_dagrun_for_first" + + with dag_maker(dag_id=dag_id): + op1a = EmptyOperator(task_id="dummy1-a", priority_weight=2, max_active_tis_per_dagrun=1) + op1b = EmptyOperator(task_id="dummy1-b", priority_weight=1) + dr1 = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED) + dr2 = dag_maker.create_dagrun_after(dr1, run_type=DagRunType.SCHEDULED) + + ti1a = dr1.get_task_instance(op1a.task_id, session) + ti1b = dr1.get_task_instance(op1b.task_id, session) + ti2a = dr2.get_task_instance(op1a.task_id, session) + ti1a.state = State.RUNNING + ti1b.state = State.SCHEDULED + ti2a.state = State.SCHEDULED + session.flush() + + # Schedule ti with higher priority, + # because it's running in a different DAG run with 0 active tis - res = self.scheduler_job.job_runner._executable_task_instances_to_queued(max_tis=1, session=session) ++ res = self.job_runner._executable_task_instances_to_queued(max_tis=1, session=session) + assert 1 == len(res) + assert res[0].key == ti2a.key + + session.rollback() + + def test_find_executable_task_instances_not_enough_task_concurrency_per_dagrun_for_first(self, dag_maker): - self.scheduler_job = BaseJob(job_runner=SchedulerJobRunner(subdir=os.devnull)) ++ scheduler_job = Job() ++ self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull) + session = settings.Session() + + dag_id = ( + "SchedulerJobTest" + ".test_find_executable_task_instances_not_enough_task_concurrency_per_dagrun_for_first" + ) + + with dag_maker(dag_id=dag_id): + op1a = EmptyOperator.partial( + task_id="dummy1-a", priority_weight=2, max_active_tis_per_dagrun=1 + ).expand_kwargs([{"inputs": 1}, {"inputs": 2}]) + op1b = EmptyOperator(task_id="dummy1-b", priority_weight=1) + dr = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED) + + ti1a0 = dr.get_task_instance(op1a.task_id, session, map_index=0) + ti1a1 = dr.get_task_instance(op1a.task_id, session, map_index=1) + ti1b = dr.get_task_instance(op1b.task_id, session) + ti1a0.state = State.RUNNING + ti1a1.state = State.SCHEDULED + ti1b.state = State.SCHEDULED + session.flush() + + # Schedule ti with lower priority, + # because the one with higher priority is limited by a concurrency limit - res = self.scheduler_job.job_runner._executable_task_instances_to_queued(max_tis=1, session=session) ++ res = self.job_runner._executable_task_instances_to_queued(max_tis=1, session=session) + assert 1 == len(res) + assert res[0].key == ti1b.key + + session.rollback() + def test_find_executable_task_instances_negative_open_pool_slots(self, dag_maker): """ Pools with negative open slots should not block other pools. @@@ -4090,52 -4117,6 +4181,54 @@@ with create_session() as session: ti = dr.get_task_instances(session=session)[0] + ti.state = state + ti.start_date = start_date + ti.end_date = end_date + - self.scheduler_job.job_runner._schedule_dag_run(dr, session) ++ self.job_runner._schedule_dag_run(dr, session) + assert session.query(TaskInstance).filter_by(state=State.SCHEDULED).count() == 1 + + session.refresh(ti) + assert ti.state == State.SCHEDULED + + @pytest.mark.parametrize( + "state,start_date,end_date", + [ + [State.NONE, None, None], + [ + State.UP_FOR_RETRY, + timezone.utcnow() - datetime.timedelta(minutes=30), + timezone.utcnow() - datetime.timedelta(minutes=15), + ], + [ + State.UP_FOR_RESCHEDULE, + timezone.utcnow() - datetime.timedelta(minutes=30), + timezone.utcnow() - datetime.timedelta(minutes=15), + ], + ], + ) + def test_dag_file_processor_process_task_instances_with_max_active_tis_per_dagrun( + self, state, start_date, end_date, dag_maker + ): + """ + Test if _process_task_instances puts the right task instances into the + mock_list. + """ + with dag_maker(dag_id="test_scheduler_process_execute_task_with_max_active_tis_per_dagrun"): + BashOperator(task_id="dummy", max_active_tis_per_dagrun=2, bash_command="echo Hi") + - self.scheduler_job = BaseJob(job_runner=SchedulerJobRunner(subdir=os.devnull)) - self.scheduler_job.processor_agent = mock.MagicMock() ++ scheduler_job = Job() ++ self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull) ++ ++ self.job_runner.processor_agent = mock.MagicMock() + + dr = dag_maker.create_dagrun( + run_type=DagRunType.SCHEDULED, + ) + assert dr is not None + + with create_session() as session: + ti = dr.get_task_instances(session=session)[0] ti.state = state ti.start_date = start_date ti.end_date = end_date