This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-8-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit bc953609029dff0e42656f4e938ecea2009745a2
Author: Vishnu <46651469+vishnucod...@users.noreply.github.com>
AuthorDate: Tue Nov 28 07:14:07 2023 +0100

    Relax mandatory requirement for start_date when schedule=None (#35356)
    
    * Relax mandatory requirement for start_date when schedule=None
    
    * Updated run_type in unit tests
    
    * Added check for empty start_date and non empty schedule
    
    * Fix the build failures
    
    * Fix the build failures
    
    * Update based on review comments
    
    (cherry picked from commit 930f165db11e611887275dce17f10eab102f0910)
---
 airflow/models/dag.py                            | 11 +++--
 airflow/models/dagrun.py                         |  2 +-
 tests/models/test_dag.py                         | 52 +++++++++++++++---------
 tests/providers/google/cloud/sensors/test_gcs.py |  4 +-
 4 files changed, 45 insertions(+), 24 deletions(-)

diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 5daa7bb805..e93d5a55a0 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -548,6 +548,13 @@ class DAG(LoggingMixin):
 
         # sort out DAG's scheduling behavior
         scheduling_args = [schedule_interval, timetable, schedule]
+
+        has_scheduling_args = any(a is not NOTSET and bool(a) for a in 
scheduling_args)
+        has_empty_start_date = not ("start_date" in self.default_args or 
self.start_date)
+
+        if has_scheduling_args and has_empty_start_date:
+            raise ValueError("DAG is missing the start_date parameter")
+
         if not at_most_one(*scheduling_args):
             raise ValueError("At most one allowed for args 
'schedule_interval', 'timetable', and 'schedule'.")
         if schedule_interval is not NOTSET:
@@ -2618,10 +2625,8 @@ class DAG(LoggingMixin):
 
         from airflow.utils.task_group import TaskGroupContext
 
-        if not self.start_date and not task.start_date:
-            raise AirflowException("DAG is missing the start_date parameter")
         # if the task has no start date, assign it the same as the DAG
-        elif not task.start_date:
+        if not task.start_date:
             task.start_date = self.start_date
         # otherwise, the task will start on the later of its own start date and
         # the DAG's start date
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 898d266390..b2e70b37a5 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -1116,7 +1116,7 @@ class DagRun(Base, LoggingMixin):
         def task_filter(task: Operator) -> bool:
             return task.task_id not in task_ids and (
                 self.is_backfill
-                or task.start_date <= self.execution_date
+                or (task.start_date is None or task.start_date <= 
self.execution_date)
                 and (task.end_date is None or self.execution_date <= 
task.end_date)
             )
 
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index f4b872109f..f7bf1ad6d0 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -765,7 +765,7 @@ class TestDag:
         """
         dag_id = "test_schedule_dag_relativedelta"
         delta = relativedelta(hours=+1)
-        dag = DAG(dag_id=dag_id, schedule=delta)
+        dag = DAG(dag_id=dag_id, schedule=delta, start_date=TEST_DATE)
         dag.add_task(BaseOperator(task_id="faketastic", owner="Also fake", 
start_date=TEST_DATE))
 
         _next = dag.following_schedule(TEST_DATE)
@@ -780,7 +780,7 @@ class TestDag:
         """
         dag_id = "test_schedule_dag_relativedelta"
         delta = relativedelta(hours=+1)
-        dag = DAG(dag_id=dag_id, schedule_interval=delta)
+        dag = DAG(dag_id=dag_id, schedule_interval=delta, start_date=TEST_DATE)
         dag.add_task(BaseOperator(task_id="faketastic", owner="Also fake", 
start_date=TEST_DATE))
 
         _next = dag.following_schedule(TEST_DATE)
@@ -799,7 +799,7 @@ class TestDag:
         dag_id = "test_schedule_dag_relativedelta"
         delta = relativedelta(hours=+1)
 
-        @dag(dag_id=dag_id, schedule_interval=delta)
+        @dag(dag_id=dag_id, schedule_interval=delta, start_date=TEST_DATE)
         def mydag():
             BaseOperator(task_id="faketastic", owner="Also fake", 
start_date=TEST_DATE)
 
@@ -827,6 +827,20 @@ class TestDag:
         when = dag.following_schedule(start)
         assert when.isoformat() == "2018-03-25T03:00:00+00:00"
 
+    def test_create_dagrun_when_schedule_is_none_and_empty_start_date(self):
+        # Check that we don't get an AttributeError 'start_date' for 
self.start_date when schedule is none
+        dag = DAG("dag_with_none_schedule_and_empty_start_date")
+        dag.add_task(BaseOperator(task_id="task_without_start_date"))
+        dagrun = dag.create_dagrun(
+            state=State.RUNNING, run_type=DagRunType.MANUAL, 
execution_date=DEFAULT_DATE
+        )
+        assert dagrun is not None
+
+    def test_fail_dag_when_schedule_is_non_none_and_empty_start_date(self):
+        # Check that we get a ValueError 'start_date' for self.start_date when 
schedule is non-none
+        with pytest.raises(ValueError, match="DAG is missing the start_date 
parameter"):
+            DAG(dag_id="dag_with_non_none_schedule_and_empty_start_date", 
schedule="@hourly")
+
     def test_following_schedule_datetime_timezone_utc0530(self):
         # Check that we don't get an AttributeError 'name' for self.timezone
         class UTC0530(datetime.tzinfo):
@@ -942,8 +956,8 @@ class TestDag:
         mock_active_runs_of_dags = 
mock.MagicMock(side_effect=DagRun.active_runs_of_dags)
         with mock.patch.object(DagRun, "active_runs_of_dags", 
mock_active_runs_of_dags):
             dags_null_timetable = [
-                DAG("dag-interval-None", schedule_interval=None),
-                DAG("dag-interval-test", schedule_interval=interval),
+                DAG("dag-interval-None", schedule_interval=None, 
start_date=TEST_DATE),
+                DAG("dag-interval-test", schedule_interval=interval, 
start_date=TEST_DATE),
             ]
             DAG.bulk_write_to_db(dags_null_timetable, 
session=settings.Session())
             if interval:
@@ -1530,7 +1544,7 @@ class TestDag:
         it is called, and not scheduled the second.
         """
         dag_id = "test_schedule_dag_once"
-        dag = DAG(dag_id=dag_id, schedule="@once")
+        dag = DAG(dag_id=dag_id, schedule="@once", start_date=TEST_DATE)
         assert isinstance(dag.timetable, OnceTimetable)
         dag.add_task(BaseOperator(task_id="faketastic", owner="Also fake", 
start_date=TEST_DATE))
 
@@ -1553,7 +1567,7 @@ class TestDag:
         Tests if fractional seconds are stored in the database
         """
         dag_id = "test_fractional_seconds"
-        dag = DAG(dag_id=dag_id, schedule="@once")
+        dag = DAG(dag_id=dag_id, schedule="@once", start_date=TEST_DATE)
         dag.add_task(BaseOperator(task_id="faketastic", owner="Also fake", 
start_date=TEST_DATE))
 
         start_date = timezone.utcnow()
@@ -1658,25 +1672,25 @@ class TestDag:
     def test_timetable_and_description_from_schedule_interval_arg(
         self, schedule_interval_arg, expected_timetable, interval_description
     ):
-        dag = DAG("test_schedule_interval_arg", schedule=schedule_interval_arg)
+        dag = DAG("test_schedule_interval_arg", 
schedule=schedule_interval_arg, start_date=TEST_DATE)
         assert dag.timetable == expected_timetable
         assert dag.schedule_interval == schedule_interval_arg
         assert dag.timetable.description == interval_description
 
     def test_timetable_and_description_from_dataset(self):
-        dag = DAG("test_schedule_interval_arg", 
schedule=[Dataset(uri="hello")])
+        dag = DAG("test_schedule_interval_arg", 
schedule=[Dataset(uri="hello")], start_date=TEST_DATE)
         assert dag.timetable == DatasetTriggeredTimetable()
         assert dag.schedule_interval == "Dataset"
         assert dag.timetable.description == "Triggered by datasets"
 
     def test_schedule_interval_still_works(self):
-        dag = DAG("test_schedule_interval_arg", schedule_interval="*/5 * * * 
*")
+        dag = DAG("test_schedule_interval_arg", schedule_interval="*/5 * * * 
*", start_date=TEST_DATE)
         assert dag.timetable == cron_timetable("*/5 * * * *")
         assert dag.schedule_interval == "*/5 * * * *"
         assert dag.timetable.description == "Every 5 minutes"
 
     def test_timetable_still_works(self):
-        dag = DAG("test_schedule_interval_arg", timetable=cron_timetable("*/6 
* * * *"))
+        dag = DAG("test_schedule_interval_arg", timetable=cron_timetable("*/6 
* * * *"), start_date=TEST_DATE)
         assert dag.timetable == cron_timetable("*/6 * * * *")
         assert dag.schedule_interval == "*/6 * * * *"
         assert dag.timetable.description == "Every 6 minutes"
@@ -1702,7 +1716,7 @@ class TestDag:
         ],
     )
     def test_description_from_timetable(self, timetable, expected_description):
-        dag = DAG("test_schedule_interval_description", timetable=timetable)
+        dag = DAG("test_schedule_interval_description", timetable=timetable, 
start_date=TEST_DATE)
         assert dag.timetable == timetable
         assert dag.timetable.description == expected_description
 
@@ -2449,7 +2463,7 @@ my_postgres_conn:
         start_date = TEST_DATE
         delta = timedelta(days=1)
 
-        dag = DAG("dummy-dag", schedule=delta)
+        dag = DAG("dummy-dag", schedule=delta, start_date=start_date)
         dag_dates = dag.date_range(start_date=start_date, num=3)
 
         assert dag_dates == [
@@ -2502,10 +2516,10 @@ my_postgres_conn:
     )
     def test_schedule_dag_param(self, kwargs):
         with pytest.raises(ValueError, match="At most one"):
-            with DAG(dag_id="hello", **kwargs):
+            with DAG(dag_id="hello", start_date=TEST_DATE, **kwargs):
                 pass
 
-    def test_continuous_schedule_interval_limits_max_active_runs(self):
+    def test_continuous_schedule_interval_linmits_max_active_runs(self):
         dag = DAG("continuous", start_date=DEFAULT_DATE, 
schedule_interval="@continuous", max_active_runs=1)
         assert isinstance(dag.timetable, ContinuousTimetable)
         assert dag.max_active_runs == 1
@@ -3010,19 +3024,19 @@ class TestDagDecorator:
 
 @pytest.mark.parametrize("timetable", [NullTimetable(), OnceTimetable()])
 def test_dag_timetable_match_schedule_interval(timetable):
-    dag = DAG("my-dag", timetable=timetable)
+    dag = DAG("my-dag", timetable=timetable, start_date=DEFAULT_DATE)
     assert dag._check_schedule_interval_matches_timetable()
 
 
 @pytest.mark.parametrize("schedule_interval", [None, "@once", "@daily", 
timedelta(days=1)])
 def test_dag_schedule_interval_match_timetable(schedule_interval):
-    dag = DAG("my-dag", schedule=schedule_interval)
+    dag = DAG("my-dag", schedule=schedule_interval, start_date=DEFAULT_DATE)
     assert dag._check_schedule_interval_matches_timetable()
 
 
 @pytest.mark.parametrize("schedule_interval", [None, "@daily", 
timedelta(days=1)])
 def test_dag_schedule_interval_change_after_init(schedule_interval):
-    dag = DAG("my-dag", timetable=OnceTimetable())
+    dag = DAG("my-dag", timetable=OnceTimetable(), start_date=DEFAULT_DATE)
     dag.schedule_interval = schedule_interval
     assert not dag._check_schedule_interval_matches_timetable()
 
@@ -3391,7 +3405,7 @@ def test_get_next_data_interval(
     data_interval_end,
     expected_data_interval,
 ):
-    dag = DAG(dag_id="test_get_next_data_interval", schedule="@daily")
+    dag = DAG(dag_id="test_get_next_data_interval", schedule="@daily", 
start_date=DEFAULT_DATE)
     dag_model = DagModel(
         dag_id="test_get_next_data_interval",
         next_dagrun=logical_date,
diff --git a/tests/providers/google/cloud/sensors/test_gcs.py 
b/tests/providers/google/cloud/sensors/test_gcs.py
index 422cd8f71a..1d4bbcec87 100644
--- a/tests/providers/google/cloud/sensors/test_gcs.py
+++ b/tests/providers/google/cloud/sensors/test_gcs.py
@@ -227,7 +227,9 @@ class TestGoogleCloudStorageObjectAsyncSensor:
 class TestTsFunction:
     def test_should_support_datetime(self):
         context = {
-            "dag": DAG(dag_id=TEST_DAG_ID, schedule=timedelta(days=5)),
+            "dag": DAG(
+                dag_id=TEST_DAG_ID, schedule=timedelta(days=5), 
start_date=datetime(2019, 2, 14, 0, 0)
+            ),
             "execution_date": datetime(2019, 2, 14, 0, 0),
         }
         result = ts_function(context)

Reply via email to