[ 
https://issues.apache.org/jira/browse/AIRFLOW-2516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17006364#comment-17006364
 ] 

Jarek Potiuk commented on AIRFLOW-2516:
---------------------------------------

So after closer look, the split to two separate queries was already implemented 
- but it was only used in sqlite. I made a change that it is now also used in 
mysql.

Now the first query runs SELECT FOR UPDATE and finds all TASK instances with 
wrong DAG_RUN states (it will lock only PRIMARY_INDEX). The lock will be held 
until the function exists. It will not try to change task_state so the 
ti_dag_state and ti_state indexes will not be locked. Now - if there will be 
any task instances to fix - it will individually fix their state one-by-one - 
locking the ti_dag_state and ti_state indexes. But then it will be in the right 
sequence PRIMARY first, ti_dag_state, ti_state next. I think that has a chance 
to work.

Since the second part of the query (looping through wrong task_instances and 
fixing them) will only happen IF there are manually updated DagRuns (which 
should be super rare), and since it is run only once every scheduler loop ,  I 
do not think it will have any performance impact at all in most cases (just a 
little slower in case there are many manually modified DagRuns - which should 
pretty much never happen.

I prepared patched versions of scheduler_job.py (for 1.10.6) and jobs.py (for 
1.9) and I have a kind request. Could you please test if the fix works? 

[~fantasylion]  - here is patched 1.10.6 version: 
[^scheduler_job_fixed_deadlock_possibly_1.10.6.py] 

[~0x4ec7] - here is patched 1.9 version: [^jobs_fixed_deadlock_possibly_1.9.py]

It should be enough to replace the scheduler_job.py / jobs.py in your installed 
airflow (just in case also remove any .pyc in case there are some)

 

I'd appreciate if you could see if the locks are still occurring for you with 
those patched versions.

 

All the best in New Year! If that works, it might be a good start of 2020 :).

> Deadlock found when trying to update task_instance table
> --------------------------------------------------------
>
>                 Key: AIRFLOW-2516
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2516
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: DagRun
>    Affects Versions: 1.8.0
>            Reporter: Jeff Liu
>            Priority: Major
>         Attachments: Screenshot 2019-12-30 at 10.42.52.png, 
> image-2019-12-30-10-48-41-313.png, image-2019-12-30-10-58-02-610.png, 
> jobs.py, jobs_fixed_deadlock_possibly.py, 
> jobs_fixed_deadlock_possibly_1.9.py, jobs_fixed_deadlock_possibly_1.9.py, 
> scheduler_job_fixed_deadlock_possibly_1.10.6.py
>
>
>  
>  
> {code:java}
> [2018-05-23 17:59:57,218] {base_task_runner.py:98} INFO - Subtask: 
> [2018-05-23 17:59:57,217] {base_executor.py:49} INFO - Adding to queue: 
> airflow run production_wipeout_wipe_manager.Carat Carat_20180227 
> 2018-05-23T17:41:18.815809 --local -sd DAGS_FOLDER/wipeout/wipeout.py
> [2018-05-23 17:59:57,231] {base_task_runner.py:98} INFO - Subtask: Traceback 
> (most recent call last):
> [2018-05-23 17:59:57,232] {base_task_runner.py:98} INFO - Subtask: File 
> "/usr/local/bin/airflow", line 27, in <module>
> [2018-05-23 17:59:57,232] {base_task_runner.py:98} INFO - Subtask: 
> args.func(args)
> [2018-05-23 17:59:57,232] {base_task_runner.py:98} INFO - Subtask: File 
> "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
> [2018-05-23 17:59:57,232] {base_task_runner.py:98} INFO - Subtask: 
> pool=args.pool,
> [2018-05-23 17:59:57,233] {base_task_runner.py:98} INFO - Subtask: File 
> "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in 
> wrapper
> [2018-05-23 17:59:57,233] {base_task_runner.py:98} INFO - Subtask: result = 
> func(*args, **kwargs)
> [2018-05-23 17:59:57,233] {base_task_runner.py:98} INFO - Subtask: File 
> "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1532, in 
> _run_raw_task
> [2018-05-23 17:59:57,234] {base_task_runner.py:98} INFO - Subtask: 
> self.handle_failure(e, test_mode, context)
> [2018-05-23 17:59:57,234] {base_task_runner.py:98} INFO - Subtask: File 
> "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1641, in 
> handle_failure
> [2018-05-23 17:59:57,234] {base_task_runner.py:98} INFO - Subtask: 
> session.merge(self)
> [2018-05-23 17:59:57,235] {base_task_runner.py:98} INFO - Subtask: File 
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 
> 1920, in merge
> [2018-05-23 17:59:57,235] {base_task_runner.py:98} INFO - Subtask: 
> _resolve_conflict_map=_resolve_conflict_map)
> [2018-05-23 17:59:57,235] {base_task_runner.py:98} INFO - Subtask: File 
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 
> 1974, in _merge
> [2018-05-23 17:59:57,236] {base_task_runner.py:98} INFO - Subtask: merged = 
> self.query(mapper.class_).get(key[1])
> [2018-05-23 17:59:57,236] {base_task_runner.py:98} INFO - Subtask: File 
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 882, 
> in get
> [2018-05-23 17:59:57,236] {base_task_runner.py:98} INFO - Subtask: ident, 
> loading.load_on_pk_identity)
> [2018-05-23 17:59:57,236] {base_task_runner.py:98} INFO - Subtask: File 
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 952, 
> in _get_impl
> [2018-05-23 17:59:57,237] {base_task_runner.py:98} INFO - Subtask: return 
> db_load_fn(self, primary_key_identity)
> [2018-05-23 17:59:57,237] {base_task_runner.py:98} INFO - Subtask: File 
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", line 247, 
> in load_on_pk_i
> dentity
> [2018-05-23 17:59:57,237] {base_task_runner.py:98} INFO - Subtask: return 
> q.one()
> [2018-05-23 17:59:57,238] {base_task_runner.py:98} INFO - Subtask: File 
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2884, 
> in one
> [2018-05-23 17:59:57,238] {base_task_runner.py:98} INFO - Subtask: ret = 
> self.one_or_none()
> [2018-05-23 17:59:57,238] {base_task_runner.py:98} INFO - Subtask: File 
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2854, 
> in one_or_none
> [2018-05-23 17:59:57,238] {base_task_runner.py:98} INFO - Subtask: ret = 
> list(self)
> [2018-05-23 17:59:57,239] {base_task_runner.py:98} INFO - Subtask: File 
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2925, 
> in __iter__
> [2018-05-23 17:59:57,239] {base_task_runner.py:98} INFO - Subtask: return 
> self._execute_and_instances(context)
> [2018-05-23 17:59:57,239] {base_task_runner.py:98} INFO - Subtask: File 
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2946, 
> in _execute_and_instances
> [2018-05-23 17:59:57,240] {base_task_runner.py:98} INFO - Subtask: 
> close_with_result=True)
> [2018-05-23 17:59:57,240] {base_task_runner.py:98} INFO - Subtask: File 
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2955, 
> in _get_bind_ar
> s
> [2018-05-23 17:59:57,240] {base_task_runner.py:98} INFO - Subtask: **kw
> [2018-05-23 17:59:57,240] {base_task_runner.py:98} INFO - Subtask: File 
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2937, 
> in _connection_f
> rom_session
> [2018-05-23 17:59:57,241] {base_task_runner.py:98} INFO - Subtask: conn = 
> self.session.connection(**kw)
> [2018-05-23 17:59:57,241] {base_task_runner.py:98} INFO - Subtask: File 
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 
> 1035, in connection
> [2018-05-23 17:59:57,241] {base_task_runner.py:98} INFO - Subtask: 
> execution_options=execution_options)
> [2018-05-23 17:59:57,241] {base_task_runner.py:98} INFO - Subtask: File 
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 
> 1040, in _connection
> _for_bind
> [2018-05-23 17:59:57,242] {base_task_runner.py:98} INFO - Subtask: engine, 
> execution_options)
> [2018-05-23 17:59:57,242] {base_task_runner.py:98} INFO - Subtask: File 
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 388, 
> in _connection_
> for_bind
> [2018-05-23 17:59:57,242] {base_task_runner.py:98} INFO - Subtask: 
> self._assert_active()
> [2018-05-23 17:59:57,243] {base_task_runner.py:98} INFO - Subtask: File 
> "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 276, 
> in _assert_acti
> ve
> [2018-05-23 17:59:57,243] {base_task_runner.py:98} INFO - Subtask: % 
> self._rollback_exception
> [2018-05-23 17:59:57,244] {base_task_runner.py:98} INFO - Subtask: 
> sqlalchemy.exc.InvalidRequestError: This Session's transaction has been 
> rolled back due to a previou
> s exception during flush. To begin a new transaction with this Session, first 
> issue Session.rollback(). Original exception was: 
> (_mysql_exceptions.OperationalError) (1
> 213, 'Deadlock found when trying to get lock; try restarting transaction') 
> [SQL: u'UPDATE task_instance SET state=%s WHERE task_instance.task_id = %s 
> AND task_instance
> .dag_id = %s AND task_instance.execution_date = %s'] [parameters: (u'queued', 
> 'Carat_20180227', 'production_wipeout_wipe_manager.Carat', 
> datetime.datetime(2018, 5, 23,
> 17, 41, 18, 815809))] (Background on this error at: 
> http://sqlalche.me/e/e3q8){code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to