kaxil commented on a change in pull request #12663:
URL: https://github.com/apache/airflow/pull/12663#discussion_r531778387
##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -477,6 +477,104 @@ def test_find_executable_task_instances_none(self):
states=[State.SCHEDULED],
session=session)))
+ @parameterized.expand([
+ [State.NONE, None, None],
+ [State.UP_FOR_RETRY, timezone.utcnow() -
datetime.timedelta(minutes=30),
+ timezone.utcnow() - datetime.timedelta(minutes=15)],
+ [State.UP_FOR_RESCHEDULE, timezone.utcnow() -
datetime.timedelta(minutes=30),
+ timezone.utcnow() - datetime.timedelta(minutes=15)],
+ ])
+ def test_process_task_instances_with_task_concurrency(
+ self, state, start_date, end_date,
+ ):
+ """
+ Test if _process_task_instances puts the right task instances into the
+ mock_list.
+ """
+ dag = DAG(
+ dag_id='test_scheduler_process_execute_task_with_task_concurrency',
+ start_date=DEFAULT_DATE)
+ dag_task1 = DummyOperator(
+ task_id='dummy',
+ task_concurrency=2,
+ dag=dag,
+ owner='airflow')
+
+ with create_session() as session:
+ orm_dag = DagModel(dag_id=dag.dag_id)
+ session.merge(orm_dag)
+
+ scheduler_job = SchedulerJob()
+ dag.clear()
+ dr = scheduler_job.create_dag_run(dag)
+ self.assertIsNotNone(dr)
+
+ with create_session() as session:
+ tis = dr.get_task_instances(session=session)
+ for ti in tis:
+ ti.state = state
+ ti.start_date = start_date
+ ti.end_date = end_date
+
+ ti_to_schedule = []
+ scheduler_job._process_task_instances(dag,
task_instances_list=ti_to_schedule)
+
+ assert ti_to_schedule == [
+ (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER),
+ ]
+
+ @parameterized.expand([
+ [State.NONE, None, None],
+ [State.UP_FOR_RETRY, timezone.utcnow() -
datetime.timedelta(minutes=30),
+ timezone.utcnow() - datetime.timedelta(minutes=15)],
+ [State.UP_FOR_RESCHEDULE, timezone.utcnow() -
datetime.timedelta(minutes=30),
+ timezone.utcnow() - datetime.timedelta(minutes=15)],
+ ])
+ def test_process_task_instances_depends_on_past(self, state, start_date,
end_date):
+ """
+ Test if _process_task_instances puts the right task instances into the
+ mock_list.
+ """
+ dag = DAG(
+ dag_id='test_scheduler_process_execute_task_depends_on_past',
+ start_date=DEFAULT_DATE,
+ default_args={
+ 'depends_on_past': True,
+ },
+ )
+ dag_task1 = DummyOperator(
+ task_id='dummy1',
+ dag=dag,
+ owner='airflow')
+ dag_task2 = DummyOperator(
+ task_id='dummy2',
+ dag=dag,
+ owner='airflow')
+
+ with create_session() as session:
+ orm_dag = DagModel(dag_id=dag.dag_id)
+ session.merge(orm_dag)
+
+ scheduler_job = SchedulerJob()
+ dag.clear()
+ dr = scheduler_job.create_dag_run(dag)
+ self.assertIsNotNone(dr)
+
+ with create_session() as session:
+ tis = dr.get_task_instances(session=session)
+ for ti in tis:
+ ti.state = state
+ ti.start_date = start_date
+ ti.end_date = end_date
+
+ ti_to_schedule = []
+ scheduler_job._process_task_instances(dag,
task_instances_list=ti_to_schedule)
+
+ assert ti_to_schedule == [
+ (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER),
+ (dag.dag_id, dag_task2.task_id, DEFAULT_DATE, TRY_NUMBER),
+ ]
Review comment:
argghh Python 2.7 & 3.5 I guess because of the ordering
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]