ashb commented on a change in pull request #7537: [AIRFLOW-6454] And script to benchmark scheduler dag-run time URL: https://github.com/apache/airflow/pull/7537#discussion_r384446219
########## File path: scripts/perf/scheduler_dag_execution_timing.py ########## @@ -0,0 +1,221 @@ +#!/usr/bin/env python3 +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import gc +import os +import statistics +import time + +import click + + +class ShortCircutExecutorMixin: + def __init__(self, stop_when_these_completed): + super().__init__() + self.reset(stop_when_these_completed) + + def reset(self, stop_when_these_completed): + self.stop_when_these_completed = { + # Store the date as a timestamp, as sometimes this is a Pendulum + # object, others it is a datetime object. + (run.dag_id, run.execution_date.timestamp()): run for run in stop_when_these_completed + } + + def change_state(self, key, state): + from airflow.utils.state import State + super().change_state(key, state) + + dag_id, task_id, execution_date, __ = key + run_key = (dag_id, execution_date.timestamp()) + run = self.stop_when_these_completed.get(run_key, None) + if run and all(t.state == State.SUCCESS for t in run.get_task_instances()): + self.stop_when_these_completed.pop(run_key) + + if not self.stop_when_these_completed: + self.log.warning("STOPPING SCHEDULER -- all runs complete") + self.scheduler_job.processor_agent._done = True + else: + self.log.warning("WAITING ON %d RUNS", len(self.stop_when_these_completed)) + elif state == State.SUCCESS: + self.log.warning("WAITING ON %d RUNS", len(self.stop_when_these_completed)) + + +def get_executor_under_test(): + try: + # Run against master and 1.10.x releases + from tests.test_utils.mock_executor import MockExecutor + except ImportError: + from tests.executors.test_executor import TestExecutor as MockExecutor + + # from airflow.executors.local_executor import LocalExecutor + + # Change this to try other executors + Executor = MockExecutor + + class ShortCircutExecutor(ShortCircutExecutorMixin, Executor): + pass + + return ShortCircutExecutor + + +def reset_dag(dag, num_runs, session): + import airflow.models + from airflow.utils import timezone + from airflow.utils.state import State + + DR = airflow.models.DagRun + DM = airflow.models.DagModel + TI = airflow.models.TaskInstance + TF = airflow.models.TaskFail + dag_id = dag.dag_id + + session.query(DM).filter(DM.dag_id == dag_id).update({'is_paused': False}) + session.query(DR).filter(DR.dag_id == dag_id).delete() + session.query(TI).filter(TI.dag_id == dag_id).delete() + session.query(TF).filter(TF.dag_id == dag_id).delete() + + next_run_date = dag.normalize_schedule(dag.start_date or min(t.start_date for t in dag.tasks)) + + for _ in range(num_runs): + next_run = dag.create_dagrun( + run_id=DR.ID_PREFIX + next_run_date.isoformat(), + execution_date=next_run_date, + start_date=timezone.utcnow(), + state=State.RUNNING, + external_trigger=False, + session=session, + ) + next_run_date = dag.following_schedule(next_run_date) + return next_run + + +def pause_all_dags(session): + from airflow.models.dag import DagModel + session.query(DagModel).update({'is_paused': True}) + + +@click.command() +@click.option('--num-runs', default=1, help='number of DagRun, to run for each DAG') +@click.option('--repeat', default=3, help='number of times to run test, to reduce variance') +@click.argument('dag_ids', required=True, nargs=-1) +def main(num_runs, repeat, dag_ids): + """ + This script will run the SchedulerJob for the specified dags "to completion". + + That is it creates a fixed number of DAG runs for the specified DAGs (from + the configured dag path/example dags etc), disable the scheduler from + creating more, and then monitor them for completion. When the file task of + the final dag run is completed the scheduler will be terminated. + + The aim of this script is to have a benchmark for real-world scheduler + performance -- i.e. total time take to run N dag runs to completion. + + Care should be taken that other limits (DAG concurrency, pool size etc) are + not the bottleneck. This script doesn't help you in that regard. + + It is recommended to repeat the test at least 3 times (`--repeat=3`) so + that you can get somewhat-accurate variance on the reported timing numbers, + but this can be disabled for longer runs if needed. + """ + # Disallow the scheduler to create new dag_runs - we've already created all + # the ones we want it to process. Easier than editing end date + os.environ['AIRFLOW__SCHEDULER__USE_JOB_SCHEDULER'] = 'False' Review comment: > I think these tests will never be reliable unless we test it in isolation. Yes, there is value in testing the parts in isolation, but the purpose of this test is to have a way of measuring "Total Scheduler Overhead" -- how quickly can the scheduler and all it's parts get out of the way and run the actual tasks. Without measuring the whole scheduler performance we could make a part 1000x faster, but if it's not where the scheduler is waiting, it won't make tasks start any quicker. The reason to not want to edit the dag file is so that this can be run against _any_ dag, not just a specially prepared one. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services