This is an automated email from the ASF dual-hosted git repository.

turbaszek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ad3bb5  Fix _process_executor_events method to use in-memory 
try_number (#9692)
4ad3bb5 is described below

commit 4ad3bb53ffe0a830461b2c59bea3c262cbb8a23a
Author: Tomek Urbaszek <[email protected]>
AuthorDate: Tue Jul 7 16:54:43 2020 +0200

    Fix _process_executor_events method to use in-memory try_number (#9692)
---
 airflow/jobs/scheduler_job.py    | 11 ++++++++---
 tests/jobs/test_scheduler_job.py | 30 ++++++++++++++++++++++++++++++
 2 files changed, 38 insertions(+), 3 deletions(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index b2d3e0f8..be78ad2 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -1486,6 +1486,7 @@ class SchedulerJob(BaseJob):
         """
         Respond to executor events.
         """
+        ti_primary_key_to_try_number_map = {}
         event_buffer = self.executor.get_event_buffer(simple_dag_bag.dag_ids)
         tis_with_right_state: List[TaskInstanceKeyType] = []
 
@@ -1493,6 +1494,8 @@ class SchedulerJob(BaseJob):
         for key, value in event_buffer.items():
             state, info = value
             dag_id, task_id, execution_date, try_number = key
+            # We create map (dag_id, task_id, execution_date) -> in-memory 
try_number
+            ti_primary_key_to_try_number_map[key[:-1]] = try_number
             self.log.info(
                 "Executor reports execution of %s.%s execution_date=%s "
                 "exited with status %s for try_number %s",
@@ -1509,9 +1512,11 @@ class SchedulerJob(BaseJob):
         filter_for_tis = TI.filter_for_tis(tis_with_right_state)
         tis = session.query(TI).filter(filter_for_tis).all()
         for ti in tis:
-            key = ti.key
-            dag_id, task_id, execution_date, try_number = key
-            state, info = event_buffer.pop(key)
+            # Recreate ti_key (dag_id, task_id, execution_date, try_number) 
using in-memory try_number
+            dag_id, task_id, execution_date, _ = ti.key
+            try_number = ti_primary_key_to_try_number_map[(dag_id, task_id, 
execution_date)]
+            buffer_key = (dag_id, task_id, execution_date, try_number)
+            state, info = event_buffer.pop(buffer_key)
 
             # TODO: should we fail RUNNING as well, as we do in Backfills?
             if ti.try_number == try_number and ti.state == State.QUEUED:
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index e78f07d..e45e2cc 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -1492,6 +1492,36 @@ class TestSchedulerJob(unittest.TestCase):
 
         
mock_stats_incr.assert_called_once_with('scheduler.tasks.killed_externally')
 
+    def test_process_executor_events_uses_inmemory_try_number(self):
+        execution_date = DEFAULT_DATE
+        dag_id = "dag_id"
+        task_id = "task_id"
+        try_number = 42
+
+        scheduler = SchedulerJob()
+        executor = MagicMock()
+        event_buffer = {
+            (dag_id, task_id, execution_date, try_number): (State.SUCCESS, 
None)
+        }
+        executor.get_event_buffer.return_value = event_buffer
+        scheduler.executor = executor
+
+        processor_agent = MagicMock()
+        scheduler.processor_agent = processor_agent
+
+        dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE)
+        task = DummyOperator(dag=dag, task_id=task_id)
+
+        with create_session() as session:
+            ti = TaskInstance(task, DEFAULT_DATE)
+            ti.state = State.SUCCESS
+            session.merge(ti)
+
+        scheduler._process_executor_events(simple_dag_bag=MagicMock())
+        # Assert that the even_buffer is empty so the task was popped using 
right
+        # task instance key
+        self.assertEqual(event_buffer, {})
+
     def test_execute_task_instances_is_paused_wont_execute(self):
         dag_id = 
'SchedulerJobTest.test_execute_task_instances_is_paused_wont_execute'
         task_id_1 = 'dummy_task'

Reply via email to