potiuk commented on PR #39513: URL: https://github.com/apache/airflow/pull/39513#issuecomment-2104390387
The biggest issue (and I have no idea what's going on is with serialization). I have not been following the changes there but it seems that there is a pretty weird behaviour of serialization where `airflow 2.9.1` is installed from package and tests are used from the "tests" folder. Some of the serialized objects miss `__var` field that is expected when then the objects get deserialized. The first case (and I already added comment about it above https://github.com/apache/airflow/pull/39513#discussion_r1596001681 is that when we retrieve base operators links, the retrieved serialized data is different when airflow is installed from 2.9.1 package vs. when latest Airflow 2.10.0 is used from sources. * For what it's worth when serialized data is used in Airflow 2.10. The serialized form is: ```python operator_extra_link = serialized_dag["dag"]["tasks"][0]["__var"]["_operator_extra_links"] ``` But when airflow 2.9.1 is installed, it is: ```python operator_extra_link = serialized_dag["dag"]["tasks"][0]["_operator_extra_links"] ``` But what's even weirder when you serialize and deserialize operator with Airflow installed from 2.9.1, you get deserialization failing - despite (seemingly) both serialization and deserialization coming from `airflow` For example: ```python def test_operator_extra_links_mapped_without_applicationui_enabled( self, ): operator = EmrServerlessStartJobOperator.partial( task_id=task_id, application_id=application_id, execution_role_arn=execution_role_arn, job_driver=spark_job_driver, enable_application_ui_links=False, ).expand( configuration_overrides=[s3_configuration_overrides, cloudwatch_configuration_overrides], ) ser_operator = BaseSerialization.serialize(operator) deser_operator = BaseSerialization.deserialize(ser_operator) # <-- HERE deserialization fails assert deser_operator.operator_extra_links == [ EmrServerlessS3LogsLink(), EmrServerlessCloudWatchLogsLink(), ] ``` The failure is ```python @classmethod def deserialize(cls, encoded_var: Any, use_pydantic_models=False) -> Any: """ Deserialize an object; helper function of depth first search for deserialization. :meta private: """ # JSON primitives (except for dict) are not encoded. if use_pydantic_models and not _ENABLE_AIP_44: raise RuntimeError( "Setting use_pydantic_models = True requires AIP-44 (in progress) feature flag to be true. " "This parameter will be removed eventually when new serialization is used by AIP-44" ) if cls._is_primitive(encoded_var): return encoded_var elif isinstance(encoded_var, list): return [cls.deserialize(v, use_pydantic_models) for v in encoded_var] if not isinstance(encoded_var, dict): raise ValueError(f"The encoded_var should be dict and is {type(encoded_var)}") > var = encoded_var[Encoding.VAR] E KeyError: <Encoding.VAR: '__var'> ``` And when I dump the serialized object, indeed it does not have `Encoding.var` key at top level: ``` {'_is_empty': False, '_task_module': 'airflow.providers.amazon.aws.operators.emr', '_expand_input_attr': 'expand_input', 'task_id': 'test_emr_serverless_task_id', 'ui_color': '#fff', 'template_fields_renderers': {'config': 'json', 'configuration_overrides': 'json'}, 'partial_kwargs': {'application_id': 'test_application_id', 'execution_role_arn': 'test_emr_serverless_role_arn', 'job_driver': {<Encoding.VAR: '__var'>: {'sparkSubmit': {<Encoding.VAR: '__var'>: {'entryPoint': 'test.py'}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}, 'enable_application_ui_links': False}, 'template_fields': ['application_id', 'config', 'execution_role_arn', 'job_driver', 'configuration_overrides', 'name', 'aws_conn_id'], 'downstream_task_ids': [], '_disallow_kwargs_override': False, 'template_ext': [], '_task_type': 'EmrServerlessStartJobOperator', 'ui_fgcolor': '#000', '_operator_extra_links': [{'airflow.providers.amazon.a ws.links.emr.EmrServerlessS3LogsLink': {}}, {'airflow.providers.amazon.aws.links.emr.EmrServerlessCloudWatchLogsLink': {}}], 'expand_input': {'type': 'dict-of-lists', 'value': {<Encoding.VAR: '__var'>: {'configuration_overrides': [{<Encoding.VAR: '__var'>: {'monitoringConfiguration': {<Encoding.VAR: '__var'>: {'s3MonitoringConfiguration': {<Encoding.VAR: '__var'>: {'logUri': 's3://test_bucket/test_key/'}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}, {<Encoding.VAR: '__var'>: {'monitoringConfiguration': {<Encoding.VAR: '__var'>: {'cloudWatchLoggingConfiguration': {<Encoding.VAR: '__var'>: {'enabled': True, 'logGroupName': '/aws/emrs', 'logStreamNamePrefix': 'myapp'}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: ' dict'>}]}, <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}}, '_is_mapped': True} ``` Those are all the dict keys: ``` dict_keys(['_disallow_kwargs_override', 'template_fields', 'template_ext', 'ui_fgcolor', 'partial_kwargs', '_task_type', '_is_empty', '_task_module', 'template_fields_renderers', 'task_id', '_expand_input_attr', 'ui_color', 'downstream_task_ids', '_operator_extra_links', 'expand_input', '_is_mapped'] ``` And It makes no sense whatsoever that serialization produces one thing and deserialization expects another thing in this case I think - both should come from the airflow code installed from airlfow package, so I am rather lost what's going on here. Unless for some reason. But I am really puzzled and a bit blocked with it. @bolkedebruin, @dstandish, @Taragolis. @Lee-W -> I looked through your recent changes since you've been modfying it recently (after we split out to 2.9) but could not find a reason why it can happen, I'd appreciate if you could help with solving the puzzle. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
