mik-laj commented on a change in pull request #7537: [AIRFLOW-6454] And script to benchmark scheduler dag-run time URL: https://github.com/apache/airflow/pull/7537#discussion_r384440299
########## File path: scripts/perf/scheduler_dag_execution_timing.py ########## @@ -0,0 +1,221 @@ +#!/usr/bin/env python3 +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import gc +import os +import statistics +import time + +import click + + +class ShortCircutExecutorMixin: + def __init__(self, stop_when_these_completed): + super().__init__() + self.reset(stop_when_these_completed) + + def reset(self, stop_when_these_completed): + self.stop_when_these_completed = { + # Store the date as a timestamp, as sometimes this is a Pendulum + # object, others it is a datetime object. + (run.dag_id, run.execution_date.timestamp()): run for run in stop_when_these_completed + } + + def change_state(self, key, state): + from airflow.utils.state import State + super().change_state(key, state) + + dag_id, task_id, execution_date, __ = key + run_key = (dag_id, execution_date.timestamp()) + run = self.stop_when_these_completed.get(run_key, None) + if run and all(t.state == State.SUCCESS for t in run.get_task_instances()): + self.stop_when_these_completed.pop(run_key) + + if not self.stop_when_these_completed: + self.log.warning("STOPPING SCHEDULER -- all runs complete") + self.scheduler_job.processor_agent._done = True + else: + self.log.warning("WAITING ON %d RUNS", len(self.stop_when_these_completed)) + elif state == State.SUCCESS: + self.log.warning("WAITING ON %d RUNS", len(self.stop_when_these_completed)) + + +def get_executor_under_test(): + try: + # Run against master and 1.10.x releases + from tests.test_utils.mock_executor import MockExecutor + except ImportError: + from tests.executors.test_executor import TestExecutor as MockExecutor + + # from airflow.executors.local_executor import LocalExecutor + + # Change this to try other executors + Executor = MockExecutor + + class ShortCircutExecutor(ShortCircutExecutorMixin, Executor): + pass + + return ShortCircutExecutor + + +def reset_dag(dag, num_runs, session): + import airflow.models + from airflow.utils import timezone + from airflow.utils.state import State + + DR = airflow.models.DagRun + DM = airflow.models.DagModel + TI = airflow.models.TaskInstance + TF = airflow.models.TaskFail + dag_id = dag.dag_id + + session.query(DM).filter(DM.dag_id == dag_id).update({'is_paused': False}) + session.query(DR).filter(DR.dag_id == dag_id).delete() + session.query(TI).filter(TI.dag_id == dag_id).delete() + session.query(TF).filter(TF.dag_id == dag_id).delete() + + next_run_date = dag.normalize_schedule(dag.start_date or min(t.start_date for t in dag.tasks)) + + for _ in range(num_runs): + next_run = dag.create_dagrun( + run_id=DR.ID_PREFIX + next_run_date.isoformat(), + execution_date=next_run_date, + start_date=timezone.utcnow(), + state=State.RUNNING, + external_trigger=False, + session=session, + ) + next_run_date = dag.following_schedule(next_run_date) + return next_run + + +def pause_all_dags(session): + from airflow.models.dag import DagModel + session.query(DagModel).update({'is_paused': True}) + + +@click.command() +@click.option('--num-runs', default=1, help='number of DagRun, to run for each DAG') +@click.option('--repeat', default=3, help='number of times to run test, to reduce variance') +@click.argument('dag_ids', required=True, nargs=-1) +def main(num_runs, repeat, dag_ids): + """ + This script will run the SchedulerJob for the specified dags "to completion". + + That is it creates a fixed number of DAG runs for the specified DAGs (from + the configured dag path/example dags etc), disable the scheduler from + creating more, and then monitor them for completion. When the file task of + the final dag run is completed the scheduler will be terminated. + + The aim of this script is to have a benchmark for real-world scheduler + performance -- i.e. total time take to run N dag runs to completion. + + Care should be taken that other limits (DAG concurrency, pool size etc) are + not the bottleneck. This script doesn't help you in that regard. + + It is recommended to repeat the test at least 3 times (`--repeat=3`) so + that you can get somewhat-accurate variance on the reported timing numbers, + but this can be disabled for longer runs if needed. + """ + # Disallow the scheduler to create new dag_runs - we've already created all + # the ones we want it to process. Easier than editing end date + os.environ['AIRFLOW__SCHEDULER__USE_JOB_SCHEDULER'] = 'False' Review comment: I think these tests will never be reliable unless we test it in isolation. We have two loops: SchedlerJob and DagFileProcessorManager. We should write performance tests for the two loops separately. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services