sunank200 commented on code in PR #46512:
URL: https://github.com/apache/airflow/pull/46512#discussion_r1952026574


##########
airflow/api_connexion/schemas/dag_run_schema.py:
##########
@@ -78,17 +79,19 @@ class Meta:
 
     @pre_load
     def autogenerate(self, data, **kwargs):
-        """Auto generate run_id and logical_date if they are not provided."""
-        logical_date = data.get("logical_date", _MISSING)
+        """Auto generate run_id, logical_date and run_after if they are not 
provided."""
+        run_after = data.get("run_after", _MISSING)
 
         # Auto-generate logical_date if missing
-        if logical_date is _MISSING:
-            data["logical_date"] = str(timezone.utcnow())
+        if run_after is _MISSING:
+            data["run_after"] = str(timezone.utcnow())

Review Comment:
   This is changed now in https://github.com/apache/airflow/pull/46616



##########
airflow/api/common/trigger_dag.py:
##########
@@ -64,27 +64,31 @@ def _trigger_dag(
 
     if dag is None or dag_id not in dag_bag.dags:
         raise DagNotFound(f"Dag id {dag_id} not found")
-
-    logical_date = logical_date or timezone.utcnow()
-
-    if not timezone.is_localized(logical_date):
-        raise ValueError("The logical date should be localized")
-
-    if replace_microseconds:
-        logical_date = logical_date.replace(microsecond=0)
-
-    if dag.default_args and "start_date" in dag.default_args:
-        min_dag_start_date = dag.default_args["start_date"]
-        if min_dag_start_date and logical_date < min_dag_start_date:
-            raise ValueError(
-                f"Logical date [{logical_date.isoformat()}] should be >= 
start_date "
-                f"[{min_dag_start_date.isoformat()}] from DAG's default_args"
-            )
-    coerced_logical_date = timezone.coerce_datetime(logical_date)
-
-    data_interval = 
dag.timetable.infer_manual_data_interval(run_after=coerced_logical_date)
+    if logical_date:
+        if not timezone.is_localized(logical_date):
+            raise ValueError("The logical date should be localized")
+
+        if replace_microseconds:
+            logical_date = logical_date.replace(microsecond=0)
+
+        if dag.default_args and "start_date" in dag.default_args:
+            min_dag_start_date = dag.default_args["start_date"]
+            if min_dag_start_date and logical_date < min_dag_start_date:
+                raise ValueError(
+                    f"Logical date [{logical_date.isoformat()}] should be >= 
start_date "
+                    f"[{min_dag_start_date.isoformat()}] from DAG's 
default_args"
+                )
+        run_after = timezone.coerce_datetime(logical_date)
+        data_interval = 
dag.timetable.infer_manual_data_interval(run_after=run_after)
+    else:
+        data_interval = None
+        run_after = timezone.coerce_datetime(timezone.utcnow())

Review Comment:
   This is outdated change.



##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1396,7 +1396,7 @@ def _create_dag_runs_asset_triggered(
                 dag_run = dag.create_dagrun(
                     run_id=dag.timetable.generate_run_id(
                         run_type=DagRunType.ASSET_TRIGGERED,
-                        logical_date=logical_date,
+                        logical_date=logical_date or 
dag_model.next_dagrun_create_after,

Review Comment:
   Changed it back as it merged now



##########
airflow/www/views.py:
##########
@@ -2224,20 +2224,27 @@ def trigger(self, dag_id: str, session: Session = 
NEW_SESSION):
                     "warning",
                 )
 
-        data_interval = 
dag.timetable.infer_manual_data_interval(run_after=logical_date)
-        if not run_id:
-            run_id = dag.timetable.generate_run_id(
-                logical_date=logical_date,
-                data_interval=data_interval,
-                run_type=DagRunType.MANUAL,
-            )
+        run_after = timezone.utcnow()
+        if logical_date:
+            data_interval = 
dag.timetable.infer_manual_data_interval(run_after=logical_date)
+            run_after = data_interval.end
+            if not run_id:
+                run_id = dag.timetable.generate_run_id(
+                    logical_date=logical_date,
+                    data_interval=data_interval,
+                    run_type=DagRunType.MANUAL,
+                )
+        else:
+            data_interval = None
+            if not run_id:
+                run_id = f"manual__{timezone.utcnow().isoformat()}"

Review Comment:
   changed it



##########
airflow/timetables/base.py:
##########
@@ -281,8 +282,17 @@ def generate_run_id(
         self,
         *,
         run_type: DagRunType,
-        logical_date: DateTime,
+        logical_date: DateTime | None,
         data_interval: DataInterval | None,
+        run_after: DateTime = timezone.coerce_datetime(timezone.utcnow()),

Review Comment:
   This is changed now in https://github.com/apache/airflow/pull/46616



##########
airflow/models/dag.py:
##########
@@ -1613,6 +1614,7 @@ def test(
         """
         Execute one single DagRun for a given DAG and logical date.
 
+        :param run_after: DagRun to execute after this DagRun

Review Comment:
   Outdated change. Not part of this PR anymore.



##########
airflow/api_fastapi/core_api/datamodels/dag_run.py:
##########
@@ -84,6 +84,8 @@ class TriggerDAGRunPostBody(StrictBaseModel):
     dag_run_id: str | None = None
     data_interval_start: AwareDatetime | None = None
     data_interval_end: AwareDatetime | None = None
+    logical_date: datetime | None = None
+    run_after: datetime | None = timezone.utcnow()

Review Comment:
   Removed it 



##########
airflow/api_fastapi/core_api/routes/public/dag_run.py:
##########
@@ -354,18 +355,23 @@ def trigger_dag_run(
             f"DAG with dag_id: '{dag_id}' has import errors and cannot be 
triggered",
         )
 
-    logical_date = pendulum.instance(body.logical_date)
-
+    logical_date = pendulum.instance(body.logical_date) if body.logical_date 
else None
+    run_after = timezone.coerce_datetime(timezone.utcnow())

Review Comment:
   Changed in https://github.com/apache/airflow/pull/46616



##########
airflow/api_connexion/schemas/dag_run_schema.py:
##########
@@ -78,7 +78,7 @@ class Meta:
 
     @pre_load
     def autogenerate(self, data, **kwargs):
-        """Auto generate run_id and logical_date if they are not provided."""
+        """Auto generate run_id, logical_date if they are not provided."""

Review Comment:
   This is an outdated change. Not part of this PR anymore



##########
airflow/models/dag.py:
##########
@@ -1782,6 +1789,10 @@ def create_dagrun(
         :meta private:
         """
         logical_date = timezone.coerce_datetime(logical_date)
+        # For manual runs where logical_date is None, ensure no data_interval 
is set.
+        if logical_date is None:
+            if data_interval is not None:
+                raise ValueError("data_interval must be None when logical_date 
is None")

Review Comment:
   changed it



##########
airflow/models/dagrun.py:
##########
@@ -253,12 +253,15 @@ def __init__(
         dag_version: DagVersion | None = None,
         bundle_version: str | None = None,
     ):
+        # For manual runs where logical_date is None, ensure no data_interval 
is set.
+        if logical_date is None:
+            if data_interval is not None:
+                raise ValueError("data_interval must be None if logical_date 
is None")

Review Comment:
   Changed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to