potiuk opened a new issue, #39858: URL: https://github.com/apache/airflow/issues/39858
### Body The test `test_backfill_failed_dag_with_upstream_failed_task` in tests/jobs/test_backfill_job.py often fails due to deadlock detected. Here is typical stack trace/error: ```python ______ TestBackfillJob.test_backfill_failed_dag_with_upstream_failed_task ______ self = <tests.jobs.test_backfill_job.TestBackfillJob object at 0x7f0f6532ef10> dag_maker = <tests.conftest.dag_maker.<locals>.DagFactory object at 0x7f0f3bfbe9d0> def test_backfill_failed_dag_with_upstream_failed_task(self, dag_maker): self.dagbag.process_file(str(TEST_DAGS_FOLDER / "test_backfill_with_upstream_failed_task.py")) dag = self.dagbag.get_dag("test_backfill_with_upstream_failed_task") # We have to use the "fake" version of perform_heartbeat due to the 'is_unit_test' check in # the original one. However, instead of using the original version of perform_heartbeat, # we can simply wait for a LocalExecutor's worker cycle. The approach with sleep works well now, # but it can be replaced with checking the state of the LocalTaskJob. def fake_perform_heartbeat(*args, **kwargs): import time time.sleep(1) with mock.patch("airflow.jobs.backfill_job_runner.perform_heartbeat", fake_perform_heartbeat): job = Job(executor=ExecutorLoader.load_executor("LocalExecutor")) job_runner = BackfillJobRunner( job=job, dag=dag, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, rerun_failed_tasks=True, ) with pytest.raises(BackfillUnfinished): > run_job(job=job, execute_callable=job_runner._execute) tests/jobs/test_backfill_job.py:2142: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ airflow/utils/session.py:84: in wrapper return func(*args, session=session, **kwargs) airflow/jobs/job.py:410: in run_job return execute_job(job, execute_callable=execute_callable) airflow/jobs/job.py:439: in execute_job ret = execute_callable() airflow/utils/session.py:84: in wrapper return func(*args, session=session, **kwargs) airflow/jobs/backfill_job_runner.py:980: in _execute self._execute_dagruns( airflow/utils/session.py:81: in wrapper return func(*args, **kwargs) airflow/jobs/backfill_job_runner.py:869: in _execute_dagruns processed_dag_run_dates = self._process_backfill_task_instances( airflow/jobs/backfill_job_runner.py:698: in _process_backfill_task_instances _per_task_process(key, ti, session) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ key = TaskInstanceKey(dag_id='test_backfill_with_upstream_failed_task', task_id='downstream_task', run_id='backfill__2016-01-01T00:00:00+00:00', try_number=0, map_index=-1) ti = <TaskInstance: test_backfill_with_upstream_failed_task.downstream_task backfill__2016-01-01T00:00:00+00:00 [upstream_failed]> session = <sqlalchemy.orm.session.Session object at 0x7f0f3bd93c10> def _per_task_process(key, ti: TaskInstance, session): ti.refresh_from_db(lock_for_update=True, session=session) task = self.dag.get_task(ti.task_id, include_subdags=True) ti.task = task self.log.debug("Task instance to run %s state %s", ti, ti.state) # The task was already marked successful or skipped by a # different Job. Don't rerun it. if ti.state == TaskInstanceState.SUCCESS: ti_status.succeeded.add(key) self.log.debug("Task instance %s succeeded. Don't rerun.", ti) ti_status.to_run.pop(key) if key in ti_status.running: ti_status.running.pop(key) return elif ti.state == TaskInstanceState.SKIPPED: ti_status.skipped.add(key) self.log.debug("Task instance %s skipped. Don't rerun.", ti) ti_status.to_run.pop(key) if key in ti_status.running: ti_status.running.pop(key) return if self.rerun_failed_tasks: # Rerun failed tasks or upstreamed failed tasks if ti.state in (TaskInstanceState.FAILED, TaskInstanceState.UPSTREAM_FAILED): self.log.error("Task instance %s with state %s", ti, ti.state) if key in ti_status.running: ti_status.running.pop(key) # Reset the failed task in backfill to scheduled state ti.try_number += 1 ti.set_state(TaskInstanceState.SCHEDULED, session=session) if ti.dag_run not in ti_status.active_runs: ti_status.active_runs.add(ti.dag_run) else: # Default behaviour which works for subdag. if ti.state in (TaskInstanceState.FAILED, TaskInstanceState.UPSTREAM_FAILED): self.log.error("Task instance %s with state %s", ti, ti.state) ti_status.failed.add(key) ti_status.to_run.pop(key) if key in ti_status.running: ti_status.running.pop(key) return if self.ignore_first_depends_on_past: dagrun = ti.get_dagrun(session=session) ignore_depends_on_past = dagrun.execution_date == (start_date or ti.start_date) else: ignore_depends_on_past = False backfill_context = DepContext( deps=BACKFILL_QUEUED_DEPS, ignore_depends_on_past=ignore_depends_on_past, ignore_task_deps=self.ignore_task_deps, wait_for_past_depends_before_skipping=False, flag_upstream_failed=True, ) # Is the task runnable? -- then run it # the dependency checker can change states of tis if ti.are_dependencies_met( dep_context=backfill_context, session=session, verbose=self.verbose ): if executor.has_task(ti): self.log.debug("Task Instance %s already in executor waiting for queue to clear", ti) else: self.log.debug("Sending %s to executor", ti) # Skip scheduled state, we are executing immediately if ti.state in (TaskInstanceState.UP_FOR_RETRY, None): # i am not sure why this is necessary. # seemingly a quirk of backfill runner. # it should be handled elsewhere i think. # seems the leaf tasks are set SCHEDULED but others not. # but i am not going to look too closely since we need # to nuke the current backfill approach anyway. ti.try_number += 1 ti.state = TaskInstanceState.QUEUED ti.queued_by_job_id = self.job.id ti.queued_dttm = timezone.utcnow() session.merge(ti) try: session.commit() except OperationalError: self.log.exception("Failed to commit task state change due to operational error") session.rollback() # early exit so the outer loop can retry return cfg_path = None if executor.is_local: cfg_path = tmp_configuration_copy() executor.queue_task_instance( ti, mark_success=self.mark_success, pickle_id=pickle_id, ignore_task_deps=self.ignore_task_deps, ignore_depends_on_past=ignore_depends_on_past, wait_for_past_depends_before_skipping=False, pool=self.pool, cfg_path=cfg_path, ) ti_status.running[key] = ti ti_status.to_run.pop(key) return if ti.state == TaskInstanceState.UPSTREAM_FAILED: self.log.error("Task instance %s upstream failed", ti) ti_status.failed.add(key) > ti_status.to_run.pop(key) E KeyError: TaskInstanceKey(dag_id='test_backfill_with_upstream_failed_task', task_id='downstream_task', run_id='backfill__2016-01-01T00:00:00+00:00', try_number=0, map_index=-1) airflow/jobs/backfill_job_runner.py:609: KeyError ---------------------------- Captured stderr setup ----------------------------- INFO [airflow.models.dagbag.DagBag] Filling up the DagBag from /dev/null ------------------------------ Captured log setup ------------------------------ INFO airflow.models.dagbag.DagBag:dagbag.py:574 Filling up the DagBag from /dev/null ----------------------------- Captured stderr call ----------------------------- INFO [airflow.executors.local_executor.LocalExecutor] Adding to queue: ['airflow', 'tasks', 'run', 'test_backfill_with_upstream_failed_task', 'failing_task', 'backfill__2016-01-01T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 'DAGS_FOLDER/test_backfill_with_upstream_failed_task.py', '--cfg-path', '/tmp/tmpbhrfampg'] INFO [airflow.executors.local_executor.QueuedLocalWorker] QueuedLocalWorker running ['airflow', 'tasks', 'run', 'test_backfill_with_upstream_failed_task', 'failing_task', 'backfill__2016-01-01T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 'DAGS_FOLDER/test_backfill_with_upstream_failed_task.py', '--cfg-path', '/tmp/tmpbhrfampg'] INFO [airflow.jobs.backfill_job_runner.BackfillJobRunner] [backfill progress] | finished run 0 of 1 | tasks waiting: 1 | succeeded: 0 | running: 1 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 1 INFO [airflow.models.dagbag.DagBag] Filling up the DagBag from /opt/airflow/tests/dags/test_backfill_with_upstream_failed_task.py INFO [airflow.cli.commands.task_command] Running <TaskInstance: test_backfill_with_upstream_failed_task.failing_task backfill__2016-01-01T00:00:00+00:00 [queued]> on host baa783c46d36 Traceback (most recent call last): File "/opt/airflow/airflow/jobs/backfill_job_runner.py", line 700, in _process_backfill_task_instances session.commit() File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 1454, in commit self._transaction.commit(_to_root=self.future) File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 832, in commit self._prepare_impl() File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 811, in _prepare_impl self.session.flush() File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 3449, in flush self._flush(objects) File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 3588, in _flush with util.safe_reraise(): File "/usr/local/lib/python3.11/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__ compat.raise_( File "/usr/local/lib/python3.11/site-packages/sqlalchemy/util/compat.py", line 211, in raise_ raise exception File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 3549, in _flush flush_context.execute() File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute rec.execute(self) File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute util.preloaded.orm_persistence.save_obj( File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/persistence.py", line 237, in save_obj _emit_update_statements( File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/persistence.py", line 1001, in _emit_update_statements c = connection._execute_20( ^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1710, in _execute_20 return meth(self, args_10style, kwargs_10style, execution_options) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection return connection._execute_clauseelement( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1577, in _execute_clauseelement ret = self._execute_context( ^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1953, in _execute_context self._handle_dbapi_exception( File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 2134, in _handle_dbapi_exception util.raise_( File "/usr/local/lib/python3.11/site-packages/sqlalchemy/util/compat.py", line 211, in raise_ raise exception File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1910, in _execute_context self.dialect.do_execute( File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute cursor.execute(statement, parameters) File "/usr/local/lib/python3.11/site-packages/MySQLdb/cursors.py", line 179, in execute res = self._query(mogrified_query) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/MySQLdb/cursors.py", line 330, in _query db.query(q) File "/usr/local/lib/python3.11/site-packages/MySQLdb/connections.py", line 261, in query _mysql.connection.query(self, query) sqlalchemy.exc.OperationalError: (MySQLdb.OperationalError) (1213, 'Deadlock found when trying to get lock; try restarting transaction') [SQL: UPDATE dag_run SET last_scheduling_decision=%s, updated_at=%s WHERE dag_run.id = %s] [parameters: (None, datetime.datetime(2024, 5, 26, 18, 4, 3, 111505), 150)] ``` ### Committer - [X] I acknowledge that I am a maintainer/committer of the Apache Airflow project. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org