This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-8-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit bc953609029dff0e42656f4e938ecea2009745a2 Author: Vishnu <46651469+vishnucod...@users.noreply.github.com> AuthorDate: Tue Nov 28 07:14:07 2023 +0100 Relax mandatory requirement for start_date when schedule=None (#35356) * Relax mandatory requirement for start_date when schedule=None * Updated run_type in unit tests * Added check for empty start_date and non empty schedule * Fix the build failures * Fix the build failures * Update based on review comments (cherry picked from commit 930f165db11e611887275dce17f10eab102f0910) --- airflow/models/dag.py | 11 +++-- airflow/models/dagrun.py | 2 +- tests/models/test_dag.py | 52 +++++++++++++++--------- tests/providers/google/cloud/sensors/test_gcs.py | 4 +- 4 files changed, 45 insertions(+), 24 deletions(-) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 5daa7bb805..e93d5a55a0 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -548,6 +548,13 @@ class DAG(LoggingMixin): # sort out DAG's scheduling behavior scheduling_args = [schedule_interval, timetable, schedule] + + has_scheduling_args = any(a is not NOTSET and bool(a) for a in scheduling_args) + has_empty_start_date = not ("start_date" in self.default_args or self.start_date) + + if has_scheduling_args and has_empty_start_date: + raise ValueError("DAG is missing the start_date parameter") + if not at_most_one(*scheduling_args): raise ValueError("At most one allowed for args 'schedule_interval', 'timetable', and 'schedule'.") if schedule_interval is not NOTSET: @@ -2618,10 +2625,8 @@ class DAG(LoggingMixin): from airflow.utils.task_group import TaskGroupContext - if not self.start_date and not task.start_date: - raise AirflowException("DAG is missing the start_date parameter") # if the task has no start date, assign it the same as the DAG - elif not task.start_date: + if not task.start_date: task.start_date = self.start_date # otherwise, the task will start on the later of its own start date and # the DAG's start date diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 898d266390..b2e70b37a5 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -1116,7 +1116,7 @@ class DagRun(Base, LoggingMixin): def task_filter(task: Operator) -> bool: return task.task_id not in task_ids and ( self.is_backfill - or task.start_date <= self.execution_date + or (task.start_date is None or task.start_date <= self.execution_date) and (task.end_date is None or self.execution_date <= task.end_date) ) diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index f4b872109f..f7bf1ad6d0 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -765,7 +765,7 @@ class TestDag: """ dag_id = "test_schedule_dag_relativedelta" delta = relativedelta(hours=+1) - dag = DAG(dag_id=dag_id, schedule=delta) + dag = DAG(dag_id=dag_id, schedule=delta, start_date=TEST_DATE) dag.add_task(BaseOperator(task_id="faketastic", owner="Also fake", start_date=TEST_DATE)) _next = dag.following_schedule(TEST_DATE) @@ -780,7 +780,7 @@ class TestDag: """ dag_id = "test_schedule_dag_relativedelta" delta = relativedelta(hours=+1) - dag = DAG(dag_id=dag_id, schedule_interval=delta) + dag = DAG(dag_id=dag_id, schedule_interval=delta, start_date=TEST_DATE) dag.add_task(BaseOperator(task_id="faketastic", owner="Also fake", start_date=TEST_DATE)) _next = dag.following_schedule(TEST_DATE) @@ -799,7 +799,7 @@ class TestDag: dag_id = "test_schedule_dag_relativedelta" delta = relativedelta(hours=+1) - @dag(dag_id=dag_id, schedule_interval=delta) + @dag(dag_id=dag_id, schedule_interval=delta, start_date=TEST_DATE) def mydag(): BaseOperator(task_id="faketastic", owner="Also fake", start_date=TEST_DATE) @@ -827,6 +827,20 @@ class TestDag: when = dag.following_schedule(start) assert when.isoformat() == "2018-03-25T03:00:00+00:00" + def test_create_dagrun_when_schedule_is_none_and_empty_start_date(self): + # Check that we don't get an AttributeError 'start_date' for self.start_date when schedule is none + dag = DAG("dag_with_none_schedule_and_empty_start_date") + dag.add_task(BaseOperator(task_id="task_without_start_date")) + dagrun = dag.create_dagrun( + state=State.RUNNING, run_type=DagRunType.MANUAL, execution_date=DEFAULT_DATE + ) + assert dagrun is not None + + def test_fail_dag_when_schedule_is_non_none_and_empty_start_date(self): + # Check that we get a ValueError 'start_date' for self.start_date when schedule is non-none + with pytest.raises(ValueError, match="DAG is missing the start_date parameter"): + DAG(dag_id="dag_with_non_none_schedule_and_empty_start_date", schedule="@hourly") + def test_following_schedule_datetime_timezone_utc0530(self): # Check that we don't get an AttributeError 'name' for self.timezone class UTC0530(datetime.tzinfo): @@ -942,8 +956,8 @@ class TestDag: mock_active_runs_of_dags = mock.MagicMock(side_effect=DagRun.active_runs_of_dags) with mock.patch.object(DagRun, "active_runs_of_dags", mock_active_runs_of_dags): dags_null_timetable = [ - DAG("dag-interval-None", schedule_interval=None), - DAG("dag-interval-test", schedule_interval=interval), + DAG("dag-interval-None", schedule_interval=None, start_date=TEST_DATE), + DAG("dag-interval-test", schedule_interval=interval, start_date=TEST_DATE), ] DAG.bulk_write_to_db(dags_null_timetable, session=settings.Session()) if interval: @@ -1530,7 +1544,7 @@ class TestDag: it is called, and not scheduled the second. """ dag_id = "test_schedule_dag_once" - dag = DAG(dag_id=dag_id, schedule="@once") + dag = DAG(dag_id=dag_id, schedule="@once", start_date=TEST_DATE) assert isinstance(dag.timetable, OnceTimetable) dag.add_task(BaseOperator(task_id="faketastic", owner="Also fake", start_date=TEST_DATE)) @@ -1553,7 +1567,7 @@ class TestDag: Tests if fractional seconds are stored in the database """ dag_id = "test_fractional_seconds" - dag = DAG(dag_id=dag_id, schedule="@once") + dag = DAG(dag_id=dag_id, schedule="@once", start_date=TEST_DATE) dag.add_task(BaseOperator(task_id="faketastic", owner="Also fake", start_date=TEST_DATE)) start_date = timezone.utcnow() @@ -1658,25 +1672,25 @@ class TestDag: def test_timetable_and_description_from_schedule_interval_arg( self, schedule_interval_arg, expected_timetable, interval_description ): - dag = DAG("test_schedule_interval_arg", schedule=schedule_interval_arg) + dag = DAG("test_schedule_interval_arg", schedule=schedule_interval_arg, start_date=TEST_DATE) assert dag.timetable == expected_timetable assert dag.schedule_interval == schedule_interval_arg assert dag.timetable.description == interval_description def test_timetable_and_description_from_dataset(self): - dag = DAG("test_schedule_interval_arg", schedule=[Dataset(uri="hello")]) + dag = DAG("test_schedule_interval_arg", schedule=[Dataset(uri="hello")], start_date=TEST_DATE) assert dag.timetable == DatasetTriggeredTimetable() assert dag.schedule_interval == "Dataset" assert dag.timetable.description == "Triggered by datasets" def test_schedule_interval_still_works(self): - dag = DAG("test_schedule_interval_arg", schedule_interval="*/5 * * * *") + dag = DAG("test_schedule_interval_arg", schedule_interval="*/5 * * * *", start_date=TEST_DATE) assert dag.timetable == cron_timetable("*/5 * * * *") assert dag.schedule_interval == "*/5 * * * *" assert dag.timetable.description == "Every 5 minutes" def test_timetable_still_works(self): - dag = DAG("test_schedule_interval_arg", timetable=cron_timetable("*/6 * * * *")) + dag = DAG("test_schedule_interval_arg", timetable=cron_timetable("*/6 * * * *"), start_date=TEST_DATE) assert dag.timetable == cron_timetable("*/6 * * * *") assert dag.schedule_interval == "*/6 * * * *" assert dag.timetable.description == "Every 6 minutes" @@ -1702,7 +1716,7 @@ class TestDag: ], ) def test_description_from_timetable(self, timetable, expected_description): - dag = DAG("test_schedule_interval_description", timetable=timetable) + dag = DAG("test_schedule_interval_description", timetable=timetable, start_date=TEST_DATE) assert dag.timetable == timetable assert dag.timetable.description == expected_description @@ -2449,7 +2463,7 @@ my_postgres_conn: start_date = TEST_DATE delta = timedelta(days=1) - dag = DAG("dummy-dag", schedule=delta) + dag = DAG("dummy-dag", schedule=delta, start_date=start_date) dag_dates = dag.date_range(start_date=start_date, num=3) assert dag_dates == [ @@ -2502,10 +2516,10 @@ my_postgres_conn: ) def test_schedule_dag_param(self, kwargs): with pytest.raises(ValueError, match="At most one"): - with DAG(dag_id="hello", **kwargs): + with DAG(dag_id="hello", start_date=TEST_DATE, **kwargs): pass - def test_continuous_schedule_interval_limits_max_active_runs(self): + def test_continuous_schedule_interval_linmits_max_active_runs(self): dag = DAG("continuous", start_date=DEFAULT_DATE, schedule_interval="@continuous", max_active_runs=1) assert isinstance(dag.timetable, ContinuousTimetable) assert dag.max_active_runs == 1 @@ -3010,19 +3024,19 @@ class TestDagDecorator: @pytest.mark.parametrize("timetable", [NullTimetable(), OnceTimetable()]) def test_dag_timetable_match_schedule_interval(timetable): - dag = DAG("my-dag", timetable=timetable) + dag = DAG("my-dag", timetable=timetable, start_date=DEFAULT_DATE) assert dag._check_schedule_interval_matches_timetable() @pytest.mark.parametrize("schedule_interval", [None, "@once", "@daily", timedelta(days=1)]) def test_dag_schedule_interval_match_timetable(schedule_interval): - dag = DAG("my-dag", schedule=schedule_interval) + dag = DAG("my-dag", schedule=schedule_interval, start_date=DEFAULT_DATE) assert dag._check_schedule_interval_matches_timetable() @pytest.mark.parametrize("schedule_interval", [None, "@daily", timedelta(days=1)]) def test_dag_schedule_interval_change_after_init(schedule_interval): - dag = DAG("my-dag", timetable=OnceTimetable()) + dag = DAG("my-dag", timetable=OnceTimetable(), start_date=DEFAULT_DATE) dag.schedule_interval = schedule_interval assert not dag._check_schedule_interval_matches_timetable() @@ -3391,7 +3405,7 @@ def test_get_next_data_interval( data_interval_end, expected_data_interval, ): - dag = DAG(dag_id="test_get_next_data_interval", schedule="@daily") + dag = DAG(dag_id="test_get_next_data_interval", schedule="@daily", start_date=DEFAULT_DATE) dag_model = DagModel( dag_id="test_get_next_data_interval", next_dagrun=logical_date, diff --git a/tests/providers/google/cloud/sensors/test_gcs.py b/tests/providers/google/cloud/sensors/test_gcs.py index 422cd8f71a..1d4bbcec87 100644 --- a/tests/providers/google/cloud/sensors/test_gcs.py +++ b/tests/providers/google/cloud/sensors/test_gcs.py @@ -227,7 +227,9 @@ class TestGoogleCloudStorageObjectAsyncSensor: class TestTsFunction: def test_should_support_datetime(self): context = { - "dag": DAG(dag_id=TEST_DAG_ID, schedule=timedelta(days=5)), + "dag": DAG( + dag_id=TEST_DAG_ID, schedule=timedelta(days=5), start_date=datetime(2019, 2, 14, 0, 0) + ), "execution_date": datetime(2019, 2, 14, 0, 0), } result = ts_function(context)