xinbinhuang commented on a change in pull request #15125:
URL: https://github.com/apache/airflow/pull/15125#discussion_r613334840



##########
File path: airflow/models/baseoperator.py
##########
@@ -822,9 +826,52 @@ def global_operator_extra_link_dict(self) -> Dict[str, 
Any]:
             raise AirflowException("Can't load operators")
         return {link.name: link for link in 
plugins_manager.global_operator_extra_links}
 
+    def _skip_if_not_latest(self, context: Optional[Any] = None) -> None:
+        """
+        Will raise :class:`~.AirflowSkipException` if dag run is not the 
latest dag run,
+        with the following exceptions:
+            - the operator's ``latest_only`` parameter is not set to ``True``
+            - the context dictionary has no dag run
+            - the context dictionary has no dag
+            - the ``context`` dictionary is ``None`` or empty
+            - the dag run is externally triggered
+        """
+        if not (self.latest_only is True and context):
+            return
+
+        import pendulum
+
+        from airflow.exceptions import AirflowSkipException
+
+        dag_run = context.get('dag_run')
+        if not dag_run:
+            return
+
+        if dag_run and dag_run.external_trigger:
+            self.log.info("Externally triggered DAG_Run: allowing execution to 
proceed.")
+            return
+
+        dag = context.get('dag')
+        if not dag:
+            return
+
+        now = pendulum.now('UTC')
+        left_window = dag.following_schedule(context['execution_date'])
+        right_window = dag.following_schedule(left_window)
+        self.log.info(  # pylint: disable=logging-fstring-interpolation
+            f"Checking latest only:\n"
+            f"\tleft_window: {left_window}\n"
+            f"\tright_window: {right_window}\n"
+            f"\tnow: {now}\n",
+        )
+
+        if not left_window < now <= right_window:
+            raise AirflowSkipException('Not latest execution; skipping...')
+
     @prepare_lineage
     def pre_execute(self, context: Any):
         """This hook is triggered right before self.execute() is called."""
+        self._skip_if_not_latest(context)

Review comment:
       Should we push this down to TI? i.e. 
   
   
https://github.com/apache/airflow/blob/75603160848e4199ed368809dfd441dcc5ddbd82/airflow/models/taskinstance.py#L1286-L1289
   
   Though currently, only SubDagOperator overwrites the `pre_execute` hook, I 
think the `pre_execute` hook is still a public API and users may override when 
they build custom operators.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to