Lee-W commented on PR #39513: URL: https://github.com/apache/airflow/pull/39513#issuecomment-2106732364
> The biggest issue (and I have no idea what's going on is with serialization). I have not been following the changes there but it seems that there is a pretty weird behaviour of serialization where `airflow 2.9.1` is installed from package and tests are used from the "tests" folder. > > Some of the serialized objects miss `__var` field that is expected when then the objects get deserialized. > > The first case (and I already added comment about it above [#39513 (comment)](https://github.com/apache/airflow/pull/39513#discussion_r1596001681) is that when we retrieve base operators links, the retrieved serialized data is different when airflow is installed from 2.9.1 package vs. when latest Airflow 2.10.0 is used from sources. > > * For what it's worth when serialized data is used in Airflow 2.10. The serialized form is: > > > ```python > operator_extra_link = serialized_dag["dag"]["tasks"][0]["__var"]["_operator_extra_links"] > ``` > > But when airflow 2.9.1 is installed, it is: > > ```python > operator_extra_link = serialized_dag["dag"]["tasks"][0]["_operator_extra_links"] > ``` > > But what's even weirder when you serialize and deserialize operator with Airflow installed from 2.9.1, you get deserialization failing - despite (seemingly) both serialization and deserialization coming from `airflow` > > For example: > > ```python > def test_operator_extra_links_mapped_without_applicationui_enabled( > self, > ): > operator = EmrServerlessStartJobOperator.partial( > task_id=task_id, > application_id=application_id, > execution_role_arn=execution_role_arn, > job_driver=spark_job_driver, > enable_application_ui_links=False, > ).expand( > configuration_overrides=[s3_configuration_overrides, cloudwatch_configuration_overrides], > ) > > ser_operator = BaseSerialization.serialize(operator) > deser_operator = BaseSerialization.deserialize(ser_operator) # <-- HERE deserialization fails > > assert deser_operator.operator_extra_links == [ > EmrServerlessS3LogsLink(), > EmrServerlessCloudWatchLogsLink(), > ] > ``` > > The failure is > > ```python > @classmethod > def deserialize(cls, encoded_var: Any, use_pydantic_models=False) -> Any: > """ > Deserialize an object; helper function of depth first search for deserialization. > > :meta private: > """ > # JSON primitives (except for dict) are not encoded. > if use_pydantic_models and not _ENABLE_AIP_44: > raise RuntimeError( > "Setting use_pydantic_models = True requires AIP-44 (in progress) feature flag to be true. " > "This parameter will be removed eventually when new serialization is used by AIP-44" > ) > if cls._is_primitive(encoded_var): > return encoded_var > elif isinstance(encoded_var, list): > return [cls.deserialize(v, use_pydantic_models) for v in encoded_var] > > if not isinstance(encoded_var, dict): > raise ValueError(f"The encoded_var should be dict and is {type(encoded_var)}") > > var = encoded_var[Encoding.VAR] > E KeyError: <Encoding.VAR: '__var'> > ``` > > And when I dump the serialized object, indeed it does not have `Encoding.var` key at top level: > > ``` > {'_is_empty': False, '_task_module': 'airflow.providers.amazon.aws.operators.emr', '_expand_input_attr': 'expand_input', 'task_id': 'test_emr_serverless_task_id', 'ui_color': '#fff', 'template_fields_renderers': {'config': 'json', 'configuration_overrides': 'json'}, 'partial_kwargs': {'application_id': 'test_application_id', 'execution_role_arn': 'test_emr_serverless_role_arn', 'job_driver': {<Encoding.VAR: '__var'>: {'sparkSubmit': {<Encoding.VAR: '__var'>: {'entryPoint': 'test.py'}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}, 'enable_application_ui_links': False}, 'template_fields': ['application_id', 'config', 'execution_role_arn', 'job_driver', 'configuration_overrides', 'name', 'aws_conn_id'], 'downstream_task_ids': [], '_disallow_kwargs_override': False, 'template_ext': [], '_task_type': 'EmrServerlessStartJobOperator', 'ui_fgcolor': '#000', '_operator_extra_links': [{'airflow.providers.amazon .aws.links.emr.EmrServerlessS3LogsLink': {}}, {'airflow.providers.amazon.aws.links.emr.EmrServerlessCloudWatchLogsLink': {}}], 'expand_input': {'type': 'dict-of-lists', 'value': {<Encoding.VAR: '__var'>: {'configuration_overrides': [{<Encoding.VAR: '__var'>: {'monitoringConfiguration': {<Encoding.VAR: '__var'>: {'s3MonitoringConfiguration': {<Encoding.VAR: '__var'>: {'logUri': 's3://test_bucket/test_key/'}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}, {<Encoding.VAR: '__var'>: {'monitoringConfiguration': {<Encoding.VAR: '__var'>: {'cloudWatchLoggingConfiguration': {<Encoding.VAR: '__var'>: {'enabled': True, 'logGroupName': '/aws/emrs', 'logStreamNamePrefix': 'myapp'}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}]}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, '_is_mapped': True} > ``` > > Those are all the dict keys: > > ``` > dict_keys(['_disallow_kwargs_override', 'template_fields', 'template_ext', 'ui_fgcolor', 'partial_kwargs', > '_task_type', '_is_empty', '_task_module', 'template_fields_renderers', 'task_id', '_expand_input_attr', 'ui_color', > 'downstream_task_ids', '_operator_extra_links', 'expand_input', '_is_mapped'] > ``` > > And It makes no sense whatsoever that serialization produces one thing and deserialization expects another thing in this case I think - both should come from the airflow code installed from airlfow package, so I am rather lost what's going on here. Unless - for some reason - some part of serialization would be inlined in provider tests ??? . But I am really puzzled and a bit blocked with it. > > @bolkedebruin, @dstandish, @Taragolis. @Lee-W -> I looked through your recent changes since you've been modfying it recently (after we split out to 2.9) but could not find a reason why it can happen, I'd appreciate if you could help with solving the puzzle. I did encounter this kind of failure during https://github.com/apache/airflow/pull/38674. But once I fixed everything else, these error no longer happened 🤔 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org