This is an automated email from the ASF dual-hosted git repository.
turbaszek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 4ad3bb5 Fix _process_executor_events method to use in-memory
try_number (#9692)
4ad3bb5 is described below
commit 4ad3bb53ffe0a830461b2c59bea3c262cbb8a23a
Author: Tomek Urbaszek <[email protected]>
AuthorDate: Tue Jul 7 16:54:43 2020 +0200
Fix _process_executor_events method to use in-memory try_number (#9692)
---
airflow/jobs/scheduler_job.py | 11 ++++++++---
tests/jobs/test_scheduler_job.py | 30 ++++++++++++++++++++++++++++++
2 files changed, 38 insertions(+), 3 deletions(-)
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index b2d3e0f8..be78ad2 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -1486,6 +1486,7 @@ class SchedulerJob(BaseJob):
"""
Respond to executor events.
"""
+ ti_primary_key_to_try_number_map = {}
event_buffer = self.executor.get_event_buffer(simple_dag_bag.dag_ids)
tis_with_right_state: List[TaskInstanceKeyType] = []
@@ -1493,6 +1494,8 @@ class SchedulerJob(BaseJob):
for key, value in event_buffer.items():
state, info = value
dag_id, task_id, execution_date, try_number = key
+ # We create map (dag_id, task_id, execution_date) -> in-memory
try_number
+ ti_primary_key_to_try_number_map[key[:-1]] = try_number
self.log.info(
"Executor reports execution of %s.%s execution_date=%s "
"exited with status %s for try_number %s",
@@ -1509,9 +1512,11 @@ class SchedulerJob(BaseJob):
filter_for_tis = TI.filter_for_tis(tis_with_right_state)
tis = session.query(TI).filter(filter_for_tis).all()
for ti in tis:
- key = ti.key
- dag_id, task_id, execution_date, try_number = key
- state, info = event_buffer.pop(key)
+ # Recreate ti_key (dag_id, task_id, execution_date, try_number)
using in-memory try_number
+ dag_id, task_id, execution_date, _ = ti.key
+ try_number = ti_primary_key_to_try_number_map[(dag_id, task_id,
execution_date)]
+ buffer_key = (dag_id, task_id, execution_date, try_number)
+ state, info = event_buffer.pop(buffer_key)
# TODO: should we fail RUNNING as well, as we do in Backfills?
if ti.try_number == try_number and ti.state == State.QUEUED:
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index e78f07d..e45e2cc 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -1492,6 +1492,36 @@ class TestSchedulerJob(unittest.TestCase):
mock_stats_incr.assert_called_once_with('scheduler.tasks.killed_externally')
+ def test_process_executor_events_uses_inmemory_try_number(self):
+ execution_date = DEFAULT_DATE
+ dag_id = "dag_id"
+ task_id = "task_id"
+ try_number = 42
+
+ scheduler = SchedulerJob()
+ executor = MagicMock()
+ event_buffer = {
+ (dag_id, task_id, execution_date, try_number): (State.SUCCESS,
None)
+ }
+ executor.get_event_buffer.return_value = event_buffer
+ scheduler.executor = executor
+
+ processor_agent = MagicMock()
+ scheduler.processor_agent = processor_agent
+
+ dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE)
+ task = DummyOperator(dag=dag, task_id=task_id)
+
+ with create_session() as session:
+ ti = TaskInstance(task, DEFAULT_DATE)
+ ti.state = State.SUCCESS
+ session.merge(ti)
+
+ scheduler._process_executor_events(simple_dag_bag=MagicMock())
+ # Assert that the even_buffer is empty so the task was popped using
right
+ # task instance key
+ self.assertEqual(event_buffer, {})
+
def test_execute_task_instances_is_paused_wont_execute(self):
dag_id =
'SchedulerJobTest.test_execute_task_instances_is_paused_wont_execute'
task_id_1 = 'dummy_task'