[ 
https://issues.apache.org/jira/browse/AIRFLOW-1039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ash Berlin-Taylor closed AIRFLOW-1039.
--------------------------------------
    Resolution: Won't Fix

Think this is won't fix - now that run times are millisecond (or are they 
microsecond) accuracy the likely hood of a clash is smaller anyway.

Anyone feel free to re-open if you think I'm wrong and we should handle this

> Airflow is raising IntegrityError when during parallel DAG trigger
> ------------------------------------------------------------------
>
>                 Key: AIRFLOW-1039
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1039
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: DagRun
>    Affects Versions: 1.8.0
>            Reporter: Matus Valo
>            Priority: Minor
>
> When Two concurrent processes are trying to trigger the same dag with the 
> same execution date at the same time, the IntegrityError is thrown by 
> SQLAlchemy:
> uwsgi[15887]: [2017-03-24 12:51:38,074] {app.py:1587} ERROR - Exception on / 
> [POST]
> uwsgi[15887]: Traceback (most recent call last):
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py", line 
> 1988, in wsgi_app
> uwsgi[15887]: response = self.full_dispatch_request()
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py", line 
> 1641, in full_dispatch_request
> uwsgi[15887]: rv = self.handle_user_exception(e)
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py", line 
> 1544, in handle_user_exception
> uwsgi[15887]: reraise(exc_type, exc_value, tb)
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py", line 
> 1639, in full_dispatch_request
> uwsgi[15887]: rv = self.dispatch_request()
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py", line 
> 1625, in dispatch_request
> uwsgi[15887]: return self.view_functions[rule.endpoint](**req.view_args)
> uwsgi[15887]: File "./ws.py", line 21, in hello
> uwsgi[15887]: trigger_dag('poc_dag2', run_id=str(uuid1()), 
> conf=json.dumps({'input_files': input_files}), execution_date=datetime.now())
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/airflow/api/common/experimental/trigger_dag.py",
>  line 56, in trigger_dag
> uwsgi[15887]: external_trigger=True
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/airflow/utils/db.py", 
> line 53, in wrapper
> uwsgi[15887]: result = func(*args, **kwargs)
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/airflow/models.py", 
> line 3377, in create_dagrun
> uwsgi[15887]: session.commit()
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
>  line 874, in commit
> uwsgi[15887]: self.transaction.commit()
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
>  line 461, in commit
> uwsgi[15887]: self._prepare_impl()
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
>  line 441, in _prepare_impl
> uwsgi[15887]: self.session.flush()
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
>  line 2139, in flush
> uwsgi[15887]: self._flush(objects)
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
>  line 2259, in _flush
> uwsgi[15887]: transaction.rollback(_capture_exception=True)
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/util/langhelpers.py",
>  line 60, in __exit__
> uwsgi[15887]: compat.reraise(exc_type, exc_value, exc_tb)
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
>  line 2223, in _flush
> uwsgi[15887]: flush_context.execute()
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/unitofwork.py",
>  line 389, in execute
> uwsgi[15887]: rec.execute(self)
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/unitofwork.py",
>  line 548, in execute
> uwsgi[15887]: uow
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/persistence.py",
>  line 181, in save_obj
> uwsgi[15887]: mapper, table, insert)
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/persistence.py",
>  line 835, in _emit_insert_statements
> uwsgi[15887]: execute(statement, params)
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/engine/base.py",
>  line 945, in execute
> uwsgi[15887]: return meth(self, multiparams, params)
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/sql/elements.py",
>  line 263, in _execute_on_connection
> uwsgi[15887]: return connection._execute_clauseelement(self, multiparams, 
> params)
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/engine/base.py",
>  line 1053, in _execute_clauseelement
> uwsgi[15887]: compiled_sql, distilled_params
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/engine/base.py",
>  line 1189, in _execute_context
> uwsgi[15887]: context)
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
>  line 2139, in flush
> uwsgi[15887]: self._flush(objects)
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
>  line 2259, in _flush
> uwsgi[15887]: transaction.rollback(_capture_exception=True)
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/util/langhelpers.py",
>  line 60, in __exit__
> uwsgi[15887]: compat.reraise(exc_type, exc_value, exc_tb)
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
>  line 2223, in _flush
> uwsgi[15887]: flush_context.execute()
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/unitofwork.py",
>  line 389, in execute
> uwsgi[15887]: rec.execute(self)
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/unitofwork.py",
>  line 548, in execute
> uwsgi[15887]: uow
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/persistence.py",
>  line 181, in save_obj
> uwsgi[15887]: mapper, table, insert)
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/persistence.py",
>  line 835, in _emit_insert_statements
> uwsgi[15887]: execute(statement, params)
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/engine/base.py",
>  line 945, in execute
> uwsgi[15887]: return meth(self, multiparams, params)
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/sql/elements.py",
>  line 263, in _execute_on_connection
> uwsgi[15887]: return connection._execute_clauseelement(self, multiparams, 
> params)
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/engine/base.py",
>  line 1053, in _execute_clauseelement
> uwsgi[15887]: compiled_sql, distilled_params
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/engine/base.py",
>  line 1189, in _execute_context
> uwsgi[15887]: context)
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/engine/base.py",
>  line 1393, in _handle_dbapi_exception
> uwsgi[15887]: exc_info
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/util/compat.py",
>  line 203, in raise_from_cause
> uwsgi[15887]: reraise(type(exception), exception, tb=exc_tb, cause=cause)
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/engine/base.py",
>  line 1182, in _execute_context
> uwsgi[15887]: context)
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/engine/default.py",
>  line 470, in do_execute
> uwsgi[15887]: cursor.execute(statement, parameters)
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/MySQLdb/cursors.py", 
> line 205, in execute
> uwsgi[15887]: self.errorhandler(self, exc, value)
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/MySQLdb/connections.py",
>  line 36, in defaulterrorhandler
> uwsgi[15887]: raise errorclass, errorvalue
> uwsgi[15887]: IntegrityError: (_mysql_exceptions.IntegrityError) (1062, 
> "Duplicate entry 'poc_dag2-2017-03-24 12:51:37.000000' for key 'dag_id'") 
> [SQL: u'INSERT INTO dag_run (dag_id, execution_date, start_date, end_date, 
> state, run_id, external_trigger, conf) VALUES (%s, %s, now(), %s, %s, %s, %s, 
> %s)'] [parameters: ('poc_dag2', datetime.datetime(2017, 3, 24, 12, 51, 37), 
> None, u'running', '4ac49276-10cb-11e7-8197-005056bc55dd', 1, 
> '\x80\x02}q\x01X\x0b\x00\x00\x00input_files]q\x02X>\x00\x00\x00/matus/dev/airflowtest/input2/data20:51:30.789572200.gzq\x03as.')]
> This is not consistent with AirflowException returned by trigger_dag() 
> function. Moreover, the session is not rolled back, hence also another 
> exception is occurring:
> uwsgi[15887]: [2017-03-24 12:55:54,105] ERROR in app: Exception on / [POST]
> uwsgi[15887]: Traceback (most recent call last):
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py", line 
> 1988, in wsgi_app
> uwsgi[15887]: response = self.full_dispatch_request()
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py", line 
> 1641, in full_dispatch_request
> uwsgi[15887]: rv = self.handle_user_exception(e)
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py", line 
> 1544, in handle_user_exception
> uwsgi[15887]: reraise(exc_type, exc_value, tb)
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py", line 
> 1639, in full_dispatch_request
> uwsgi[15887]: rv = self.dispatch_request()
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py", line 
> 1625, in dispatch_request
> uwsgi[15887]: return self.view_functions[rule.endpoint](**req.view_args)
> uwsgi[15887]: File "./ws.py", line 21, in hello
> uwsgi[15887]: trigger_dag('poc_dag2', run_id=str(uuid1()), 
> conf=json.dumps({'input_files': input_files}), execution_date=datetime.now())
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/airflow/api/common/experimental/trigger_dag.py",
>  line 29, in trigger_dag
> uwsgi[15887]: dag = dagbag.get_dag(dag_id)
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/airflow/models.py", 
> line 200, in get_dag
> uwsgi[15887]: orm_dag = DagModel.get_current(root_dag_id)
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/airflow/models.py", 
> line 2549, in get_current
> uwsgi[15887]: obj = session.query(cls).filter(cls.dag_id == dag_id).first()
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py",
>  line 2731, in first
> uwsgi[15887]: ret = list(self[0:1])
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py",
>  line 2523, in __getitem__
> uwsgi[15887]: return list(res)
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py",
>  line 2831, in __iter__
> uwsgi[15887]: [2017-03-24 12:55:54,105] {app.py:1587} ERROR - Exception on / 
> [POST]
> uwsgi[15887]: Traceback (most recent call last):
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py", line 
> 1988, in wsgi_app
> uwsgi[15887]: response = self.full_dispatch_request()
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py", line 
> 1641, in full_dispatch_request
> uwsgi[15887]: rv = self.handle_user_exception(e)
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py", line 
> 1544, in handle_user_exception
> uwsgi[15887]: reraise(exc_type, exc_value, tb)
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py", line 
> 1639, in full_dispatch_request
> uwsgi[15887]: rv = self.dispatch_request()
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py", line 
> 1625, in dispatch_request
> uwsgi[15887]: return self.view_functions[rule.endpoint](**req.view_args)
> uwsgi[15887]: File "./ws.py", line 21, in hello
> uwsgi[15887]: trigger_dag('poc_dag2', run_id=str(uuid1()), 
> conf=json.dumps({'input_files': input_files}), execution_date=datetime.now())
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/airflow/api/common/experimental/trigger_dag.py",
>  line 29, in trigger_dag
> uwsgi[15887]: dag = dagbag.get_dag(dag_id)
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/airflow/models.py", 
> line 200, in get_dag
> uwsgi[15887]: orm_dag = DagModel.get_current(root_dag_id)
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/airflow/models.py", 
> line 2549, in get_current
> uwsgi[15887]: obj = session.query(cls).filter(cls.dag_id == dag_id).first()
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py",
>  line 2731, in first
> uwsgi[15887]: ret = list(self[0:1])
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py",
>  line 2523, in __getitem__
> uwsgi[15887]: return list(res)
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py",
>  line 2831, in __iter__
> uwsgi[15887]: return self._execute_and_instances(context)
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py",
>  line 2852, in _execute_and_instances
> uwsgi[15887]: close_with_result=True)
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py",
>  line 2861, in _get_bind_args
> uwsgi[15887]: **kw
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py",
>  line 2843, in _connection_from_session
> uwsgi[15887]: conn = self.session.connection(**kw)
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
>  line 966, in connection
> uwsgi[15887]: execution_options=execution_options)
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
>  line 971, in _connection_for_bind
> uwsgi[15887]: engine, execution_options)
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
>  line 382, in _connection_for_bind
> uwsgi[15887]: self._assert_active()
> uwsgi[15887]: File 
> "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
>  line 276, in _assert_active
> uwsgi[15887]: % self._rollback_exception
> uwsgi[15887]: InvalidRequestError: This Session's transaction has been rolled 
> back due to a previous exception during flush. To begin a new transaction 
> with this Session, first issue Session.rollback(). Original exception was: 
> (_mysql_exceptions.IntegrityError) (1062, "Duplicate entry 
> 'poc_dag2-2017-03-24 12:55:51.000000' for key 'dag_id'") [SQL: u'INSERT INTO 
> dag_run (dag_id, execution_date, start_date, end_date, state, run_id, 
> external_trigger, conf) VALUES (%s, %s, now(), %s, %s, %s, %s, %s)'] 
> [parameters: ('poc_dag2', datetime.datetime(2017, 3, 24, 12, 55, 51), None, 
> u'running', 'e1c78296-10cb-11e7-9e34-005056bc55dd', 1, 
> '\x80\x02}q\x01X\x0b\x00\x00\x00input_files]q\x02X>\x00\x00\x00/home/matus/dev/airflowtest/input2/data20:55:49.589767900.gzq\x03as.')]
> As example, here is the simple example web service causing exceptions when 
> multiple parallel clients tries to process file:
> from uuid import uuid1
> import json
> from os.path import join
> from datetime import datetime
> from flask import Flask
> from flask import request
> app = Flask(__name__)
> @app.route("/", methods=['POST'])
> def hello():
>     input_files = list()
>     for f in request.files.values():
>         fname = join('/home/matus/dev/airflowtest/input', f.filename)
>         f.save(fname)
>         input_files.append(fname)
>     from airflow.api.common.experimental.trigger_dag import trigger_dag
>     trigger_dag('poc_dag2', run_id=str(uuid1()), 
> conf=json.dumps({'input_files': input_files}), execution_date=datetime.now())
>     return '{"status": "OK"}'
> if __name__ == "__main__":
>     app.run()



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to