[ 
https://issues.apache.org/jira/browse/AIRFLOW-2516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17001975#comment-17001975
 ] 

Jarek Potiuk edited comment on AIRFLOW-2516 at 12/25/19 10:46 AM:
------------------------------------------------------------------

I think I have found the reason for deadlock (and probable solution). 

I would love to hear your opinion about the hypothesis I have and proposed 
solution [~kamil.bregula] , [~ash] , [~kaxilnaik],

It is based on 1.9.0 source code but the same problem occurs in 1.10.x and 
2.0.0 of airflow.

It looks like the deadlock is caused by two competing Update Queries that are 
run on the TaskInstance tables.

NOTE That those queries are not trying to update the same row in a table! They 
are merely trying to lock indexes in reverse order each.
h2. QUERY ONE:

jobs.py: Line 1653 in 1.9.0, scheduler_job.py, Line 1624 in master

 

 
{code:java}
                # If a task instance is scheduled or queued, but the 
corresponding
                # DAG run isn't running, set the state to NONE so we don't try 
to
                # re-run it.
                self._change_state_for_tis_without_dagrun(simple_dag_bag,
                                                          [State.QUEUED,
                                                           State.SCHEDULED],
                                                          State.NONE)
{code}
 

This results in calling this SQLALCHEMY instruction:

jobs.py, Line 986 in 1.9.0, scheduler_job.py, Line : 971 in master

 

 
{code:java}
            tis_changed = (
                session
                    .query(models.TaskInstance)
                    
.filter(models.TaskInstance.dag_id.in_(simple_dag_bag.dag_ids))
                    .filter(models.TaskInstance.state.in_(old_states))
                    .filter(and_(
                        models.DagRun.dag_id == models.TaskInstance.dag_id,
                        models.DagRun.execution_date == 
models.TaskInstance.execution_date,
                        models.DagRun.state != State.RUNNING))
                    .update({models.TaskInstance.state: new_state},
                            synchronize_session=False)
            )
            session.commit()
{code}
 

 

This query tries to update the state field of Task Instance for task instances 
that match certain selection criteria which include state (so during the update 
query it likely wants to keep consistent ti_state index and update it 
afterwards). It looks like internally it is done in two steps in MySQL - it 
first finds all the matching rows (and keeps the ti_state index locked during 
that) and only when it filters all the rows it tries to read all the rows 
matching using the primary key (task_id, dag_id, execution_date). This is an 
update query and it tries to lock the Primary key index (even if it is not 
modified during the update) - to keep consistency most likely. This query in 
most cases will update no rows as it is only to fix some manually modified DAG 
runs. In any case those tasks should not be scheduled as they have no RUNNING 
DagRuns.

 

This is done in the main scheduler loop execute_helper:

 

jobs.py, Line 1604 in 1.9.0, scheduler_job.py, Line 1525 in master:

 

 
{code:java}
        # For the execute duration, parse and schedule DAGs
        while (datetime.utcnow() - execute_start_time).total_seconds() < \
                self.run_duration or self.run_duration < 0:
            self.log.debug("Starting Loop..."
            loop_start_time = time.time()
 
{code}
 

The second query is simply setting the state of task instance to running.

 
h2. QUERY TWO:

The second query is run in parallel by _launch_process on one of the tasks 
being scheduled. It sets SOME task to be RUNNING. This happens in this place:

 

models.py, Line: 1412 in 1.9.0, task_instance.py:  845 in master:

 

 
{code:java}
        self.state = State.RUNNING
        self.pid = os.getpid()
        self.end_date = None
        if not test_mode:
            session.merge(self)
        session.commit()
{code}
 

This query sets the state on the TaskInstance. First it locks the primary key 
of the task instance and secondly it locks the ti_state index - to update the 
state. 

This reverse locking sequence primary-> ti_state, ti_state -> primary cause the 
deadlock.

 

I believe the root of the problem is in those two lines (specifically the 
synchronize_session):

 
{code:java}
                    .update({models.TaskInstance.state: new_state},
                            synchronize_session=False)
{code}
 

The *synchronise_session=False* is here to speed up the query and make it 
consistent - in case synchronize_session=False this is the most efficient way 
to execute the query - there is just one SQL query executed to find the 
matching rows and update them (source: 
[https://docs.sqlalchemy.org/en/13/orm/query.html#sqlalchemy.orm.query.Query.update]
 ).

If we change it to *synchronize_session=fetch*, the “update” operation will be 
performed in two steps:

1) first a query will be executed to find the matching rows, and then

2) those matching rows primary keys will be used to perform the state on the 
selected rows.

 

In this case the first query will return matching values for the primary key 
and then the update will update state with those primary_keys in WHERE clause. 
In effect - the first query will not lock either of the indexes, and the second 
will lock primary key first and ti_state second - thus avoiding the deadlock.

I think this query is executed rarely enough to pay the price of less efficient 
code and avoiding the deadlock.

 

 


was (Author: higrys):
I think I have found the reason for deadlock (and probable solution). 

I would love to hear your opinion about the hypothesis I have and proposed 
solution [~kamil.bregula] , [~ash] , [~kaxilnaik],

It is based on 1.9.0 source code but the same problem occurs in 1.10.x and 
2.0.0 of airflow.

It looks like the deadlock is caused by two competing Update Queries that are 
run on the TaskInstance tables.

NOTE That those queries are not trying to update the same row in a table! They 
are merely trying to lock indexes in reverse order each.
h2. QUERY ONE:

jobs.py: Line 1653 in 1.9.0, scheduler_job.py, Line 1624 in master

 

 
{code:java}
                # If a task instance is scheduled or queued, but the 
corresponding
                # DAG run isn't running, set the state to NONE so we don't try 
to
                # re-run it.
                self._change_state_for_tis_without_dagrun(simple_dag_bag,
                                                          [State.QUEUED,
                                                           State.SCHEDULED],
                                                          State.NONE)
{code}
 

This results in calling this SQLALCHEMY instruction:

jobs.py, Line 986 in 1.9.0, scheduler_job.py, Line : 971 in master

 

 
{code:java}
            tis_changed = (
                session
                    .query(models.TaskInstance)
                    
.filter(models.TaskInstance.dag_id.in_(simple_dag_bag.dag_ids))
                    .filter(models.TaskInstance.state.in_(old_states))
                    .filter(and_(
                        models.DagRun.dag_id == models.TaskInstance.dag_id,
                        models.DagRun.execution_date == 
models.TaskInstance.execution_date,
                        models.DagRun.state != State.RUNNING))
                    .update({models.TaskInstance.state: new_state},
                            synchronize_session=False)
            )
            session.commit()
{code}
 

 

This query tries to update the state field of Task Instance for task instances 
that match certain selection criteria which include state (so during the update 
query it likely wants to keep consistent ti_state index and update it 
afterwards). It looks like internally it is done

in two steps in MySQL - it first finds all the matching rows (and keeps the 
ti_state index locked during that) and only when it filters all the rows it 
tries to read all the rows matching using the primary key (task_id, dag_id, 
execution_date). This is an update query and it

Tries to lock the Primary key index (even if it is not modified during the 
update) - to keep consistency most likely. This query in most cases will update 
no rows as it is only to fix some manually modified DAG runs. In any case those 
tasks should not be scheduled as they have no RUNNING DagRuns.

 

This is done in the main scheduler loop execute_helper:

 

jobs.py, Line 1604 in 1.9.0, scheduler_job.py, Line 1525 in master:

 

 
{code:java}
        # For the execute duration, parse and schedule DAGs
        while (datetime.utcnow() - execute_start_time).total_seconds() < \
                self.run_duration or self.run_duration < 0:
            self.log.debug("Starting Loop..."
            loop_start_time = time.time()
 
{code}
 

The second query is simply setting the state of task instance to running.

 
h2. QUERY TWO:

The second query is run in parallel by _launch_process on one of the tasks 
being scheduled. It sets SOME task to be RUNNING. This happens in this place:

 

models.py, Line: 1412 in 1.9.0, task_instance.py:  845 in master:

 

 
{code:java}
        self.state = State.RUNNING
        self.pid = os.getpid()
        self.end_date = None
        if not test_mode:
            session.merge(self)
        session.commit()
{code}
 

This query sets the state on the TaskInstance. First it locks the primary key 
of the task instance and secondly it locks the ti_state index - to update the 
state. 

This reverse locking sequence primary-> ti_state, ti_state -> primary cause the 
deadlock.

 

I believe the root of the problem is in those two lines (specifically the 
synchronize_session):

 
{code:java}
                    .update({models.TaskInstance.state: new_state},
                            synchronize_session=False)
{code}
 

The *synchronise_session=False* is here to speed up the query and make it 
consistent - in case synchronize_session=False this is the most efficient way 
to execute the query - there is just one SQL query executed to find the 
matching rows and update them (source: 
[https://docs.sqlalchemy.org/en/13/orm/query.html#sqlalchemy.orm.query.Query.update]
 ).

If we change it to *synchronize_session=fetch*, the “update” operation will be 
performed in two steps:

1) first a query will be executed to find the matching rows, and then

2) those matching rows primary keys will be used to perform the state on the 
selected rows.

 

In this case the first query will return matching values for the primary key 
and then the update will update state with those primary_keys in WHERE clause. 
In effect - the first query will not lock either of the indexes, and the second 
will lock primary key first and ti_state second - thus avoiding the deadlock.

I think this query is executed rarely enough to pay the price of less efficient 
code and avoiding the deadlock.

 

 

> Deadlock found when trying to update task_instance table
> --------------------------------------------------------
>
>                 Key: AIRFLOW-2516
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2516
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: DagRun
>    Affects Versions: 1.8.0
>            Reporter: Jeff Liu
>            Priority: Major
>
>  
>  
> {code:java}
> [2018-05-23 17:59:57,218] {base_task_runner.py:98} INFO - Subtask: 
> [2018-05-23 17:59:57,217] {base_executor.py:49} INFO - Adding to queue: 
> airflow run production_wipeout_wipe_manager.Carat Carat_20180227 
> 2018-05-23T17:41:18.815809 --local -sd DAGS_FOLDER/wipeout/wipeout.py
> [2018-05-23 17:59:57,231] {base_task_runner.py:98} INFO - Subtask: Traceback 
> (most recent call last):
> [2018-05-23 17:59:57,232] {base_task_runner.py:98} INFO - Subtask: File 
> "/usr/local/bin/airflow", line 27, in <module>
> [2018-05-23 17:59:57,232] {base_task_runner.py:98} INFO - Subtask: 
> args.func(args)
> [2018-05-23 17:59:57,232] {base_task_runner.py:98} INFO - Subtask: File 
> "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
> [2018-05-23 17:59:57,232] {base_task_runner.py:98} INFO - Subtask: 
> pool=args.pool,
> [2018-05-23 17:59:57,233] {base_task_runner.py:98} INFO - Subtask: File 
> "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in 
> wrapper
> [2018-05-23 17:59:57,233] {base_task_runner.py:98} INFO - Subtask: result = 
> func(*args, **kwargs)
> [2018-05-23 17:59:57,233] {base_task_runner.py:98} INFO - Subtask: File 
> "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1532, in 
> _run_raw_task
> [2018-05-23 17:59:57,234] {base_task_runner.py:98} INFO - Subtask: 
> self.handle_failure(e, test_mode, context)
> [2018-05-23 17:59:57,234] {base_task_runner.py:98} INFO - Subtask: File 
> "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1641, in 
> handle_failure
> [2018-05-23 17:59:57,234] {base_task_runner.py:98} INFO - Subtask: 
> session.merge(self)
> [2018-05-23 17:59:57,235] {base_task_runner.py:98} INFO - Subtask: File 
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 
> 1920, in merge
> [2018-05-23 17:59:57,235] {base_task_runner.py:98} INFO - Subtask: 
> _resolve_conflict_map=_resolve_conflict_map)
> [2018-05-23 17:59:57,235] {base_task_runner.py:98} INFO - Subtask: File 
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 
> 1974, in _merge
> [2018-05-23 17:59:57,236] {base_task_runner.py:98} INFO - Subtask: merged = 
> self.query(mapper.class_).get(key[1])
> [2018-05-23 17:59:57,236] {base_task_runner.py:98} INFO - Subtask: File 
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 882, 
> in get
> [2018-05-23 17:59:57,236] {base_task_runner.py:98} INFO - Subtask: ident, 
> loading.load_on_pk_identity)
> [2018-05-23 17:59:57,236] {base_task_runner.py:98} INFO - Subtask: File 
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 952, 
> in _get_impl
> [2018-05-23 17:59:57,237] {base_task_runner.py:98} INFO - Subtask: return 
> db_load_fn(self, primary_key_identity)
> [2018-05-23 17:59:57,237] {base_task_runner.py:98} INFO - Subtask: File 
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", line 247, 
> in load_on_pk_i
> dentity
> [2018-05-23 17:59:57,237] {base_task_runner.py:98} INFO - Subtask: return 
> q.one()
> [2018-05-23 17:59:57,238] {base_task_runner.py:98} INFO - Subtask: File 
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2884, 
> in one
> [2018-05-23 17:59:57,238] {base_task_runner.py:98} INFO - Subtask: ret = 
> self.one_or_none()
> [2018-05-23 17:59:57,238] {base_task_runner.py:98} INFO - Subtask: File 
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2854, 
> in one_or_none
> [2018-05-23 17:59:57,238] {base_task_runner.py:98} INFO - Subtask: ret = 
> list(self)
> [2018-05-23 17:59:57,239] {base_task_runner.py:98} INFO - Subtask: File 
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2925, 
> in __iter__
> [2018-05-23 17:59:57,239] {base_task_runner.py:98} INFO - Subtask: return 
> self._execute_and_instances(context)
> [2018-05-23 17:59:57,239] {base_task_runner.py:98} INFO - Subtask: File 
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2946, 
> in _execute_and_instances
> [2018-05-23 17:59:57,240] {base_task_runner.py:98} INFO - Subtask: 
> close_with_result=True)
> [2018-05-23 17:59:57,240] {base_task_runner.py:98} INFO - Subtask: File 
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2955, 
> in _get_bind_ar
> s
> [2018-05-23 17:59:57,240] {base_task_runner.py:98} INFO - Subtask: **kw
> [2018-05-23 17:59:57,240] {base_task_runner.py:98} INFO - Subtask: File 
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2937, 
> in _connection_f
> rom_session
> [2018-05-23 17:59:57,241] {base_task_runner.py:98} INFO - Subtask: conn = 
> self.session.connection(**kw)
> [2018-05-23 17:59:57,241] {base_task_runner.py:98} INFO - Subtask: File 
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 
> 1035, in connection
> [2018-05-23 17:59:57,241] {base_task_runner.py:98} INFO - Subtask: 
> execution_options=execution_options)
> [2018-05-23 17:59:57,241] {base_task_runner.py:98} INFO - Subtask: File 
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 
> 1040, in _connection
> _for_bind
> [2018-05-23 17:59:57,242] {base_task_runner.py:98} INFO - Subtask: engine, 
> execution_options)
> [2018-05-23 17:59:57,242] {base_task_runner.py:98} INFO - Subtask: File 
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 388, 
> in _connection_
> for_bind
> [2018-05-23 17:59:57,242] {base_task_runner.py:98} INFO - Subtask: 
> self._assert_active()
> [2018-05-23 17:59:57,243] {base_task_runner.py:98} INFO - Subtask: File 
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 276, 
> in _assert_acti
> ve
> [2018-05-23 17:59:57,243] {base_task_runner.py:98} INFO - Subtask: % 
> self._rollback_exception
> [2018-05-23 17:59:57,244] {base_task_runner.py:98} INFO - Subtask: 
> sqlalchemy.exc.InvalidRequestError: This Session's transaction has been 
> rolled back due to a previou
> s exception during flush. To begin a new transaction with this Session, first 
> issue Session.rollback(). Original exception was: 
> (_mysql_exceptions.OperationalError) (1
> 213, 'Deadlock found when trying to get lock; try restarting transaction') 
> [SQL: u'UPDATE task_instance SET state=%s WHERE task_instance.task_id = %s 
> AND task_instance
> .dag_id = %s AND task_instance.execution_date = %s'] [parameters: (u'queued', 
> 'Carat_20180227', 'production_wipeout_wipe_manager.Carat', 
> datetime.datetime(2018, 5, 23,
> 17, 41, 18, 815809))] (Background on this error at: 
> http://sqlalche.me/e/e3q8){code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to