scastria opened a new issue, #29582:
URL: https://github.com/apache/airflow/issues/29582

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   Maybe similar to #15174
   
   Trying to trigger the same dag_id  within the same second.  I get:
   
   
   Traceback (most recent call last):
   --
   File 
"/usr/local/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py",
 line 1802, in _execute_context
   self.dialect.do_execute(
   File 
"/usr/local/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/default.py",
 line 719, in do_execute
   cursor.execute(statement, parameters)
   psycopg2.errors.UniqueViolation: duplicate key value violates unique 
constraint "dag_run_dag_id_execution_date_key"
   DETAIL:  Key (dag_id, execution_date)=(shawn-shotgun3-shotgun, 2023-02-16 
22:47:21+00) already exists.
   The above exception was the direct cause of the following exception:
   Traceback (most recent call last):
   File "/usr/local/airflow/.local/bin/airflow", line 8, in <module>
   sys.exit(main())
   File 
"/usr/local/airflow/.local/lib/python3.10/site-packages/airflow/__main__.py", 
line 39, in main
   args.func(args)
   File 
"/usr/local/airflow/.local/lib/python3.10/site-packages/airflow/cli/cli_parser.py",
 line 52, in command
   return func(*args, **kwargs)
   File 
"/usr/local/airflow/.local/lib/python3.10/site-packages/airflow/utils/cli.py", 
line 103, in wrapper
   return f(*args, **kwargs)
   File 
"/usr/local/airflow/.local/lib/python3.10/site-packages/airflow/cli/commands/dag_command.py",
 line 145, in dag_trigger
   message = api_client.trigger_dag(
   File 
"/usr/local/airflow/.local/lib/python3.10/site-packages/airflow/api/client/local_client.py",
 line 32, in trigger_dag
   dag_run = trigger_dag.trigger_dag(
   File 
"/usr/local/airflow/.local/lib/python3.10/site-packages/airflow/api/common/trigger_dag.py",
 line 124, in trigger_dag
   triggers = _trigger_dag(
   File 
"/usr/local/airflow/.local/lib/python3.10/site-packages/airflow/api/common/trigger_dag.py",
 line 89, in _trigger_dag
   dag_run = _dag.create_dagrun(
   File 
"/usr/local/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py",
 line 75, in wrapper
   return func(*args, session=session, **kwargs)
   File 
"/usr/local/airflow/.local/lib/python3.10/site-packages/airflow/models/dag.py", 
line 2540, in create_dagrun
   session.flush()
   File 
"/usr/local/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py",
 line 3345, in flush
   self._flush(objects)
   File 
"/usr/local/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py",
 line 3484, in _flush
   with util.safe_reraise():
   File 
"/usr/local/airflow/.local/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py",
 line 70, in __exit__
   compat.raise_(
   File 
"/usr/local/airflow/.local/lib/python3.10/site-packages/sqlalchemy/util/compat.py",
 line 207, in raise_
   raise exception
   File 
"/usr/local/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py",
 line 3445, in _flush
   flush_context.execute()
   File 
"/usr/local/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py",
 line 456, in execute
   rec.execute(self)
   File 
"/usr/local/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py",
 line 630, in execute
   util.preloaded.orm_persistence.save_obj(
   File 
"/usr/local/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py",
 line 244, in save_obj
   _emit_insert_statements(
   File 
"/usr/local/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py",
 line 1221, in _emit_insert_statements
   result = connection._execute_20(
   File 
"/usr/local/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py",
 line 1614, in _execute_20
   return meth(self, args_10style, kwargs_10style, execution_options)
   File 
"/usr/local/airflow/.local/lib/python3.10/site-packages/sqlalchemy/sql/elements.py",
 line 325, in _execute_on_connection
   return connection._execute_clauseelement(
   File 
"/usr/local/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py",
 line 1481, in _execute_clauseelement
   ret = self._execute_context(
   File 
"/usr/local/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py",
 line 1845, in _execute_context
   self._handle_dbapi_exception(
   File 
"/usr/local/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py",
 line 2026, in _handle_dbapi_exception
   util.raise_(
   File 
"/usr/local/airflow/.local/lib/python3.10/site-packages/sqlalchemy/util/compat.py",
 line 207, in raise_
   raise exception
   File 
"/usr/local/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py",
 line 1802, in _execute_context
   self.dialect.do_execute(
   File 
"/usr/local/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/default.py",
 line 719, in do_execute
   cursor.execute(statement, parameters)
   sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate 
key value violates unique constraint "dag_run_dag_id_execution_date_key"
   DETAIL:  Key (dag_id, execution_date)=(shawn-shotgun3-shotgun, 2023-02-16 
22:47:21+00) already exists.
   [SQL: INSERT INTO dag_run (dag_id, queued_at, execution_date, start_date, 
end_date, state, run_id, creating_job_id, external_trigger, run_type, conf, 
data_interval_start, data_interval_end, last_scheduling_decision, dag_hash, 
log_template_id) VALUES (%(dag_id)s, %(queued_at)s, %(execution_date)s, 
%(start_date)s, %(end_date)s, %(state)s, %(run_id)s, %(creating_job_id)s, 
%(external_trigger)s, %(run_type)s, %(conf)s, %(data_interval_start)s, 
%(data_interval_end)s, %(last_scheduling_decision)s, %(dag_hash)s, (SELECT 
max(log_template.id) AS max_1
   FROM log_template)) RETURNING dag_run.id]
   [parameters: {'dag_id': 'shawn-shotgun3-shotgun', 'queued_at': 
datetime.datetime(2023, 2, 16, 22, 47, 22, 43099, tzinfo=Timezone('UTC')), 
'execution_date': DateTime(2023, 2, 16, 22, 47, 21, tzinfo=Timezone('UTC')), 
'start_date': None, 'end_date': None, 'state': <DagRunState.QUEUED: 'queued'>, 
'run_id': 'shotgun3_20230216224703986721', 'creating_job_id': None, 
'external_trigger': True, 'run_type': <DagRunType.MANUAL: 'manual'>, 'conf': 
<psycopg2.extensions.Binary object at 0x7fab34ddb840>, 'data_interval_start': 
DateTime(2023, 2, 16, 22, 47, 21, tzinfo=Timezone('UTC')), 'data_interval_end': 
DateTime(2023, 2, 16, 22, 47, 21, tzinfo=Timezone('UTC')), 
'last_scheduling_decision': None, 'dag_hash': 
'7ab0e818c048cc52d84ba94f0df44258'}]
   (Background on this error at: https://sqlalche.me/e/14/gkpj)
   
   
   
   
   ### What you think should happen instead
   
   It should be allowed to run the same dag many times without any issue of how 
fast it is being run.  The error message above seems to suggest that there is a 
unique key constraint with columns:
   
   dag_id, execution_date
   
   which prevent this since the execution_date only has resolution to the 
second.  I would have thought dag_id and run_id should be the unique key 
constraint and leave it up to the caller to guarantee unique run_ids.
   
   ### How to reproduce
   
   _No response_
   
   ### Operating System
   
   AWS MWAA 2.4.3
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   MWAA
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to