Lee-W commented on PR #39513:
URL: https://github.com/apache/airflow/pull/39513#issuecomment-2106732364

   > The biggest issue (and I have no idea what's going on is with 
serialization). I have not been following the changes there but it seems that 
there is a pretty weird behaviour of serialization where `airflow 2.9.1` is 
installed from package and tests are used from the "tests" folder.
   > 
   > Some of the serialized objects miss `__var` field that is expected when 
then the objects get deserialized.
   > 
   > The first case (and I already added comment about it above [#39513 
(comment)](https://github.com/apache/airflow/pull/39513#discussion_r1596001681) 
is that when we retrieve base operators links, the retrieved serialized data is 
different when airflow is installed from 2.9.1 package vs. when latest Airflow 
2.10.0 is used from sources.
   > 
   >     * For what it's worth when serialized  data is used in Airflow 2.10. 
The serialized form is:
   > 
   > 
   > ```python
   > operator_extra_link = 
serialized_dag["dag"]["tasks"][0]["__var"]["_operator_extra_links"]
   > ```
   > 
   > But when airflow 2.9.1 is installed, it is:
   > 
   > ```python
   > operator_extra_link = 
serialized_dag["dag"]["tasks"][0]["_operator_extra_links"]
   > ```
   > 
   > But what's even weirder when you serialize and deserialize operator with 
Airflow installed from 2.9.1, you get deserialization failing - despite 
(seemingly) both serialization and deserialization coming from `airflow`
   > 
   > For example:
   > 
   > ```python
   >     def test_operator_extra_links_mapped_without_applicationui_enabled(
   >         self,
   >     ):
   >         operator = EmrServerlessStartJobOperator.partial(
   >             task_id=task_id,
   >             application_id=application_id,
   >             execution_role_arn=execution_role_arn,
   >             job_driver=spark_job_driver,
   >             enable_application_ui_links=False,
   >         ).expand(
   >             configuration_overrides=[s3_configuration_overrides, 
cloudwatch_configuration_overrides],
   >         )
   > 
   >         ser_operator = BaseSerialization.serialize(operator)
   >         deser_operator = BaseSerialization.deserialize(ser_operator)  # 
<-- HERE deserialization fails
   > 
   >         assert deser_operator.operator_extra_links == [
   >             EmrServerlessS3LogsLink(),
   >             EmrServerlessCloudWatchLogsLink(),
   >         ]
   > ```
   > 
   > The failure is
   > 
   > ```python
   >     @classmethod
   >     def deserialize(cls, encoded_var: Any, use_pydantic_models=False) -> 
Any:
   >         """
   >         Deserialize an object; helper function of depth first search for 
deserialization.
   >     
   >         :meta private:
   >         """
   >         # JSON primitives (except for dict) are not encoded.
   >         if use_pydantic_models and not _ENABLE_AIP_44:
   >             raise RuntimeError(
   >                 "Setting use_pydantic_models = True requires AIP-44 (in 
progress) feature flag to be true. "
   >                 "This parameter will be removed eventually when new 
serialization is used by AIP-44"
   >             )
   >         if cls._is_primitive(encoded_var):
   >             return encoded_var
   >         elif isinstance(encoded_var, list):
   >             return [cls.deserialize(v, use_pydantic_models) for v in 
encoded_var]
   >     
   >         if not isinstance(encoded_var, dict):
   >             raise ValueError(f"The encoded_var should be dict and is 
{type(encoded_var)}")
   > >       var = encoded_var[Encoding.VAR]
   > E       KeyError: <Encoding.VAR: '__var'>
   > ```
   > 
   > And when I dump the serialized object, indeed it does not have 
`Encoding.var` key at top level:
   > 
   > ```
   > {'_is_empty': False, '_task_module': 
'airflow.providers.amazon.aws.operators.emr', '_expand_input_attr': 
'expand_input', 'task_id': 'test_emr_serverless_task_id', 'ui_color': '#fff', 
'template_fields_renderers': {'config': 'json', 'configuration_overrides': 
'json'}, 'partial_kwargs': {'application_id': 'test_application_id', 
'execution_role_arn': 'test_emr_serverless_role_arn', 'job_driver': 
{<Encoding.VAR: '__var'>: {'sparkSubmit': {<Encoding.VAR: '__var'>: 
{'entryPoint': 'test.py'}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 
'dict'>}}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}, 
'enable_application_ui_links': False}, 'template_fields': ['application_id', 
'config', 'execution_role_arn', 'job_driver', 'configuration_overrides', 
'name', 'aws_conn_id'], 'downstream_task_ids': [], '_disallow_kwargs_override': 
False, 'template_ext': [], '_task_type': 'EmrServerlessStartJobOperator', 
'ui_fgcolor': '#000', '_operator_extra_links': [{'airflow.providers.amazon
 .aws.links.emr.EmrServerlessS3LogsLink': {}}, 
{'airflow.providers.amazon.aws.links.emr.EmrServerlessCloudWatchLogsLink': 
{}}], 'expand_input': {'type': 'dict-of-lists', 'value': {<Encoding.VAR: 
'__var'>: {'configuration_overrides': [{<Encoding.VAR: '__var'>: 
{'monitoringConfiguration': {<Encoding.VAR: '__var'>: 
{'s3MonitoringConfiguration': {<Encoding.VAR: '__var'>: {'logUri': 
's3://test_bucket/test_key/'}, <Encoding.TYPE: '__type'>: 
<DagAttributeTypes.DICT: 'dict'>}}, <Encoding.TYPE: '__type'>: 
<DagAttributeTypes.DICT: 'dict'>}}, <Encoding.TYPE: '__type'>: 
<DagAttributeTypes.DICT: 'dict'>}, {<Encoding.VAR: '__var'>: 
{'monitoringConfiguration': {<Encoding.VAR: '__var'>: 
{'cloudWatchLoggingConfiguration': {<Encoding.VAR: '__var'>: {'enabled': True, 
'logGroupName': '/aws/emrs', 'logStreamNamePrefix': 'myapp'}, <Encoding.TYPE: 
'__type'>: <DagAttributeTypes.DICT: 'dict'>}}, <Encoding.TYPE: '__type'>: 
<DagAttributeTypes.DICT: 'dict'>}}, <Encoding.TYPE: '__type'>: 
<DagAttributeTypes.DICT:
  'dict'>}]}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, 
'_is_mapped': True}
   > ```
   > 
   > Those are all the dict keys:
   > 
   > ```
   > dict_keys(['_disallow_kwargs_override', 'template_fields', 'template_ext', 
'ui_fgcolor', 'partial_kwargs', 
   > '_task_type', '_is_empty', '_task_module', 'template_fields_renderers', 
'task_id', '_expand_input_attr', 'ui_color', 
   > 'downstream_task_ids', '_operator_extra_links', 'expand_input', 
'_is_mapped']
   > ```
   > 
   > And It makes no sense whatsoever that serialization produces one thing and 
deserialization expects another thing in this case I think - both should come 
from the airflow code installed from airlfow package, so I am rather lost 
what's going on here. Unless - for some reason - some part of serialization 
would be inlined in provider tests ??? . But I am really puzzled and a bit 
blocked with it.
   > 
   > @bolkedebruin, @dstandish, @Taragolis. @Lee-W -> I looked through your 
recent changes since you've been modfying it recently (after we split out to 
2.9) but could not find a reason why it can happen, I'd appreciate if you could 
help with solving the puzzle.
   
   I did encounter this kind of failure during 
https://github.com/apache/airflow/pull/38674. But once I fixed everything else, 
these error no longer happened 🤔 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to