[ https://issues.apache.org/jira/browse/AIRFLOW-6834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Qian Yu updated AIRFLOW-6834: ----------------------------- Description: test_scheduler_job.py has a few flaky tests. Some are marked with pytest.mark.xfail, but this one is not marked flaky. It sometimes fails in Travis. Example 1: {code:python} ============================================================ FAILURES ============================================================ _____________________ TestDagFileProcessor.test_dag_file_processor_process_task_instances_depends_on_past_0 ______________________ a = (<tests.jobs.test_scheduler_job.TestDagFileProcessor testMethod=test_dag_file_processor_process_task_instances_depends_on_past_0>,) @wraps(func) def standalone_func(*a): > return func(*(a + p.args), **p.kwargs) /usr/local/lib/python3.6/site-packages/parameterized/parameterized.py:518: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <tests.jobs.test_scheduler_job.TestDagFileProcessor testMethod=test_dag_file_processor_process_task_instances_depends_on_past_0> state = None, start_date = None, end_date = None @parameterized.expand([ [State.NONE, None, None], [State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30), timezone.utcnow() - datetime.timedelta(minutes=15)], [State.UP_FOR_RESCHEDULE, timezone.utcnow() - datetime.timedelta(minutes=30), timezone.utcnow() - datetime.timedelta(minutes=15)], ]) def test_dag_file_processor_process_task_instances_depends_on_past(self, state, start_date, end_date): """ Test if _process_task_instances puts the right task instances into the mock_list. """ dag = DAG( dag_id='test_scheduler_process_execute_task_depends_on_past', start_date=DEFAULT_DATE, default_args={ 'depends_on_past': True, }, ) dag_task1 = DummyOperator( task_id='dummy1', dag=dag, owner='airflow') dag_task2 = DummyOperator( task_id='dummy2', dag=dag, owner='airflow') with create_session() as session: orm_dag = DagModel(dag_id=dag.dag_id) session.merge(orm_dag) dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) dag.clear() dr = dag_file_processor.create_dag_run(dag) self.assertIsNotNone(dr) with create_session() as session: tis = dr.get_task_instances(session=session) for ti in tis: ti.state = state ti.start_date = start_date ti.end_date = end_date ti_to_schedule = [] dag_file_processor._process_task_instances(dag, task_instances_list=ti_to_schedule) > assert ti_to_schedule == [ (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER), (dag.dag_id, dag_task2.task_id, DEFAULT_DATE, TRY_NUMBER), ] E AssertionError: assert [('test_scheduler_process_execute_task_depends_on_past',\n 'dummy2',\n datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),\n 1),\n ('test_scheduler_process_execute_task_depends_on_past',\n 'dummy1',\n datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),\n 1)] == [('test_scheduler_process_execute_task_depends_on_past',\n 'dummy1',\n datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n 1),\n ('test_scheduler_process_execute_task_depends_on_past',\n 'dummy2',\n datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n 1)] E At index 0 diff: ('test_scheduler_process_execute_task_depends_on_past', 'dummy2', datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) != ('test_scheduler_process_execute_task_depends_on_past', 'dummy1', datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), 1) E Full diff: E [ E ('test_scheduler_process_execute_task_depends_on_past', E - 'dummy2', E ? ^ E + 'dummy1', E ? ^ E - datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), E ? ---- --------------------- E + datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), E 1), E ('test_scheduler_process_execute_task_depends_on_past', E - 'dummy1', E ? ^ E + 'dummy2', E ? ^ E - datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), E ? ---- --------------------- E + datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), E 1), E ] tests/jobs/test_scheduler_job.py:565: AssertionError _______________ TestDagFileProcessor.test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry _______________ a = (<tests.jobs.test_scheduler_job.TestDagFileProcessor testMethod=test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry>,) @wraps(func) def standalone_func(*a): > return func(*(a + p.args), **p.kwargs) /usr/local/lib/python3.6/site-packages/parameterized/parameterized.py:518: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <tests.jobs.test_scheduler_job.TestDagFileProcessor testMethod=test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry> state = 'up_for_retry', start_date = datetime.datetime(2020, 2, 18, 13, 26, 42, 916304, tzinfo=<Timezone [UTC]>) end_date = datetime.datetime(2020, 2, 18, 13, 41, 42, 916315, tzinfo=<Timezone [UTC]>) @parameterized.expand([ [State.NONE, None, None], [State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30), timezone.utcnow() - datetime.timedelta(minutes=15)], [State.UP_FOR_RESCHEDULE, timezone.utcnow() - datetime.timedelta(minutes=30), timezone.utcnow() - datetime.timedelta(minutes=15)], ]) def test_dag_file_processor_process_task_instances_depends_on_past(self, state, start_date, end_date): """ Test if _process_task_instances puts the right task instances into the mock_list. """ dag = DAG( dag_id='test_scheduler_process_execute_task_depends_on_past', start_date=DEFAULT_DATE, default_args={ 'depends_on_past': True, }, ) dag_task1 = DummyOperator( task_id='dummy1', dag=dag, owner='airflow') dag_task2 = DummyOperator( task_id='dummy2', dag=dag, owner='airflow') with create_session() as session: orm_dag = DagModel(dag_id=dag.dag_id) session.merge(orm_dag) dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) dag.clear() dr = dag_file_processor.create_dag_run(dag) self.assertIsNotNone(dr) with create_session() as session: tis = dr.get_task_instances(session=session) for ti in tis: ti.state = state ti.start_date = start_date ti.end_date = end_date ti_to_schedule = [] dag_file_processor._process_task_instances(dag, task_instances_list=ti_to_schedule) > assert ti_to_schedule == [ (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER), (dag.dag_id, dag_task2.task_id, DEFAULT_DATE, TRY_NUMBER), ] E AssertionError: assert [('test_scheduler_process_execute_task_depends_on_past',\n 'dummy2',\n datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),\n 1),\n ('test_scheduler_process_execute_task_depends_on_past',\n 'dummy1',\n datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),\n 1)] == [('test_scheduler_process_execute_task_depends_on_past',\n 'dummy1',\n datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n 1),\n ('test_scheduler_process_execute_task_depends_on_past',\n 'dummy2',\n datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n 1)] E At index 0 diff: ('test_scheduler_process_execute_task_depends_on_past', 'dummy2', datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) != ('test_scheduler_process_execute_task_depends_on_past', 'dummy1', datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), 1) E Full diff: E [ E ('test_scheduler_process_execute_task_depends_on_past', E - 'dummy2', E ? ^ E + 'dummy1', E ? ^ E - datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), E ? ---- --------------------- E + datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), E 1), E ('test_scheduler_process_execute_task_depends_on_past', E - 'dummy1', E ? ^ E + 'dummy2', E ? ^ E - datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), E ? ---- --------------------- E + datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), E 1), E ] tests/jobs/test_scheduler_job.py:565: AssertionError ____________ TestDagFileProcessor.test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule _____________ a = (<tests.jobs.test_scheduler_job.TestDagFileProcessor testMethod=test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule>,) @wraps(func) def standalone_func(*a): > return func(*(a + p.args), **p.kwargs) /usr/local/lib/python3.6/site-packages/parameterized/parameterized.py:518: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <tests.jobs.test_scheduler_job.TestDagFileProcessor testMethod=test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule> state = 'up_for_reschedule', start_date = datetime.datetime(2020, 2, 18, 13, 26, 42, 916318, tzinfo=<Timezone [UTC]>) end_date = datetime.datetime(2020, 2, 18, 13, 41, 42, 916321, tzinfo=<Timezone [UTC]>) @parameterized.expand([ [State.NONE, None, None], [State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30), timezone.utcnow() - datetime.timedelta(minutes=15)], [State.UP_FOR_RESCHEDULE, timezone.utcnow() - datetime.timedelta(minutes=30), timezone.utcnow() - datetime.timedelta(minutes=15)], ]) def test_dag_file_processor_process_task_instances_depends_on_past(self, state, start_date, end_date): """ Test if _process_task_instances puts the right task instances into the mock_list. """ dag = DAG( dag_id='test_scheduler_process_execute_task_depends_on_past', start_date=DEFAULT_DATE, default_args={ 'depends_on_past': True, }, ) dag_task1 = DummyOperator( task_id='dummy1', dag=dag, owner='airflow') dag_task2 = DummyOperator( task_id='dummy2', dag=dag, owner='airflow') with create_session() as session: orm_dag = DagModel(dag_id=dag.dag_id) session.merge(orm_dag) dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) dag.clear() dr = dag_file_processor.create_dag_run(dag) self.assertIsNotNone(dr) with create_session() as session: tis = dr.get_task_instances(session=session) for ti in tis: ti.state = state ti.start_date = start_date ti.end_date = end_date ti_to_schedule = [] dag_file_processor._process_task_instances(dag, task_instances_list=ti_to_schedule) > assert ti_to_schedule == [ (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER), (dag.dag_id, dag_task2.task_id, DEFAULT_DATE, TRY_NUMBER), ] E AssertionError: assert [('test_scheduler_process_execute_task_depends_on_past',\n 'dummy2',\n datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),\n 1),\n ('test_scheduler_process_execute_task_depends_on_past',\n 'dummy1',\n datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),\n 1)] == [('test_scheduler_process_execute_task_depends_on_past',\n 'dummy1',\n datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n 1),\n ('test_scheduler_process_execute_task_depends_on_past',\n 'dummy2',\n datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n 1)] E At index 0 diff: ('test_scheduler_process_execute_task_depends_on_past', 'dummy2', datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) != ('test_scheduler_process_execute_task_depends_on_past', 'dummy1', datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), 1) E Full diff: E [ E ('test_scheduler_process_execute_task_depends_on_past', E - 'dummy2', E ? ^ E + 'dummy1', E ? ^ E - datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), E ? ---- --------------------- E + datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), E 1), E ('test_scheduler_process_execute_task_depends_on_past', E - 'dummy1', E ? ^ E + 'dummy2', E ? ^ E - datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), E ? ---- --------------------- E + datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), E 1), E ] tests/jobs/test_scheduler_job.py:565: AssertionError ======================================================== warnings summary ======================================================== /usr/local/lib/python3.6/site-packages/flask_babel/__init__.py:19 /usr/local/lib/python3.6/site-packages/flask_babel/__init__.py:19: DeprecationWarning: The import 'werkzeug.ImmutableDict' is deprecated and will be removed in Werkzeug 1.0. Use 'from werkzeug.datastructures import ImmutableDict' instead. from werkzeug import ImmutableDict /usr/local/lib/python3.6/site-packages/flask_wtf/recaptcha/widgets.py:5 /usr/local/lib/python3.6/site-packages/flask_wtf/recaptcha/widgets.py:5: DeprecationWarning: The import 'werkzeug.url_encode' is deprecated and will be removed in Werkzeug 1.0. Use 'from werkzeug.urls import url_encode' instead. from werkzeug import url_encode -- Docs: https://docs.pytest.org/en/latest/warnings.html ==================================================== short test summary info ===================================================== XFAIL tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_retry_handling_job This test is failing! XPASS tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_change_state_for_tis_without_dagrun The test is flaky with nondeterministic result FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_0 FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule =========================== 3 failed, 86 passed, 1 xfailed, 1 xpassed, 2 warnings in 61.44s (0:01:01) ============================ {code} Example 2: Run py.test like this in breeze. {{test_start_and_terminate_run_as_user}} fails. This happens because {{test_start_and_terminate_run_as_user}} is secretly relying on a call to {{settings.configure_orm()}} in {{test_task_command.py}}. When {{test_task_command.py}} is not run, {{test_start_and_terminate_run_as_user}} fails. {code} py.test --with-db-init tests/task/task_runner/test_standard_task_runner.py XPASS tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_change_state_for_tis_without_dagrun The test is flaky with nondeterministic result XPASS tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_retry_handling_job This test is failing! ERROR tests/task/task_runner/test_standard_task_runner.py::TestStandardTaskRunner::test_start_and_terminate_run_as_user - sqlal... FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_0 FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule {code} was: test_scheduler_job.py has a few flaky tests. Some are marked with pytest.mark.xfail, but this one is not marked flaky. It sometimes fails in Travis. For example: {code:python} ============================================================ FAILURES ============================================================ _____________________ TestDagFileProcessor.test_dag_file_processor_process_task_instances_depends_on_past_0 ______________________ a = (<tests.jobs.test_scheduler_job.TestDagFileProcessor testMethod=test_dag_file_processor_process_task_instances_depends_on_past_0>,) @wraps(func) def standalone_func(*a): > return func(*(a + p.args), **p.kwargs) /usr/local/lib/python3.6/site-packages/parameterized/parameterized.py:518: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <tests.jobs.test_scheduler_job.TestDagFileProcessor testMethod=test_dag_file_processor_process_task_instances_depends_on_past_0> state = None, start_date = None, end_date = None @parameterized.expand([ [State.NONE, None, None], [State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30), timezone.utcnow() - datetime.timedelta(minutes=15)], [State.UP_FOR_RESCHEDULE, timezone.utcnow() - datetime.timedelta(minutes=30), timezone.utcnow() - datetime.timedelta(minutes=15)], ]) def test_dag_file_processor_process_task_instances_depends_on_past(self, state, start_date, end_date): """ Test if _process_task_instances puts the right task instances into the mock_list. """ dag = DAG( dag_id='test_scheduler_process_execute_task_depends_on_past', start_date=DEFAULT_DATE, default_args={ 'depends_on_past': True, }, ) dag_task1 = DummyOperator( task_id='dummy1', dag=dag, owner='airflow') dag_task2 = DummyOperator( task_id='dummy2', dag=dag, owner='airflow') with create_session() as session: orm_dag = DagModel(dag_id=dag.dag_id) session.merge(orm_dag) dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) dag.clear() dr = dag_file_processor.create_dag_run(dag) self.assertIsNotNone(dr) with create_session() as session: tis = dr.get_task_instances(session=session) for ti in tis: ti.state = state ti.start_date = start_date ti.end_date = end_date ti_to_schedule = [] dag_file_processor._process_task_instances(dag, task_instances_list=ti_to_schedule) > assert ti_to_schedule == [ (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER), (dag.dag_id, dag_task2.task_id, DEFAULT_DATE, TRY_NUMBER), ] E AssertionError: assert [('test_scheduler_process_execute_task_depends_on_past',\n 'dummy2',\n datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),\n 1),\n ('test_scheduler_process_execute_task_depends_on_past',\n 'dummy1',\n datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),\n 1)] == [('test_scheduler_process_execute_task_depends_on_past',\n 'dummy1',\n datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n 1),\n ('test_scheduler_process_execute_task_depends_on_past',\n 'dummy2',\n datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n 1)] E At index 0 diff: ('test_scheduler_process_execute_task_depends_on_past', 'dummy2', datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) != ('test_scheduler_process_execute_task_depends_on_past', 'dummy1', datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), 1) E Full diff: E [ E ('test_scheduler_process_execute_task_depends_on_past', E - 'dummy2', E ? ^ E + 'dummy1', E ? ^ E - datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), E ? ---- --------------------- E + datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), E 1), E ('test_scheduler_process_execute_task_depends_on_past', E - 'dummy1', E ? ^ E + 'dummy2', E ? ^ E - datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), E ? ---- --------------------- E + datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), E 1), E ] tests/jobs/test_scheduler_job.py:565: AssertionError _______________ TestDagFileProcessor.test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry _______________ a = (<tests.jobs.test_scheduler_job.TestDagFileProcessor testMethod=test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry>,) @wraps(func) def standalone_func(*a): > return func(*(a + p.args), **p.kwargs) /usr/local/lib/python3.6/site-packages/parameterized/parameterized.py:518: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <tests.jobs.test_scheduler_job.TestDagFileProcessor testMethod=test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry> state = 'up_for_retry', start_date = datetime.datetime(2020, 2, 18, 13, 26, 42, 916304, tzinfo=<Timezone [UTC]>) end_date = datetime.datetime(2020, 2, 18, 13, 41, 42, 916315, tzinfo=<Timezone [UTC]>) @parameterized.expand([ [State.NONE, None, None], [State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30), timezone.utcnow() - datetime.timedelta(minutes=15)], [State.UP_FOR_RESCHEDULE, timezone.utcnow() - datetime.timedelta(minutes=30), timezone.utcnow() - datetime.timedelta(minutes=15)], ]) def test_dag_file_processor_process_task_instances_depends_on_past(self, state, start_date, end_date): """ Test if _process_task_instances puts the right task instances into the mock_list. """ dag = DAG( dag_id='test_scheduler_process_execute_task_depends_on_past', start_date=DEFAULT_DATE, default_args={ 'depends_on_past': True, }, ) dag_task1 = DummyOperator( task_id='dummy1', dag=dag, owner='airflow') dag_task2 = DummyOperator( task_id='dummy2', dag=dag, owner='airflow') with create_session() as session: orm_dag = DagModel(dag_id=dag.dag_id) session.merge(orm_dag) dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) dag.clear() dr = dag_file_processor.create_dag_run(dag) self.assertIsNotNone(dr) with create_session() as session: tis = dr.get_task_instances(session=session) for ti in tis: ti.state = state ti.start_date = start_date ti.end_date = end_date ti_to_schedule = [] dag_file_processor._process_task_instances(dag, task_instances_list=ti_to_schedule) > assert ti_to_schedule == [ (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER), (dag.dag_id, dag_task2.task_id, DEFAULT_DATE, TRY_NUMBER), ] E AssertionError: assert [('test_scheduler_process_execute_task_depends_on_past',\n 'dummy2',\n datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),\n 1),\n ('test_scheduler_process_execute_task_depends_on_past',\n 'dummy1',\n datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),\n 1)] == [('test_scheduler_process_execute_task_depends_on_past',\n 'dummy1',\n datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n 1),\n ('test_scheduler_process_execute_task_depends_on_past',\n 'dummy2',\n datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n 1)] E At index 0 diff: ('test_scheduler_process_execute_task_depends_on_past', 'dummy2', datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) != ('test_scheduler_process_execute_task_depends_on_past', 'dummy1', datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), 1) E Full diff: E [ E ('test_scheduler_process_execute_task_depends_on_past', E - 'dummy2', E ? ^ E + 'dummy1', E ? ^ E - datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), E ? ---- --------------------- E + datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), E 1), E ('test_scheduler_process_execute_task_depends_on_past', E - 'dummy1', E ? ^ E + 'dummy2', E ? ^ E - datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), E ? ---- --------------------- E + datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), E 1), E ] tests/jobs/test_scheduler_job.py:565: AssertionError ____________ TestDagFileProcessor.test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule _____________ a = (<tests.jobs.test_scheduler_job.TestDagFileProcessor testMethod=test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule>,) @wraps(func) def standalone_func(*a): > return func(*(a + p.args), **p.kwargs) /usr/local/lib/python3.6/site-packages/parameterized/parameterized.py:518: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <tests.jobs.test_scheduler_job.TestDagFileProcessor testMethod=test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule> state = 'up_for_reschedule', start_date = datetime.datetime(2020, 2, 18, 13, 26, 42, 916318, tzinfo=<Timezone [UTC]>) end_date = datetime.datetime(2020, 2, 18, 13, 41, 42, 916321, tzinfo=<Timezone [UTC]>) @parameterized.expand([ [State.NONE, None, None], [State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30), timezone.utcnow() - datetime.timedelta(minutes=15)], [State.UP_FOR_RESCHEDULE, timezone.utcnow() - datetime.timedelta(minutes=30), timezone.utcnow() - datetime.timedelta(minutes=15)], ]) def test_dag_file_processor_process_task_instances_depends_on_past(self, state, start_date, end_date): """ Test if _process_task_instances puts the right task instances into the mock_list. """ dag = DAG( dag_id='test_scheduler_process_execute_task_depends_on_past', start_date=DEFAULT_DATE, default_args={ 'depends_on_past': True, }, ) dag_task1 = DummyOperator( task_id='dummy1', dag=dag, owner='airflow') dag_task2 = DummyOperator( task_id='dummy2', dag=dag, owner='airflow') with create_session() as session: orm_dag = DagModel(dag_id=dag.dag_id) session.merge(orm_dag) dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) dag.clear() dr = dag_file_processor.create_dag_run(dag) self.assertIsNotNone(dr) with create_session() as session: tis = dr.get_task_instances(session=session) for ti in tis: ti.state = state ti.start_date = start_date ti.end_date = end_date ti_to_schedule = [] dag_file_processor._process_task_instances(dag, task_instances_list=ti_to_schedule) > assert ti_to_schedule == [ (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER), (dag.dag_id, dag_task2.task_id, DEFAULT_DATE, TRY_NUMBER), ] E AssertionError: assert [('test_scheduler_process_execute_task_depends_on_past',\n 'dummy2',\n datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),\n 1),\n ('test_scheduler_process_execute_task_depends_on_past',\n 'dummy1',\n datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),\n 1)] == [('test_scheduler_process_execute_task_depends_on_past',\n 'dummy1',\n datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n 1),\n ('test_scheduler_process_execute_task_depends_on_past',\n 'dummy2',\n datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n 1)] E At index 0 diff: ('test_scheduler_process_execute_task_depends_on_past', 'dummy2', datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) != ('test_scheduler_process_execute_task_depends_on_past', 'dummy1', datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), 1) E Full diff: E [ E ('test_scheduler_process_execute_task_depends_on_past', E - 'dummy2', E ? ^ E + 'dummy1', E ? ^ E - datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), E ? ---- --------------------- E + datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), E 1), E ('test_scheduler_process_execute_task_depends_on_past', E - 'dummy1', E ? ^ E + 'dummy2', E ? ^ E - datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), E ? ---- --------------------- E + datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), E 1), E ] tests/jobs/test_scheduler_job.py:565: AssertionError ======================================================== warnings summary ======================================================== /usr/local/lib/python3.6/site-packages/flask_babel/__init__.py:19 /usr/local/lib/python3.6/site-packages/flask_babel/__init__.py:19: DeprecationWarning: The import 'werkzeug.ImmutableDict' is deprecated and will be removed in Werkzeug 1.0. Use 'from werkzeug.datastructures import ImmutableDict' instead. from werkzeug import ImmutableDict /usr/local/lib/python3.6/site-packages/flask_wtf/recaptcha/widgets.py:5 /usr/local/lib/python3.6/site-packages/flask_wtf/recaptcha/widgets.py:5: DeprecationWarning: The import 'werkzeug.url_encode' is deprecated and will be removed in Werkzeug 1.0. Use 'from werkzeug.urls import url_encode' instead. from werkzeug import url_encode -- Docs: https://docs.pytest.org/en/latest/warnings.html ==================================================== short test summary info ===================================================== XFAIL tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_retry_handling_job This test is failing! XPASS tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_change_state_for_tis_without_dagrun The test is flaky with nondeterministic result FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_0 FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry FAILED tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule =========================== 3 failed, 86 passed, 1 xfailed, 1 xpassed, 2 warnings in 61.44s (0:01:01) ============================ {code} > Fix flaky test > test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past > -------------------------------------------------------------------------------------------------------------------------- > > Key: AIRFLOW-6834 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6834 > Project: Apache Airflow > Issue Type: Bug > Components: tests > Affects Versions: 1.10.9 > Reporter: Qian Yu > Assignee: Qian Yu > Priority: Major > > test_scheduler_job.py has a few flaky tests. Some are marked with > pytest.mark.xfail, but this one is not marked flaky. It sometimes fails in > Travis. > > Example 1: > > {code:python} > ============================================================ FAILURES > ============================================================ > _____________________ > TestDagFileProcessor.test_dag_file_processor_process_task_instances_depends_on_past_0 > ______________________ > a = (<tests.jobs.test_scheduler_job.TestDagFileProcessor > testMethod=test_dag_file_processor_process_task_instances_depends_on_past_0>,) > @wraps(func) > def standalone_func(*a): > > return func(*(a + p.args), **p.kwargs) > /usr/local/lib/python3.6/site-packages/parameterized/parameterized.py:518: > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > self = <tests.jobs.test_scheduler_job.TestDagFileProcessor > testMethod=test_dag_file_processor_process_task_instances_depends_on_past_0> > state = None, start_date = None, end_date = None > @parameterized.expand([ > [State.NONE, None, None], > [State.UP_FOR_RETRY, timezone.utcnow() - > datetime.timedelta(minutes=30), > timezone.utcnow() - datetime.timedelta(minutes=15)], > [State.UP_FOR_RESCHEDULE, timezone.utcnow() - > datetime.timedelta(minutes=30), > timezone.utcnow() - datetime.timedelta(minutes=15)], > ]) > def test_dag_file_processor_process_task_instances_depends_on_past(self, > state, start_date, end_date): > """ > Test if _process_task_instances puts the right task instances into the > mock_list. > """ > dag = DAG( > dag_id='test_scheduler_process_execute_task_depends_on_past', > start_date=DEFAULT_DATE, > default_args={ > 'depends_on_past': True, > }, > ) > dag_task1 = DummyOperator( > task_id='dummy1', > dag=dag, > owner='airflow') > dag_task2 = DummyOperator( > task_id='dummy2', > dag=dag, > owner='airflow') > with create_session() as session: > orm_dag = DagModel(dag_id=dag.dag_id) > session.merge(orm_dag) > dag_file_processor = DagFileProcessor(dag_ids=[], > log=mock.MagicMock()) > dag.clear() > dr = dag_file_processor.create_dag_run(dag) > self.assertIsNotNone(dr) > with create_session() as session: > tis = dr.get_task_instances(session=session) > for ti in tis: > ti.state = state > ti.start_date = start_date > ti.end_date = end_date > ti_to_schedule = [] > dag_file_processor._process_task_instances(dag, > task_instances_list=ti_to_schedule) > > assert ti_to_schedule == [ > (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER), > (dag.dag_id, dag_task2.task_id, DEFAULT_DATE, TRY_NUMBER), > ] > E AssertionError: assert > [('test_scheduler_process_execute_task_depends_on_past',\n 'dummy2',\n > datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, > +00:00:00, STD]>),\n 1),\n > ('test_scheduler_process_execute_task_depends_on_past',\n 'dummy1',\n > datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, > +00:00:00, STD]>),\n 1)] == > [('test_scheduler_process_execute_task_depends_on_past',\n 'dummy1',\n > datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n 1),\n > ('test_scheduler_process_execute_task_depends_on_past',\n 'dummy2',\n > datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n 1)] > E At index 0 diff: > ('test_scheduler_process_execute_task_depends_on_past', 'dummy2', > datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, > +00:00:00, STD]>), 1) != > ('test_scheduler_process_execute_task_depends_on_past', 'dummy1', > datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), 1) > E Full diff: > E [ > E ('test_scheduler_process_execute_task_depends_on_past', > E - 'dummy2', > E ? ^ > E + 'dummy1', > E ? ^ > E - datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, > GMT, +00:00:00, STD]>), > E ? ---- > --------------------- > E + datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), > E 1), > E ('test_scheduler_process_execute_task_depends_on_past', > E - 'dummy1', > E ? ^ > E + 'dummy2', > E ? ^ > E - datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, > GMT, +00:00:00, STD]>), > E ? ---- > --------------------- > E + datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), > E 1), > E ] > tests/jobs/test_scheduler_job.py:565: AssertionError > _______________ > TestDagFileProcessor.test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry > _______________ > a = (<tests.jobs.test_scheduler_job.TestDagFileProcessor > testMethod=test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry>,) > @wraps(func) > def standalone_func(*a): > > return func(*(a + p.args), **p.kwargs) > /usr/local/lib/python3.6/site-packages/parameterized/parameterized.py:518: > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > self = <tests.jobs.test_scheduler_job.TestDagFileProcessor > testMethod=test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry> > state = 'up_for_retry', start_date = datetime.datetime(2020, 2, 18, 13, 26, > 42, 916304, tzinfo=<Timezone [UTC]>) > end_date = datetime.datetime(2020, 2, 18, 13, 41, 42, 916315, > tzinfo=<Timezone [UTC]>) > @parameterized.expand([ > [State.NONE, None, None], > [State.UP_FOR_RETRY, timezone.utcnow() - > datetime.timedelta(minutes=30), > timezone.utcnow() - datetime.timedelta(minutes=15)], > [State.UP_FOR_RESCHEDULE, timezone.utcnow() - > datetime.timedelta(minutes=30), > timezone.utcnow() - datetime.timedelta(minutes=15)], > ]) > def test_dag_file_processor_process_task_instances_depends_on_past(self, > state, start_date, end_date): > """ > Test if _process_task_instances puts the right task instances into the > mock_list. > """ > dag = DAG( > dag_id='test_scheduler_process_execute_task_depends_on_past', > start_date=DEFAULT_DATE, > default_args={ > 'depends_on_past': True, > }, > ) > dag_task1 = DummyOperator( > task_id='dummy1', > dag=dag, > owner='airflow') > dag_task2 = DummyOperator( > task_id='dummy2', > dag=dag, > owner='airflow') > with create_session() as session: > orm_dag = DagModel(dag_id=dag.dag_id) > session.merge(orm_dag) > dag_file_processor = DagFileProcessor(dag_ids=[], > log=mock.MagicMock()) > dag.clear() > dr = dag_file_processor.create_dag_run(dag) > self.assertIsNotNone(dr) > with create_session() as session: > tis = dr.get_task_instances(session=session) > for ti in tis: > ti.state = state > ti.start_date = start_date > ti.end_date = end_date > ti_to_schedule = [] > dag_file_processor._process_task_instances(dag, > task_instances_list=ti_to_schedule) > > assert ti_to_schedule == [ > (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER), > (dag.dag_id, dag_task2.task_id, DEFAULT_DATE, TRY_NUMBER), > ] > E AssertionError: assert > [('test_scheduler_process_execute_task_depends_on_past',\n 'dummy2',\n > datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, > +00:00:00, STD]>),\n 1),\n > ('test_scheduler_process_execute_task_depends_on_past',\n 'dummy1',\n > datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, > +00:00:00, STD]>),\n 1)] == > [('test_scheduler_process_execute_task_depends_on_past',\n 'dummy1',\n > datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n 1),\n > ('test_scheduler_process_execute_task_depends_on_past',\n 'dummy2',\n > datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n 1)] > E At index 0 diff: > ('test_scheduler_process_execute_task_depends_on_past', 'dummy2', > datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, > +00:00:00, STD]>), 1) != > ('test_scheduler_process_execute_task_depends_on_past', 'dummy1', > datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), 1) > E Full diff: > E [ > E ('test_scheduler_process_execute_task_depends_on_past', > E - 'dummy2', > E ? ^ > E + 'dummy1', > E ? ^ > E - datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, > GMT, +00:00:00, STD]>), > E ? ---- > --------------------- > E + datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), > E 1), > E ('test_scheduler_process_execute_task_depends_on_past', > E - 'dummy1', > E ? ^ > E + 'dummy2', > E ? ^ > E - datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, > GMT, +00:00:00, STD]>), > E ? ---- > --------------------- > E + datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), > E 1), > E ] > tests/jobs/test_scheduler_job.py:565: AssertionError > ____________ > TestDagFileProcessor.test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule > _____________ > a = (<tests.jobs.test_scheduler_job.TestDagFileProcessor > testMethod=test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule>,) > @wraps(func) > def standalone_func(*a): > > return func(*(a + p.args), **p.kwargs) > /usr/local/lib/python3.6/site-packages/parameterized/parameterized.py:518: > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > self = <tests.jobs.test_scheduler_job.TestDagFileProcessor > testMethod=test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule> > state = 'up_for_reschedule', start_date = datetime.datetime(2020, 2, 18, 13, > 26, 42, 916318, tzinfo=<Timezone [UTC]>) > end_date = datetime.datetime(2020, 2, 18, 13, 41, 42, 916321, > tzinfo=<Timezone [UTC]>) > @parameterized.expand([ > [State.NONE, None, None], > [State.UP_FOR_RETRY, timezone.utcnow() - > datetime.timedelta(minutes=30), > timezone.utcnow() - datetime.timedelta(minutes=15)], > [State.UP_FOR_RESCHEDULE, timezone.utcnow() - > datetime.timedelta(minutes=30), > timezone.utcnow() - datetime.timedelta(minutes=15)], > ]) > def test_dag_file_processor_process_task_instances_depends_on_past(self, > state, start_date, end_date): > """ > Test if _process_task_instances puts the right task instances into the > mock_list. > """ > dag = DAG( > dag_id='test_scheduler_process_execute_task_depends_on_past', > start_date=DEFAULT_DATE, > default_args={ > 'depends_on_past': True, > }, > ) > dag_task1 = DummyOperator( > task_id='dummy1', > dag=dag, > owner='airflow') > dag_task2 = DummyOperator( > task_id='dummy2', > dag=dag, > owner='airflow') > with create_session() as session: > orm_dag = DagModel(dag_id=dag.dag_id) > session.merge(orm_dag) > dag_file_processor = DagFileProcessor(dag_ids=[], > log=mock.MagicMock()) > dag.clear() > dr = dag_file_processor.create_dag_run(dag) > self.assertIsNotNone(dr) > with create_session() as session: > tis = dr.get_task_instances(session=session) > for ti in tis: > ti.state = state > ti.start_date = start_date > ti.end_date = end_date > ti_to_schedule = [] > dag_file_processor._process_task_instances(dag, > task_instances_list=ti_to_schedule) > > assert ti_to_schedule == [ > (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER), > (dag.dag_id, dag_task2.task_id, DEFAULT_DATE, TRY_NUMBER), > ] > E AssertionError: assert > [('test_scheduler_process_execute_task_depends_on_past',\n 'dummy2',\n > datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, > +00:00:00, STD]>),\n 1),\n > ('test_scheduler_process_execute_task_depends_on_past',\n 'dummy1',\n > datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, > +00:00:00, STD]>),\n 1)] == > [('test_scheduler_process_execute_task_depends_on_past',\n 'dummy1',\n > datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n 1),\n > ('test_scheduler_process_execute_task_depends_on_past',\n 'dummy2',\n > datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n 1)] > E At index 0 diff: > ('test_scheduler_process_execute_task_depends_on_past', 'dummy2', > datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, > +00:00:00, STD]>), 1) != > ('test_scheduler_process_execute_task_depends_on_past', 'dummy1', > datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), 1) > E Full diff: > E [ > E ('test_scheduler_process_execute_task_depends_on_past', > E - 'dummy2', > E ? ^ > E + 'dummy1', > E ? ^ > E - datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, > GMT, +00:00:00, STD]>), > E ? ---- > --------------------- > E + datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), > E 1), > E ('test_scheduler_process_execute_task_depends_on_past', > E - 'dummy1', > E ? ^ > E + 'dummy2', > E ? ^ > E - datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, > GMT, +00:00:00, STD]>), > E ? ---- > --------------------- > E + datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), > E 1), > E ] > tests/jobs/test_scheduler_job.py:565: AssertionError > ======================================================== warnings summary > ======================================================== > /usr/local/lib/python3.6/site-packages/flask_babel/__init__.py:19 > /usr/local/lib/python3.6/site-packages/flask_babel/__init__.py:19: > DeprecationWarning: The import 'werkzeug.ImmutableDict' is deprecated and > will be removed in Werkzeug 1.0. Use 'from werkzeug.datastructures import > ImmutableDict' instead. > from werkzeug import ImmutableDict > /usr/local/lib/python3.6/site-packages/flask_wtf/recaptcha/widgets.py:5 > /usr/local/lib/python3.6/site-packages/flask_wtf/recaptcha/widgets.py:5: > DeprecationWarning: The import 'werkzeug.url_encode' is deprecated and will > be removed in Werkzeug 1.0. Use 'from werkzeug.urls import url_encode' > instead. > from werkzeug import url_encode > -- Docs: https://docs.pytest.org/en/latest/warnings.html > ==================================================== short test summary info > ===================================================== > XFAIL > tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_retry_handling_job > This test is failing! > XPASS > tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_change_state_for_tis_without_dagrun > The test is flaky with nondeterministic result > FAILED > tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_0 > FAILED > tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry > FAILED > tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule > =========================== 3 failed, 86 passed, 1 xfailed, 1 xpassed, 2 > warnings in 61.44s (0:01:01) ============================ > {code} > Example 2: > Run py.test like this in breeze. {{test_start_and_terminate_run_as_user}} > fails. This happens because {{test_start_and_terminate_run_as_user}} is > secretly relying on a call to {{settings.configure_orm()}} in > {{test_task_command.py}}. When {{test_task_command.py}} is not run, > {{test_start_and_terminate_run_as_user}} fails. > {code} > py.test --with-db-init tests/task/task_runner/test_standard_task_runner.py > XPASS > tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_change_state_for_tis_without_dagrun > The test is flaky with nondeterministic result > XPASS > tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_retry_handling_job > This test is failing! > ERROR > tests/task/task_runner/test_standard_task_runner.py::TestStandardTaskRunner::test_start_and_terminate_run_as_user > - sqlal... > FAILED > tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_0 > FAILED > tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry > FAILED > tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)