ashb commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r678343708



##########
File path: airflow/models/dag.py
##########
@@ -112,6 +113,51 @@ def get_last_dagrun(dag_id, session, 
include_externally_triggered=False):
     return query.first()
 
 
+class _NextDagRunInfoLegacy(NamedTuple):
+    """Legacy return format for ``DAG.next_dagrun_info()``.
+
+    In the pre-AIP-39 implementation, ``DAG.next_dagrun_info()`` returns a
+    2-tuple ``(execution_date, run_after)``.
+    """
+
+    execution_date: Optional[pendulum.DateTime]
+    run_after: Optional[pendulum.DateTime]
+
+
+class _NextDagRunInfoCompat(_NextDagRunInfoLegacy):

Review comment:
       We don't need to maintain compat here (though doing so isn't all that 
hard) but I think we should give this a better/non-compat-or-private name.
   
   I don't have any brilliant ideas for a name though.

##########
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##########
@@ -242,16 +242,23 @@ def post_dag_run(dag_id, session):
     except ValidationError as err:
         raise BadRequest(detail=str(err))
 
+    execution_date = post_body["execution_date"]
     dagrun_instance = (
         session.query(DagRun)
         .filter(
             DagRun.dag_id == dag_id,
-            or_(DagRun.run_id == post_body["run_id"], DagRun.execution_date == 
post_body["execution_date"]),
+            or_(DagRun.run_id == post_body["run_id"], DagRun.execution_date == 
execution_date),
         )
         .first()
     )
     if not dagrun_instance:
-        dag_run = DagRun(dag_id=dag_id, run_type=DagRunType.MANUAL, 
**post_body)
+        dag = current_app.dag_bag.get_dag(dag_id)
+        dag_run = DagRun(
+            dag_id=dag_id,
+            run_type=DagRunType.MANUAL,
+            data_interval=dag.timetable.infer_data_interval(execution_date),
+            **post_body,
+        )

Review comment:
       Is there a reason (other than we didn't before) that we don't use 
`dag.create_run()`?

##########
File path: airflow/models/dag.py
##########
@@ -2372,8 +2471,13 @@ class DagModel(Base):
 
     has_task_concurrency_limits = Column(Boolean, nullable=False)
 
-    # The execution_date of the next dag run
-    next_dagrun = Column(UtcDateTime)
+    # Kept for backwards compatibility. New code should use data_interval 
columns.

Review comment:
       I think we will still need this -- it is the 
execution_date/schedule_date of the new DagRun to create -- which may be 
different to interval start or end.
   
   ```suggestion
   ```

##########
File path: airflow/models/dag.py
##########
@@ -596,23 +697,17 @@ def get_run_dates(self, start_date, end_date=None, *, 
align: bool = True):
         :return: A list of dates within the interval following the dag's 
schedule.
         :rtype: list
         """
-        if start_date is None:
-            start = self._time_restriction.earliest
-        else:
-            start = pendulum.instance(start_date)
+        warnings.warn(
+            "`DAG.get_run_dates()` is deprecated. " "Please use 
`DAG.iter_dagrun_infos_between()` instead.",

Review comment:
       ```suggestion
               "`DAG.get_run_dates()` is deprecated. Please use 
`DAG.iter_dagrun_infos_between()` instead.",
   ```

##########
File path: airflow/models/dag.py
##########
@@ -2403,6 +2507,24 @@ def __init__(self, concurrency=None, **kwargs):
     def __repr__(self):
         return f"<DAG: {self.dag_id}>"
 
+    @property
+    def data_interval(self) -> Optional[Tuple[datetime, datetime]]:
+        if self.next_dagrun_data_interval_start is None:
+            if self.next_dagrun_data_interval_end is not None:
+                raise AirflowException(
+                    "Inconsistent DagModel: next_dagrun_data_interval_start 
and "
+                    "next_dagrun_data_interval_end must be either both None or 
both datetime"
+                )
+            return None
+        return (self.next_dagrun_data_interval_start, 
self.next_dagrun_data_interval_end)

Review comment:
       This seems... odd. What does `dag.data_interval` _mean_? I would expect 
this at least to be called `next_data_interval` based on the implementation.

##########
File path: airflow/www/views.py
##########
@@ -1537,11 +1537,12 @@ def trigger(self, session=None):
                     'airflow/trigger.html', dag_id=dag_id, origin=origin, 
conf=request_conf, form=form
                 )
 
-        dag = current_app.dag_bag.get_dag(dag_id)
+        dag: DAG = current_app.dag_bag.get_dag(dag_id)
         dag.create_dagrun(
             run_type=DagRunType.MANUAL,
             execution_date=execution_date,
-            state=State.QUEUED,
+            data_interval=dag.timetable.infer_data_interval(execution_date),
+            state=State.RUNNING,

Review comment:
       ```suggestion
               state=State.QUEUED,
   ```
   
   Bad conflict resolve?

##########
File path: airflow/timetables/interval.py
##########
@@ -43,6 +43,13 @@ def __eq__(self, other: Any) -> bool:
     def validate(self) -> None:
         self._schedule.validate()
 
+    def infer_data_interval(self, run_after: DateTime) -> 
Optional[DataInterval]:
+        # Get the last complete period before run_after, e.g. if a DAG run is
+        # scheduled at each midnight, the data interval of a manually 
trioggered
+        # run at 1am 25th is between 0am 24th and 0am 25th.

Review comment:
       (Without thinking about this in much detail) I would have expected such 
a manually triggered run to be from midnight 25th to 1am 25th.
   
   But maybe this way does make more sense. Data intervals aren't (currently) 
well defined in the case of manual triggers.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to