potiuk commented on PR #39513:
URL: https://github.com/apache/airflow/pull/39513#issuecomment-2104390387

   The biggest issue (and I have no idea what's going on is with 
serialization). I have not been following the changes there but it seems that 
there is a pretty weird behaviour of serialization where `airflow 2.9.1` is 
installed from package and tests are used from the "tests" folder. 
   
   Some of the serialized objects miss `__var`  field that is expected when 
then the objects get deserialized.
   
   The first case (and I already added comment about it above 
https://github.com/apache/airflow/pull/39513#discussion_r1596001681 is that 
when we retrieve base operators links, the retrieved serialized data is 
different when airflow is installed from 2.9.1 package vs. when latest Airflow 
2.10.0 is used from sources. 
   
   * For what it's worth when serialized  data is used in Airflow 2.10. The 
serialized form is:
   
   ```python
   operator_extra_link = 
serialized_dag["dag"]["tasks"][0]["__var"]["_operator_extra_links"]
   ```
   
   But when airflow 2.9.1 is installed, it is:
   
   ```python
   operator_extra_link = 
serialized_dag["dag"]["tasks"][0]["_operator_extra_links"]
   ```
   
   But what's even weirder when you serialize and deserialize operator with 
Airflow installed from 2.9.1, you get deserialization failing - despite 
(seemingly) both serialization and deserialization coming from `airflow`
   
   For example:
   
   ```python
       def test_operator_extra_links_mapped_without_applicationui_enabled(
           self,
       ):
           operator = EmrServerlessStartJobOperator.partial(
               task_id=task_id,
               application_id=application_id,
               execution_role_arn=execution_role_arn,
               job_driver=spark_job_driver,
               enable_application_ui_links=False,
           ).expand(
               configuration_overrides=[s3_configuration_overrides, 
cloudwatch_configuration_overrides],
           )
   
           ser_operator = BaseSerialization.serialize(operator)
           deser_operator = BaseSerialization.deserialize(ser_operator)  # <-- 
HERE deserialization fails
   
           assert deser_operator.operator_extra_links == [
               EmrServerlessS3LogsLink(),
               EmrServerlessCloudWatchLogsLink(),
           ]
   ```
   
   The failure is
   
   ```python
       @classmethod
       def deserialize(cls, encoded_var: Any, use_pydantic_models=False) -> Any:
           """
           Deserialize an object; helper function of depth first search for 
deserialization.
       
           :meta private:
           """
           # JSON primitives (except for dict) are not encoded.
           if use_pydantic_models and not _ENABLE_AIP_44:
               raise RuntimeError(
                   "Setting use_pydantic_models = True requires AIP-44 (in 
progress) feature flag to be true. "
                   "This parameter will be removed eventually when new 
serialization is used by AIP-44"
               )
           if cls._is_primitive(encoded_var):
               return encoded_var
           elif isinstance(encoded_var, list):
               return [cls.deserialize(v, use_pydantic_models) for v in 
encoded_var]
       
           if not isinstance(encoded_var, dict):
               raise ValueError(f"The encoded_var should be dict and is 
{type(encoded_var)}")
   >       var = encoded_var[Encoding.VAR]
   E       KeyError: <Encoding.VAR: '__var'>
   ```
   
   And when I dump the serialized object, indeed it does not have 
`Encoding.var` key at top level:
   
   ```
   {'_is_empty': False, '_task_module': 
'airflow.providers.amazon.aws.operators.emr', '_expand_input_attr': 
'expand_input', 'task_id': 'test_emr_serverless_task_id', 'ui_color': '#fff', 
'template_fields_renderers': {'config': 'json', 'configuration_overrides': 
'json'}, 'partial_kwargs': {'application_id': 'test_application_id', 
'execution_role_arn': 'test_emr_serverless_role_arn', 'job_driver': 
{<Encoding.VAR: '__var'>: {'sparkSubmit': {<Encoding.VAR: '__var'>: 
{'entryPoint': 'test.py'}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 
'dict'>}}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}, 
'enable_application_ui_links': False}, 'template_fields': ['application_id', 
'config', 'execution_role_arn', 'job_driver', 'configuration_overrides', 
'name', 'aws_conn_id'], 'downstream_task_ids': [], '_disallow_kwargs_override': 
False, 'template_ext': [], '_task_type': 'EmrServerlessStartJobOperator', 
'ui_fgcolor': '#000', '_operator_extra_links': [{'airflow.providers.amazon.a
 ws.links.emr.EmrServerlessS3LogsLink': {}}, 
{'airflow.providers.amazon.aws.links.emr.EmrServerlessCloudWatchLogsLink': 
{}}], 'expand_input': {'type': 'dict-of-lists', 'value': {<Encoding.VAR: 
'__var'>: {'configuration_overrides': [{<Encoding.VAR: '__var'>: 
{'monitoringConfiguration': {<Encoding.VAR: '__var'>: 
{'s3MonitoringConfiguration': {<Encoding.VAR: '__var'>: {'logUri': 
's3://test_bucket/test_key/'}, <Encoding.TYPE: '__type'>: 
<DagAttributeTypes.DICT: 'dict'>}}, <Encoding.TYPE: '__type'>: 
<DagAttributeTypes.DICT: 'dict'>}}, <Encoding.TYPE: '__type'>: 
<DagAttributeTypes.DICT: 'dict'>}, {<Encoding.VAR: '__var'>: 
{'monitoringConfiguration': {<Encoding.VAR: '__var'>: 
{'cloudWatchLoggingConfiguration': {<Encoding.VAR: '__var'>: {'enabled': True, 
'logGroupName': '/aws/emrs', 'logStreamNamePrefix': 'myapp'}, <Encoding.TYPE: 
'__type'>: <DagAttributeTypes.DICT: 'dict'>}}, <Encoding.TYPE: '__type'>: 
<DagAttributeTypes.DICT: 'dict'>}}, <Encoding.TYPE: '__type'>: 
<DagAttributeTypes.DICT: '
 dict'>}]}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, 
'_is_mapped': True}
   ```
   
   Those are all the dict keys:
   
   ```
   dict_keys(['_disallow_kwargs_override', 'template_fields', 'template_ext', 
'ui_fgcolor', 'partial_kwargs', 
   '_task_type', '_is_empty', '_task_module', 'template_fields_renderers', 
'task_id', '_expand_input_attr', 'ui_color', 
   'downstream_task_ids', '_operator_extra_links', 'expand_input', '_is_mapped']
   ```
   
   And It makes no sense whatsoever that serialization produces one thing and 
deserialization expects another thing in this case I think - both should come 
from the airflow code installed from airlfow package, so I am rather lost 
what's going on here. Unless for some reason. But I am really puzzled and a bit 
blocked with it.
   
   @bolkedebruin, @dstandish, @Taragolis. @Lee-W -> I looked through your 
recent changes since you've been modfying it recently (after we split out to 
2.9) but could not find a reason why it can happen, I'd appreciate if you could 
help with solving the puzzle.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to