sjyangkevin commented on PR #51059:
URL: https://github.com/apache/airflow/pull/51059#issuecomment-2953588878

   Hi @amoghrajesh , @bolkedebruin , I would like to follow up with some updates
   
   1. I reverted the changes that I made to `serde.py` to alter the order of 
serializers/deserializers
   2. I updated the function signature of `deserialize` in all serializer 
modules, by adding an optional parameter `cls: Any`. I found that existing 
serializers mostly use `classname`. If I replaced the `classname` with `cls`, 
there will be some refactors to the code. Therefore, I would like to reduce the 
chance of introducing subtle issues. The existing serializers can function as 
it is, the pydantic one can then accept `cls` from `serde.py`.
   3. I modified the pydantic's `serialize` method as well, instead of 
returning `pydantic.main.BaseModel` as the `serialized_classname`, I let it 
return `qualname(o)`. In this way, any arbitrary pydantic model can be scanned 
during deserialization. It means 
`cohere.types.embed_by_type_response_embeddings.EmbedByTypeResponseEmbeddings` 
must be added to the `allowed_deserialization_clases` such that it can be 
deserialized.
   4. `import_string` is removed from the pydantic serializer.
   
   It has passed all the unit tests, and I've updated my test DAG code as shown 
below.
   
   ### Arbitrary pydantic model must be added to 
`allowed_deserialization_clases`
   Before adding to `allowed_deserialization_clases`
   ![Screenshot from 2025-06-08 
01-16-41](https://github.com/user-attachments/assets/8df908f7-16f6-481f-a1d1-c5a2cc0417d5)
   After adding to `allowed_deserialization_clases`
   ![Screenshot from 2025-06-08 
01-16-53](https://github.com/user-attachments/assets/2486739c-09f4-486f-92fc-2b46493aa577)
   
   
   ### Test DAG code
   ```python
   from airflow.decorators import dag, task
   from airflow.models.baseoperator import chain
   from airflow.providers.cohere.hooks.cohere import CohereHook
   from airflow.providers.cohere.operators.embedding import 
CohereEmbeddingOperator
   
   from pendulum import datetime
   
   COHERE_CONN_ID = "cohere_default"
   
   @dag(
       start_date=datetime(2025, 5, 23),
       schedule=None,
       catchup=False,
   )
   def pydantic_serde():
   
       @task
       def get_pandas():
           import pandas as pd
           import numpy as np
   
           return pd.DataFrame(np.random.randn(3, 2), columns=list('AB'))
       
       @task
       def print_pandas(df):
           print(df)
   
       @task
       def get_numpy():
           import numpy as np
   
           n = np.random.rand(3,2)[0][0]
           print(type(n))
       
       @task
       def print_numpy(n):
           print(n)
   
       @task
       def get_embeddings():
           import pydantic
           
           cohere_hook = CohereHook()
           embeddings = cohere_hook.create_embeddings(["gruyere"])
   
           print("type of embeddings:", type(embeddings))
           print("is embedding type pydantic:", isinstance(embeddings, 
pydantic.BaseModel))
   
           return embeddings
   
       @task
       def print_embeddings(embeddings):
           print(embeddings)
   
       print_embeddings(get_embeddings())
       print_numpy(get_numpy())
       print_pandas(get_pandas())
   
   pydantic_serde()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to