[jira] [Comment Edited] (AIRFLOW-6518) Task did not retry when there was temporary metastore db connectivity loss
[ https://issues.apache.org/jira/browse/AIRFLOW-6518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17016625#comment-17016625 ] t oo edited comment on AIRFLOW-6518 at 1/16/20 7:27 AM: some options: 1. introduce db retry config - https://stackoverflow.com/a/53300049/8874837 2. set conservative defaults, wrap calls to db functions in try/catch with sleep and another call to the db function ie https://github.com/apache/airflow/blob/1.10.6/airflow/models/taskinstance.py#L961-L969 {code} except AirflowException as e: self.refresh_from_db() # for case when task is marked as success/failed externally # current behavior doesn't hit the success callback if self.state in {State.SUCCESS, State.FAILED}: return else: self.handle_failure(e, test_mode, context) raise {code} CHANGES TO {code} except AirflowException as e: try: self.refresh_from_db() # for case when task is marked as success/failed externally # current behavior doesn't hit the success callback if self.state in {State.SUCCESS, State.FAILED}: return else: self.handle_failure(e, test_mode, context) raise except (Exception, KeyboardInterrupt) as e: sleep(5) self.refresh_from_db() # for case when task is marked as success/failed externally # current behavior doesn't hit the success callback if self.state in {State.SUCCESS, State.FAILED}: return else: self.handle_failure(e, test_mode, context) raise {code} 3. make the db function have try/catch with sleep and repeat ie in refresh_from_db() of models/taskinstance.py {code} TI = TaskInstance qry = session.query(TI).filter( TI.dag_id == self.dag_id, TI.task_id == self.task_id, TI.execution_date == self.execution_date) if lock_for_update: ti = qry.with_for_update().first() else: ti = qry.first() {code} CHANGES TO {code} try: TI = TaskInstance qry = session.query(TI).filter( TI.dag_id == self.dag_id, TI.task_id == self.task_id, TI.execution_date == self.execution_date) if lock_for_update: ti = qry.with_for_update().first() else: ti = qry.first() except: sleep(5) TI = TaskInstance qry = session.query(TI).filter( TI.dag_id == self.dag_id, TI.task_id == self.task_id, TI.execution_date == self.execution_date) if lock_for_update: ti = qry.with_for_update().first() else: ti = qry.first() {code} was (Author: toopt4): some options: 1. introduce db retry config - https://stackoverflow.com/a/53300049/8874837 2. set conservative defaults, wrap calls to db functions in try/catch with sleep and another call to the db function ie https://github.com/apache/airflow/blob/1.10.6/airflow/models/taskinstance.py#L961-L969 {code} except AirflowException as e: self.refresh_from_db() # for case when task is marked as success/failed externally # current behavior doesn't hit the success callback if self.state in {State.SUCCESS, State.FAILED}: return else: self.handle_failure(e, test_mode, context) raise {code} CHANGES TO {code} except AirflowException as e: try: self.refresh_from_db() # for case when task is marked as success/failed externally # current behavior doesn't hit the success callback if self.state in {State.SUCCESS, State.FAILED}: return else: self.handle_failure(e, test_mode, context) raise except (Exception, KeyboardInterrupt) as e: sleep(5) self.refresh_from_db() # for case when task is marked as success/failed externally # current behavior doesn't hit the success callback if self.state in {State.SUCCESS, State.FAILED}: return else: self.handle_failure(e, test_mode, context) raise {code} 3. make the db function have try/catch with sleep and repeat > Task did not retry when there was temporary metastore db connectivity loss > -- > > Key: AIRFLOW-6518 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6518 > Project: Apache
[jira] [Comment Edited] (AIRFLOW-6518) Task did not retry when there was temporary metastore db connectivity loss
[ https://issues.apache.org/jira/browse/AIRFLOW-6518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17016625#comment-17016625 ] t oo edited comment on AIRFLOW-6518 at 1/16/20 7:24 AM: some options: 1. introduce db retry config - https://stackoverflow.com/a/53300049/8874837 2. set conservative defaults, wrap calls to db functions in try/catch with sleep and another call to the db function ie https://github.com/apache/airflow/blob/1.10.6/airflow/models/taskinstance.py#L961-L969 {code} except AirflowException as e: self.refresh_from_db() # for case when task is marked as success/failed externally # current behavior doesn't hit the success callback if self.state in {State.SUCCESS, State.FAILED}: return else: self.handle_failure(e, test_mode, context) raise {code} CHANGES TO {code} except AirflowException as e: try: self.refresh_from_db() # for case when task is marked as success/failed externally # current behavior doesn't hit the success callback if self.state in {State.SUCCESS, State.FAILED}: return else: self.handle_failure(e, test_mode, context) raise except (Exception, KeyboardInterrupt) as e: sleep(5) self.refresh_from_db() # for case when task is marked as success/failed externally # current behavior doesn't hit the success callback if self.state in {State.SUCCESS, State.FAILED}: return else: self.handle_failure(e, test_mode, context) raise {code} 3. make the db function have try/catch with sleep and repeat was (Author: toopt4): some options: 1. introduce db retry config - https://stackoverflow.com/a/53300049/8874837 2. set conservative defaults, wrap calls to db functions in try/catch with sleep and another call to the db function ie https://github.com/apache/airflow/blob/1.10.6/airflow/models/taskinstance.py#L961-L969 except AirflowException as e: self.refresh_from_db() # for case when task is marked as success/failed externally # current behavior doesn't hit the success callback if self.state in {State.SUCCESS, State.FAILED}: return else: self.handle_failure(e, test_mode, context) raise CHANGES TO except AirflowException as e: try: self.refresh_from_db() # for case when task is marked as success/failed externally # current behavior doesn't hit the success callback if self.state in {State.SUCCESS, State.FAILED}: return else: self.handle_failure(e, test_mode, context) raise except (Exception, KeyboardInterrupt) as e: sleep(5) self.refresh_from_db() # for case when task is marked as success/failed externally # current behavior doesn't hit the success callback if self.state in {State.SUCCESS, State.FAILED}: return else: self.handle_failure(e, test_mode, context) raise 3. make the db function have try/catch with sleep and repeat > Task did not retry when there was temporary metastore db connectivity loss > -- > > Key: AIRFLOW-6518 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6518 > Project: Apache Airflow > Issue Type: Bug > Components: database, scheduler >Affects Versions: 1.10.6 >Reporter: t oo >Priority: Major > > My DAG has got retries configured at the task level. I started a dagrun, then > while a task was running the metastore db crashed, the task failed, but the > dagrun did not attempt to retry the task (even though task retries are > configured!), db recovered 3 seconds after the task failed, instead the > dagrun went to FAILED state. > *Last part of log of TaskInstance:* > [2020-01-08 17:34:46,301] {base_task_runner.py:115} INFO - Job 34662: Subtask > mytsk Traceback (most recent call last): > [2020-01-08 17:34:46,301] {base_task_runner.py:115} INFO - Job 34662: Subtask > mytsk File "/home/ec2-user/venv/bin/airflow", line 37, in > [2020-01-08 17:34:46,302] {base_task_runner.py:115} INFO - Job 34662: Subtask > mytsk args.func(args) > [2020-01-08 17:34:46,302] {base_task_runner.py:115} INFO - Job 34662: Subtask > mytsk File > "/home/ec2-user/venv/local/lib/python2.7/site-packages/airflow/utils/cli.py",