[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16358: Only create dagruns when max_active_runs is not reached

2021-06-10 Thread GitBox


ephraimbuddy commented on a change in pull request #16358:
URL: https://github.com/apache/airflow/pull/16358#discussion_r649364776



##
File path: airflow/jobs/scheduler_job.py
##
@@ -1593,7 +1592,9 @@ def _create_dag_runs(self, dag_models: 
Iterable[DagModel], session: Session) ->
 # we need to run self._update_dag_next_dagruns if the Dag Run 
already exists or if we
 # create a new one. This is so that in the next Scheduling loop we 
try to create new runs
 # instead of falling in a loop of Integrity Error.
-if (dag.dag_id, dag_model.next_dagrun) not in active_dagruns:
+if (dag.dag_id, dag_model.next_dagrun) not in active_dagruns and 
len(
+active_dagruns
+) < dag.max_active_runs:

Review comment:
   I have corrected it. Surprised why it was working previously. Thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16358: Only create dagruns when max_active_runs is not reached

2021-06-10 Thread GitBox


ephraimbuddy commented on a change in pull request #16358:
URL: https://github.com/apache/airflow/pull/16358#discussion_r649047629



##
File path: tests/jobs/test_scheduler_job.py
##
@@ -3864,6 +3864,102 @@ def 
test_scheduler_create_dag_runs_check_existing_run(self):
 assert dag_model.next_dagrun == DEFAULT_DATE + timedelta(days=1)
 session.rollback()
 
+def 
test_scheduler_loop_dont_create_dagruns_when_max_active_runs_is_reached(self):
+"""
+Test that if max_active_runs is reached, scheduler loop do not create 
extra dagruns.
+
+With max_active_runs=1, scheduler loop won't create extra dagrun if 
there's a running
+dagrun
+"""
+dag = DAG(
+dag_id='test_scheduler_max_active_runs_1',
+start_date=DEFAULT_DATE,
+schedule_interval=timedelta(seconds=1),
+max_active_runs=1,
+)
+
+BashOperator(
+task_id='dummy',
+bash_command="sleep 10",
+dag=dag,
+)
+dag2 = DAG(
+dag_id='test_scheduler_max_active_runs_2',
+start_date=DEFAULT_DATE,
+schedule_interval=timedelta(seconds=1),
+max_active_runs=2,
+)
+
+BashOperator(
+task_id='dummy',
+bash_command="sleep 10",
+dag=dag2,
+)
+
+session = settings.Session()
+assert dag.get_last_dagrun(session) is None
+
+dagbag = DagBag(
+dag_folder=os.devnull,
+include_examples=False,
+read_dags_from_db=False,
+)
+dagbag.bag_dag(dag=dag, root_dag=dag)
+dagbag.bag_dag(dag=dag2, root_dag=dag2)
+
+# Create DagModel
+DAG.bulk_write_to_db(dagbag.dags.values())
+dag_model = DagModel.get_dagmodel(dag.dag_id)
+
+# Assert dag_model.next_dagrun is set correctly
+assert dag_model.next_dagrun == DEFAULT_DATE
+
+dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+dag2 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag2))
+
+dagrun = dag.create_dagrun(
+run_type=DagRunType.SCHEDULED,
+execution_date=dag_model.next_dagrun,
+start_date=timezone.utcnow(),
+state=State.RUNNING,
+external_trigger=False,
+session=session,
+creating_job_id=2,
+)
+dagrun2 = dag2.create_dagrun(
+run_type=DagRunType.SCHEDULED,
+execution_date=dag_model.next_dagrun,
+start_date=timezone.utcnow(),
+state=State.RUNNING,
+external_trigger=False,
+session=session,
+creating_job_id=2,
+)
+session.flush()
+
+assert dag.get_last_dagrun(session) == dagrun
+assert dag2.get_last_dagrun(session) == dagrun2
+
+# This poll interval is large, bug the scheduler doesn't sleep that
+# long, instead we hit the update_dagrun_state_for_paused_dag_interval 
instead

Review comment:
   ```suggestion
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org