[GitHub] [airflow] mik-laj commented on a change in pull request #7537: [AIRFLOW-6454] And script to benchmark scheduler dag-run time
mik-laj commented on a change in pull request #7537: [AIRFLOW-6454] And script to benchmark scheduler dag-run time URL: https://github.com/apache/airflow/pull/7537#discussion_r384442867 ## File path: scripts/perf/scheduler_dag_execution_timing.py ## @@ -0,0 +1,221 @@ +#!/usr/bin/env python3 +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import gc +import os +import statistics +import time + +import click + + +class ShortCircutExecutorMixin: +def __init__(self, stop_when_these_completed): +super().__init__() +self.reset(stop_when_these_completed) + +def reset(self, stop_when_these_completed): +self.stop_when_these_completed = { +# Store the date as a timestamp, as sometimes this is a Pendulum +# object, others it is a datetime object. +(run.dag_id, run.execution_date.timestamp()): run for run in stop_when_these_completed +} + +def change_state(self, key, state): +from airflow.utils.state import State +super().change_state(key, state) + +dag_id, task_id, execution_date, __ = key +run_key = (dag_id, execution_date.timestamp()) +run = self.stop_when_these_completed.get(run_key, None) +if run and all(t.state == State.SUCCESS for t in run.get_task_instances()): +self.stop_when_these_completed.pop(run_key) + +if not self.stop_when_these_completed: +self.log.warning("STOPPING SCHEDULER -- all runs complete") +self.scheduler_job.processor_agent._done = True +else: +self.log.warning("WAITING ON %d RUNS", len(self.stop_when_these_completed)) +elif state == State.SUCCESS: +self.log.warning("WAITING ON %d RUNS", len(self.stop_when_these_completed)) + + +def get_executor_under_test(): +try: +# Run against master and 1.10.x releases +from tests.test_utils.mock_executor import MockExecutor +except ImportError: +from tests.executors.test_executor import TestExecutor as MockExecutor + +# from airflow.executors.local_executor import LocalExecutor + +# Change this to try other executors +Executor = MockExecutor + +class ShortCircutExecutor(ShortCircutExecutorMixin, Executor): +pass + +return ShortCircutExecutor + + +def reset_dag(dag, num_runs, session): +import airflow.models +from airflow.utils import timezone +from airflow.utils.state import State + +DR = airflow.models.DagRun +DM = airflow.models.DagModel +TI = airflow.models.TaskInstance +TF = airflow.models.TaskFail +dag_id = dag.dag_id + +session.query(DM).filter(DM.dag_id == dag_id).update({'is_paused': False}) +session.query(DR).filter(DR.dag_id == dag_id).delete() +session.query(TI).filter(TI.dag_id == dag_id).delete() +session.query(TF).filter(TF.dag_id == dag_id).delete() + +next_run_date = dag.normalize_schedule(dag.start_date or min(t.start_date for t in dag.tasks)) + +for _ in range(num_runs): +next_run = dag.create_dagrun( +run_id=DR.ID_PREFIX + next_run_date.isoformat(), +execution_date=next_run_date, +start_date=timezone.utcnow(), +state=State.RUNNING, +external_trigger=False, +session=session, +) +next_run_date = dag.following_schedule(next_run_date) +return next_run + + +def pause_all_dags(session): +from airflow.models.dag import DagModel +session.query(DagModel).update({'is_paused': True}) + + +@click.command() +@click.option('--num-runs', default=1, help='number of DagRun, to run for each DAG') +@click.option('--repeat', default=3, help='number of times to run test, to reduce variance') +@click.argument('dag_ids', required=True, nargs=-1) +def main(num_runs, repeat, dag_ids): +""" +This script will run the SchedulerJob for the specified dags "to completion". + +That is it creates a fixed number of DAG runs for the specified DAGs (from +the configured dag path/example dags etc), disable the scheduler from +creating more, and then monitor them for completion. When the file task of +the final dag run is comple
[GitHub] [airflow] mik-laj commented on a change in pull request #7537: [AIRFLOW-6454] And script to benchmark scheduler dag-run time
mik-laj commented on a change in pull request #7537: [AIRFLOW-6454] And script to benchmark scheduler dag-run time URL: https://github.com/apache/airflow/pull/7537#discussion_r384442175 ## File path: scripts/perf/scheduler_dag_execution_timing.py ## @@ -0,0 +1,221 @@ +#!/usr/bin/env python3 +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import gc +import os +import statistics +import time + +import click + + +class ShortCircutExecutorMixin: +def __init__(self, stop_when_these_completed): +super().__init__() +self.reset(stop_when_these_completed) + +def reset(self, stop_when_these_completed): +self.stop_when_these_completed = { +# Store the date as a timestamp, as sometimes this is a Pendulum +# object, others it is a datetime object. +(run.dag_id, run.execution_date.timestamp()): run for run in stop_when_these_completed +} + +def change_state(self, key, state): +from airflow.utils.state import State +super().change_state(key, state) + +dag_id, task_id, execution_date, __ = key +run_key = (dag_id, execution_date.timestamp()) +run = self.stop_when_these_completed.get(run_key, None) +if run and all(t.state == State.SUCCESS for t in run.get_task_instances()): +self.stop_when_these_completed.pop(run_key) + +if not self.stop_when_these_completed: +self.log.warning("STOPPING SCHEDULER -- all runs complete") +self.scheduler_job.processor_agent._done = True +else: +self.log.warning("WAITING ON %d RUNS", len(self.stop_when_these_completed)) +elif state == State.SUCCESS: +self.log.warning("WAITING ON %d RUNS", len(self.stop_when_these_completed)) + + +def get_executor_under_test(): +try: +# Run against master and 1.10.x releases +from tests.test_utils.mock_executor import MockExecutor +except ImportError: +from tests.executors.test_executor import TestExecutor as MockExecutor + +# from airflow.executors.local_executor import LocalExecutor + +# Change this to try other executors +Executor = MockExecutor + +class ShortCircutExecutor(ShortCircutExecutorMixin, Executor): +pass + +return ShortCircutExecutor + + +def reset_dag(dag, num_runs, session): +import airflow.models +from airflow.utils import timezone +from airflow.utils.state import State + +DR = airflow.models.DagRun +DM = airflow.models.DagModel +TI = airflow.models.TaskInstance +TF = airflow.models.TaskFail +dag_id = dag.dag_id + +session.query(DM).filter(DM.dag_id == dag_id).update({'is_paused': False}) +session.query(DR).filter(DR.dag_id == dag_id).delete() +session.query(TI).filter(TI.dag_id == dag_id).delete() +session.query(TF).filter(TF.dag_id == dag_id).delete() + +next_run_date = dag.normalize_schedule(dag.start_date or min(t.start_date for t in dag.tasks)) + +for _ in range(num_runs): +next_run = dag.create_dagrun( +run_id=DR.ID_PREFIX + next_run_date.isoformat(), +execution_date=next_run_date, +start_date=timezone.utcnow(), +state=State.RUNNING, +external_trigger=False, +session=session, +) +next_run_date = dag.following_schedule(next_run_date) +return next_run + + +def pause_all_dags(session): +from airflow.models.dag import DagModel +session.query(DagModel).update({'is_paused': True}) + + +@click.command() +@click.option('--num-runs', default=1, help='number of DagRun, to run for each DAG') +@click.option('--repeat', default=3, help='number of times to run test, to reduce variance') +@click.argument('dag_ids', required=True, nargs=-1) +def main(num_runs, repeat, dag_ids): +""" +This script will run the SchedulerJob for the specified dags "to completion". + +That is it creates a fixed number of DAG runs for the specified DAGs (from +the configured dag path/example dags etc), disable the scheduler from +creating more, and then monitor them for completion. When the file task of +the final dag run is comple
[GitHub] [airflow] mik-laj commented on a change in pull request #7537: [AIRFLOW-6454] And script to benchmark scheduler dag-run time
mik-laj commented on a change in pull request #7537: [AIRFLOW-6454] And script to benchmark scheduler dag-run time URL: https://github.com/apache/airflow/pull/7537#discussion_r384440788 ## File path: scripts/perf/scheduler_dag_execution_timing.py ## @@ -0,0 +1,221 @@ +#!/usr/bin/env python3 +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import gc +import os +import statistics +import time + +import click + + +class ShortCircutExecutorMixin: +def __init__(self, stop_when_these_completed): +super().__init__() +self.reset(stop_when_these_completed) + +def reset(self, stop_when_these_completed): +self.stop_when_these_completed = { +# Store the date as a timestamp, as sometimes this is a Pendulum +# object, others it is a datetime object. +(run.dag_id, run.execution_date.timestamp()): run for run in stop_when_these_completed +} + +def change_state(self, key, state): +from airflow.utils.state import State +super().change_state(key, state) + +dag_id, task_id, execution_date, __ = key +run_key = (dag_id, execution_date.timestamp()) +run = self.stop_when_these_completed.get(run_key, None) +if run and all(t.state == State.SUCCESS for t in run.get_task_instances()): +self.stop_when_these_completed.pop(run_key) + +if not self.stop_when_these_completed: +self.log.warning("STOPPING SCHEDULER -- all runs complete") +self.scheduler_job.processor_agent._done = True +else: +self.log.warning("WAITING ON %d RUNS", len(self.stop_when_these_completed)) +elif state == State.SUCCESS: +self.log.warning("WAITING ON %d RUNS", len(self.stop_when_these_completed)) + + +def get_executor_under_test(): +try: +# Run against master and 1.10.x releases +from tests.test_utils.mock_executor import MockExecutor +except ImportError: +from tests.executors.test_executor import TestExecutor as MockExecutor + +# from airflow.executors.local_executor import LocalExecutor + +# Change this to try other executors +Executor = MockExecutor + +class ShortCircutExecutor(ShortCircutExecutorMixin, Executor): +pass + +return ShortCircutExecutor + + +def reset_dag(dag, num_runs, session): +import airflow.models +from airflow.utils import timezone +from airflow.utils.state import State + +DR = airflow.models.DagRun +DM = airflow.models.DagModel +TI = airflow.models.TaskInstance +TF = airflow.models.TaskFail +dag_id = dag.dag_id + +session.query(DM).filter(DM.dag_id == dag_id).update({'is_paused': False}) +session.query(DR).filter(DR.dag_id == dag_id).delete() +session.query(TI).filter(TI.dag_id == dag_id).delete() +session.query(TF).filter(TF.dag_id == dag_id).delete() + +next_run_date = dag.normalize_schedule(dag.start_date or min(t.start_date for t in dag.tasks)) + +for _ in range(num_runs): +next_run = dag.create_dagrun( +run_id=DR.ID_PREFIX + next_run_date.isoformat(), +execution_date=next_run_date, +start_date=timezone.utcnow(), +state=State.RUNNING, +external_trigger=False, +session=session, +) +next_run_date = dag.following_schedule(next_run_date) +return next_run + + +def pause_all_dags(session): +from airflow.models.dag import DagModel +session.query(DagModel).update({'is_paused': True}) + + +@click.command() +@click.option('--num-runs', default=1, help='number of DagRun, to run for each DAG') +@click.option('--repeat', default=3, help='number of times to run test, to reduce variance') +@click.argument('dag_ids', required=True, nargs=-1) +def main(num_runs, repeat, dag_ids): +""" +This script will run the SchedulerJob for the specified dags "to completion". + +That is it creates a fixed number of DAG runs for the specified DAGs (from +the configured dag path/example dags etc), disable the scheduler from +creating more, and then monitor them for completion. When the file task of +the final dag run is comple
[GitHub] [airflow] mik-laj commented on a change in pull request #7537: [AIRFLOW-6454] And script to benchmark scheduler dag-run time
mik-laj commented on a change in pull request #7537: [AIRFLOW-6454] And script to benchmark scheduler dag-run time URL: https://github.com/apache/airflow/pull/7537#discussion_r384440299 ## File path: scripts/perf/scheduler_dag_execution_timing.py ## @@ -0,0 +1,221 @@ +#!/usr/bin/env python3 +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import gc +import os +import statistics +import time + +import click + + +class ShortCircutExecutorMixin: +def __init__(self, stop_when_these_completed): +super().__init__() +self.reset(stop_when_these_completed) + +def reset(self, stop_when_these_completed): +self.stop_when_these_completed = { +# Store the date as a timestamp, as sometimes this is a Pendulum +# object, others it is a datetime object. +(run.dag_id, run.execution_date.timestamp()): run for run in stop_when_these_completed +} + +def change_state(self, key, state): +from airflow.utils.state import State +super().change_state(key, state) + +dag_id, task_id, execution_date, __ = key +run_key = (dag_id, execution_date.timestamp()) +run = self.stop_when_these_completed.get(run_key, None) +if run and all(t.state == State.SUCCESS for t in run.get_task_instances()): +self.stop_when_these_completed.pop(run_key) + +if not self.stop_when_these_completed: +self.log.warning("STOPPING SCHEDULER -- all runs complete") +self.scheduler_job.processor_agent._done = True +else: +self.log.warning("WAITING ON %d RUNS", len(self.stop_when_these_completed)) +elif state == State.SUCCESS: +self.log.warning("WAITING ON %d RUNS", len(self.stop_when_these_completed)) + + +def get_executor_under_test(): +try: +# Run against master and 1.10.x releases +from tests.test_utils.mock_executor import MockExecutor +except ImportError: +from tests.executors.test_executor import TestExecutor as MockExecutor + +# from airflow.executors.local_executor import LocalExecutor + +# Change this to try other executors +Executor = MockExecutor + +class ShortCircutExecutor(ShortCircutExecutorMixin, Executor): +pass + +return ShortCircutExecutor + + +def reset_dag(dag, num_runs, session): +import airflow.models +from airflow.utils import timezone +from airflow.utils.state import State + +DR = airflow.models.DagRun +DM = airflow.models.DagModel +TI = airflow.models.TaskInstance +TF = airflow.models.TaskFail +dag_id = dag.dag_id + +session.query(DM).filter(DM.dag_id == dag_id).update({'is_paused': False}) +session.query(DR).filter(DR.dag_id == dag_id).delete() +session.query(TI).filter(TI.dag_id == dag_id).delete() +session.query(TF).filter(TF.dag_id == dag_id).delete() + +next_run_date = dag.normalize_schedule(dag.start_date or min(t.start_date for t in dag.tasks)) + +for _ in range(num_runs): +next_run = dag.create_dagrun( +run_id=DR.ID_PREFIX + next_run_date.isoformat(), +execution_date=next_run_date, +start_date=timezone.utcnow(), +state=State.RUNNING, +external_trigger=False, +session=session, +) +next_run_date = dag.following_schedule(next_run_date) +return next_run + + +def pause_all_dags(session): +from airflow.models.dag import DagModel +session.query(DagModel).update({'is_paused': True}) + + +@click.command() +@click.option('--num-runs', default=1, help='number of DagRun, to run for each DAG') +@click.option('--repeat', default=3, help='number of times to run test, to reduce variance') +@click.argument('dag_ids', required=True, nargs=-1) +def main(num_runs, repeat, dag_ids): +""" +This script will run the SchedulerJob for the specified dags "to completion". + +That is it creates a fixed number of DAG runs for the specified DAGs (from +the configured dag path/example dags etc), disable the scheduler from +creating more, and then monitor them for completion. When the file task of +the final dag run is comple
[GitHub] [airflow] mik-laj commented on a change in pull request #7537: [AIRFLOW-6454] And script to benchmark scheduler dag-run time
mik-laj commented on a change in pull request #7537: [AIRFLOW-6454] And script to benchmark scheduler dag-run time URL: https://github.com/apache/airflow/pull/7537#discussion_r384362406 ## File path: scripts/perf/scheduler_dag_execution_timing.py ## @@ -0,0 +1,221 @@ +#!/usr/bin/env python3 +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import gc +import os +import statistics +import time + +import click + + +class ShortCircutExecutorMixin: +def __init__(self, stop_when_these_completed): +super().__init__() +self.reset(stop_when_these_completed) + +def reset(self, stop_when_these_completed): +self.stop_when_these_completed = { +# Store the date as a timestamp, as sometimes this is a Pendulum +# object, others it is a datetime object. +(run.dag_id, run.execution_date.timestamp()): run for run in stop_when_these_completed +} + +def change_state(self, key, state): +from airflow.utils.state import State +super().change_state(key, state) + +dag_id, task_id, execution_date, __ = key +run_key = (dag_id, execution_date.timestamp()) +run = self.stop_when_these_completed.get(run_key, None) +if run and all(t.state == State.SUCCESS for t in run.get_task_instances()): +self.stop_when_these_completed.pop(run_key) + +if not self.stop_when_these_completed: +self.log.warning("STOPPING SCHEDULER -- all runs complete") +self.scheduler_job.processor_agent._done = True +else: +self.log.warning("WAITING ON %d RUNS", len(self.stop_when_these_completed)) +elif state == State.SUCCESS: +self.log.warning("WAITING ON %d RUNS", len(self.stop_when_these_completed)) + + +def get_executor_under_test(): +try: +# Run against master and 1.10.x releases +from tests.test_utils.mock_executor import MockExecutor +except ImportError: +from tests.executors.test_executor import TestExecutor as MockExecutor + +# from airflow.executors.local_executor import LocalExecutor + +# Change this to try other executors +Executor = MockExecutor + +class ShortCircutExecutor(ShortCircutExecutorMixin, Executor): +pass + +return ShortCircutExecutor + + +def reset_dag(dag, num_runs, session): +import airflow.models +from airflow.utils import timezone +from airflow.utils.state import State + +DR = airflow.models.DagRun +DM = airflow.models.DagModel +TI = airflow.models.TaskInstance +TF = airflow.models.TaskFail +dag_id = dag.dag_id + +session.query(DM).filter(DM.dag_id == dag_id).update({'is_paused': False}) +session.query(DR).filter(DR.dag_id == dag_id).delete() +session.query(TI).filter(TI.dag_id == dag_id).delete() +session.query(TF).filter(TF.dag_id == dag_id).delete() + +next_run_date = dag.normalize_schedule(dag.start_date or min(t.start_date for t in dag.tasks)) + +for _ in range(num_runs): +next_run = dag.create_dagrun( +run_id=DR.ID_PREFIX + next_run_date.isoformat(), +execution_date=next_run_date, +start_date=timezone.utcnow(), +state=State.RUNNING, +external_trigger=False, +session=session, +) +next_run_date = dag.following_schedule(next_run_date) +return next_run + + +def pause_all_dags(session): +from airflow.models.dag import DagModel +session.query(DagModel).update({'is_paused': True}) + + +@click.command() +@click.option('--num-runs', default=1, help='number of DagRun, to run for each DAG') +@click.option('--repeat', default=3, help='number of times to run test, to reduce variance') +@click.argument('dag_ids', required=True, nargs=-1) +def main(num_runs, repeat, dag_ids): +""" +This script will run the SchedulerJob for the specified dags "to completion". + +That is it creates a fixed number of DAG runs for the specified DAGs (from +the configured dag path/example dags etc), disable the scheduler from +creating more, and then monitor them for completion. When the file task of +the final dag run is comple
[GitHub] [airflow] mik-laj commented on a change in pull request #7537: [AIRFLOW-6454] And script to benchmark scheduler dag-run time
mik-laj commented on a change in pull request #7537: [AIRFLOW-6454] And script to benchmark scheduler dag-run time URL: https://github.com/apache/airflow/pull/7537#discussion_r384352337 ## File path: scripts/perf/scheduler_dag_execution_timing.py ## @@ -0,0 +1,221 @@ +#!/usr/bin/env python3 +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import gc +import os +import statistics +import time + +import click + + +class ShortCircutExecutorMixin: +def __init__(self, stop_when_these_completed): +super().__init__() +self.reset(stop_when_these_completed) + +def reset(self, stop_when_these_completed): +self.stop_when_these_completed = { +# Store the date as a timestamp, as sometimes this is a Pendulum +# object, others it is a datetime object. +(run.dag_id, run.execution_date.timestamp()): run for run in stop_when_these_completed +} + +def change_state(self, key, state): +from airflow.utils.state import State +super().change_state(key, state) + +dag_id, task_id, execution_date, __ = key +run_key = (dag_id, execution_date.timestamp()) +run = self.stop_when_these_completed.get(run_key, None) +if run and all(t.state == State.SUCCESS for t in run.get_task_instances()): +self.stop_when_these_completed.pop(run_key) + +if not self.stop_when_these_completed: +self.log.warning("STOPPING SCHEDULER -- all runs complete") +self.scheduler_job.processor_agent._done = True +else: +self.log.warning("WAITING ON %d RUNS", len(self.stop_when_these_completed)) +elif state == State.SUCCESS: +self.log.warning("WAITING ON %d RUNS", len(self.stop_when_these_completed)) + + +def get_executor_under_test(): +try: +# Run against master and 1.10.x releases +from tests.test_utils.mock_executor import MockExecutor +except ImportError: +from tests.executors.test_executor import TestExecutor as MockExecutor + +# from airflow.executors.local_executor import LocalExecutor + +# Change this to try other executors +Executor = MockExecutor + +class ShortCircutExecutor(ShortCircutExecutorMixin, Executor): +pass + +return ShortCircutExecutor + + +def reset_dag(dag, num_runs, session): +import airflow.models +from airflow.utils import timezone +from airflow.utils.state import State + +DR = airflow.models.DagRun +DM = airflow.models.DagModel +TI = airflow.models.TaskInstance +TF = airflow.models.TaskFail +dag_id = dag.dag_id + +session.query(DM).filter(DM.dag_id == dag_id).update({'is_paused': False}) +session.query(DR).filter(DR.dag_id == dag_id).delete() +session.query(TI).filter(TI.dag_id == dag_id).delete() +session.query(TF).filter(TF.dag_id == dag_id).delete() + +next_run_date = dag.normalize_schedule(dag.start_date or min(t.start_date for t in dag.tasks)) + +for _ in range(num_runs): +next_run = dag.create_dagrun( +run_id=DR.ID_PREFIX + next_run_date.isoformat(), +execution_date=next_run_date, +start_date=timezone.utcnow(), +state=State.RUNNING, +external_trigger=False, +session=session, +) +next_run_date = dag.following_schedule(next_run_date) +return next_run + + +def pause_all_dags(session): +from airflow.models.dag import DagModel +session.query(DagModel).update({'is_paused': True}) + + +@click.command() +@click.option('--num-runs', default=1, help='number of DagRun, to run for each DAG') +@click.option('--repeat', default=3, help='number of times to run test, to reduce variance') +@click.argument('dag_ids', required=True, nargs=-1) +def main(num_runs, repeat, dag_ids): +""" +This script will run the SchedulerJob for the specified dags "to completion". + +That is it creates a fixed number of DAG runs for the specified DAGs (from +the configured dag path/example dags etc), disable the scheduler from +creating more, and then monitor them for completion. When the file task of +the final dag run is comple
[GitHub] [airflow] mik-laj commented on a change in pull request #7537: [AIRFLOW-6454] And script to benchmark scheduler dag-run time
mik-laj commented on a change in pull request #7537: [AIRFLOW-6454] And script to benchmark scheduler dag-run time URL: https://github.com/apache/airflow/pull/7537#discussion_r384258420 ## File path: scripts/perf/scheduler_dag_execution_timing.py ## @@ -0,0 +1,221 @@ +#!/usr/bin/env python3 +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import gc +import os +import statistics +import time + +import click + + +class ShortCircutExecutorMixin: +def __init__(self, stop_when_these_completed): +super().__init__() +self.reset(stop_when_these_completed) + +def reset(self, stop_when_these_completed): +self.stop_when_these_completed = { +# Store the date as a timestamp, as sometimes this is a Pendulum +# object, others it is a datetime object. +(run.dag_id, run.execution_date.timestamp()): run for run in stop_when_these_completed +} + +def change_state(self, key, state): +from airflow.utils.state import State +super().change_state(key, state) + +dag_id, task_id, execution_date, __ = key +run_key = (dag_id, execution_date.timestamp()) +run = self.stop_when_these_completed.get(run_key, None) +if run and all(t.state == State.SUCCESS for t in run.get_task_instances()): +self.stop_when_these_completed.pop(run_key) + +if not self.stop_when_these_completed: +self.log.warning("STOPPING SCHEDULER -- all runs complete") +self.scheduler_job.processor_agent._done = True +else: +self.log.warning("WAITING ON %d RUNS", len(self.stop_when_these_completed)) +elif state == State.SUCCESS: +self.log.warning("WAITING ON %d RUNS", len(self.stop_when_these_completed)) + + +def get_executor_under_test(): +try: +# Run against master and 1.10.x releases +from tests.test_utils.mock_executor import MockExecutor +except ImportError: +from tests.executors.test_executor import TestExecutor as MockExecutor + +# from airflow.executors.local_executor import LocalExecutor + +# Change this to try other executors +Executor = MockExecutor + +class ShortCircutExecutor(ShortCircutExecutorMixin, Executor): +pass + +return ShortCircutExecutor + + +def reset_dag(dag, num_runs, session): +import airflow.models +from airflow.utils import timezone +from airflow.utils.state import State + +DR = airflow.models.DagRun +DM = airflow.models.DagModel +TI = airflow.models.TaskInstance +TF = airflow.models.TaskFail +dag_id = dag.dag_id + +session.query(DM).filter(DM.dag_id == dag_id).update({'is_paused': False}) +session.query(DR).filter(DR.dag_id == dag_id).delete() +session.query(TI).filter(TI.dag_id == dag_id).delete() +session.query(TF).filter(TF.dag_id == dag_id).delete() + +next_run_date = dag.normalize_schedule(dag.start_date or min(t.start_date for t in dag.tasks)) + +for _ in range(num_runs): +next_run = dag.create_dagrun( +run_id=DR.ID_PREFIX + next_run_date.isoformat(), +execution_date=next_run_date, +start_date=timezone.utcnow(), +state=State.RUNNING, +external_trigger=False, +session=session, +) +next_run_date = dag.following_schedule(next_run_date) +return next_run + + +def pause_all_dags(session): +from airflow.models.dag import DagModel +session.query(DagModel).update({'is_paused': True}) + + +@click.command() +@click.option('--num-runs', default=1, help='number of DagRun, to run for each DAG') +@click.option('--repeat', default=3, help='number of times to run test, to reduce variance') +@click.argument('dag_ids', required=True, nargs=-1) +def main(num_runs, repeat, dag_ids): +""" +This script will run the SchedulerJob for the specified dags "to completion". + +That is it creates a fixed number of DAG runs for the specified DAGs (from +the configured dag path/example dags etc), disable the scheduler from +creating more, and then monitor them for completion. When the file task of +the final dag run is comple