This is an automated email from the ASF dual-hosted git repository.

shahar pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c31069d23a Only orphan non-orphaned Datasets (#40806)
c31069d23a is described below

commit c31069d23ac7e7407c95e348bc88ef83e8145de5
Author: Stijn De Haes <stijndeh...@gmail.com>
AuthorDate: Fri Aug 2 23:00:33 2024 +0200

    Only orphan non-orphaned Datasets (#40806)
---
 airflow/jobs/scheduler_job_runner.py |  1 +
 tests/jobs/test_scheduler_job.py     | 42 ++++++++++++++++++++++++++++++++++++
 2 files changed, 43 insertions(+)

diff --git a/airflow/jobs/scheduler_job_runner.py 
b/airflow/jobs/scheduler_job_runner.py
index 9e1f0121ac..163bf5b714 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -2070,6 +2070,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                 isouter=True,
             )
             .group_by(DatasetModel.id)
+            .where(~DatasetModel.is_orphaned)
             .having(
                 and_(
                     func.count(DagScheduleDatasetReference.dag_id) == 0,
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index eb6064be04..981e210dff 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -5796,6 +5796,48 @@ class TestSchedulerJob:
         ]
         assert orphaned_datasets == ["ds2", "ds4"]
 
+    def test_dataset_orphaning_ignore_orphaned_datasets(self, dag_maker, 
session):
+        dataset1 = Dataset(uri="ds1")
+
+        with dag_maker(dag_id="datasets-1", schedule=[dataset1], 
session=session):
+            BashOperator(task_id="task", bash_command="echo 1")
+
+        non_orphaned_dataset_count = 
session.query(DatasetModel).filter(~DatasetModel.is_orphaned).count()
+        assert non_orphaned_dataset_count == 1
+        orphaned_dataset_count = 
session.query(DatasetModel).filter(DatasetModel.is_orphaned).count()
+        assert orphaned_dataset_count == 0
+
+        # now remove dataset1 reference
+        with dag_maker(dag_id="datasets-1", schedule=None, session=session):
+            BashOperator(task_id="task", bash_command="echo 1")
+
+        scheduler_job = Job()
+        self.job_runner = SchedulerJobRunner(job=scheduler_job, 
subdir=os.devnull)
+
+        self.job_runner._orphan_unreferenced_datasets(session=session)
+        session.flush()
+
+        orphaned_datasets_before_rerun = (
+            session.query(DatasetModel.updated_at, DatasetModel.uri)
+            .filter(DatasetModel.is_orphaned)
+            .order_by(DatasetModel.uri)
+        )
+        assert [dataset.uri for dataset in orphaned_datasets_before_rerun] == 
["ds1"]
+        updated_at_timestamps = [dataset.updated_at for dataset in 
orphaned_datasets_before_rerun]
+
+        # when rerunning we should ignore the already orphaned datasets and 
thus the updated_at timestamp
+        # should remain the same
+        self.job_runner._orphan_unreferenced_datasets(session=session)
+        session.flush()
+
+        orphaned_datasets_after_rerun = (
+            session.query(DatasetModel.updated_at, DatasetModel.uri)
+            .filter(DatasetModel.is_orphaned)
+            .order_by(DatasetModel.uri)
+        )
+        assert [dataset.uri for dataset in orphaned_datasets_after_rerun] == 
["ds1"]
+        assert updated_at_timestamps == [dataset.updated_at for dataset in 
orphaned_datasets_after_rerun]
+
     def test_misconfigured_dags_doesnt_crash_scheduler(self, session, 
dag_maker, caplog):
         """Test that if dagrun creation throws an exception, the scheduler 
doesn't crash"""
 

Reply via email to