This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v1-10-test by this push:
     new 6a729c5  Handle IntegrityError while creating TIs (#10136)
6a729c5 is described below

commit 6a729c506ad3807e0c2adb49bb014338b5872292
Author: Sumit Maheshwari <[email protected]>
AuthorDate: Fri Aug 7 18:25:10 2020 +0530

    Handle IntegrityError while creating TIs (#10136)
    
    While doing a trigger_dag from UI, DagRun gets created first and then 
WebServer starts creating TIs. Meanwhile, Scheduler also picks up the DagRun 
and starts creating the TIs, which results in IntegrityError as the Primary key 
constraint gets violated. This happens when a DAG has a good number of tasks.
    
    Also, changing the TIs array with a set for faster lookups for Dags with 
too many tasks.
    
    (cherry picked from commit 21021228759da8d3e98ca3f6d0922a6e9a0b5e68)
---
 airflow/models/dagrun.py    | 16 +++++++++++++---
 tests/models/test_dagrun.py | 19 +++++++++++++++++++
 2 files changed, 32 insertions(+), 3 deletions(-)

diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index ec9ecc8..a908a5b 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -23,6 +23,7 @@ from sqlalchemy import (
     Column, Integer, String, Boolean, PickleType, Index, UniqueConstraint, 
func, DateTime, or_,
     and_
 )
+from sqlalchemy.exc import IntegrityError
 from sqlalchemy.ext.declarative import declared_attr
 from sqlalchemy.orm import synonym
 from sqlalchemy.orm.session import Session
@@ -362,10 +363,10 @@ class DagRun(Base, LoggingMixin):
         tis = self.get_task_instances(session=session)
 
         # check for removed or restored tasks
-        task_ids = []
+        task_ids = set()
         for ti in tis:
             task_instance_mutation_hook(ti)
-            task_ids.append(ti.task_id)
+            task_ids.add(ti.task_id)
             task = None
             try:
                 task = dag.get_task(ti.task_id)
@@ -401,7 +402,16 @@ class DagRun(Base, LoggingMixin):
                 task_instance_mutation_hook(ti)
                 session.add(ti)
 
-        session.commit()
+        try:
+            session.commit()
+        except IntegrityError as err:
+            self.log.info(str(err))
+            self.log.info(
+                'Hit IntegrityError while creating the TIs for %s - %s',
+                dag.dag_id, self.execution_date
+            )
+            self.log.info('Doing session rollback.')
+            session.rollback()
 
     @staticmethod
     def get_run(session, dag_id, execution_date):
diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py
index c149c00..431cf71 100644
--- a/tests/models/test_dagrun.py
+++ b/tests/models/test_dagrun.py
@@ -564,6 +564,25 @@ class DagRunTest(unittest.TestCase):
         flaky_ti.refresh_from_db()
         self.assertEqual(State.NONE, flaky_ti.state)
 
+    def test_already_added_task_instances_can_be_ignored(self):
+        dag = DAG('triggered_dag', start_date=DEFAULT_DATE)
+        dag.add_task(DummyOperator(task_id='first_task', owner='test'))
+
+        dagrun = self.create_dag_run(dag)
+        first_ti = dagrun.get_task_instances()[0]
+        self.assertEqual('first_task', first_ti.task_id)
+        self.assertEqual(State.NONE, first_ti.state)
+
+        # Lets assume that the above TI was added into DB by webserver, but if 
scheduler
+        # is running the same method at the same time it would find 0 TIs for 
this dag
+        # and proceeds further to create TIs. Hence mocking 
DagRun.get_task_instances
+        # method to return an empty list of TIs.
+        with mock.patch.object(DagRun, 'get_task_instances') as mock_gtis:
+            mock_gtis.return_value = []
+            dagrun.verify_integrity()
+            first_ti.refresh_from_db()
+            self.assertEqual(State.NONE, first_ti.state)
+
     @parameterized.expand([(state,) for state in State.task_states])
     @mock.patch('airflow.models.dagrun.task_instance_mutation_hook')
     def test_task_instance_mutation_hook(self, state, mock_hook):

Reply via email to