fuechter opened a new issue, #30636:
URL: https://github.com/apache/airflow/issues/30636

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   Version Airflow: 2.5.1
   
   I have a file in python that generate dynamic DAG, and all time when have a 
new code in that file, is necessary to execute `./airflow.sh dags reserialize`, 
but sometimes this command return with an error like below, but :
   
   ```
   [2023-04-13 19:46:54,669] {dag.py:2690} INFO - Sync 35 DAGs
   Traceback (most recent call last):
     File "/home/airflow/.local/bin/airflow", line 8, in <module>
       sys.exit(main())
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/__main__.py", line 
39, in main
       args.func(args)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/cli_parser.py", 
line 52, in command
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", 
line 75, in wrapper
       return func(*args, session=session, **kwargs)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/cli.py", line 
108, in wrapper
       return f(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/commands/dag_command.py",
 line 492, in dag_reserialize
       dagbag.sync_to_db(session=session)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", 
line 72, in wrapper
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dagbag.py", 
line 645, in sync_to_db
       for attempt in run_with_db_retries(logger=self.log):
     File 
"/home/airflow/.local/lib/python3.7/site-packages/tenacity/__init__.py", line 
384, in __iter__
       do = self.iter(retry_state=retry_state)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/tenacity/__init__.py", line 
351, in iter
       return fut.result()
     File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 428, in 
result
       return self.__get_result()
     File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 384, in 
__get_result
       raise self._exception
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dagbag.py", 
line 660, in sync_to_db
       self.dags.values(), processor_subdir=processor_subdir, session=session
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", 
line 72, in wrapper
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dag.py", line 
2701, in bulk_write_to_db
       orm_dags: list[DagModel] = with_row_locks(query, of=DagModel, 
session=session).all()
     File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/query.py", 
line 2773, in all
       return self._iter().all()
     File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/query.py", 
line 2919, in _iter
       execution_options={"_sa_orm_load_options": self.load_options},
     File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", 
line 1713, in execute
       conn = self._connection_for_bind(bind)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", 
line 1553, in _connection_for_bind              2-tetris-linkedin-hourly-days', 
'fileloc': '/opt/airflow/dags/register_dynamic_dag.py', 'fileloc_hash': 
51105027638417678, 'data': '{"__version": 1, "dag": {"_dag_id": 
"96ddcc3b-900a-44a7-bda9-81b9eeserialized DAG: 
/opt/airflow/dags/register_alerts_resport_dag.py
       engine, execution_options       
python3.7/site-packages/airflow/models/dagbag.py", line 631, in 
_serialize_dag_capturing_errors
     File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", 
line 721, in _connection_for_bind
       self._assert_active()
     File 
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", 
line 608, in _assert_active
       code="7s2a",
   sqlalchemy.exc.PendingRollbackError: This Session's transaction has been 
rolled back due to a previous exception during flush. To begin a new 
transaction with this Session, first issue Session.rollback(). Original 
exception was: (psycopg2.errors.UniqueViolation) duplicate key value violates 
unique constraint "serialized_dag_pkey"
   DETAIL:  Key 
(dag_id)=(96ddcc3b-900a-44a7-bda9-81b9eefde4d2-dynamic-dag-hourly-days) already 
exists.
   ```
   
   ### What you think should happen instead
   
   I think that airflow should only reserialize without problem with duplicate 
key.
   
   ### How to reproduce
   
   The python's file that register dynamic dag (I shortened code to focus on 
the problem):
   
   Here, the data is fixed as dict on python, but in real scene is a json file, 
that update sometime for other DAG.
   
   ```
   import pytz
   from datetime import datetime, timedelta
   from airflow.decorators import dag, task
   
   data = {
       'companies': [{
           'id': '1233',
           'slug': 'company-1',
           'timezone': {
               'id': 'America/Sao_Paulo'
           },
           'connections': [{
               'id': 'facebook',
           }, {
               'id': 'instagram',
           }]
       }, {
           'id': '1234',
           'slug': 'company-2',
           'timezone': {
               'id': 'America/Sao_Paulo'
           },
           'connections': [{
               'id': 'facebook',   
           }, {
               'id': 'instagram',
           }, {
               'id': 'twitter',
           }]
       }]
   }
   
   def create_dynamic_dag(dag_id, company):
       schedule_interval = "* 5 * * * *"
       local_tz = pytz.timezone(company['timezone']['id'])
       start_date = datetime.now(local_tz) - timedelta(days=7, hours=0)
   
       @dag(
           dag_id=dag_id,
           schedule=schedule_interval,
           start_date=start_date,
           catchup=False,
           max_active_runs=1,
       )
       def dynamic_dag():
           @task.branch
           def ping():
               # ...code to ping
               # return 'done' or 'notify_error'
               pass
   
           @task
           def notify_error():
               # ...code to notify error
               pass
   
           @task
           def done():
               # ... code to done
               pass
           
           ping_instance = ping()
   
           notify_error_instance = notify_error()
           done_instance = done()
   
           ping_instance >> [done_instance, notify_error_instance]
       
       dagD = dynamic_dag()
   
       globals()[dag_id] = dagD
   
       return dagD
   
   for company in data['companies']:
       for connection in company['connections']:
           dag_id = f"{company['id']}-{company['slug']}-{connection['id']}"
           dagD = create_dynamic_dag(dag_id, company)
   ```
   
   ### Operating System
   
   Mac, but I'm using docker from airflow
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   I only follow the documentation of [Airflow in 
docker](https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html).
   
   ### Anything else
   
   Sometime this errors doesn't happen, but it's happens with more frequency.
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to