[ https://issues.apache.org/jira/browse/AIRFLOW-2516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17001975#comment-17001975 ]
Jarek Potiuk edited comment on AIRFLOW-2516 at 12/25/19 10:46 AM: ------------------------------------------------------------------ I think I have found the reason for deadlock (and probable solution). I would love to hear your opinion about the hypothesis I have and proposed solution [~kamil.bregula] , [~ash] , [~kaxilnaik], It is based on 1.9.0 source code but the same problem occurs in 1.10.x and 2.0.0 of airflow. It looks like the deadlock is caused by two competing Update Queries that are run on the TaskInstance tables. NOTE That those queries are not trying to update the same row in a table! They are merely trying to lock indexes in reverse order each. h2. QUERY ONE: jobs.py: Line 1653 in 1.9.0, scheduler_job.py, Line 1624 in master {code:java} # If a task instance is scheduled or queued, but the corresponding # DAG run isn't running, set the state to NONE so we don't try to # re-run it. self._change_state_for_tis_without_dagrun(simple_dag_bag, [State.QUEUED, State.SCHEDULED], State.NONE) {code} This results in calling this SQLALCHEMY instruction: jobs.py, Line 986 in 1.9.0, scheduler_job.py, Line : 971 in master {code:java} tis_changed = ( session .query(models.TaskInstance) .filter(models.TaskInstance.dag_id.in_(simple_dag_bag.dag_ids)) .filter(models.TaskInstance.state.in_(old_states)) .filter(and_( models.DagRun.dag_id == models.TaskInstance.dag_id, models.DagRun.execution_date == models.TaskInstance.execution_date, models.DagRun.state != State.RUNNING)) .update({models.TaskInstance.state: new_state}, synchronize_session=False) ) session.commit() {code} This query tries to update the state field of Task Instance for task instances that match certain selection criteria which include state (so during the update query it likely wants to keep consistent ti_state index and update it afterwards). It looks like internally it is done in two steps in MySQL - it first finds all the matching rows (and keeps the ti_state index locked during that) and only when it filters all the rows it tries to read all the rows matching using the primary key (task_id, dag_id, execution_date). This is an update query and it tries to lock the Primary key index (even if it is not modified during the update) - to keep consistency most likely. This query in most cases will update no rows as it is only to fix some manually modified DAG runs. In any case those tasks should not be scheduled as they have no RUNNING DagRuns. This is done in the main scheduler loop execute_helper: jobs.py, Line 1604 in 1.9.0, scheduler_job.py, Line 1525 in master: {code:java} # For the execute duration, parse and schedule DAGs while (datetime.utcnow() - execute_start_time).total_seconds() < \ self.run_duration or self.run_duration < 0: self.log.debug("Starting Loop..." loop_start_time = time.time() {code} The second query is simply setting the state of task instance to running. h2. QUERY TWO: The second query is run in parallel by _launch_process on one of the tasks being scheduled. It sets SOME task to be RUNNING. This happens in this place: models.py, Line: 1412 in 1.9.0, task_instance.py: 845 in master: {code:java} self.state = State.RUNNING self.pid = os.getpid() self.end_date = None if not test_mode: session.merge(self) session.commit() {code} This query sets the state on the TaskInstance. First it locks the primary key of the task instance and secondly it locks the ti_state index - to update the state. This reverse locking sequence primary-> ti_state, ti_state -> primary cause the deadlock. I believe the root of the problem is in those two lines (specifically the synchronize_session): {code:java} .update({models.TaskInstance.state: new_state}, synchronize_session=False) {code} The *synchronise_session=False* is here to speed up the query and make it consistent - in case synchronize_session=False this is the most efficient way to execute the query - there is just one SQL query executed to find the matching rows and update them (source: [https://docs.sqlalchemy.org/en/13/orm/query.html#sqlalchemy.orm.query.Query.update] ). If we change it to *synchronize_session=fetch*, the “update” operation will be performed in two steps: 1) first a query will be executed to find the matching rows, and then 2) those matching rows primary keys will be used to perform the state on the selected rows. In this case the first query will return matching values for the primary key and then the update will update state with those primary_keys in WHERE clause. In effect - the first query will not lock either of the indexes, and the second will lock primary key first and ti_state second - thus avoiding the deadlock. I think this query is executed rarely enough to pay the price of less efficient code and avoiding the deadlock. was (Author: higrys): I think I have found the reason for deadlock (and probable solution). I would love to hear your opinion about the hypothesis I have and proposed solution [~kamil.bregula] , [~ash] , [~kaxilnaik], It is based on 1.9.0 source code but the same problem occurs in 1.10.x and 2.0.0 of airflow. It looks like the deadlock is caused by two competing Update Queries that are run on the TaskInstance tables. NOTE That those queries are not trying to update the same row in a table! They are merely trying to lock indexes in reverse order each. h2. QUERY ONE: jobs.py: Line 1653 in 1.9.0, scheduler_job.py, Line 1624 in master {code:java} # If a task instance is scheduled or queued, but the corresponding # DAG run isn't running, set the state to NONE so we don't try to # re-run it. self._change_state_for_tis_without_dagrun(simple_dag_bag, [State.QUEUED, State.SCHEDULED], State.NONE) {code} This results in calling this SQLALCHEMY instruction: jobs.py, Line 986 in 1.9.0, scheduler_job.py, Line : 971 in master {code:java} tis_changed = ( session .query(models.TaskInstance) .filter(models.TaskInstance.dag_id.in_(simple_dag_bag.dag_ids)) .filter(models.TaskInstance.state.in_(old_states)) .filter(and_( models.DagRun.dag_id == models.TaskInstance.dag_id, models.DagRun.execution_date == models.TaskInstance.execution_date, models.DagRun.state != State.RUNNING)) .update({models.TaskInstance.state: new_state}, synchronize_session=False) ) session.commit() {code} This query tries to update the state field of Task Instance for task instances that match certain selection criteria which include state (so during the update query it likely wants to keep consistent ti_state index and update it afterwards). It looks like internally it is done in two steps in MySQL - it first finds all the matching rows (and keeps the ti_state index locked during that) and only when it filters all the rows it tries to read all the rows matching using the primary key (task_id, dag_id, execution_date). This is an update query and it Tries to lock the Primary key index (even if it is not modified during the update) - to keep consistency most likely. This query in most cases will update no rows as it is only to fix some manually modified DAG runs. In any case those tasks should not be scheduled as they have no RUNNING DagRuns. This is done in the main scheduler loop execute_helper: jobs.py, Line 1604 in 1.9.0, scheduler_job.py, Line 1525 in master: {code:java} # For the execute duration, parse and schedule DAGs while (datetime.utcnow() - execute_start_time).total_seconds() < \ self.run_duration or self.run_duration < 0: self.log.debug("Starting Loop..." loop_start_time = time.time() {code} The second query is simply setting the state of task instance to running. h2. QUERY TWO: The second query is run in parallel by _launch_process on one of the tasks being scheduled. It sets SOME task to be RUNNING. This happens in this place: models.py, Line: 1412 in 1.9.0, task_instance.py: 845 in master: {code:java} self.state = State.RUNNING self.pid = os.getpid() self.end_date = None if not test_mode: session.merge(self) session.commit() {code} This query sets the state on the TaskInstance. First it locks the primary key of the task instance and secondly it locks the ti_state index - to update the state. This reverse locking sequence primary-> ti_state, ti_state -> primary cause the deadlock. I believe the root of the problem is in those two lines (specifically the synchronize_session): {code:java} .update({models.TaskInstance.state: new_state}, synchronize_session=False) {code} The *synchronise_session=False* is here to speed up the query and make it consistent - in case synchronize_session=False this is the most efficient way to execute the query - there is just one SQL query executed to find the matching rows and update them (source: [https://docs.sqlalchemy.org/en/13/orm/query.html#sqlalchemy.orm.query.Query.update] ). If we change it to *synchronize_session=fetch*, the “update” operation will be performed in two steps: 1) first a query will be executed to find the matching rows, and then 2) those matching rows primary keys will be used to perform the state on the selected rows. In this case the first query will return matching values for the primary key and then the update will update state with those primary_keys in WHERE clause. In effect - the first query will not lock either of the indexes, and the second will lock primary key first and ti_state second - thus avoiding the deadlock. I think this query is executed rarely enough to pay the price of less efficient code and avoiding the deadlock. > Deadlock found when trying to update task_instance table > -------------------------------------------------------- > > Key: AIRFLOW-2516 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2516 > Project: Apache Airflow > Issue Type: Bug > Components: DagRun > Affects Versions: 1.8.0 > Reporter: Jeff Liu > Priority: Major > > > > {code:java} > [2018-05-23 17:59:57,218] {base_task_runner.py:98} INFO - Subtask: > [2018-05-23 17:59:57,217] {base_executor.py:49} INFO - Adding to queue: > airflow run production_wipeout_wipe_manager.Carat Carat_20180227 > 2018-05-23T17:41:18.815809 --local -sd DAGS_FOLDER/wipeout/wipeout.py > [2018-05-23 17:59:57,231] {base_task_runner.py:98} INFO - Subtask: Traceback > (most recent call last): > [2018-05-23 17:59:57,232] {base_task_runner.py:98} INFO - Subtask: File > "/usr/local/bin/airflow", line 27, in <module> > [2018-05-23 17:59:57,232] {base_task_runner.py:98} INFO - Subtask: > args.func(args) > [2018-05-23 17:59:57,232] {base_task_runner.py:98} INFO - Subtask: File > "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run > [2018-05-23 17:59:57,232] {base_task_runner.py:98} INFO - Subtask: > pool=args.pool, > [2018-05-23 17:59:57,233] {base_task_runner.py:98} INFO - Subtask: File > "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in > wrapper > [2018-05-23 17:59:57,233] {base_task_runner.py:98} INFO - Subtask: result = > func(*args, **kwargs) > [2018-05-23 17:59:57,233] {base_task_runner.py:98} INFO - Subtask: File > "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1532, in > _run_raw_task > [2018-05-23 17:59:57,234] {base_task_runner.py:98} INFO - Subtask: > self.handle_failure(e, test_mode, context) > [2018-05-23 17:59:57,234] {base_task_runner.py:98} INFO - Subtask: File > "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1641, in > handle_failure > [2018-05-23 17:59:57,234] {base_task_runner.py:98} INFO - Subtask: > session.merge(self) > [2018-05-23 17:59:57,235] {base_task_runner.py:98} INFO - Subtask: File > "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line > 1920, in merge > [2018-05-23 17:59:57,235] {base_task_runner.py:98} INFO - Subtask: > _resolve_conflict_map=_resolve_conflict_map) > [2018-05-23 17:59:57,235] {base_task_runner.py:98} INFO - Subtask: File > "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line > 1974, in _merge > [2018-05-23 17:59:57,236] {base_task_runner.py:98} INFO - Subtask: merged = > self.query(mapper.class_).get(key[1]) > [2018-05-23 17:59:57,236] {base_task_runner.py:98} INFO - Subtask: File > "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 882, > in get > [2018-05-23 17:59:57,236] {base_task_runner.py:98} INFO - Subtask: ident, > loading.load_on_pk_identity) > [2018-05-23 17:59:57,236] {base_task_runner.py:98} INFO - Subtask: File > "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 952, > in _get_impl > [2018-05-23 17:59:57,237] {base_task_runner.py:98} INFO - Subtask: return > db_load_fn(self, primary_key_identity) > [2018-05-23 17:59:57,237] {base_task_runner.py:98} INFO - Subtask: File > "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", line 247, > in load_on_pk_i > dentity > [2018-05-23 17:59:57,237] {base_task_runner.py:98} INFO - Subtask: return > q.one() > [2018-05-23 17:59:57,238] {base_task_runner.py:98} INFO - Subtask: File > "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2884, > in one > [2018-05-23 17:59:57,238] {base_task_runner.py:98} INFO - Subtask: ret = > self.one_or_none() > [2018-05-23 17:59:57,238] {base_task_runner.py:98} INFO - Subtask: File > "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2854, > in one_or_none > [2018-05-23 17:59:57,238] {base_task_runner.py:98} INFO - Subtask: ret = > list(self) > [2018-05-23 17:59:57,239] {base_task_runner.py:98} INFO - Subtask: File > "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2925, > in __iter__ > [2018-05-23 17:59:57,239] {base_task_runner.py:98} INFO - Subtask: return > self._execute_and_instances(context) > [2018-05-23 17:59:57,239] {base_task_runner.py:98} INFO - Subtask: File > "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2946, > in _execute_and_instances > [2018-05-23 17:59:57,240] {base_task_runner.py:98} INFO - Subtask: > close_with_result=True) > [2018-05-23 17:59:57,240] {base_task_runner.py:98} INFO - Subtask: File > "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2955, > in _get_bind_ar > s > [2018-05-23 17:59:57,240] {base_task_runner.py:98} INFO - Subtask: **kw > [2018-05-23 17:59:57,240] {base_task_runner.py:98} INFO - Subtask: File > "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2937, > in _connection_f > rom_session > [2018-05-23 17:59:57,241] {base_task_runner.py:98} INFO - Subtask: conn = > self.session.connection(**kw) > [2018-05-23 17:59:57,241] {base_task_runner.py:98} INFO - Subtask: File > "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line > 1035, in connection > [2018-05-23 17:59:57,241] {base_task_runner.py:98} INFO - Subtask: > execution_options=execution_options) > [2018-05-23 17:59:57,241] {base_task_runner.py:98} INFO - Subtask: File > "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line > 1040, in _connection > _for_bind > [2018-05-23 17:59:57,242] {base_task_runner.py:98} INFO - Subtask: engine, > execution_options) > [2018-05-23 17:59:57,242] {base_task_runner.py:98} INFO - Subtask: File > "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 388, > in _connection_ > for_bind > [2018-05-23 17:59:57,242] {base_task_runner.py:98} INFO - Subtask: > self._assert_active() > [2018-05-23 17:59:57,243] {base_task_runner.py:98} INFO - Subtask: File > "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 276, > in _assert_acti > ve > [2018-05-23 17:59:57,243] {base_task_runner.py:98} INFO - Subtask: % > self._rollback_exception > [2018-05-23 17:59:57,244] {base_task_runner.py:98} INFO - Subtask: > sqlalchemy.exc.InvalidRequestError: This Session's transaction has been > rolled back due to a previou > s exception during flush. To begin a new transaction with this Session, first > issue Session.rollback(). Original exception was: > (_mysql_exceptions.OperationalError) (1 > 213, 'Deadlock found when trying to get lock; try restarting transaction') > [SQL: u'UPDATE task_instance SET state=%s WHERE task_instance.task_id = %s > AND task_instance > .dag_id = %s AND task_instance.execution_date = %s'] [parameters: (u'queued', > 'Carat_20180227', 'production_wipeout_wipe_manager.Carat', > datetime.datetime(2018, 5, 23, > 17, 41, 18, 815809))] (Background on this error at: > http://sqlalche.me/e/e3q8){code} -- This message was sent by Atlassian Jira (v8.3.4#803005)