yuqian90 commented on a change in pull request #7119: [AIRFLOW-5840] Add 
operator extra link to external task sensor
URL: https://github.com/apache/airflow/pull/7119#discussion_r366633960
 
 

 ##########
 File path: airflow/sensors/external_task_sensor.py
 ##########
 @@ -16,22 +16,71 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
 import datetime
 import os
-from typing import Optional, Union
+from typing import FrozenSet, Optional, Union
 
 from sqlalchemy import func
 
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException
-from airflow.models import DagBag, DagModel, DagRun, TaskInstance
+from airflow.models import BaseOperatorLink, DagBag, DagModel, DagRun, 
TaskInstance
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.sensors.base_sensor_operator import BaseSensorOperator
 from airflow.utils.decorators import apply_defaults
 from airflow.utils.session import provide_session
 from airflow.utils.state import State
 
 
+def get_possible_target_execution_dates(execution_date, execution_delta, 
execution_date_fn):
+    """
+    Gets the execution date(s) of an external DAG for which an
+    ExternalTaskSensor should succeed on. Default is the execution
+    date itself, but it may be modified if a non-null execution delta
+    or execution date function is passed in.
+
+    :param execution_date: The execution date of the sensor
+    :type execution_date: datetime.datetime
+    :param execution_delta: Time difference between the sensor
+        execution date and the target DAG run execution date. Positive
+        delta looks back in time.
+    :type execution_delta: Optional[datetime.timedelta]
+    :param execution_date_fn: Function to compute the execution date(s)
+        of the target DAG run to look at given the sensor's execution
+        date.
+    :type execution_date_fn: Optional[Callable]
+    :return: Execution date(s) to wait for
+    :rtype: List[datetime.datetime]
+    """
+    if execution_delta:
+        dttm = execution_date - execution_delta
+    elif execution_date_fn:
+        dttm = execution_date_fn(execution_date)
+    else:
+        dttm = execution_date
+
+    return dttm if isinstance(dttm, list) else [dttm]
+
+
+class ExternalTaskLink(BaseOperatorLink):
+    name = 'External DAG'
+
+    def get_link(self, operator, dttm):
+        possible_execution_dates = get_possible_target_execution_dates(
+            execution_date=dttm,
+            execution_delta=getattr(operator, 'execution_delta', None),
+            execution_date_fn=None,
 
 Review comment:
   Oh that's unfortunate if we can't make an existing `execution_date_fn` 
feature work here because I assume many people would need to be using 
`execution_date_fn` to shift execution_date around according to some fancy 
logic they may have.
   
   I have one suggestion that might work. If we know the `task_id` and `dag_id` 
of the `operator`, we can always find the real `ExternalTaskSensor` object 
(e.g. by looking it up in the `DagBag`). Once we have a reference to the real 
`ExternalTaskSensor` object, we can call its `execution_date_fn`.
   
   If that doesn't work for you, I think it's important to make sure that if 
the `ExternalTaskSensor` uses `execution_date_fn`, the extra link doesn't get 
generated. In the current state of the PR, it looks like the link is going to 
point to the wrong `execution_date` if the user intended to use 
`execution_date_fn`. That is going to cause confusion. Also a comment that says 
`execution_date_fn` is not supported for the extra link is probably helpful too.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to