dhatch-niv opened a new issue, #27509:
URL: https://github.com/apache/airflow/issues/27509

   ### Apache Airflow version
   
   2.4.2
   
   ### What happened
   
   I have a DAG that is triggered by three datasets. When I remove one or more 
of these datasets, the web server fails to update the DAG, and `airflow dags 
reserialize` fails with an `AssertionError` within SQLAlchemy. Full stack trace 
below:
   
   ```
   docker-airflow-scheduler-1  |   File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", 
line 75, in wrapper
   docker-airflow-scheduler-1  |     return func(*args, session=session, 
**kwargs)
   docker-airflow-scheduler-1  |   File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/dag_processing/processor.py",
 line 781, in process_file
   docker-airflow-scheduler-1  |     
dagbag.sync_to_db(processor_subdir=self._dag_directory, session=session)
   docker-airflow-scheduler-1  |   File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", 
line 72, in wrapper
   docker-airflow-scheduler-1  |     return func(*args, **kwargs)
   docker-airflow-scheduler-1  |   File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dagbag.py", 
line 644, in sync_to_db
   docker-airflow-scheduler-1  |     for attempt in 
run_with_db_retries(logger=self.log):
   docker-airflow-scheduler-1  |   File 
"/home/airflow/.local/lib/python3.9/site-packages/tenacity/__init__.py", line 
382, in __iter__
   docker-airflow-scheduler-1  |     do = self.iter(retry_state=retry_state)
   docker-airflow-scheduler-1  |   File 
"/home/airflow/.local/lib/python3.9/site-packages/tenacity/__init__.py", line 
349, in iter
   docker-airflow-scheduler-1  |     return fut.result()
   docker-airflow-scheduler-1  |   File 
"/usr/local/lib/python3.9/concurrent/futures/_base.py", line 439, in result
   docker-airflow-scheduler-1  |     return self.__get_result()
   docker-airflow-scheduler-1  |   File 
"/usr/local/lib/python3.9/concurrent/futures/_base.py", line 391, in 
__get_result
   docker-airflow-scheduler-1  |     raise self._exception
   docker-airflow-scheduler-1  |   File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dagbag.py", 
line 658, in sync_to_db
   docker-airflow-scheduler-1  |     DAG.bulk_write_to_db(
   docker-airflow-scheduler-1  |   File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", 
line 72, in wrapper
   docker-airflow-scheduler-1  |     return func(*args, **kwargs)
   docker-airflow-scheduler-1  |   File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dag.py", line 
2781, in bulk_write_to_db
   docker-airflow-scheduler-1  |     session.flush()
   docker-airflow-scheduler-1  |   File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", 
line 3345, in flush
   docker-airflow-scheduler-1  |     self._flush(objects)
   docker-airflow-scheduler-1  |   File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", 
line 3485, in _flush
   docker-airflow-scheduler-1  |     
transaction.rollback(_capture_exception=True)
   docker-airflow-scheduler-1  |   File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/util/langhelpers.py",
 line 70, in __exit__
   docker-airflow-scheduler-1  |     compat.raise_(
   docker-airflow-scheduler-1  |   File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/util/compat.py", 
line 207, in raise_
   docker-airflow-scheduler-1  |     raise exception
   docker-airflow-scheduler-1  |   File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", 
line 3445, in _flush
   docker-airflow-scheduler-1  |     flush_context.execute()
   docker-airflow-scheduler-1  |   File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/unitofwork.py",
 line 456, in execute
   docker-airflow-scheduler-1  |     rec.execute(self)
   docker-airflow-scheduler-1  |   File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/unitofwork.py",
 line 577, in execute
   docker-airflow-scheduler-1  |     
self.dependency_processor.process_deletes(uow, states)
   docker-airflow-scheduler-1  |   File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/dependency.py",
 line 552, in process_deletes
   docker-airflow-scheduler-1  |     self._synchronize(
   docker-airflow-scheduler-1  |   File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/dependency.py",
 line 610, in _synchronize
   docker-airflow-scheduler-1  |     sync.clear(dest, self.mapper, 
self.prop.synchronize_pairs)
   docker-airflow-scheduler-1  |   File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/sync.py", line 
86, in clear
   docker-airflow-scheduler-1  |     raise AssertionError(
   docker-airflow-scheduler-1  | AssertionError: Dependency rule tried to 
blank-out primary key column 'dataset_dag_run_queue.dataset_id' on instance 
'<DatasetDagRunQueue at 0xffff5d213d00>'
   ```
   
   ### What you think should happen instead
   
   The DAG does not properly load in the UI, and no error is displayed. 
Instead, the old datasets that have been removed should be removed as 
dependencies and the DAG should be updated with the new dataset dependencies.
   
   ### How to reproduce
   
   Initial DAG:
   
   ```python
   def foo():
       pass
   
   @dag(
       dag_id="test",
       start_date=pendulum.datetime(2022, 1, 1),
       catchup=False,
       schedule=[
           Dataset('test/1'),
           Dataset('test/2'),
           Dataset('test/3'),
       ]
   )
   def test_dag():
       @task
       def test_task():
           foo()
   
       test_task()
   
   test_dag()
   ```
   
   At least one of the datasets should be 'ready'. Now `dataset_dag_run_queue` 
will look something like below:
   
   ```
   airflow=# SELECT * FROM dataset_dag_run_queue ;
    dataset_id |            target_dag_id            |          created_at
   
------------+-------------------------------------+-------------------------------
            16 | test | 2022-11-02 19:47:53.938748+00
   (1 row)
   ```
   
   Then, update the DAG with new datasets:
   
   ```python
   def foo():
       pass
   
   @dag(
       dag_id="test",
       start_date=pendulum.datetime(2022, 1, 1),
       catchup=False,
       schedule=[
           Dataset('test/new/1'),      # <--- updated 
           Dataset('test/new/2'),
           Dataset('test/new/3'),
       ]
   )
   def test_dag():
       @task
       def test_task():
           foo()
   
       test_task()
   
   test_dag()
   ```
   
   Now you will observe the error in the web server logs or when running 
`airflow dags reserialize`.
   
   I suspect this issue is related to handling of cascading deletes on the 
`dataset_id` foreign key for the run queue table. Dataset `id = 16` is one of 
the datasets that has been renamed.
   
   ### Operating System
   
   docker image - apache/airflow:2.4.2-python3.9
   
   ### Versions of Apache Airflow Providers
   
   ```
   apache-airflow-providers-amazon==6.0.0
   apache-airflow-providers-celery==3.0.0
   apache-airflow-providers-cncf-kubernetes==4.4.0
   apache-airflow-providers-common-sql==1.2.0
   apache-airflow-providers-docker==3.2.0
   apache-airflow-providers-elasticsearch==4.2.1
   apache-airflow-providers-ftp==3.1.0
   apache-airflow-providers-google==8.4.0
   apache-airflow-providers-grpc==3.0.0
   apache-airflow-providers-hashicorp==3.1.0
   apache-airflow-providers-http==4.0.0
   apache-airflow-providers-imap==3.0.0
   apache-airflow-providers-microsoft-azure==4.3.0
   apache-airflow-providers-mysql==3.2.1
   apache-airflow-providers-odbc==3.1.2
   apache-airflow-providers-postgres==5.2.2
   apache-airflow-providers-redis==3.0.0
   apache-airflow-providers-sendgrid==3.0.0
   apache-airflow-providers-sftp==4.1.0
   apache-airflow-providers-slack==6.0.0
   apache-airflow-providers-sqlite==3.2.1
   apache-airflow-providers-ssh==3.2.0
   ```
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   Running using docker-compose locally.
   
   ### Anything else
   
   To trigger this problem the dataset to be removed must be in the "ready" 
state so that there is an entry in `dataset_dag_run_queue`. 
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to