[ https://issues.apache.org/jira/browse/AIRFLOW-1039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ash Berlin-Taylor closed AIRFLOW-1039. -------------------------------------- Resolution: Won't Fix Think this is won't fix - now that run times are millisecond (or are they microsecond) accuracy the likely hood of a clash is smaller anyway. Anyone feel free to re-open if you think I'm wrong and we should handle this > Airflow is raising IntegrityError when during parallel DAG trigger > ------------------------------------------------------------------ > > Key: AIRFLOW-1039 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1039 > Project: Apache Airflow > Issue Type: Bug > Components: DagRun > Affects Versions: 1.8.0 > Reporter: Matus Valo > Priority: Minor > > When Two concurrent processes are trying to trigger the same dag with the > same execution date at the same time, the IntegrityError is thrown by > SQLAlchemy: > uwsgi[15887]: [2017-03-24 12:51:38,074] {app.py:1587} ERROR - Exception on / > [POST] > uwsgi[15887]: Traceback (most recent call last): > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py", line > 1988, in wsgi_app > uwsgi[15887]: response = self.full_dispatch_request() > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py", line > 1641, in full_dispatch_request > uwsgi[15887]: rv = self.handle_user_exception(e) > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py", line > 1544, in handle_user_exception > uwsgi[15887]: reraise(exc_type, exc_value, tb) > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py", line > 1639, in full_dispatch_request > uwsgi[15887]: rv = self.dispatch_request() > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py", line > 1625, in dispatch_request > uwsgi[15887]: return self.view_functions[rule.endpoint](**req.view_args) > uwsgi[15887]: File "./ws.py", line 21, in hello > uwsgi[15887]: trigger_dag('poc_dag2', run_id=str(uuid1()), > conf=json.dumps({'input_files': input_files}), execution_date=datetime.now()) > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/airflow/api/common/experimental/trigger_dag.py", > line 56, in trigger_dag > uwsgi[15887]: external_trigger=True > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/airflow/utils/db.py", > line 53, in wrapper > uwsgi[15887]: result = func(*args, **kwargs) > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/airflow/models.py", > line 3377, in create_dagrun > uwsgi[15887]: session.commit() > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py", > line 874, in commit > uwsgi[15887]: self.transaction.commit() > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py", > line 461, in commit > uwsgi[15887]: self._prepare_impl() > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py", > line 441, in _prepare_impl > uwsgi[15887]: self.session.flush() > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py", > line 2139, in flush > uwsgi[15887]: self._flush(objects) > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py", > line 2259, in _flush > uwsgi[15887]: transaction.rollback(_capture_exception=True) > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/util/langhelpers.py", > line 60, in __exit__ > uwsgi[15887]: compat.reraise(exc_type, exc_value, exc_tb) > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py", > line 2223, in _flush > uwsgi[15887]: flush_context.execute() > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/unitofwork.py", > line 389, in execute > uwsgi[15887]: rec.execute(self) > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/unitofwork.py", > line 548, in execute > uwsgi[15887]: uow > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/persistence.py", > line 181, in save_obj > uwsgi[15887]: mapper, table, insert) > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/persistence.py", > line 835, in _emit_insert_statements > uwsgi[15887]: execute(statement, params) > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/engine/base.py", > line 945, in execute > uwsgi[15887]: return meth(self, multiparams, params) > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/sql/elements.py", > line 263, in _execute_on_connection > uwsgi[15887]: return connection._execute_clauseelement(self, multiparams, > params) > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/engine/base.py", > line 1053, in _execute_clauseelement > uwsgi[15887]: compiled_sql, distilled_params > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/engine/base.py", > line 1189, in _execute_context > uwsgi[15887]: context) > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py", > line 2139, in flush > uwsgi[15887]: self._flush(objects) > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py", > line 2259, in _flush > uwsgi[15887]: transaction.rollback(_capture_exception=True) > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/util/langhelpers.py", > line 60, in __exit__ > uwsgi[15887]: compat.reraise(exc_type, exc_value, exc_tb) > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py", > line 2223, in _flush > uwsgi[15887]: flush_context.execute() > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/unitofwork.py", > line 389, in execute > uwsgi[15887]: rec.execute(self) > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/unitofwork.py", > line 548, in execute > uwsgi[15887]: uow > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/persistence.py", > line 181, in save_obj > uwsgi[15887]: mapper, table, insert) > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/persistence.py", > line 835, in _emit_insert_statements > uwsgi[15887]: execute(statement, params) > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/engine/base.py", > line 945, in execute > uwsgi[15887]: return meth(self, multiparams, params) > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/sql/elements.py", > line 263, in _execute_on_connection > uwsgi[15887]: return connection._execute_clauseelement(self, multiparams, > params) > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/engine/base.py", > line 1053, in _execute_clauseelement > uwsgi[15887]: compiled_sql, distilled_params > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/engine/base.py", > line 1189, in _execute_context > uwsgi[15887]: context) > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/engine/base.py", > line 1393, in _handle_dbapi_exception > uwsgi[15887]: exc_info > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/util/compat.py", > line 203, in raise_from_cause > uwsgi[15887]: reraise(type(exception), exception, tb=exc_tb, cause=cause) > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/engine/base.py", > line 1182, in _execute_context > uwsgi[15887]: context) > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/engine/default.py", > line 470, in do_execute > uwsgi[15887]: cursor.execute(statement, parameters) > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/MySQLdb/cursors.py", > line 205, in execute > uwsgi[15887]: self.errorhandler(self, exc, value) > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/MySQLdb/connections.py", > line 36, in defaulterrorhandler > uwsgi[15887]: raise errorclass, errorvalue > uwsgi[15887]: IntegrityError: (_mysql_exceptions.IntegrityError) (1062, > "Duplicate entry 'poc_dag2-2017-03-24 12:51:37.000000' for key 'dag_id'") > [SQL: u'INSERT INTO dag_run (dag_id, execution_date, start_date, end_date, > state, run_id, external_trigger, conf) VALUES (%s, %s, now(), %s, %s, %s, %s, > %s)'] [parameters: ('poc_dag2', datetime.datetime(2017, 3, 24, 12, 51, 37), > None, u'running', '4ac49276-10cb-11e7-8197-005056bc55dd', 1, > '\x80\x02}q\x01X\x0b\x00\x00\x00input_files]q\x02X>\x00\x00\x00/matus/dev/airflowtest/input2/data20:51:30.789572200.gzq\x03as.')] > This is not consistent with AirflowException returned by trigger_dag() > function. Moreover, the session is not rolled back, hence also another > exception is occurring: > uwsgi[15887]: [2017-03-24 12:55:54,105] ERROR in app: Exception on / [POST] > uwsgi[15887]: Traceback (most recent call last): > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py", line > 1988, in wsgi_app > uwsgi[15887]: response = self.full_dispatch_request() > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py", line > 1641, in full_dispatch_request > uwsgi[15887]: rv = self.handle_user_exception(e) > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py", line > 1544, in handle_user_exception > uwsgi[15887]: reraise(exc_type, exc_value, tb) > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py", line > 1639, in full_dispatch_request > uwsgi[15887]: rv = self.dispatch_request() > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py", line > 1625, in dispatch_request > uwsgi[15887]: return self.view_functions[rule.endpoint](**req.view_args) > uwsgi[15887]: File "./ws.py", line 21, in hello > uwsgi[15887]: trigger_dag('poc_dag2', run_id=str(uuid1()), > conf=json.dumps({'input_files': input_files}), execution_date=datetime.now()) > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/airflow/api/common/experimental/trigger_dag.py", > line 29, in trigger_dag > uwsgi[15887]: dag = dagbag.get_dag(dag_id) > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/airflow/models.py", > line 200, in get_dag > uwsgi[15887]: orm_dag = DagModel.get_current(root_dag_id) > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/airflow/models.py", > line 2549, in get_current > uwsgi[15887]: obj = session.query(cls).filter(cls.dag_id == dag_id).first() > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py", > line 2731, in first > uwsgi[15887]: ret = list(self[0:1]) > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py", > line 2523, in __getitem__ > uwsgi[15887]: return list(res) > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py", > line 2831, in __iter__ > uwsgi[15887]: [2017-03-24 12:55:54,105] {app.py:1587} ERROR - Exception on / > [POST] > uwsgi[15887]: Traceback (most recent call last): > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py", line > 1988, in wsgi_app > uwsgi[15887]: response = self.full_dispatch_request() > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py", line > 1641, in full_dispatch_request > uwsgi[15887]: rv = self.handle_user_exception(e) > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py", line > 1544, in handle_user_exception > uwsgi[15887]: reraise(exc_type, exc_value, tb) > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py", line > 1639, in full_dispatch_request > uwsgi[15887]: rv = self.dispatch_request() > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py", line > 1625, in dispatch_request > uwsgi[15887]: return self.view_functions[rule.endpoint](**req.view_args) > uwsgi[15887]: File "./ws.py", line 21, in hello > uwsgi[15887]: trigger_dag('poc_dag2', run_id=str(uuid1()), > conf=json.dumps({'input_files': input_files}), execution_date=datetime.now()) > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/airflow/api/common/experimental/trigger_dag.py", > line 29, in trigger_dag > uwsgi[15887]: dag = dagbag.get_dag(dag_id) > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/airflow/models.py", > line 200, in get_dag > uwsgi[15887]: orm_dag = DagModel.get_current(root_dag_id) > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/airflow/models.py", > line 2549, in get_current > uwsgi[15887]: obj = session.query(cls).filter(cls.dag_id == dag_id).first() > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py", > line 2731, in first > uwsgi[15887]: ret = list(self[0:1]) > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py", > line 2523, in __getitem__ > uwsgi[15887]: return list(res) > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py", > line 2831, in __iter__ > uwsgi[15887]: return self._execute_and_instances(context) > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py", > line 2852, in _execute_and_instances > uwsgi[15887]: close_with_result=True) > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py", > line 2861, in _get_bind_args > uwsgi[15887]: **kw > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py", > line 2843, in _connection_from_session > uwsgi[15887]: conn = self.session.connection(**kw) > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py", > line 966, in connection > uwsgi[15887]: execution_options=execution_options) > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py", > line 971, in _connection_for_bind > uwsgi[15887]: engine, execution_options) > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py", > line 382, in _connection_for_bind > uwsgi[15887]: self._assert_active() > uwsgi[15887]: File > "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py", > line 276, in _assert_active > uwsgi[15887]: % self._rollback_exception > uwsgi[15887]: InvalidRequestError: This Session's transaction has been rolled > back due to a previous exception during flush. To begin a new transaction > with this Session, first issue Session.rollback(). Original exception was: > (_mysql_exceptions.IntegrityError) (1062, "Duplicate entry > 'poc_dag2-2017-03-24 12:55:51.000000' for key 'dag_id'") [SQL: u'INSERT INTO > dag_run (dag_id, execution_date, start_date, end_date, state, run_id, > external_trigger, conf) VALUES (%s, %s, now(), %s, %s, %s, %s, %s)'] > [parameters: ('poc_dag2', datetime.datetime(2017, 3, 24, 12, 55, 51), None, > u'running', 'e1c78296-10cb-11e7-9e34-005056bc55dd', 1, > '\x80\x02}q\x01X\x0b\x00\x00\x00input_files]q\x02X>\x00\x00\x00/home/matus/dev/airflowtest/input2/data20:55:49.589767900.gzq\x03as.')] > As example, here is the simple example web service causing exceptions when > multiple parallel clients tries to process file: > from uuid import uuid1 > import json > from os.path import join > from datetime import datetime > from flask import Flask > from flask import request > app = Flask(__name__) > @app.route("/", methods=['POST']) > def hello(): > input_files = list() > for f in request.files.values(): > fname = join('/home/matus/dev/airflowtest/input', f.filename) > f.save(fname) > input_files.append(fname) > from airflow.api.common.experimental.trigger_dag import trigger_dag > trigger_dag('poc_dag2', run_id=str(uuid1()), > conf=json.dumps({'input_files': input_files}), execution_date=datetime.now()) > return '{"status": "OK"}' > if __name__ == "__main__": > app.run() -- This message was sent by Atlassian JIRA (v7.6.3#76005)