[ https://issues.apache.org/jira/browse/AIRFLOW-492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15978507#comment-15978507 ]
Bolke de Bruin commented on AIRFLOW-492: ---------------------------------------- Will have a look. In any circumstances updating the stats should never end up in failure of the task. > Insert into dag_stats table results into failed task while task itself > succeeded > -------------------------------------------------------------------------------- > > Key: AIRFLOW-492 > URL: https://issues.apache.org/jira/browse/AIRFLOW-492 > Project: Apache Airflow > Issue Type: Bug > Reporter: Bolke de Bruin > Assignee: Siddharth Anand > Priority: Critical > Attachments: subdag_test.py > > > In some occasions there seem to be a duplicate key being inserted in > dag_stats that results in a task/dag run being marked failed while the task > itself has succeeded. > [2016-09-07 18:44:16,940] {models.py:3912} INFO - Marking run <DagRun > hanging_subdags_n16_sqe.level_2_14 @ 2016-04-21 00:00:00: > backfill_2016-04-21T00:00:00, externally triggered: False> successful > [2016-09-07 18:44:17,671] {models.py:1450} ERROR - > (_mysql_exceptions.IntegrityError) (1062, "Duplicate entry > 'hanging_subdags_n16_sqe.level_2_14-success' for key 'PRIMARY'") [SQL: > u'INSERT INTO dag_stats (dag_id, state, count, dirty) VALUES (%s, %s, %s, > %s)'] [parameters: ('hanging_subdags_n16_sqe.level_2_14', 'success', 3L, 0)] > Traceback (most recent call last): > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/models.py", > line 1409, in run > result = task_copy.execute(context=context) > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/operators/subdag_operator.py", > line 88, in execute > executor=self.executor) > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/models.py", > line 3244, in run > job.run() > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/jobs.py", > line 189, in run > self._execute() > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/jobs.py", > line 1855, in _execute > models.DagStat.clean_dirty([run.dag_id], session=session) > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/utils/db.py", > line 54, in wrapper > result = func(*args, **kwargs) > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/models.py", > line 3695, in clean_dirty > session.commit() > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", > line 801, in commit > self.transaction.commit() > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", > line 392, in commit > self._prepare_impl() > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", > line 372, in _prepare_impl > self.session.flush() > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", > line 2019, in flush > self._flush(objects) > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", > line 2137, in _flush > transaction.rollback(_capture_exception=True) > File > "/usr/local/lib/python2.7/dist-packages/sqlalchemy/util/langhelpers.py", line > 60, in __exit__ > compat.reraise(exc_type, exc_value, exc_tb) > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", > line 2101, in _flush > flush_context.execute() > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/unitofwork.py", > line 373, in execute > rec.execute(self) > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/unitofwork.py", > line 532, in execute > uow > File > "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/persistence.py", line > 174, in save_obj > mapper, table, insert) > File > "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/persistence.py", line > 767, in _emit_insert_statements > execute(statement, multiparams) > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", > line 914, in execute > return meth(self, multiparams, params) > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/sql/elements.py", > line 323, in _execute_on_connection > return connection._execute_clauseelement(self, multiparams, params) > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", > line 1010, in _execute_clauseelement > compiled_sql, distilled_params > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", > line 1146, in _execute_context > context) > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", > line 1341, in _handle_dbapi_exception > exc_info > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/util/compat.py", > line 200, in raise_from_cause > reraise(type(exception), exception, tb=exc_tb, cause=cause) > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", > line 1139, in _execute_context > context) > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/default.py", > line 450, in do_execute > cursor.execute(statement, parameters) > File "/usr/local/lib/python2.7/dist-packages/MySQLdb/cursors.py", line 226, > in execute > self.errorhandler(self, exc, value) > File "/usr/local/lib/python2.7/dist-packages/MySQLdb/connections.py", line > 36, in defaulterrorhandler > raise errorvalue > IntegrityError: (_mysql_exceptions.IntegrityError) (1062, "Duplicate entry > 'hanging_subdags_n16_sqe.level_2_14-success' for key 'PRIMARY'") [SQL: > u'INSERT INTO dag_stats (dag_id, state, count, dirty) VALUES (%s, %s, %s, > %s)'] [parameters: ('hanging_subdags_n16_sqe.level_2_14', 'success', 3L, 0)] > [2016-09-07 18:44:17,787] {models.py:1473} INFO - Marking task as FAILED. -- This message was sent by Atlassian JIRA (v6.3.15#6346)