[jira] [Comment Edited] (AIRFLOW-6518) Task did not retry when there was temporary metastore db connectivity loss

2020-01-15 Thread t oo (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-6518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17016625#comment-17016625
 ] 

t oo edited comment on AIRFLOW-6518 at 1/16/20 7:27 AM:


some options:
1. introduce db retry config - https://stackoverflow.com/a/53300049/8874837
2. set conservative defaults, wrap calls to db functions in try/catch with 
sleep and another call to the db function
ie 
https://github.com/apache/airflow/blob/1.10.6/airflow/models/taskinstance.py#L961-L969
{code}
except AirflowException as e:
self.refresh_from_db()
# for case when task is marked as success/failed externally
# current behavior doesn't hit the success callback
if self.state in {State.SUCCESS, State.FAILED}:
return
else:
self.handle_failure(e, test_mode, context)
raise
{code}
CHANGES TO
{code}
except AirflowException as e:
  try:
  self.refresh_from_db()
  # for case when task is marked as success/failed externally
  # current behavior doesn't hit the success callback
  if self.state in {State.SUCCESS, State.FAILED}:
  return
  else:
  self.handle_failure(e, test_mode, context)
  raise
  except (Exception, KeyboardInterrupt) as e:
  sleep(5)
  self.refresh_from_db()
  # for case when task is marked as success/failed externally
  # current behavior doesn't hit the success callback
  if self.state in {State.SUCCESS, State.FAILED}:
  return
  else:
  self.handle_failure(e, test_mode, context)
  raise
{code}
3. make the db function have try/catch with sleep and repeat
ie in refresh_from_db() of models/taskinstance.py
{code}
TI = TaskInstance

qry = session.query(TI).filter(
TI.dag_id == self.dag_id,
TI.task_id == self.task_id,
TI.execution_date == self.execution_date)

if lock_for_update:
ti = qry.with_for_update().first()
else:
ti = qry.first()
{code}
CHANGES TO
{code}
try:
TI = TaskInstance

qry = session.query(TI).filter(
TI.dag_id == self.dag_id,
TI.task_id == self.task_id,
TI.execution_date == self.execution_date)

if lock_for_update:
ti = qry.with_for_update().first()
else:
ti = qry.first()
except:
   sleep(5)
TI = TaskInstance
qry = session.query(TI).filter(
TI.dag_id == self.dag_id,
TI.task_id == self.task_id,
TI.execution_date == self.execution_date)

if lock_for_update:
ti = qry.with_for_update().first()
else:
ti = qry.first()
{code}


was (Author: toopt4):
some options:
1. introduce db retry config - https://stackoverflow.com/a/53300049/8874837
2. set conservative defaults, wrap calls to db functions in try/catch with 
sleep and another call to the db function
ie 
https://github.com/apache/airflow/blob/1.10.6/airflow/models/taskinstance.py#L961-L969
{code}
except AirflowException as e:
self.refresh_from_db()
# for case when task is marked as success/failed externally
# current behavior doesn't hit the success callback
if self.state in {State.SUCCESS, State.FAILED}:
return
else:
self.handle_failure(e, test_mode, context)
raise
{code}
CHANGES TO
{code}
except AirflowException as e:
  try:
  self.refresh_from_db()
  # for case when task is marked as success/failed externally
  # current behavior doesn't hit the success callback
  if self.state in {State.SUCCESS, State.FAILED}:
  return
  else:
  self.handle_failure(e, test_mode, context)
  raise
  except (Exception, KeyboardInterrupt) as e:
  sleep(5)
  self.refresh_from_db()
  # for case when task is marked as success/failed externally
  # current behavior doesn't hit the success callback
  if self.state in {State.SUCCESS, State.FAILED}:
  return
  else:
  self.handle_failure(e, test_mode, context)
  raise
{code}
3. make the db function have try/catch with sleep and repeat

> Task did not retry when there was temporary metastore db connectivity loss
> --
>
> Key: AIRFLOW-6518
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6518
> Project: Apache

[jira] [Comment Edited] (AIRFLOW-6518) Task did not retry when there was temporary metastore db connectivity loss

2020-01-15 Thread t oo (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-6518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17016625#comment-17016625
 ] 

t oo edited comment on AIRFLOW-6518 at 1/16/20 7:24 AM:


some options:
1. introduce db retry config - https://stackoverflow.com/a/53300049/8874837
2. set conservative defaults, wrap calls to db functions in try/catch with 
sleep and another call to the db function
ie 
https://github.com/apache/airflow/blob/1.10.6/airflow/models/taskinstance.py#L961-L969
{code}
except AirflowException as e:
self.refresh_from_db()
# for case when task is marked as success/failed externally
# current behavior doesn't hit the success callback
if self.state in {State.SUCCESS, State.FAILED}:
return
else:
self.handle_failure(e, test_mode, context)
raise
{code}
CHANGES TO
{code}
except AirflowException as e:
  try:
  self.refresh_from_db()
  # for case when task is marked as success/failed externally
  # current behavior doesn't hit the success callback
  if self.state in {State.SUCCESS, State.FAILED}:
  return
  else:
  self.handle_failure(e, test_mode, context)
  raise
  except (Exception, KeyboardInterrupt) as e:
  sleep(5)
  self.refresh_from_db()
  # for case when task is marked as success/failed externally
  # current behavior doesn't hit the success callback
  if self.state in {State.SUCCESS, State.FAILED}:
  return
  else:
  self.handle_failure(e, test_mode, context)
  raise
{code}
3. make the db function have try/catch with sleep and repeat


was (Author: toopt4):
some options:
1. introduce db retry config - https://stackoverflow.com/a/53300049/8874837
2. set conservative defaults, wrap calls to db functions in try/catch with 
sleep and another call to the db function
ie 
https://github.com/apache/airflow/blob/1.10.6/airflow/models/taskinstance.py#L961-L969
except AirflowException as e:
self.refresh_from_db()
# for case when task is marked as success/failed externally
# current behavior doesn't hit the success callback
if self.state in {State.SUCCESS, State.FAILED}:
return
else:
self.handle_failure(e, test_mode, context)
raise
CHANGES TO
except AirflowException as e:
  try:
  self.refresh_from_db()
  # for case when task is marked as success/failed externally
  # current behavior doesn't hit the success callback
  if self.state in {State.SUCCESS, State.FAILED}:
  return
  else:
  self.handle_failure(e, test_mode, context)
  raise
  except (Exception, KeyboardInterrupt) as e:
  sleep(5)
  self.refresh_from_db()
  # for case when task is marked as success/failed externally
  # current behavior doesn't hit the success callback
  if self.state in {State.SUCCESS, State.FAILED}:
  return
  else:
  self.handle_failure(e, test_mode, context)
  raise
3. make the db function have try/catch with sleep and repeat

> Task did not retry when there was temporary metastore db connectivity loss
> --
>
> Key: AIRFLOW-6518
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6518
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: database, scheduler
>Affects Versions: 1.10.6
>Reporter: t oo
>Priority: Major
>
> My DAG has got retries configured at the task level. I started a dagrun, then 
> while a task was running the metastore db crashed, the task failed, but the 
> dagrun did not attempt to retry the task (even though task retries are 
> configured!), db recovered 3 seconds after the task failed, instead the 
> dagrun went to FAILED state.
> *Last part of log of TaskInstance:*
> [2020-01-08 17:34:46,301] {base_task_runner.py:115} INFO - Job 34662: Subtask 
> mytsk Traceback (most recent call last):
> [2020-01-08 17:34:46,301] {base_task_runner.py:115} INFO - Job 34662: Subtask 
> mytsk   File "/home/ec2-user/venv/bin/airflow", line 37, in 
> [2020-01-08 17:34:46,302] {base_task_runner.py:115} INFO - Job 34662: Subtask 
> mytsk args.func(args)
> [2020-01-08 17:34:46,302] {base_task_runner.py:115} INFO - Job 34662: Subtask 
> mytsk   File 
> "/home/ec2-user/venv/local/lib/python2.7/site-packages/airflow/utils/cli.py",