manipatnam opened a new issue, #68263:
URL: https://github.com/apache/airflow/issues/68263

   ### Under which category would you file this issue?
   
   Airflow Core
   
   ### Apache Airflow version
   
   3.1.8
   
   ### What happened and how to reproduce it?
   
   The DAG processor crashed with a `sqlalchemy.exc.PendingRollbackError.` The 
"Original exception was:" tail of the formatted error shows the actual cause: a 
`psycopg2.errors.UniqueViolation` on `dag_tag_pkey` during `bulk_write_to_db`.
   
   ```
   Traceback (most recent call last):
     File 
"/usr/local/lib/python3.11/site-packages/airflow/jobs/dag_processor_job_runner.py",
 line 61, in _execute
       self.processor.run()
     File 
"/usr/local/lib/python3.11/site-packages/airflow/dag_processing/manager.py", 
line 297, in run
       return self._run_parsing_loop()
     File 
"/usr/local/lib/python3.11/site-packages/airflow/dag_processing/manager.py", 
line 401, in _run_parsing_loop
       self._collect_results()
     File "/usr/local/lib/python3.11/site-packages/airflow/utils/session.py", 
line 100, in wrapper
       return func(*args, session=session, **kwargs)
     File 
"/usr/local/lib/python3.11/site-packages/airflow/dag_processing/manager.py", 
line 882, in _collect_results
       self._file_stats[file] = process_parse_results(
     File 
"/usr/local/lib/python3.11/site-packages/airflow/dag_processing/manager.py", 
line 1276, in process_parse_results
       update_dag_parsing_results_in_db(
     File 
"/usr/local/lib/python3.11/site-packages/airflow/dag_processing/collection.py", 
line 400, in update_dag_parsing_results_in_db
       for attempt in run_with_db_retries(logger=log):
     File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 
438, in __iter__
       do = self.iter(retry_state=retry_state)
     File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 
371, in iter
       result = action(retry_state)
     File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 
393, in <lambda>
       self._add_action_func(lambda rs: rs.outcome.result())
     File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in 
result
       return self.__get_result()
     File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in 
__get_result
       raise self._exception
     File 
"/usr/local/lib/python3.11/site-packages/airflow/dag_processing/collection.py", 
line 410, in update_dag_parsing_results_in_db
       SerializedDAG.bulk_write_to_db(
     File "/usr/local/lib/python3.11/site-packages/airflow/utils/session.py", 
line 98, in wrapper
       return func(*args, **kwargs)
     File 
"/usr/local/lib/python3.11/site-packages/airflow/serialization/serialized_objects.py",
 line 2956, in bulk_write_to_db
       orm_dags = dag_op.add_dags(session=session)
     File 
"/usr/local/lib/python3.11/site-packages/airflow/dag_processing/collection.py", 
line 470, in add_dags
       orm_dags = self.find_orm_dags(session=session)
     File 
"/usr/local/lib/python3.11/site-packages/airflow/dag_processing/collection.py", 
line 467, in find_orm_dags
       return {dm.dag_id: dm for dm in session.scalars(stmt).unique()}
     File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", 
line 1778, in scalars
       return self.execute(
     File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", 
line 1716, in execute
       conn = self._connection_for_bind(bind)
     File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", 
line 1555, in _connection_for_bind
       return self._transaction._connection_for_bind(
     File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", 
line 724, in _connection_for_bind
       self._assert_active()
     File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", 
line 604, in _assert_active
       raise sa_exc.PendingRollbackError(
   sqlalchemy.exc.PendingRollbackError: This Session's transaction has been 
rolled back due to a previous exception during flush. To begin a new 
transaction with this Session, first issue Session.rollback(). Original 
exception was: (psycopg2.errors.UniqueViolation) duplicate key value violates 
unique constraint "dag_tag_pkey"
   DETAIL:  Key (name, dag_id)=(is_gdpr=False, featuregen_dag_generator) 
already exists.
   
   [SQL: INSERT INTO dag_tag (name, dag_id) VALUES (%(name)s, %(dag_id)s)]
   [parameters: {'name': 'is_gdpr=False', 'dag_id': 'featuregen_dag_generator'}]
   (Background on this error at: https://sqlalche.me/e/14/gkpj) (Background on 
this error at: https://sqlalche.me/e/14/7s2a)
   ```
   
   
   
   ### What you think should happen instead?
   
   _No response_
   
   ### Operating System
   
   debian
   
   ### Deployment
   
   Astronomer
   
   ### Apache Airflow Provider(s)
   
   _No response_
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Official Helm Chart version
   
   Not Applicable
   
   ### Kubernetes Version
   
   _No response_
   
   ### Helm Chart configuration
   
   _No response_
   
   ### Docker Image customizations
   
   _No response_
   
   ### Anything else?
   
   Deployment is running in HA mode with two DAG processors.
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to