potiuk opened a new issue, #39858:
URL: https://github.com/apache/airflow/issues/39858

   ### Body
   
   The test `test_backfill_failed_dag_with_upstream_failed_task` in 
tests/jobs/test_backfill_job.py often fails due to deadlock detected.
   
   Here is typical stack trace/error:
   
   ```python
   ______ TestBackfillJob.test_backfill_failed_dag_with_upstream_failed_task 
______
   
   self = <tests.jobs.test_backfill_job.TestBackfillJob object at 
0x7f0f6532ef10>
   dag_maker = <tests.conftest.dag_maker.<locals>.DagFactory object at 
0x7f0f3bfbe9d0>
   
       def test_backfill_failed_dag_with_upstream_failed_task(self, dag_maker):
           self.dagbag.process_file(str(TEST_DAGS_FOLDER / 
"test_backfill_with_upstream_failed_task.py"))
           dag = self.dagbag.get_dag("test_backfill_with_upstream_failed_task")
       
           # We have to use the "fake" version of perform_heartbeat due to the 
'is_unit_test' check in
           # the original one. However, instead of using the original version 
of perform_heartbeat,
           # we can simply wait for a LocalExecutor's worker cycle. The 
approach with sleep works well now,
           # but it can be replaced with checking the state of the LocalTaskJob.
           def fake_perform_heartbeat(*args, **kwargs):
               import time
       
               time.sleep(1)
       
           with 
mock.patch("airflow.jobs.backfill_job_runner.perform_heartbeat", 
fake_perform_heartbeat):
               job = Job(executor=ExecutorLoader.load_executor("LocalExecutor"))
               job_runner = BackfillJobRunner(
                   job=job,
                   dag=dag,
                   start_date=DEFAULT_DATE,
                   end_date=DEFAULT_DATE,
                   rerun_failed_tasks=True,
               )
               with pytest.raises(BackfillUnfinished):
   >               run_job(job=job, execute_callable=job_runner._execute)
   
   tests/jobs/test_backfill_job.py:2142: 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   airflow/utils/session.py:84: in wrapper
       return func(*args, session=session, **kwargs)
   airflow/jobs/job.py:410: in run_job
       return execute_job(job, execute_callable=execute_callable)
   airflow/jobs/job.py:439: in execute_job
       ret = execute_callable()
   airflow/utils/session.py:84: in wrapper
       return func(*args, session=session, **kwargs)
   airflow/jobs/backfill_job_runner.py:980: in _execute
       self._execute_dagruns(
   airflow/utils/session.py:81: in wrapper
       return func(*args, **kwargs)
   airflow/jobs/backfill_job_runner.py:869: in _execute_dagruns
       processed_dag_run_dates = self._process_backfill_task_instances(
   airflow/jobs/backfill_job_runner.py:698: in _process_backfill_task_instances
       _per_task_process(key, ti, session)
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   
   key = TaskInstanceKey(dag_id='test_backfill_with_upstream_failed_task', 
task_id='downstream_task', run_id='backfill__2016-01-01T00:00:00+00:00', 
try_number=0, map_index=-1)
   ti = <TaskInstance: test_backfill_with_upstream_failed_task.downstream_task 
backfill__2016-01-01T00:00:00+00:00 [upstream_failed]>
   session = <sqlalchemy.orm.session.Session object at 0x7f0f3bd93c10>
   
       def _per_task_process(key, ti: TaskInstance, session):
           ti.refresh_from_db(lock_for_update=True, session=session)
       
           task = self.dag.get_task(ti.task_id, include_subdags=True)
           ti.task = task
       
           self.log.debug("Task instance to run %s state %s", ti, ti.state)
       
           # The task was already marked successful or skipped by a
           # different Job. Don't rerun it.
           if ti.state == TaskInstanceState.SUCCESS:
               ti_status.succeeded.add(key)
               self.log.debug("Task instance %s succeeded. Don't rerun.", ti)
               ti_status.to_run.pop(key)
               if key in ti_status.running:
                   ti_status.running.pop(key)
               return
           elif ti.state == TaskInstanceState.SKIPPED:
               ti_status.skipped.add(key)
               self.log.debug("Task instance %s skipped. Don't rerun.", ti)
               ti_status.to_run.pop(key)
               if key in ti_status.running:
                   ti_status.running.pop(key)
               return
       
           if self.rerun_failed_tasks:
               # Rerun failed tasks or upstreamed failed tasks
               if ti.state in (TaskInstanceState.FAILED, 
TaskInstanceState.UPSTREAM_FAILED):
                   self.log.error("Task instance %s with state %s", ti, 
ti.state)
                   if key in ti_status.running:
                       ti_status.running.pop(key)
                   # Reset the failed task in backfill to scheduled state
                   ti.try_number += 1
                   ti.set_state(TaskInstanceState.SCHEDULED, session=session)
                   if ti.dag_run not in ti_status.active_runs:
                       ti_status.active_runs.add(ti.dag_run)
           else:
               # Default behaviour which works for subdag.
               if ti.state in (TaskInstanceState.FAILED, 
TaskInstanceState.UPSTREAM_FAILED):
                   self.log.error("Task instance %s with state %s", ti, 
ti.state)
                   ti_status.failed.add(key)
                   ti_status.to_run.pop(key)
                   if key in ti_status.running:
                       ti_status.running.pop(key)
                   return
       
           if self.ignore_first_depends_on_past:
               dagrun = ti.get_dagrun(session=session)
               ignore_depends_on_past = dagrun.execution_date == (start_date or 
ti.start_date)
           else:
               ignore_depends_on_past = False
       
           backfill_context = DepContext(
               deps=BACKFILL_QUEUED_DEPS,
               ignore_depends_on_past=ignore_depends_on_past,
               ignore_task_deps=self.ignore_task_deps,
               wait_for_past_depends_before_skipping=False,
               flag_upstream_failed=True,
           )
       
           # Is the task runnable? -- then run it
           # the dependency checker can change states of tis
           if ti.are_dependencies_met(
               dep_context=backfill_context, session=session, 
verbose=self.verbose
           ):
               if executor.has_task(ti):
                   self.log.debug("Task Instance %s already in executor waiting 
for queue to clear", ti)
               else:
                   self.log.debug("Sending %s to executor", ti)
                   # Skip scheduled state, we are executing immediately
                   if ti.state in (TaskInstanceState.UP_FOR_RETRY, None):
                       # i am not sure why this is necessary.
                       # seemingly a quirk of backfill runner.
                       # it should be handled elsewhere i think.
                       # seems the leaf tasks are set SCHEDULED but others not.
                       # but i am not going to look too closely since we need
                       # to nuke the current backfill approach anyway.
                       ti.try_number += 1
                   ti.state = TaskInstanceState.QUEUED
                   ti.queued_by_job_id = self.job.id
                   ti.queued_dttm = timezone.utcnow()
                   session.merge(ti)
                   try:
                       session.commit()
                   except OperationalError:
                       self.log.exception("Failed to commit task state change 
due to operational error")
                       session.rollback()
                       # early exit so the outer loop can retry
                       return
       
                   cfg_path = None
       
                   if executor.is_local:
                       cfg_path = tmp_configuration_copy()
       
                   executor.queue_task_instance(
                       ti,
                       mark_success=self.mark_success,
                       pickle_id=pickle_id,
                       ignore_task_deps=self.ignore_task_deps,
                       ignore_depends_on_past=ignore_depends_on_past,
                       wait_for_past_depends_before_skipping=False,
                       pool=self.pool,
                       cfg_path=cfg_path,
                   )
                   ti_status.running[key] = ti
                   ti_status.to_run.pop(key)
               return
       
           if ti.state == TaskInstanceState.UPSTREAM_FAILED:
               self.log.error("Task instance %s upstream failed", ti)
               ti_status.failed.add(key)
   >           ti_status.to_run.pop(key)
   E           KeyError: 
TaskInstanceKey(dag_id='test_backfill_with_upstream_failed_task', 
task_id='downstream_task', run_id='backfill__2016-01-01T00:00:00+00:00', 
try_number=0, map_index=-1)
   
   airflow/jobs/backfill_job_runner.py:609: KeyError
   ---------------------------- Captured stderr setup 
-----------------------------
   INFO  [airflow.models.dagbag.DagBag] Filling up the DagBag from /dev/null
   ------------------------------ Captured log setup 
------------------------------
   INFO     airflow.models.dagbag.DagBag:dagbag.py:574 Filling up the DagBag 
from /dev/null
   ----------------------------- Captured stderr call 
-----------------------------
   INFO  [airflow.executors.local_executor.LocalExecutor] Adding to queue: 
['airflow', 'tasks', 'run', 'test_backfill_with_upstream_failed_task', 
'failing_task', 'backfill__2016-01-01T00:00:00+00:00', '--local', '--pool', 
'default_pool', '--subdir', 
'DAGS_FOLDER/test_backfill_with_upstream_failed_task.py', '--cfg-path', 
'/tmp/tmpbhrfampg']
   INFO  [airflow.executors.local_executor.QueuedLocalWorker] QueuedLocalWorker 
running ['airflow', 'tasks', 'run', 'test_backfill_with_upstream_failed_task', 
'failing_task', 'backfill__2016-01-01T00:00:00+00:00', '--local', '--pool', 
'default_pool', '--subdir', 
'DAGS_FOLDER/test_backfill_with_upstream_failed_task.py', '--cfg-path', 
'/tmp/tmpbhrfampg']
   INFO  [airflow.jobs.backfill_job_runner.BackfillJobRunner] [backfill 
progress] | finished run 0 of 1 | tasks waiting: 1 | succeeded: 0 | running: 1 
| failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 1
   INFO  [airflow.models.dagbag.DagBag] Filling up the DagBag from 
/opt/airflow/tests/dags/test_backfill_with_upstream_failed_task.py
   INFO  [airflow.cli.commands.task_command] Running <TaskInstance: 
test_backfill_with_upstream_failed_task.failing_task 
backfill__2016-01-01T00:00:00+00:00 [queued]> on host baa783c46d36
   Traceback (most recent call last):
     File "/opt/airflow/airflow/jobs/backfill_job_runner.py", line 700, in 
_process_backfill_task_instances
       session.commit()
     File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", 
line 1454, in commit
       self._transaction.commit(_to_root=self.future)
     File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", 
line 832, in commit
       self._prepare_impl()
     File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", 
line 811, in _prepare_impl
       self.session.flush()
     File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", 
line 3449, in flush
       self._flush(objects)
     File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", 
line 3588, in _flush
       with util.safe_reraise():
     File 
"/usr/local/lib/python3.11/site-packages/sqlalchemy/util/langhelpers.py", line 
70, in __exit__
       compat.raise_(
     File "/usr/local/lib/python3.11/site-packages/sqlalchemy/util/compat.py", 
line 211, in raise_
       raise exception
     File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", 
line 3549, in _flush
       flush_context.execute()
     File 
"/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/unitofwork.py", line 
456, in execute
       rec.execute(self)
     File 
"/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/unitofwork.py", line 
630, in execute
       util.preloaded.orm_persistence.save_obj(
     File 
"/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/persistence.py", line 
237, in save_obj
       _emit_update_statements(
     File 
"/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/persistence.py", line 
1001, in _emit_update_statements
       c = connection._execute_20(
           ^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", 
line 1710, in _execute_20
       return meth(self, args_10style, kwargs_10style, execution_options)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.11/site-packages/sqlalchemy/sql/elements.py", 
line 334, in _execute_on_connection
       return connection._execute_clauseelement(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", 
line 1577, in _execute_clauseelement
       ret = self._execute_context(
             ^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", 
line 1953, in _execute_context
       self._handle_dbapi_exception(
     File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", 
line 2134, in _handle_dbapi_exception
       util.raise_(
     File "/usr/local/lib/python3.11/site-packages/sqlalchemy/util/compat.py", 
line 211, in raise_
       raise exception
     File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", 
line 1910, in _execute_context
       self.dialect.do_execute(
     File 
"/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 
736, in do_execute
       cursor.execute(statement, parameters)
     File "/usr/local/lib/python3.11/site-packages/MySQLdb/cursors.py", line 
179, in execute
       res = self._query(mogrified_query)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.11/site-packages/MySQLdb/cursors.py", line 
330, in _query
       db.query(q)
     File "/usr/local/lib/python3.11/site-packages/MySQLdb/connections.py", 
line 261, in query
       _mysql.connection.query(self, query)
   sqlalchemy.exc.OperationalError: (MySQLdb.OperationalError) (1213, 'Deadlock 
found when trying to get lock; try restarting transaction')
   [SQL: UPDATE dag_run SET last_scheduling_decision=%s, updated_at=%s WHERE 
dag_run.id = %s]
   [parameters: (None, datetime.datetime(2024, 5, 26, 18, 4, 3, 111505), 150)]
   ```
   
   
   
   
   ### Committer
   
   - [X] I acknowledge that I am a maintainer/committer of the Apache Airflow 
project.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to