sjyangkevin commented on PR #51059: URL: https://github.com/apache/airflow/pull/51059#issuecomment-2953588878
Hi @amoghrajesh , @bolkedebruin , I would like to follow up with some updates 1. I reverted the changes that I made to `serde.py` to alter the order of serializers/deserializers 2. I updated the function signature of `deserialize` in all serializer modules, by adding an optional parameter `cls: Any`. I found that existing serializers mostly use `classname`. If I replaced the `classname` with `cls`, there will be some refactors to the code. Therefore, I would like to reduce the chance of introducing subtle issues. The existing serializers can function as it is, the pydantic one can then accept `cls` from `serde.py`. 3. I modified the pydantic's `serialize` method as well, instead of returning `pydantic.main.BaseModel` as the `serialized_classname`, I let it return `qualname(o)`. In this way, any arbitrary pydantic model can be scanned during deserialization. It means `cohere.types.embed_by_type_response_embeddings.EmbedByTypeResponseEmbeddings` must be added to the `allowed_deserialization_clases` such that it can be deserialized. 4. `import_string` is removed from the pydantic serializer. It has passed all the unit tests, and I've updated my test DAG code as shown below. ### Arbitrary pydantic model must be added to `allowed_deserialization_clases` Before adding to `allowed_deserialization_clases`  After adding to `allowed_deserialization_clases`  ### Test DAG code ```python from airflow.decorators import dag, task from airflow.models.baseoperator import chain from airflow.providers.cohere.hooks.cohere import CohereHook from airflow.providers.cohere.operators.embedding import CohereEmbeddingOperator from pendulum import datetime COHERE_CONN_ID = "cohere_default" @dag( start_date=datetime(2025, 5, 23), schedule=None, catchup=False, ) def pydantic_serde(): @task def get_pandas(): import pandas as pd import numpy as np return pd.DataFrame(np.random.randn(3, 2), columns=list('AB')) @task def print_pandas(df): print(df) @task def get_numpy(): import numpy as np n = np.random.rand(3,2)[0][0] print(type(n)) @task def print_numpy(n): print(n) @task def get_embeddings(): import pydantic cohere_hook = CohereHook() embeddings = cohere_hook.create_embeddings(["gruyere"]) print("type of embeddings:", type(embeddings)) print("is embedding type pydantic:", isinstance(embeddings, pydantic.BaseModel)) return embeddings @task def print_embeddings(embeddings): print(embeddings) print_embeddings(get_embeddings()) print_numpy(get_numpy()) print_pandas(get_pandas()) pydantic_serde() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
