amoghrajesh commented on code in PR #46613:
URL: https://github.com/apache/airflow/pull/46613#discussion_r1949370470
##########
airflow/serialization/serialized_objects.py:
##########
@@ -1520,76 +1523,43 @@ def _deserialize_operator_extra_links(cls,
encoded_op_links: list) -> dict[str,
raise AirflowException("Can't load plugins")
op_predefined_extra_links = {}
- for _operator_links_source in encoded_op_links:
- # Get the key, value pair as Tuple where key is OperatorLink
ClassName
- # and value is the dictionary containing the arguments passed to
the OperatorLink
+ for item in encoded_op_links.items():
+ # Get the name and xcom_key of the encoded operator and use it to
create a GenericOperatorLink object
+ # during deserialization.
#
- # Example of a single iteration:
- #
- # _operator_links_source =
- # {
- #
'airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink':
{
- # 'index': 0
- # }
- # },
- #
- # list(_operator_links_source.items()) =
- # [
- # (
- #
'airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink',
- # {'index': 0}
- # )
- # ]
- #
- # list(_operator_links_source.items())[0] =
- # (
- #
'airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink',
- # {
- # 'index': 0
- # }
- # )
-
- _operator_link_class_path, data =
next(iter(_operator_links_source.items()))
- if _operator_link_class_path in get_operator_extra_links():
- single_op_link_class = import_string(_operator_link_class_path)
- elif _operator_link_class_path in
plugins_manager.registered_operator_link_classes:
- single_op_link_class =
plugins_manager.registered_operator_link_classes[
- _operator_link_class_path
- ]
- else:
- log.error("Operator Link class %r not registered",
_operator_link_class_path)
- return {}
-
- op_link_parameters = {param: cls.deserialize(value) for param,
value in data.items()}
- op_predefined_extra_link: BaseOperatorLink =
single_op_link_class(**op_link_parameters)
-
+ # Example:
+ # enc_operator['_operator_extra_links'] =
+ # {
+ # 'airflow': 'airflow_link_key',
+ # 'foo-bar': 'link-key',
+ # 'no_response': 'key',
+ # 'raise_error': 'key'
+ # }
+
+ name, xcom_key = item
+ op_predefined_extra_link = GenericOperatorLink(name=name,
xcom_key=xcom_key)
op_predefined_extra_links.update({op_predefined_extra_link.name:
op_predefined_extra_link})
return op_predefined_extra_links
@classmethod
- def _serialize_operator_extra_links(cls, operator_extra_links:
Iterable[BaseOperatorLink]):
+ def _serialize_operator_extra_links(
+ cls, operator_extra_links: Iterable[BaseOperatorLink]
+ ) -> dict[str, str]:
"""
Serialize Operator Links.
- Store the import path of the OperatorLink and the arguments passed to
it.
+ Store the "name" of the link mapped with the xcom_key which can be
later used to retrieve this
+ operator extra link from XComs.
For example:
-
``[{'airflow.providers.google.cloud.links.bigquery.BigQueryDatasetLink': {}}]``
+ ``{'link-name-1': 'xcom-key-1'}``
:param operator_extra_links: Operator Link
:return: Serialized Operator Link
"""
- serialize_operator_extra_links = []
+ serialize_operator_extra_links = {}
for operator_extra_link in operator_extra_links:
- op_link_arguments = {
- param: cls.serialize(value) for param, value in
attrs.asdict(operator_extra_link).items()
- }
-
- module_path = (
-
f"{operator_extra_link.__class__.__module__}.{operator_extra_link.__class__.__name__}"
- )
- serialize_operator_extra_links.append({module_path:
op_link_arguments})
-
+ serialize_operator_extra_links[operator_extra_link.name] =
operator_extra_link.xcom_key
return serialize_operator_extra_links
Review Comment:
Cool
##########
airflow/serialization/serialized_objects.py:
##########
@@ -1520,76 +1523,43 @@ def _deserialize_operator_extra_links(cls,
encoded_op_links: list) -> dict[str,
raise AirflowException("Can't load plugins")
op_predefined_extra_links = {}
- for _operator_links_source in encoded_op_links:
- # Get the key, value pair as Tuple where key is OperatorLink
ClassName
- # and value is the dictionary containing the arguments passed to
the OperatorLink
+ for item in encoded_op_links.items():
+ # Get the name and xcom_key of the encoded operator and use it to
create a GenericOperatorLink object
+ # during deserialization.
#
- # Example of a single iteration:
- #
- # _operator_links_source =
- # {
- #
'airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink':
{
- # 'index': 0
- # }
- # },
- #
- # list(_operator_links_source.items()) =
- # [
- # (
- #
'airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink',
- # {'index': 0}
- # )
- # ]
- #
- # list(_operator_links_source.items())[0] =
- # (
- #
'airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink',
- # {
- # 'index': 0
- # }
- # )
-
- _operator_link_class_path, data =
next(iter(_operator_links_source.items()))
- if _operator_link_class_path in get_operator_extra_links():
- single_op_link_class = import_string(_operator_link_class_path)
- elif _operator_link_class_path in
plugins_manager.registered_operator_link_classes:
- single_op_link_class =
plugins_manager.registered_operator_link_classes[
- _operator_link_class_path
- ]
- else:
- log.error("Operator Link class %r not registered",
_operator_link_class_path)
- return {}
-
- op_link_parameters = {param: cls.deserialize(value) for param,
value in data.items()}
- op_predefined_extra_link: BaseOperatorLink =
single_op_link_class(**op_link_parameters)
-
+ # Example:
+ # enc_operator['_operator_extra_links'] =
+ # {
+ # 'airflow': 'airflow_link_key',
+ # 'foo-bar': 'link-key',
+ # 'no_response': 'key',
+ # 'raise_error': 'key'
+ # }
+
+ name, xcom_key = item
+ op_predefined_extra_link = GenericOperatorLink(name=name,
xcom_key=xcom_key)
op_predefined_extra_links.update({op_predefined_extra_link.name:
op_predefined_extra_link})
return op_predefined_extra_links
@classmethod
- def _serialize_operator_extra_links(cls, operator_extra_links:
Iterable[BaseOperatorLink]):
+ def _serialize_operator_extra_links(
+ cls, operator_extra_links: Iterable[BaseOperatorLink]
+ ) -> dict[str, str]:
"""
Serialize Operator Links.
- Store the import path of the OperatorLink and the arguments passed to
it.
+ Store the "name" of the link mapped with the xcom_key which can be
later used to retrieve this
+ operator extra link from XComs.
For example:
-
``[{'airflow.providers.google.cloud.links.bigquery.BigQueryDatasetLink': {}}]``
+ ``{'link-name-1': 'xcom-key-1'}``
:param operator_extra_links: Operator Link
:return: Serialized Operator Link
"""
- serialize_operator_extra_links = []
+ serialize_operator_extra_links = {}
for operator_extra_link in operator_extra_links:
- op_link_arguments = {
- param: cls.serialize(value) for param, value in
attrs.asdict(operator_extra_link).items()
- }
-
- module_path = (
-
f"{operator_extra_link.__class__.__module__}.{operator_extra_link.__class__.__name__}"
- )
- serialize_operator_extra_links.append({module_path:
op_link_arguments})
-
+ serialize_operator_extra_links[operator_extra_link.name] =
operator_extra_link.xcom_key
return serialize_operator_extra_links
Review Comment:
Cool, handled
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]