This is an automated email from the ASF dual-hosted git repository. shahar pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new c31069d23a Only orphan non-orphaned Datasets (#40806) c31069d23a is described below commit c31069d23ac7e7407c95e348bc88ef83e8145de5 Author: Stijn De Haes <stijndeh...@gmail.com> AuthorDate: Fri Aug 2 23:00:33 2024 +0200 Only orphan non-orphaned Datasets (#40806) --- airflow/jobs/scheduler_job_runner.py | 1 + tests/jobs/test_scheduler_job.py | 42 ++++++++++++++++++++++++++++++++++++ 2 files changed, 43 insertions(+) diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 9e1f0121ac..163bf5b714 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -2070,6 +2070,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin): isouter=True, ) .group_by(DatasetModel.id) + .where(~DatasetModel.is_orphaned) .having( and_( func.count(DagScheduleDatasetReference.dag_id) == 0, diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index eb6064be04..981e210dff 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -5796,6 +5796,48 @@ class TestSchedulerJob: ] assert orphaned_datasets == ["ds2", "ds4"] + def test_dataset_orphaning_ignore_orphaned_datasets(self, dag_maker, session): + dataset1 = Dataset(uri="ds1") + + with dag_maker(dag_id="datasets-1", schedule=[dataset1], session=session): + BashOperator(task_id="task", bash_command="echo 1") + + non_orphaned_dataset_count = session.query(DatasetModel).filter(~DatasetModel.is_orphaned).count() + assert non_orphaned_dataset_count == 1 + orphaned_dataset_count = session.query(DatasetModel).filter(DatasetModel.is_orphaned).count() + assert orphaned_dataset_count == 0 + + # now remove dataset1 reference + with dag_maker(dag_id="datasets-1", schedule=None, session=session): + BashOperator(task_id="task", bash_command="echo 1") + + scheduler_job = Job() + self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull) + + self.job_runner._orphan_unreferenced_datasets(session=session) + session.flush() + + orphaned_datasets_before_rerun = ( + session.query(DatasetModel.updated_at, DatasetModel.uri) + .filter(DatasetModel.is_orphaned) + .order_by(DatasetModel.uri) + ) + assert [dataset.uri for dataset in orphaned_datasets_before_rerun] == ["ds1"] + updated_at_timestamps = [dataset.updated_at for dataset in orphaned_datasets_before_rerun] + + # when rerunning we should ignore the already orphaned datasets and thus the updated_at timestamp + # should remain the same + self.job_runner._orphan_unreferenced_datasets(session=session) + session.flush() + + orphaned_datasets_after_rerun = ( + session.query(DatasetModel.updated_at, DatasetModel.uri) + .filter(DatasetModel.is_orphaned) + .order_by(DatasetModel.uri) + ) + assert [dataset.uri for dataset in orphaned_datasets_after_rerun] == ["ds1"] + assert updated_at_timestamps == [dataset.updated_at for dataset in orphaned_datasets_after_rerun] + def test_misconfigured_dags_doesnt_crash_scheduler(self, session, dag_maker, caplog): """Test that if dagrun creation throws an exception, the scheduler doesn't crash"""