This is an automated email from the ASF dual-hosted git repository.

husseinawala pushed a commit to branch feat/max_active_tis_per_dagrun
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 04cb0da4b57255376c7c66276d7ab4eb8094034f
Merge: 4081117d06 058b6ebdd8
Author: Hussein Awala <houssein.awala...@gmail.com>
AuthorDate: Fri Apr 14 03:18:42 2023 +0200

    Merge branch 'main' into feat/max_active_tis_per_dagrun

 .asf.yaml                                          |    2 -
 .github/workflows/ci.yml                           |    7 +-
 .pre-commit-config.yaml                            |   10 +-
 BREEZE.rst                                         |   14 +-
 CONTRIBUTORS_QUICK_START.rst                       |   10 +-
 Dockerfile.ci                                      |   10 +-
 STATIC_CODE_CHECKS.rst                             |   34 +-
 airflow/api_connexion/endpoints/health_endpoint.py |    4 +-
 airflow/api_connexion/schemas/job_schema.py        |    4 +-
 airflow/api_internal/endpoints/rpc_api_endpoint.py |    4 +-
 airflow/cli/commands/dag_command.py                |   10 +-
 airflow/cli/commands/dag_processor_command.py      |   32 +-
 airflow/cli/commands/jobs_command.py               |   17 +-
 airflow/cli/commands/scheduler_command.py          |   22 +-
 airflow/cli/commands/standalone_command.py         |    8 +-
 airflow/cli/commands/task_command.py               |   13 +-
 airflow/cli/commands/triggerer_command.py          |   11 +-
 airflow/dag_processing/manager.py                  |   24 +-
 airflow/dag_processing/processor.py                |    2 +-
 airflow/executors/celery_executor.py               |    2 +-
 airflow/executors/kubernetes_executor.py           |   21 +-
 airflow/jobs/JOB_LIFECYCLE.md                      |  158 +++
 .../{backfill_job.py => backfill_job_runner.py}    |   22 +-
 airflow/jobs/{job_runner.py => base_job_runner.py} |    9 +-
 ...rocessor_job.py => dag_processor_job_runner.py} |   24 +-
 airflow/jobs/{base_job.py => job.py}               |  198 +--
 ...{local_task_job.py => local_task_job_runner.py} |   26 +-
 .../{scheduler_job.py => scheduler_job_runner.py}  |   45 +-
 .../{triggerer_job.py => triggerer_job_runner.py}  |   24 +-
 .../0093_2_2_0_taskinstance_keyed_to_dagrun.py     |    2 +-
 ...3_0_migrate_rtif_to_use_run_id_and_map_index.py |    2 +-
 .../0105_2_3_0_add_map_index_to_taskfail.py        |    2 +-
 airflow/models/__init__.py                         |    2 +-
 airflow/models/dag.py                              |   52 +-
 airflow/models/dagbag.py                           |   21 +-
 airflow/models/dagrun.py                           |    2 +-
 airflow/models/serialized_dag.py                   |   18 +
 airflow/models/taskinstance.py                     |    8 +-
 airflow/models/trigger.py                          |   14 +-
 airflow/operators/trigger_dagrun.py                |   11 +-
 airflow/providers/amazon/aws/hooks/batch_client.py |   87 +-
 airflow/providers/amazon/aws/operators/batch.py    |  102 +-
 airflow/providers/amazon/aws/operators/ecs.py      |   19 +-
 airflow/providers/databricks/provider.yaml         |    4 +
 .../databricks/sensors/__init__.py}                |   26 -
 .../providers/databricks/sensors/databricks_sql.py |  134 ++
 airflow/providers/google/CHANGELOG.rst             |    1 +
 airflow/providers/microsoft/azure/CHANGELOG.rst    |    2 +
 airflow/providers/mysql/hooks/mysql.py             |   18 +-
 airflow/providers/mysql/provider.yaml              |    6 +-
 airflow/providers/sqlite/hooks/sqlite.py           |   19 +-
 airflow/serialization/enums.py                     |    2 +-
 .../serialization/pydantic/{base_job.py => job.py} |    6 +-
 airflow/serialization/serialized_objects.py        |   10 +-
 airflow/task/task_runner/__init__.py               |    4 +-
 airflow/task/task_runner/base_task_runner.py       |   23 +-
 airflow/task/task_runner/cgroup_task_runner.py     |    6 +-
 airflow/task/task_runner/standard_task_runner.py   |    8 +-
 airflow/triggers/external_task.py                  |    2 +-
 airflow/utils/db.py                                |    4 +-
 airflow/utils/scheduler_health.py                  |    8 +-
 airflow/utils/task_group.py                        |    9 +-
 airflow/www/extensions/init_views.py               |   17 +
 airflow/www/package.json                           |    3 +-
 airflow/www/utils.py                               |   21 +-
 airflow/www/views.py                               |   10 +-
 airflow/www/yarn.lock                              |  372 ++++--
 chart/Chart.yaml                                   |  191 +--
 chart/RELEASE_NOTES.rst                            |   70 ++
 chart/newsfragments/30054.significant.rst          |    4 -
 chart/newsfragments/30411.significant.rst          |    3 -
 chart/templates/_helpers.yaml                      |   20 +-
 .../templates/pgbouncer/pgbouncer-deployment.yaml  |    3 +
 chart/values.schema.json                           |   25 +
 chart/values.yaml                                  |    7 +
 dev/README_RELEASE_HELM_CHART.md                   |    2 +
 dev/breeze/src/airflow_breeze/pre_commit_ids.py    |   10 +-
 dev/perf/scheduler_dag_execution_timing.py         |   24 +-
 dev/perf/sql_queries.py                            |    7 +-
 docker_tests/test_prod_image.py                    |    4 +-
 .../operators/sql.rst                              |   42 +-
 docs/apache-airflow-providers-google/commits.rst   |    2 +
 .../operators/cloud/dataform.rst                   |    4 +-
 .../operators/cloud/dataproc.rst                   |    2 +-
 .../commits.rst                                    |   10 +-
 .../connections/sqlite.rst                         |   53 +-
 .../logging-monitoring/check-health.rst            |    2 +-
 .../logging-monitoring/metrics.rst                 |    1 -
 .../dagfile-processing.rst                         |    1 +
 .../authoring-and-scheduling/datasets.rst          |    4 +-
 docs/apache-airflow/howto/operator/python.rst      |   10 +-
 docs/apache-airflow/howto/set-up-database.rst      |    8 +-
 docs/apache-airflow/img/airflow_erd.sha256         |    2 +-
 docs/apache-airflow/project.rst                    |    1 +
 generated/provider_dependencies.json               |    1 -
 images/breeze/output-commands-hash.txt             |    2 +-
 images/breeze/output_static-checks.svg             |  108 +-
 images/quick_start/mysql_connection.png            |  Bin 99933 -> 0 bytes
 images/quick_start/postgresql_connection.png       |  Bin 0 -> 240443 bytes
 newsfragments/30374.significant.rst                |    5 +
 .../pre_commit_check_pre_commit_hooks.py           |    2 +-
 scripts/docker/entrypoint_ci.sh                    |   10 +-
 scripts/in_container/verify_providers.py           |    4 +-
 .../endpoints/test_health_endpoint.py              |   30 +-
 .../endpoints/test_task_instance_endpoint.py       |   15 +-
 tests/charts/test_airflow_common.py                |   43 +-
 tests/charts/test_annotations.py                   |   14 +
 tests/cli/commands/test_dag_processor_command.py   |    4 +-
 tests/cli/commands/test_jobs_command.py            |   73 +-
 tests/cli/commands/test_task_command.py            |    9 +-
 tests/cli/commands/test_triggerer_command.py       |    2 +-
 tests/core/test_impersonation_tests.py             |    8 +-
 .../{test_manager.py => test_job_runner.py}        |  668 +++++-----
 tests/executors/test_dask_executor.py              |   21 +-
 tests/jobs/test_backfill_job.py                    |  772 ++++++------
 tests/jobs/test_base_job.py                        |   75 +-
 tests/jobs/test_local_task_job.py                  |  260 ++--
 tests/jobs/test_scheduler_job.py                   | 1296 +++++++++++---------
 tests/jobs/test_triggerer_job.py                   |  141 ++-
 tests/jobs/test_triggerer_job_logging.py           |   46 +-
 tests/listeners/test_listeners.py                  |    7 +-
 tests/models/test_dagbag.py                        |   48 +
 tests/models/test_trigger.py                       |   17 +-
 tests/operators/test_trigger_dagrun.py             |   40 +-
 .../amazon/aws/hooks/test_batch_client.py          |   74 +-
 tests/providers/amazon/aws/operators/test_batch.py |   86 +-
 tests/providers/amazon/aws/operators/test_ecs.py   |    4 +-
 tests/providers/amazon/aws/operators/test_sqs.py   |    4 +-
 .../providers/databricks/sensors/__init__.py       |   26 -
 .../databricks/sensors/test_databricks_sql.py      |   97 ++
 tests/providers/mysql/hooks/test_mysql.py          |   57 -
 .../mysql/hooks/test_mysql_connector_python.py     |   86 ++
 tests/providers/sqlite/hooks/test_sqlite.py        |   26 +-
 tests/serialization/test_pydantic_models.py        |   13 +-
 .../databricks/example_databricks_sensors.py       |   86 ++
 tests/task/task_runner/test_base_task_runner.py    |   10 +-
 tests/task/task_runner/test_cgroup_task_runner.py  |   11 +-
 .../task/task_runner/test_standard_task_runner.py  |  151 ++-
 tests/task/task_runner/test_task_runner.py         |   18 +-
 tests/test_utils/db.py                             |    6 +-
 tests/utils/test_helpers.py                        |   10 +-
 tests/utils/test_log_handlers.py                   |    8 +-
 tests/www/views/test_views_base.py                 |   40 +-
 143 files changed, 4231 insertions(+), 2639 deletions(-)

diff --cc airflow/jobs/scheduler_job_runner.py
index e725695254,706d980253..a39b879ede
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@@ -326,9 -304,12 +329,8 @@@ class SchedulerJobRunner(BaseJobRunner
          starved_pools = {pool_name for pool_name, stats in pools.items() if 
stats["open"] <= 0}
  
          # dag_id to # of running tasks and (dag_id, task_id) to # of running 
tasks.
 -        dag_active_tasks_map: DefaultDict[str, int]
 -        task_concurrency_map: DefaultDict[tuple[str, str], int]
 -        dag_active_tasks_map, task_concurrency_map = 
self.__get_concurrency_maps(
 -            states=list(EXECUTION_STATES), session=session
 -        )
 +        concurrency_map = 
self.__get_concurrency_maps(states=list(EXECUTION_STATES), session=session)
  
-         num_tasks_in_executor = 0
          # Number of tasks that cannot be scheduled because of no open slot in 
pool
          num_starving_tasks_total = 0
  
diff --cc tests/jobs/test_backfill_job.py
index c5f415c9b7,7001f6f1b9..164d5af1df
--- a/tests/jobs/test_backfill_job.py
+++ b/tests/jobs/test_backfill_job.py
@@@ -362,89 -358,6 +359,88 @@@ class TestBackfillJob
          assert task_concurrency_limit_reached_at_least_once
  
          times_dag_concurrency_limit_reached_in_debug = 
self._times_called_with(
 +            mock_log.debug,
 +            DagConcurrencyLimitReached,
 +        )
 +
 +        times_pool_limit_reached_in_debug = self._times_called_with(
 +            mock_log.debug,
 +            NoAvailablePoolSlot,
 +        )
 +
 +        times_task_concurrency_limit_reached_in_debug = 
self._times_called_with(
 +            mock_log.debug,
 +            TaskConcurrencyLimitReached,
 +        )
 +
 +        assert 0 == times_pool_limit_reached_in_debug
 +        assert 0 == times_dag_concurrency_limit_reached_in_debug
 +        assert times_task_concurrency_limit_reached_in_debug > 0
 +
 +    @pytest.mark.parametrize("with_max_active_tis_per_dag", [False, True])
-     @patch("airflow.jobs.backfill_job.BackfillJobRunner.log")
++    @patch("airflow.jobs.backfill_job_runner.BackfillJobRunner.log")
 +    def test_backfill_respect_max_active_tis_per_dagrun_limit(
 +        self, mock_log, dag_maker, with_max_active_tis_per_dag
 +    ):
 +        max_active_tis_per_dag = 3
 +        max_active_tis_per_dagrun = 2
 +        kwargs = {"max_active_tis_per_dagrun": max_active_tis_per_dagrun}
 +        if with_max_active_tis_per_dag:
 +            kwargs["max_active_tis_per_dag"] = max_active_tis_per_dag
 +
 +        with 
dag_maker(dag_id="test_backfill_respect_max_active_tis_per_dag_limit", 
schedule="@daily") as dag:
 +            EmptyOperator.partial(task_id="task1", 
**kwargs).expand_kwargs([{"x": i} for i in range(10)])
 +
 +        dag_maker.create_dagrun(state=None)
 +
 +        executor = MockExecutor()
 +
-         job = BaseJob(
-             job_runner=BackfillJobRunner(
-                 dag=dag,
-                 start_date=DEFAULT_DATE,
-                 end_date=DEFAULT_DATE + datetime.timedelta(days=7),
-             ),
-             executor=executor,
++        job = Job(executor=executor)
++        job_runner = BackfillJobRunner(
++            job=job,
++            dag=dag,
++            start_date=DEFAULT_DATE,
++            end_date=DEFAULT_DATE + datetime.timedelta(days=7),
 +        )
 +
-         job.run()
++        run_job(job=job, execute_callable=job_runner._execute)
 +
 +        assert len(executor.history) > 0
 +
 +        task_concurrency_limit_reached_at_least_once = False
 +
 +        def get_running_tis_per_dagrun(running_tis):
 +            running_tis_per_dagrun_dict = defaultdict(int)
 +            for running_ti in running_tis:
 +                running_tis_per_dagrun_dict[running_ti[3].dag_run.id] += 1
 +            return running_tis_per_dagrun_dict
 +
 +        num_running_task_instances = 0
 +        for running_task_instances in executor.history:
 +            if with_max_active_tis_per_dag:
 +                assert len(running_task_instances) <= max_active_tis_per_dag
 +            running_tis_per_dagrun_dict = 
get_running_tis_per_dagrun(running_task_instances)
 +            assert all(
 +                [
 +                    num_running_tis <= max_active_tis_per_dagrun
 +                    for num_running_tis in 
running_tis_per_dagrun_dict.values()
 +                ]
 +            )
 +            num_running_task_instances += len(running_task_instances)
 +            task_concurrency_limit_reached_at_least_once = (
 +                task_concurrency_limit_reached_at_least_once
 +                or any(
 +                    [
 +                        num_running_tis == max_active_tis_per_dagrun
 +                        for num_running_tis in 
running_tis_per_dagrun_dict.values()
 +                    ]
 +                )
 +            )
 +
 +        assert 80 == num_running_task_instances  # (7 backfill run + 1 manual 
run ) * 10 mapped task per run
 +        assert task_concurrency_limit_reached_at_least_once
 +
 +        times_dag_concurrency_limit_reached_in_debug = 
self._times_called_with(
              mock_log.debug,
              DagConcurrencyLimitReached,
          )
diff --cc tests/jobs/test_scheduler_job.py
index c1f9f37d6e,4870a2aa13..fafc5ee2b4
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@@ -1238,66 -1264,6 +1264,68 @@@ class TestSchedulerJob
  
          session.rollback()
  
 +    def 
test_find_executable_task_instances_task_concurrency_per_dagrun_for_first(self, 
dag_maker):
-         self.scheduler_job = 
BaseJob(job_runner=SchedulerJobRunner(subdir=os.devnull))
++        scheduler_job = Job()
++        self.job_runner = SchedulerJobRunner(job=scheduler_job, 
subdir=os.devnull)
 +        session = settings.Session()
 +
 +        dag_id = 
"SchedulerJobTest.test_find_executable_task_instances_task_concurrency_per_dagrun_for_first"
 +
 +        with dag_maker(dag_id=dag_id):
 +            op1a = EmptyOperator(task_id="dummy1-a", priority_weight=2, 
max_active_tis_per_dagrun=1)
 +            op1b = EmptyOperator(task_id="dummy1-b", priority_weight=1)
 +        dr1 = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
 +        dr2 = dag_maker.create_dagrun_after(dr1, 
run_type=DagRunType.SCHEDULED)
 +
 +        ti1a = dr1.get_task_instance(op1a.task_id, session)
 +        ti1b = dr1.get_task_instance(op1b.task_id, session)
 +        ti2a = dr2.get_task_instance(op1a.task_id, session)
 +        ti1a.state = State.RUNNING
 +        ti1b.state = State.SCHEDULED
 +        ti2a.state = State.SCHEDULED
 +        session.flush()
 +
 +        # Schedule ti with higher priority,
 +        # because it's running in a different DAG run with 0 active tis
-         res = 
self.scheduler_job.job_runner._executable_task_instances_to_queued(max_tis=1, 
session=session)
++        res = self.job_runner._executable_task_instances_to_queued(max_tis=1, 
session=session)
 +        assert 1 == len(res)
 +        assert res[0].key == ti2a.key
 +
 +        session.rollback()
 +
 +    def 
test_find_executable_task_instances_not_enough_task_concurrency_per_dagrun_for_first(self,
 dag_maker):
-         self.scheduler_job = 
BaseJob(job_runner=SchedulerJobRunner(subdir=os.devnull))
++        scheduler_job = Job()
++        self.job_runner = SchedulerJobRunner(job=scheduler_job, 
subdir=os.devnull)
 +        session = settings.Session()
 +
 +        dag_id = (
 +            "SchedulerJobTest"
 +            
".test_find_executable_task_instances_not_enough_task_concurrency_per_dagrun_for_first"
 +        )
 +
 +        with dag_maker(dag_id=dag_id):
 +            op1a = EmptyOperator.partial(
 +                task_id="dummy1-a", priority_weight=2, 
max_active_tis_per_dagrun=1
 +            ).expand_kwargs([{"inputs": 1}, {"inputs": 2}])
 +            op1b = EmptyOperator(task_id="dummy1-b", priority_weight=1)
 +        dr = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
 +
 +        ti1a0 = dr.get_task_instance(op1a.task_id, session, map_index=0)
 +        ti1a1 = dr.get_task_instance(op1a.task_id, session, map_index=1)
 +        ti1b = dr.get_task_instance(op1b.task_id, session)
 +        ti1a0.state = State.RUNNING
 +        ti1a1.state = State.SCHEDULED
 +        ti1b.state = State.SCHEDULED
 +        session.flush()
 +
 +        # Schedule ti with lower priority,
 +        # because the one with higher priority is limited by a concurrency 
limit
-         res = 
self.scheduler_job.job_runner._executable_task_instances_to_queued(max_tis=1, 
session=session)
++        res = self.job_runner._executable_task_instances_to_queued(max_tis=1, 
session=session)
 +        assert 1 == len(res)
 +        assert res[0].key == ti1b.key
 +
 +        session.rollback()
 +
      def test_find_executable_task_instances_negative_open_pool_slots(self, 
dag_maker):
          """
          Pools with negative open slots should not block other pools.
@@@ -4090,52 -4117,6 +4181,54 @@@
  
          with create_session() as session:
              ti = dr.get_task_instances(session=session)[0]
 +            ti.state = state
 +            ti.start_date = start_date
 +            ti.end_date = end_date
 +
-             self.scheduler_job.job_runner._schedule_dag_run(dr, session)
++            self.job_runner._schedule_dag_run(dr, session)
 +            assert 
session.query(TaskInstance).filter_by(state=State.SCHEDULED).count() == 1
 +
 +            session.refresh(ti)
 +            assert ti.state == State.SCHEDULED
 +
 +    @pytest.mark.parametrize(
 +        "state,start_date,end_date",
 +        [
 +            [State.NONE, None, None],
 +            [
 +                State.UP_FOR_RETRY,
 +                timezone.utcnow() - datetime.timedelta(minutes=30),
 +                timezone.utcnow() - datetime.timedelta(minutes=15),
 +            ],
 +            [
 +                State.UP_FOR_RESCHEDULE,
 +                timezone.utcnow() - datetime.timedelta(minutes=30),
 +                timezone.utcnow() - datetime.timedelta(minutes=15),
 +            ],
 +        ],
 +    )
 +    def 
test_dag_file_processor_process_task_instances_with_max_active_tis_per_dagrun(
 +        self, state, start_date, end_date, dag_maker
 +    ):
 +        """
 +        Test if _process_task_instances puts the right task instances into the
 +        mock_list.
 +        """
 +        with 
dag_maker(dag_id="test_scheduler_process_execute_task_with_max_active_tis_per_dagrun"):
 +            BashOperator(task_id="dummy", max_active_tis_per_dagrun=2, 
bash_command="echo Hi")
 +
-         self.scheduler_job = 
BaseJob(job_runner=SchedulerJobRunner(subdir=os.devnull))
-         self.scheduler_job.processor_agent = mock.MagicMock()
++        scheduler_job = Job()
++        self.job_runner = SchedulerJobRunner(job=scheduler_job, 
subdir=os.devnull)
++
++        self.job_runner.processor_agent = mock.MagicMock()
 +
 +        dr = dag_maker.create_dagrun(
 +            run_type=DagRunType.SCHEDULED,
 +        )
 +        assert dr is not None
 +
 +        with create_session() as session:
 +            ti = dr.get_task_instances(session=session)[0]
              ti.state = state
              ti.start_date = start_date
              ti.end_date = end_date

Reply via email to