Desdroid opened a new issue, #68941:
URL: https://github.com/apache/airflow/issues/68941

   ### Under which category would you file this issue?
   
   Airflow Core
   
   ### Apache Airflow version
   
   3.2.2
   
   ### What happened and how to reproduce it?
   
   When using a DagParam as a kwarg to `partial` it will get serialized with 
its memory address. As the address is not stable this leads to version 
inflation. 
   
   Repro:
   
   ```python 
   from __future__ import annotations
   
   import json
   
   from airflow.sdk import DAG
   from airflow.sdk import task
   from airflow.serialization.definitions.param import SerializedParam
   from airflow.serialization.serialized_objects import DagSerialization
   
   if __name__ == "__main__":
       with DAG(dag_id="repro_dagparam") as dag:
   
           @task
           def add(value):
               return value
   
           @task
           def do(something):
               return something
   
           do(dag.param("some", "some_default_val"))
   
           add.partial(value=dag.param("p", "p_default_val")).expand(value=[1, 
2, 3])
   
       ser = DagSerialization.to_dict(dag)
       mapped = ser["dag"]["tasks"][1]["__var"]
       print(json.dumps(mapped.get("partial_kwargs"), indent=2, default=str))
       print("ROUNDTRIP:")
       deser = DagSerialization.from_dict(ser)
       t = deser.get_task("add")
       print(type(t.partial_kwargs.get("op_kwargs")), 
t.partial_kwargs.get("op_kwargs"))
   ```
   
   This will print
   ```
   {
     "op_kwargs": {
       "__var": {
         "value": "<airflow.sdk.definitions.param.DagParam object at 
0x71b92d0789e0>"
       },
       "__type": "dict"
     },
     "op_args": []
   }
   ``` 
   Note the memory address `<airflow.sdk.definitions.param.DagParam object at 
0x71b92d0789e0>`
   
   ### What you think should happen instead?
   
   _No response_
   
   ### Operating System
   
   _No response_
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Apache Airflow Provider(s)
   
   _No response_
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Official Helm Chart version
   
   1.22.0 (latest released)
   
   ### Kubernetes Version
   
   _No response_
   
   ### Helm Chart configuration
   
   _No response_
   
   ### Docker Image customizations
   
   _No response_
   
   ### Anything else?
   
   A workaround exists by using Jinja templates to pass DagParams in partial 
   ```python
   add.partial(value="{{ params.p }}").expand(value=[1, 2, 3])
   ``` 
   
   The root cause is in 
[`serialize()`](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/serialization/serialized_objects.py#L457)
 not having a case for `DagParam` running into the default branch and 
serializing the object as string.
   I think either a separate `DagParam` case needs to be added to  
[`serialize()`](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/serialization/serialized_objects.py#L457)
 similar to `XComArg` or 
[`serialize_mapped_operator()`](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/serialization/serialized_objects.py#L963)
 needs to handle the `partial_kwargs` ops with 
[`serialize_template_field()`](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/serialization/helpers.py#L37)
 or something similar.
   If somebody can point me in a direction for a fix, I am willing to 
contribute a PR.
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to