[jira] [Updated] (AIRFLOW-492) Insert into dag_stats table results into failed task while task itself succeeded

2017-04-26 Thread Chris Riccomini (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Riccomini updated AIRFLOW-492:

Fix Version/s: 1.8.1

> Insert into dag_stats table results into failed task while task itself 
> succeeded
> 
>
> Key: AIRFLOW-492
> URL: https://issues.apache.org/jira/browse/AIRFLOW-492
> Project: Apache Airflow
>  Issue Type: Bug
>Reporter: Bolke de Bruin
>Assignee: Siddharth Anand
>Priority: Critical
> Fix For: 1.8.1
>
> Attachments: subdag_test.py
>
>
> In some occasions there seem to be a duplicate key being inserted in 
> dag_stats that results in a task/dag run being marked failed while the task 
> itself has succeeded.
> [2016-09-07 18:44:16,940] {models.py:3912} INFO - Marking run  hanging_subdags_n16_sqe.level_2_14 @ 2016-04-21 00:00:00: 
> backfill_2016-04-21T00:00:00, externally triggered: False> successful
> [2016-09-07 18:44:17,671] {models.py:1450} ERROR - 
> (_mysql_exceptions.IntegrityError) (1062, "Duplicate entry 
> 'hanging_subdags_n16_sqe.level_2_14-success' for key 'PRIMARY'") [SQL: 
> u'INSERT INTO dag_stats (dag_id, state, count, dirty) VALUES (%s, %s, %s, 
> %s)'] [parameters: ('hanging_subdags_n16_sqe.level_2_14', 'success', 3L, 0)]
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/models.py",
>  line 1409, in run
> result = task_copy.execute(context=context)
>   File 
> "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/operators/subdag_operator.py",
>  line 88, in execute
> executor=self.executor)
>   File 
> "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/models.py",
>  line 3244, in run
> job.run()
>   File 
> "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/jobs.py",
>  line 189, in run
> self._execute()
>   File 
> "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/jobs.py",
>  line 1855, in _execute
> models.DagStat.clean_dirty([run.dag_id], session=session)
>   File 
> "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/utils/db.py",
>  line 54, in wrapper
> result = func(*args, **kwargs)
>   File 
> "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/models.py",
>  line 3695, in clean_dirty
> session.commit()
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", 
> line 801, in commit
> self.transaction.commit()
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", 
> line 392, in commit
> self._prepare_impl()
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", 
> line 372, in _prepare_impl
> self.session.flush()
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", 
> line 2019, in flush
> self._flush(objects)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", 
> line 2137, in _flush
> transaction.rollback(_capture_exception=True)
>   File 
> "/usr/local/lib/python2.7/dist-packages/sqlalchemy/util/langhelpers.py", line 
> 60, in __exit__
> compat.reraise(exc_type, exc_value, exc_tb)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", 
> line 2101, in _flush
> flush_context.execute()
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/unitofwork.py", 
> line 373, in execute
> rec.execute(self)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/unitofwork.py", 
> line 532, in execute
> uow
>   File 
> "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/persistence.py", line 
> 174, in save_obj
> mapper, table, insert)
>   File 
> "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/persistence.py", line 
> 767, in _emit_insert_statements
> execute(statement, multiparams)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", 
> line 914, in execute
> return meth(self, multiparams, params)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/sql/elements.py", 
> line 323, in _execute_on_connection
> return connection._execute_clauseelement(self, multiparams, params)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", 
> line 1010, in _execute_clauseelement
> compiled_sql, distilled_params
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", 
> line 1146, in _execute_context
> context)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", 
> line 1341, in _handle_dbapi_exception
> exc_info
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/util/compat.py", 
> line 200, in raise_from_cause
> reraise(typ

[jira] [Updated] (AIRFLOW-492) Insert into dag_stats table results into failed task while task itself succeeded

2016-09-08 Thread Bolke de Bruin (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bolke de Bruin updated AIRFLOW-492:
---
Attachment: subdag_test.py

> Insert into dag_stats table results into failed task while task itself 
> succeeded
> 
>
> Key: AIRFLOW-492
> URL: https://issues.apache.org/jira/browse/AIRFLOW-492
> Project: Apache Airflow
>  Issue Type: Bug
>Reporter: Bolke de Bruin
>Assignee: siddharth anand
>Priority: Critical
> Attachments: subdag_test.py
>
>
> In some occasions there seem to be a duplicate key being inserted in 
> dag_stats that results in a task/dag run being marked failed while the task 
> itself has succeeded.
> [2016-09-07 18:44:16,940] {models.py:3912} INFO - Marking run  hanging_subdags_n16_sqe.level_2_14 @ 2016-04-21 00:00:00: 
> backfill_2016-04-21T00:00:00, externally triggered: False> successful
> [2016-09-07 18:44:17,671] {models.py:1450} ERROR - 
> (_mysql_exceptions.IntegrityError) (1062, "Duplicate entry 
> 'hanging_subdags_n16_sqe.level_2_14-success' for key 'PRIMARY'") [SQL: 
> u'INSERT INTO dag_stats (dag_id, state, count, dirty) VALUES (%s, %s, %s, 
> %s)'] [parameters: ('hanging_subdags_n16_sqe.level_2_14', 'success', 3L, 0)]
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/models.py",
>  line 1409, in run
> result = task_copy.execute(context=context)
>   File 
> "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/operators/subdag_operator.py",
>  line 88, in execute
> executor=self.executor)
>   File 
> "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/models.py",
>  line 3244, in run
> job.run()
>   File 
> "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/jobs.py",
>  line 189, in run
> self._execute()
>   File 
> "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/jobs.py",
>  line 1855, in _execute
> models.DagStat.clean_dirty([run.dag_id], session=session)
>   File 
> "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/utils/db.py",
>  line 54, in wrapper
> result = func(*args, **kwargs)
>   File 
> "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/models.py",
>  line 3695, in clean_dirty
> session.commit()
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", 
> line 801, in commit
> self.transaction.commit()
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", 
> line 392, in commit
> self._prepare_impl()
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", 
> line 372, in _prepare_impl
> self.session.flush()
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", 
> line 2019, in flush
> self._flush(objects)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", 
> line 2137, in _flush
> transaction.rollback(_capture_exception=True)
>   File 
> "/usr/local/lib/python2.7/dist-packages/sqlalchemy/util/langhelpers.py", line 
> 60, in __exit__
> compat.reraise(exc_type, exc_value, exc_tb)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", 
> line 2101, in _flush
> flush_context.execute()
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/unitofwork.py", 
> line 373, in execute
> rec.execute(self)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/unitofwork.py", 
> line 532, in execute
> uow
>   File 
> "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/persistence.py", line 
> 174, in save_obj
> mapper, table, insert)
>   File 
> "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/persistence.py", line 
> 767, in _emit_insert_statements
> execute(statement, multiparams)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", 
> line 914, in execute
> return meth(self, multiparams, params)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/sql/elements.py", 
> line 323, in _execute_on_connection
> return connection._execute_clauseelement(self, multiparams, params)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", 
> line 1010, in _execute_clauseelement
> compiled_sql, distilled_params
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", 
> line 1146, in _execute_context
> context)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", 
> line 1341, in _handle_dbapi_exception
> exc_info
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/util/compat.py", 
> line 200, in raise_from_cause
> reraise(type(exception), exception, tb