[GitHub] [airflow] mik-laj commented on a change in pull request #7537: [AIRFLOW-6454] And script to benchmark scheduler dag-run time

2020-02-26 Thread GitBox
mik-laj commented on a change in pull request #7537: [AIRFLOW-6454] And script 
to benchmark scheduler dag-run time
URL: https://github.com/apache/airflow/pull/7537#discussion_r384442867
 
 

 ##
 File path: scripts/perf/scheduler_dag_execution_timing.py
 ##
 @@ -0,0 +1,221 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import gc
+import os
+import statistics
+import time
+
+import click
+
+
+class ShortCircutExecutorMixin:
+def __init__(self, stop_when_these_completed):
+super().__init__()
+self.reset(stop_when_these_completed)
+
+def reset(self, stop_when_these_completed):
+self.stop_when_these_completed = {
+# Store the date as a timestamp, as sometimes this is a Pendulum
+# object, others it is a datetime object.
+(run.dag_id, run.execution_date.timestamp()): run for run in 
stop_when_these_completed
+}
+
+def change_state(self, key, state):
+from airflow.utils.state import State
+super().change_state(key, state)
+
+dag_id, task_id, execution_date, __ = key
+run_key = (dag_id, execution_date.timestamp())
+run = self.stop_when_these_completed.get(run_key, None)
+if run and all(t.state == State.SUCCESS for t in 
run.get_task_instances()):
+self.stop_when_these_completed.pop(run_key)
+
+if not self.stop_when_these_completed:
+self.log.warning("STOPPING SCHEDULER -- all runs complete")
+self.scheduler_job.processor_agent._done = True
+else:
+self.log.warning("WAITING ON %d RUNS", 
len(self.stop_when_these_completed))
+elif state == State.SUCCESS:
+self.log.warning("WAITING ON %d RUNS", 
len(self.stop_when_these_completed))
+
+
+def get_executor_under_test():
+try:
+# Run against master and 1.10.x releases
+from tests.test_utils.mock_executor import MockExecutor
+except ImportError:
+from tests.executors.test_executor import TestExecutor as MockExecutor
+
+# from airflow.executors.local_executor import LocalExecutor
+
+# Change this to try other executors
+Executor = MockExecutor
+
+class ShortCircutExecutor(ShortCircutExecutorMixin, Executor):
+pass
+
+return ShortCircutExecutor
+
+
+def reset_dag(dag, num_runs, session):
+import airflow.models
+from airflow.utils import timezone
+from airflow.utils.state import State
+
+DR = airflow.models.DagRun
+DM = airflow.models.DagModel
+TI = airflow.models.TaskInstance
+TF = airflow.models.TaskFail
+dag_id = dag.dag_id
+
+session.query(DM).filter(DM.dag_id == dag_id).update({'is_paused': False})
+session.query(DR).filter(DR.dag_id == dag_id).delete()
+session.query(TI).filter(TI.dag_id == dag_id).delete()
+session.query(TF).filter(TF.dag_id == dag_id).delete()
+
+next_run_date = dag.normalize_schedule(dag.start_date or min(t.start_date 
for t in dag.tasks))
+
+for _ in range(num_runs):
+next_run = dag.create_dagrun(
+run_id=DR.ID_PREFIX + next_run_date.isoformat(),
+execution_date=next_run_date,
+start_date=timezone.utcnow(),
+state=State.RUNNING,
+external_trigger=False,
+session=session,
+)
+next_run_date = dag.following_schedule(next_run_date)
+return next_run
+
+
+def pause_all_dags(session):
+from airflow.models.dag import DagModel
+session.query(DagModel).update({'is_paused': True})
+
+
+@click.command()
+@click.option('--num-runs', default=1, help='number of DagRun, to run for each 
DAG')
+@click.option('--repeat', default=3, help='number of times to run test, to 
reduce variance')
+@click.argument('dag_ids', required=True, nargs=-1)
+def main(num_runs, repeat, dag_ids):
+"""
+This script will run the SchedulerJob for the specified dags "to 
completion".
+
+That is it creates a fixed number of DAG runs for the specified DAGs (from
+the configured dag path/example dags etc), disable the scheduler from
+creating more, and then monitor them for completion. When the file task of
+the final dag run is comple

[GitHub] [airflow] mik-laj commented on a change in pull request #7537: [AIRFLOW-6454] And script to benchmark scheduler dag-run time

2020-02-26 Thread GitBox
mik-laj commented on a change in pull request #7537: [AIRFLOW-6454] And script 
to benchmark scheduler dag-run time
URL: https://github.com/apache/airflow/pull/7537#discussion_r384442175
 
 

 ##
 File path: scripts/perf/scheduler_dag_execution_timing.py
 ##
 @@ -0,0 +1,221 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import gc
+import os
+import statistics
+import time
+
+import click
+
+
+class ShortCircutExecutorMixin:
+def __init__(self, stop_when_these_completed):
+super().__init__()
+self.reset(stop_when_these_completed)
+
+def reset(self, stop_when_these_completed):
+self.stop_when_these_completed = {
+# Store the date as a timestamp, as sometimes this is a Pendulum
+# object, others it is a datetime object.
+(run.dag_id, run.execution_date.timestamp()): run for run in 
stop_when_these_completed
+}
+
+def change_state(self, key, state):
+from airflow.utils.state import State
+super().change_state(key, state)
+
+dag_id, task_id, execution_date, __ = key
+run_key = (dag_id, execution_date.timestamp())
+run = self.stop_when_these_completed.get(run_key, None)
+if run and all(t.state == State.SUCCESS for t in 
run.get_task_instances()):
+self.stop_when_these_completed.pop(run_key)
+
+if not self.stop_when_these_completed:
+self.log.warning("STOPPING SCHEDULER -- all runs complete")
+self.scheduler_job.processor_agent._done = True
+else:
+self.log.warning("WAITING ON %d RUNS", 
len(self.stop_when_these_completed))
+elif state == State.SUCCESS:
+self.log.warning("WAITING ON %d RUNS", 
len(self.stop_when_these_completed))
+
+
+def get_executor_under_test():
+try:
+# Run against master and 1.10.x releases
+from tests.test_utils.mock_executor import MockExecutor
+except ImportError:
+from tests.executors.test_executor import TestExecutor as MockExecutor
+
+# from airflow.executors.local_executor import LocalExecutor
+
+# Change this to try other executors
+Executor = MockExecutor
+
+class ShortCircutExecutor(ShortCircutExecutorMixin, Executor):
+pass
+
+return ShortCircutExecutor
+
+
+def reset_dag(dag, num_runs, session):
+import airflow.models
+from airflow.utils import timezone
+from airflow.utils.state import State
+
+DR = airflow.models.DagRun
+DM = airflow.models.DagModel
+TI = airflow.models.TaskInstance
+TF = airflow.models.TaskFail
+dag_id = dag.dag_id
+
+session.query(DM).filter(DM.dag_id == dag_id).update({'is_paused': False})
+session.query(DR).filter(DR.dag_id == dag_id).delete()
+session.query(TI).filter(TI.dag_id == dag_id).delete()
+session.query(TF).filter(TF.dag_id == dag_id).delete()
+
+next_run_date = dag.normalize_schedule(dag.start_date or min(t.start_date 
for t in dag.tasks))
+
+for _ in range(num_runs):
+next_run = dag.create_dagrun(
+run_id=DR.ID_PREFIX + next_run_date.isoformat(),
+execution_date=next_run_date,
+start_date=timezone.utcnow(),
+state=State.RUNNING,
+external_trigger=False,
+session=session,
+)
+next_run_date = dag.following_schedule(next_run_date)
+return next_run
+
+
+def pause_all_dags(session):
+from airflow.models.dag import DagModel
+session.query(DagModel).update({'is_paused': True})
+
+
+@click.command()
+@click.option('--num-runs', default=1, help='number of DagRun, to run for each 
DAG')
+@click.option('--repeat', default=3, help='number of times to run test, to 
reduce variance')
+@click.argument('dag_ids', required=True, nargs=-1)
+def main(num_runs, repeat, dag_ids):
+"""
+This script will run the SchedulerJob for the specified dags "to 
completion".
+
+That is it creates a fixed number of DAG runs for the specified DAGs (from
+the configured dag path/example dags etc), disable the scheduler from
+creating more, and then monitor them for completion. When the file task of
+the final dag run is comple

[GitHub] [airflow] mik-laj commented on a change in pull request #7537: [AIRFLOW-6454] And script to benchmark scheduler dag-run time

2020-02-26 Thread GitBox
mik-laj commented on a change in pull request #7537: [AIRFLOW-6454] And script 
to benchmark scheduler dag-run time
URL: https://github.com/apache/airflow/pull/7537#discussion_r384440788
 
 

 ##
 File path: scripts/perf/scheduler_dag_execution_timing.py
 ##
 @@ -0,0 +1,221 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import gc
+import os
+import statistics
+import time
+
+import click
+
+
+class ShortCircutExecutorMixin:
+def __init__(self, stop_when_these_completed):
+super().__init__()
+self.reset(stop_when_these_completed)
+
+def reset(self, stop_when_these_completed):
+self.stop_when_these_completed = {
+# Store the date as a timestamp, as sometimes this is a Pendulum
+# object, others it is a datetime object.
+(run.dag_id, run.execution_date.timestamp()): run for run in 
stop_when_these_completed
+}
+
+def change_state(self, key, state):
+from airflow.utils.state import State
+super().change_state(key, state)
+
+dag_id, task_id, execution_date, __ = key
+run_key = (dag_id, execution_date.timestamp())
+run = self.stop_when_these_completed.get(run_key, None)
+if run and all(t.state == State.SUCCESS for t in 
run.get_task_instances()):
+self.stop_when_these_completed.pop(run_key)
+
+if not self.stop_when_these_completed:
+self.log.warning("STOPPING SCHEDULER -- all runs complete")
+self.scheduler_job.processor_agent._done = True
+else:
+self.log.warning("WAITING ON %d RUNS", 
len(self.stop_when_these_completed))
+elif state == State.SUCCESS:
+self.log.warning("WAITING ON %d RUNS", 
len(self.stop_when_these_completed))
+
+
+def get_executor_under_test():
+try:
+# Run against master and 1.10.x releases
+from tests.test_utils.mock_executor import MockExecutor
+except ImportError:
+from tests.executors.test_executor import TestExecutor as MockExecutor
+
+# from airflow.executors.local_executor import LocalExecutor
+
+# Change this to try other executors
+Executor = MockExecutor
+
+class ShortCircutExecutor(ShortCircutExecutorMixin, Executor):
+pass
+
+return ShortCircutExecutor
+
+
+def reset_dag(dag, num_runs, session):
+import airflow.models
+from airflow.utils import timezone
+from airflow.utils.state import State
+
+DR = airflow.models.DagRun
+DM = airflow.models.DagModel
+TI = airflow.models.TaskInstance
+TF = airflow.models.TaskFail
+dag_id = dag.dag_id
+
+session.query(DM).filter(DM.dag_id == dag_id).update({'is_paused': False})
+session.query(DR).filter(DR.dag_id == dag_id).delete()
+session.query(TI).filter(TI.dag_id == dag_id).delete()
+session.query(TF).filter(TF.dag_id == dag_id).delete()
+
+next_run_date = dag.normalize_schedule(dag.start_date or min(t.start_date 
for t in dag.tasks))
+
+for _ in range(num_runs):
+next_run = dag.create_dagrun(
+run_id=DR.ID_PREFIX + next_run_date.isoformat(),
+execution_date=next_run_date,
+start_date=timezone.utcnow(),
+state=State.RUNNING,
+external_trigger=False,
+session=session,
+)
+next_run_date = dag.following_schedule(next_run_date)
+return next_run
+
+
+def pause_all_dags(session):
+from airflow.models.dag import DagModel
+session.query(DagModel).update({'is_paused': True})
+
+
+@click.command()
+@click.option('--num-runs', default=1, help='number of DagRun, to run for each 
DAG')
+@click.option('--repeat', default=3, help='number of times to run test, to 
reduce variance')
+@click.argument('dag_ids', required=True, nargs=-1)
+def main(num_runs, repeat, dag_ids):
+"""
+This script will run the SchedulerJob for the specified dags "to 
completion".
+
+That is it creates a fixed number of DAG runs for the specified DAGs (from
+the configured dag path/example dags etc), disable the scheduler from
+creating more, and then monitor them for completion. When the file task of
+the final dag run is comple

[GitHub] [airflow] mik-laj commented on a change in pull request #7537: [AIRFLOW-6454] And script to benchmark scheduler dag-run time

2020-02-26 Thread GitBox
mik-laj commented on a change in pull request #7537: [AIRFLOW-6454] And script 
to benchmark scheduler dag-run time
URL: https://github.com/apache/airflow/pull/7537#discussion_r384440299
 
 

 ##
 File path: scripts/perf/scheduler_dag_execution_timing.py
 ##
 @@ -0,0 +1,221 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import gc
+import os
+import statistics
+import time
+
+import click
+
+
+class ShortCircutExecutorMixin:
+def __init__(self, stop_when_these_completed):
+super().__init__()
+self.reset(stop_when_these_completed)
+
+def reset(self, stop_when_these_completed):
+self.stop_when_these_completed = {
+# Store the date as a timestamp, as sometimes this is a Pendulum
+# object, others it is a datetime object.
+(run.dag_id, run.execution_date.timestamp()): run for run in 
stop_when_these_completed
+}
+
+def change_state(self, key, state):
+from airflow.utils.state import State
+super().change_state(key, state)
+
+dag_id, task_id, execution_date, __ = key
+run_key = (dag_id, execution_date.timestamp())
+run = self.stop_when_these_completed.get(run_key, None)
+if run and all(t.state == State.SUCCESS for t in 
run.get_task_instances()):
+self.stop_when_these_completed.pop(run_key)
+
+if not self.stop_when_these_completed:
+self.log.warning("STOPPING SCHEDULER -- all runs complete")
+self.scheduler_job.processor_agent._done = True
+else:
+self.log.warning("WAITING ON %d RUNS", 
len(self.stop_when_these_completed))
+elif state == State.SUCCESS:
+self.log.warning("WAITING ON %d RUNS", 
len(self.stop_when_these_completed))
+
+
+def get_executor_under_test():
+try:
+# Run against master and 1.10.x releases
+from tests.test_utils.mock_executor import MockExecutor
+except ImportError:
+from tests.executors.test_executor import TestExecutor as MockExecutor
+
+# from airflow.executors.local_executor import LocalExecutor
+
+# Change this to try other executors
+Executor = MockExecutor
+
+class ShortCircutExecutor(ShortCircutExecutorMixin, Executor):
+pass
+
+return ShortCircutExecutor
+
+
+def reset_dag(dag, num_runs, session):
+import airflow.models
+from airflow.utils import timezone
+from airflow.utils.state import State
+
+DR = airflow.models.DagRun
+DM = airflow.models.DagModel
+TI = airflow.models.TaskInstance
+TF = airflow.models.TaskFail
+dag_id = dag.dag_id
+
+session.query(DM).filter(DM.dag_id == dag_id).update({'is_paused': False})
+session.query(DR).filter(DR.dag_id == dag_id).delete()
+session.query(TI).filter(TI.dag_id == dag_id).delete()
+session.query(TF).filter(TF.dag_id == dag_id).delete()
+
+next_run_date = dag.normalize_schedule(dag.start_date or min(t.start_date 
for t in dag.tasks))
+
+for _ in range(num_runs):
+next_run = dag.create_dagrun(
+run_id=DR.ID_PREFIX + next_run_date.isoformat(),
+execution_date=next_run_date,
+start_date=timezone.utcnow(),
+state=State.RUNNING,
+external_trigger=False,
+session=session,
+)
+next_run_date = dag.following_schedule(next_run_date)
+return next_run
+
+
+def pause_all_dags(session):
+from airflow.models.dag import DagModel
+session.query(DagModel).update({'is_paused': True})
+
+
+@click.command()
+@click.option('--num-runs', default=1, help='number of DagRun, to run for each 
DAG')
+@click.option('--repeat', default=3, help='number of times to run test, to 
reduce variance')
+@click.argument('dag_ids', required=True, nargs=-1)
+def main(num_runs, repeat, dag_ids):
+"""
+This script will run the SchedulerJob for the specified dags "to 
completion".
+
+That is it creates a fixed number of DAG runs for the specified DAGs (from
+the configured dag path/example dags etc), disable the scheduler from
+creating more, and then monitor them for completion. When the file task of
+the final dag run is comple

[GitHub] [airflow] mik-laj commented on a change in pull request #7537: [AIRFLOW-6454] And script to benchmark scheduler dag-run time

2020-02-26 Thread GitBox
mik-laj commented on a change in pull request #7537: [AIRFLOW-6454] And script 
to benchmark scheduler dag-run time
URL: https://github.com/apache/airflow/pull/7537#discussion_r384362406
 
 

 ##
 File path: scripts/perf/scheduler_dag_execution_timing.py
 ##
 @@ -0,0 +1,221 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import gc
+import os
+import statistics
+import time
+
+import click
+
+
+class ShortCircutExecutorMixin:
+def __init__(self, stop_when_these_completed):
+super().__init__()
+self.reset(stop_when_these_completed)
+
+def reset(self, stop_when_these_completed):
+self.stop_when_these_completed = {
+# Store the date as a timestamp, as sometimes this is a Pendulum
+# object, others it is a datetime object.
+(run.dag_id, run.execution_date.timestamp()): run for run in 
stop_when_these_completed
+}
+
+def change_state(self, key, state):
+from airflow.utils.state import State
+super().change_state(key, state)
+
+dag_id, task_id, execution_date, __ = key
+run_key = (dag_id, execution_date.timestamp())
+run = self.stop_when_these_completed.get(run_key, None)
+if run and all(t.state == State.SUCCESS for t in 
run.get_task_instances()):
+self.stop_when_these_completed.pop(run_key)
+
+if not self.stop_when_these_completed:
+self.log.warning("STOPPING SCHEDULER -- all runs complete")
+self.scheduler_job.processor_agent._done = True
+else:
+self.log.warning("WAITING ON %d RUNS", 
len(self.stop_when_these_completed))
+elif state == State.SUCCESS:
+self.log.warning("WAITING ON %d RUNS", 
len(self.stop_when_these_completed))
+
+
+def get_executor_under_test():
+try:
+# Run against master and 1.10.x releases
+from tests.test_utils.mock_executor import MockExecutor
+except ImportError:
+from tests.executors.test_executor import TestExecutor as MockExecutor
+
+# from airflow.executors.local_executor import LocalExecutor
+
+# Change this to try other executors
+Executor = MockExecutor
+
+class ShortCircutExecutor(ShortCircutExecutorMixin, Executor):
+pass
+
+return ShortCircutExecutor
+
+
+def reset_dag(dag, num_runs, session):
+import airflow.models
+from airflow.utils import timezone
+from airflow.utils.state import State
+
+DR = airflow.models.DagRun
+DM = airflow.models.DagModel
+TI = airflow.models.TaskInstance
+TF = airflow.models.TaskFail
+dag_id = dag.dag_id
+
+session.query(DM).filter(DM.dag_id == dag_id).update({'is_paused': False})
+session.query(DR).filter(DR.dag_id == dag_id).delete()
+session.query(TI).filter(TI.dag_id == dag_id).delete()
+session.query(TF).filter(TF.dag_id == dag_id).delete()
+
+next_run_date = dag.normalize_schedule(dag.start_date or min(t.start_date 
for t in dag.tasks))
+
+for _ in range(num_runs):
+next_run = dag.create_dagrun(
+run_id=DR.ID_PREFIX + next_run_date.isoformat(),
+execution_date=next_run_date,
+start_date=timezone.utcnow(),
+state=State.RUNNING,
+external_trigger=False,
+session=session,
+)
+next_run_date = dag.following_schedule(next_run_date)
+return next_run
+
+
+def pause_all_dags(session):
+from airflow.models.dag import DagModel
+session.query(DagModel).update({'is_paused': True})
+
+
+@click.command()
+@click.option('--num-runs', default=1, help='number of DagRun, to run for each 
DAG')
+@click.option('--repeat', default=3, help='number of times to run test, to 
reduce variance')
+@click.argument('dag_ids', required=True, nargs=-1)
+def main(num_runs, repeat, dag_ids):
+"""
+This script will run the SchedulerJob for the specified dags "to 
completion".
+
+That is it creates a fixed number of DAG runs for the specified DAGs (from
+the configured dag path/example dags etc), disable the scheduler from
+creating more, and then monitor them for completion. When the file task of
+the final dag run is comple

[GitHub] [airflow] mik-laj commented on a change in pull request #7537: [AIRFLOW-6454] And script to benchmark scheduler dag-run time

2020-02-26 Thread GitBox
mik-laj commented on a change in pull request #7537: [AIRFLOW-6454] And script 
to benchmark scheduler dag-run time
URL: https://github.com/apache/airflow/pull/7537#discussion_r384352337
 
 

 ##
 File path: scripts/perf/scheduler_dag_execution_timing.py
 ##
 @@ -0,0 +1,221 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import gc
+import os
+import statistics
+import time
+
+import click
+
+
+class ShortCircutExecutorMixin:
+def __init__(self, stop_when_these_completed):
+super().__init__()
+self.reset(stop_when_these_completed)
+
+def reset(self, stop_when_these_completed):
+self.stop_when_these_completed = {
+# Store the date as a timestamp, as sometimes this is a Pendulum
+# object, others it is a datetime object.
+(run.dag_id, run.execution_date.timestamp()): run for run in 
stop_when_these_completed
+}
+
+def change_state(self, key, state):
+from airflow.utils.state import State
+super().change_state(key, state)
+
+dag_id, task_id, execution_date, __ = key
+run_key = (dag_id, execution_date.timestamp())
+run = self.stop_when_these_completed.get(run_key, None)
+if run and all(t.state == State.SUCCESS for t in 
run.get_task_instances()):
+self.stop_when_these_completed.pop(run_key)
+
+if not self.stop_when_these_completed:
+self.log.warning("STOPPING SCHEDULER -- all runs complete")
+self.scheduler_job.processor_agent._done = True
+else:
+self.log.warning("WAITING ON %d RUNS", 
len(self.stop_when_these_completed))
+elif state == State.SUCCESS:
+self.log.warning("WAITING ON %d RUNS", 
len(self.stop_when_these_completed))
+
+
+def get_executor_under_test():
+try:
+# Run against master and 1.10.x releases
+from tests.test_utils.mock_executor import MockExecutor
+except ImportError:
+from tests.executors.test_executor import TestExecutor as MockExecutor
+
+# from airflow.executors.local_executor import LocalExecutor
+
+# Change this to try other executors
+Executor = MockExecutor
+
+class ShortCircutExecutor(ShortCircutExecutorMixin, Executor):
+pass
+
+return ShortCircutExecutor
+
+
+def reset_dag(dag, num_runs, session):
+import airflow.models
+from airflow.utils import timezone
+from airflow.utils.state import State
+
+DR = airflow.models.DagRun
+DM = airflow.models.DagModel
+TI = airflow.models.TaskInstance
+TF = airflow.models.TaskFail
+dag_id = dag.dag_id
+
+session.query(DM).filter(DM.dag_id == dag_id).update({'is_paused': False})
+session.query(DR).filter(DR.dag_id == dag_id).delete()
+session.query(TI).filter(TI.dag_id == dag_id).delete()
+session.query(TF).filter(TF.dag_id == dag_id).delete()
+
+next_run_date = dag.normalize_schedule(dag.start_date or min(t.start_date 
for t in dag.tasks))
+
+for _ in range(num_runs):
+next_run = dag.create_dagrun(
+run_id=DR.ID_PREFIX + next_run_date.isoformat(),
+execution_date=next_run_date,
+start_date=timezone.utcnow(),
+state=State.RUNNING,
+external_trigger=False,
+session=session,
+)
+next_run_date = dag.following_schedule(next_run_date)
+return next_run
+
+
+def pause_all_dags(session):
+from airflow.models.dag import DagModel
+session.query(DagModel).update({'is_paused': True})
+
+
+@click.command()
+@click.option('--num-runs', default=1, help='number of DagRun, to run for each 
DAG')
+@click.option('--repeat', default=3, help='number of times to run test, to 
reduce variance')
+@click.argument('dag_ids', required=True, nargs=-1)
+def main(num_runs, repeat, dag_ids):
+"""
+This script will run the SchedulerJob for the specified dags "to 
completion".
+
+That is it creates a fixed number of DAG runs for the specified DAGs (from
+the configured dag path/example dags etc), disable the scheduler from
+creating more, and then monitor them for completion. When the file task of
+the final dag run is comple

[GitHub] [airflow] mik-laj commented on a change in pull request #7537: [AIRFLOW-6454] And script to benchmark scheduler dag-run time

2020-02-25 Thread GitBox
mik-laj commented on a change in pull request #7537: [AIRFLOW-6454] And script 
to benchmark scheduler dag-run time
URL: https://github.com/apache/airflow/pull/7537#discussion_r384258420
 
 

 ##
 File path: scripts/perf/scheduler_dag_execution_timing.py
 ##
 @@ -0,0 +1,221 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import gc
+import os
+import statistics
+import time
+
+import click
+
+
+class ShortCircutExecutorMixin:
+def __init__(self, stop_when_these_completed):
+super().__init__()
+self.reset(stop_when_these_completed)
+
+def reset(self, stop_when_these_completed):
+self.stop_when_these_completed = {
+# Store the date as a timestamp, as sometimes this is a Pendulum
+# object, others it is a datetime object.
+(run.dag_id, run.execution_date.timestamp()): run for run in 
stop_when_these_completed
+}
+
+def change_state(self, key, state):
+from airflow.utils.state import State
+super().change_state(key, state)
+
+dag_id, task_id, execution_date, __ = key
+run_key = (dag_id, execution_date.timestamp())
+run = self.stop_when_these_completed.get(run_key, None)
+if run and all(t.state == State.SUCCESS for t in 
run.get_task_instances()):
+self.stop_when_these_completed.pop(run_key)
+
+if not self.stop_when_these_completed:
+self.log.warning("STOPPING SCHEDULER -- all runs complete")
+self.scheduler_job.processor_agent._done = True
+else:
+self.log.warning("WAITING ON %d RUNS", 
len(self.stop_when_these_completed))
+elif state == State.SUCCESS:
+self.log.warning("WAITING ON %d RUNS", 
len(self.stop_when_these_completed))
+
+
+def get_executor_under_test():
+try:
+# Run against master and 1.10.x releases
+from tests.test_utils.mock_executor import MockExecutor
+except ImportError:
+from tests.executors.test_executor import TestExecutor as MockExecutor
+
+# from airflow.executors.local_executor import LocalExecutor
+
+# Change this to try other executors
+Executor = MockExecutor
+
+class ShortCircutExecutor(ShortCircutExecutorMixin, Executor):
+pass
+
+return ShortCircutExecutor
+
+
+def reset_dag(dag, num_runs, session):
+import airflow.models
+from airflow.utils import timezone
+from airflow.utils.state import State
+
+DR = airflow.models.DagRun
+DM = airflow.models.DagModel
+TI = airflow.models.TaskInstance
+TF = airflow.models.TaskFail
+dag_id = dag.dag_id
+
+session.query(DM).filter(DM.dag_id == dag_id).update({'is_paused': False})
+session.query(DR).filter(DR.dag_id == dag_id).delete()
+session.query(TI).filter(TI.dag_id == dag_id).delete()
+session.query(TF).filter(TF.dag_id == dag_id).delete()
+
+next_run_date = dag.normalize_schedule(dag.start_date or min(t.start_date 
for t in dag.tasks))
+
+for _ in range(num_runs):
+next_run = dag.create_dagrun(
+run_id=DR.ID_PREFIX + next_run_date.isoformat(),
+execution_date=next_run_date,
+start_date=timezone.utcnow(),
+state=State.RUNNING,
+external_trigger=False,
+session=session,
+)
+next_run_date = dag.following_schedule(next_run_date)
+return next_run
+
+
+def pause_all_dags(session):
+from airflow.models.dag import DagModel
+session.query(DagModel).update({'is_paused': True})
+
+
+@click.command()
+@click.option('--num-runs', default=1, help='number of DagRun, to run for each 
DAG')
+@click.option('--repeat', default=3, help='number of times to run test, to 
reduce variance')
+@click.argument('dag_ids', required=True, nargs=-1)
+def main(num_runs, repeat, dag_ids):
+"""
+This script will run the SchedulerJob for the specified dags "to 
completion".
+
+That is it creates a fixed number of DAG runs for the specified DAGs (from
+the configured dag path/example dags etc), disable the scheduler from
+creating more, and then monitor them for completion. When the file task of
+the final dag run is comple