manipatnam opened a new issue, #68263:
URL: https://github.com/apache/airflow/issues/68263
### Under which category would you file this issue?
Airflow Core
### Apache Airflow version
3.1.8
### What happened and how to reproduce it?
The DAG processor crashed with a `sqlalchemy.exc.PendingRollbackError.` The
"Original exception was:" tail of the formatted error shows the actual cause: a
`psycopg2.errors.UniqueViolation` on `dag_tag_pkey` during `bulk_write_to_db`.
```
Traceback (most recent call last):
File
"/usr/local/lib/python3.11/site-packages/airflow/jobs/dag_processor_job_runner.py",
line 61, in _execute
self.processor.run()
File
"/usr/local/lib/python3.11/site-packages/airflow/dag_processing/manager.py",
line 297, in run
return self._run_parsing_loop()
File
"/usr/local/lib/python3.11/site-packages/airflow/dag_processing/manager.py",
line 401, in _run_parsing_loop
self._collect_results()
File "/usr/local/lib/python3.11/site-packages/airflow/utils/session.py",
line 100, in wrapper
return func(*args, session=session, **kwargs)
File
"/usr/local/lib/python3.11/site-packages/airflow/dag_processing/manager.py",
line 882, in _collect_results
self._file_stats[file] = process_parse_results(
File
"/usr/local/lib/python3.11/site-packages/airflow/dag_processing/manager.py",
line 1276, in process_parse_results
update_dag_parsing_results_in_db(
File
"/usr/local/lib/python3.11/site-packages/airflow/dag_processing/collection.py",
line 400, in update_dag_parsing_results_in_db
for attempt in run_with_db_retries(logger=log):
File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line
438, in __iter__
do = self.iter(retry_state=retry_state)
File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line
371, in iter
result = action(retry_state)
File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line
393, in <lambda>
self._add_action_func(lambda rs: rs.outcome.result())
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in
result
return self.__get_result()
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in
__get_result
raise self._exception
File
"/usr/local/lib/python3.11/site-packages/airflow/dag_processing/collection.py",
line 410, in update_dag_parsing_results_in_db
SerializedDAG.bulk_write_to_db(
File "/usr/local/lib/python3.11/site-packages/airflow/utils/session.py",
line 98, in wrapper
return func(*args, **kwargs)
File
"/usr/local/lib/python3.11/site-packages/airflow/serialization/serialized_objects.py",
line 2956, in bulk_write_to_db
orm_dags = dag_op.add_dags(session=session)
File
"/usr/local/lib/python3.11/site-packages/airflow/dag_processing/collection.py",
line 470, in add_dags
orm_dags = self.find_orm_dags(session=session)
File
"/usr/local/lib/python3.11/site-packages/airflow/dag_processing/collection.py",
line 467, in find_orm_dags
return {dm.dag_id: dm for dm in session.scalars(stmt).unique()}
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/session.py",
line 1778, in scalars
return self.execute(
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/session.py",
line 1716, in execute
conn = self._connection_for_bind(bind)
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/session.py",
line 1555, in _connection_for_bind
return self._transaction._connection_for_bind(
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/session.py",
line 724, in _connection_for_bind
self._assert_active()
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/session.py",
line 604, in _assert_active
raise sa_exc.PendingRollbackError(
sqlalchemy.exc.PendingRollbackError: This Session's transaction has been
rolled back due to a previous exception during flush. To begin a new
transaction with this Session, first issue Session.rollback(). Original
exception was: (psycopg2.errors.UniqueViolation) duplicate key value violates
unique constraint "dag_tag_pkey"
DETAIL: Key (name, dag_id)=(is_gdpr=False, featuregen_dag_generator)
already exists.
[SQL: INSERT INTO dag_tag (name, dag_id) VALUES (%(name)s, %(dag_id)s)]
[parameters: {'name': 'is_gdpr=False', 'dag_id': 'featuregen_dag_generator'}]
(Background on this error at: https://sqlalche.me/e/14/gkpj) (Background on
this error at: https://sqlalche.me/e/14/7s2a)
```
### What you think should happen instead?
_No response_
### Operating System
debian
### Deployment
Astronomer
### Apache Airflow Provider(s)
_No response_
### Versions of Apache Airflow Providers
_No response_
### Official Helm Chart version
Not Applicable
### Kubernetes Version
_No response_
### Helm Chart configuration
_No response_
### Docker Image customizations
_No response_
### Anything else?
Deployment is running in HA mode with two DAG processors.
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]