[ 
https://issues.apache.org/jira/browse/AIRFLOW-492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16003139#comment-16003139
 ] 

ASF subversion and git services commented on AIRFLOW-492:
---------------------------------------------------------

Commit e342d0d223e47ea25f73baaa00a16df414a6e0df in incubator-airflow's branch 
refs/heads/v1-8-stable from [~bolke]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=e342d0d ]

[AIRFLOW-492] Make sure stat updates cannot fail a task

Previously a failed commit into the db for the statistics
could also fail a task. Secondly, the ui could display
out of date statistics.

This patch reworks DagStat so that failure to update the
statistics does not propagate. Next to that, it make sure
the ui always displays the latest statistics.

Closes #2254 from bolkedebruin/AIRFLOW-492

(cherry picked from commit c2472ffa124ffc65b8762ea583554494624dbb6a)


> Insert into dag_stats table results into failed task while task itself 
> succeeded
> --------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-492
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-492
>             Project: Apache Airflow
>          Issue Type: Bug
>            Reporter: Bolke de Bruin
>            Assignee: Siddharth Anand
>            Priority: Critical
>             Fix For: 1.8.1
>
>         Attachments: subdag_test.py
>
>
> In some occasions there seem to be a duplicate key being inserted in 
> dag_stats that results in a task/dag run being marked failed while the task 
> itself has succeeded.
> [2016-09-07 18:44:16,940] {models.py:3912} INFO - Marking run <DagRun 
> hanging_subdags_n16_sqe.level_2_14 @ 2016-04-21 00:00:00: 
> backfill_2016-04-21T00:00:00, externally triggered: False> successful
> [2016-09-07 18:44:17,671] {models.py:1450} ERROR - 
> (_mysql_exceptions.IntegrityError) (1062, "Duplicate entry 
> 'hanging_subdags_n16_sqe.level_2_14-success' for key 'PRIMARY'") [SQL: 
> u'INSERT INTO dag_stats (dag_id, state, count, dirty) VALUES (%s, %s, %s, 
> %s)'] [parameters: ('hanging_subdags_n16_sqe.level_2_14', 'success', 3L, 0)]
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/models.py",
>  line 1409, in run
>     result = task_copy.execute(context=context)
>   File 
> "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/operators/subdag_operator.py",
>  line 88, in execute
>     executor=self.executor)
>   File 
> "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/models.py",
>  line 3244, in run
>     job.run()
>   File 
> "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/jobs.py",
>  line 189, in run
>     self._execute()
>   File 
> "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/jobs.py",
>  line 1855, in _execute
>     models.DagStat.clean_dirty([run.dag_id], session=session)
>   File 
> "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/utils/db.py",
>  line 54, in wrapper
>     result = func(*args, **kwargs)
>   File 
> "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/models.py",
>  line 3695, in clean_dirty
>     session.commit()
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", 
> line 801, in commit
>     self.transaction.commit()
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", 
> line 392, in commit
>     self._prepare_impl()
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", 
> line 372, in _prepare_impl
>     self.session.flush()
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", 
> line 2019, in flush
>     self._flush(objects)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", 
> line 2137, in _flush
>     transaction.rollback(_capture_exception=True)
>   File 
> "/usr/local/lib/python2.7/dist-packages/sqlalchemy/util/langhelpers.py", line 
> 60, in __exit__
>     compat.reraise(exc_type, exc_value, exc_tb)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", 
> line 2101, in _flush
>     flush_context.execute()
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/unitofwork.py", 
> line 373, in execute
>     rec.execute(self)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/unitofwork.py", 
> line 532, in execute
>     uow
>   File 
> "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/persistence.py", line 
> 174, in save_obj
>     mapper, table, insert)
>   File 
> "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/persistence.py", line 
> 767, in _emit_insert_statements
>     execute(statement, multiparams)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", 
> line 914, in execute
>     return meth(self, multiparams, params)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/sql/elements.py", 
> line 323, in _execute_on_connection
>     return connection._execute_clauseelement(self, multiparams, params)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", 
> line 1010, in _execute_clauseelement
>     compiled_sql, distilled_params
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", 
> line 1146, in _execute_context
>     context)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", 
> line 1341, in _handle_dbapi_exception
>     exc_info
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/util/compat.py", 
> line 200, in raise_from_cause
>     reraise(type(exception), exception, tb=exc_tb, cause=cause)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", 
> line 1139, in _execute_context
>     context)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/default.py", 
> line 450, in do_execute
>     cursor.execute(statement, parameters)
>   File "/usr/local/lib/python2.7/dist-packages/MySQLdb/cursors.py", line 226, 
> in execute
>     self.errorhandler(self, exc, value)
>   File "/usr/local/lib/python2.7/dist-packages/MySQLdb/connections.py", line 
> 36, in defaulterrorhandler
>     raise errorvalue
> IntegrityError: (_mysql_exceptions.IntegrityError) (1062, "Duplicate entry 
> 'hanging_subdags_n16_sqe.level_2_14-success' for key 'PRIMARY'") [SQL: 
> u'INSERT INTO dag_stats (dag_id, state, count, dirty) VALUES (%s, %s, %s, 
> %s)'] [parameters: ('hanging_subdags_n16_sqe.level_2_14', 'success', 3L, 0)]
> [2016-09-07 18:44:17,787] {models.py:1473} INFO - Marking task as FAILED.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to