uranusjr commented on a change in pull request #21798:
URL: https://github.com/apache/airflow/pull/21798#discussion_r815885054
##########
File path: airflow/models/abstractoperator.py
##########
@@ -239,19 +242,27 @@ def global_operator_extra_link_dict(self) -> Dict[str,
Any]:
def extra_links(self) -> List[str]:
return
list(set(self.operator_extra_link_dict).union(self.global_operator_extra_link_dict))
- def get_extra_links(self, dttm: datetime.datetime, link_name: str) ->
Optional[Dict[str, Any]]:
+ def get_extra_links(self, ti: "TaskInstance", link_name: str) ->
Optional[str]:
Review comment:
Does this need to be considered public API?
##########
File path: airflow/models/abstractoperator.py
##########
@@ -394,3 +405,7 @@ def _render_nested_template_fields(
# content has no inner template fields
return
self._do_render_template_fields(value, nested_template_fields,
context, jinja_env, seen_oids)
+
+
+if TYPE_CHECKING:
+ from airflow.models.baseoperator import BaseOperator, BaseOperatorLink
Review comment:
Why are these not at the top?
##########
File path: airflow/models/abstractoperator.py
##########
@@ -239,19 +242,27 @@ def global_operator_extra_link_dict(self) -> Dict[str,
Any]:
def extra_links(self) -> List[str]:
return
list(set(self.operator_extra_link_dict).union(self.global_operator_extra_link_dict))
- def get_extra_links(self, dttm: datetime.datetime, link_name: str) ->
Optional[Dict[str, Any]]:
+ def get_extra_links(self, ti: "TaskInstance", link_name: str) ->
Optional[str]:
"""For an operator, gets the URLs that the ``extra_links`` entry
points to.
:raise ValueError: The error message of a ValueError will be passed on
through to
the fronted to show up as a tooltip on the disabled link.
- :param dttm: The datetime parsed execution date for the URL being
searched for.
+ :param ti: The TaskInstance for the URL being searched for.
:param link_name: The name of the link we're looking for the URL for.
Should be
one of the options specified in ``extra_links``.
"""
- if link_name in self.operator_extra_link_dict:
- return self.operator_extra_link_dict[link_name].get_link(self,
dttm)
- elif link_name in self.global_operator_extra_link_dict:
- return
self.global_operator_extra_link_dict[link_name].get_link(self, dttm)
+ link: Optional["BaseOperatorLink"] = self.operator_extra_link_dict.get(
+ link_name
+ ) or self.global_operator_extra_link_dict.get(link_name)
Review comment:
```suggestion
link: Optional["BaseOperatorLink"] = (
self.operator_extra_link_dict.get(link_name)
or self.global_operator_extra_link_dict.get(link_name)
)
```
Would Black be happy with this? The current format is… bad…
##########
File path: airflow/models/baseoperator.py
##########
@@ -1730,11 +1731,17 @@ def name(self) -> str:
"""
@abstractmethod
- def get_link(self, operator: BaseOperator, dttm: datetime) -> str:
+ def get_link(
+ self,
+ operator: AbstractOperator,
+ dttm: Optional[datetime] = None,
+ *,
+ ti_key: Optional["TaskInstanceKey"] = None,
+ ) -> str:
Review comment:
`@overload` this?
##########
File path: airflow/models/abstractoperator.py
##########
@@ -239,19 +242,27 @@ def global_operator_extra_link_dict(self) -> Dict[str,
Any]:
def extra_links(self) -> List[str]:
return
list(set(self.operator_extra_link_dict).union(self.global_operator_extra_link_dict))
- def get_extra_links(self, dttm: datetime.datetime, link_name: str) ->
Optional[Dict[str, Any]]:
+ def get_extra_links(self, ti: "TaskInstance", link_name: str) ->
Optional[str]:
"""For an operator, gets the URLs that the ``extra_links`` entry
points to.
:raise ValueError: The error message of a ValueError will be passed on
through to
the fronted to show up as a tooltip on the disabled link.
- :param dttm: The datetime parsed execution date for the URL being
searched for.
+ :param ti: The TaskInstance for the URL being searched for.
:param link_name: The name of the link we're looking for the URL for.
Should be
one of the options specified in ``extra_links``.
"""
- if link_name in self.operator_extra_link_dict:
- return self.operator_extra_link_dict[link_name].get_link(self,
dttm)
- elif link_name in self.global_operator_extra_link_dict:
- return
self.global_operator_extra_link_dict[link_name].get_link(self, dttm)
+ link: Optional["BaseOperatorLink"] = self.operator_extra_link_dict.get(
+ link_name
+ ) or self.global_operator_extra_link_dict.get(link_name)
+ if not link:
+ return None
+ # Check for old function signature
+ parameters = inspect.signature(link.get_link).parameters
+ args = [name for name, p in parameters.items() if p.kind !=
p.VAR_KEYWORD]
+ if "ti_key" in args:
Review comment:
I think this should be good enough. Can’t think of all the future
possibilities but this should be strict enough, if we always _call_ `get_link`
with keyword `ti_key` (we already do here).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]