amoghrajesh commented on code in PR #44843:
URL: https://github.com/apache/airflow/pull/44843#discussion_r1881472643


##########
task_sdk/src/airflow/sdk/execution_time/comms.py:
##########
@@ -170,13 +170,17 @@ class PutVariable(BaseModel):
     type: Literal["PutVariable"] = "PutVariable"
 
 
+"""Defines the types that the RTIF payload's dictionary values can take. These 
are all JsonAble types """
+JsonAbleValueTypes = Union[str, dict[str, str], list[str], int, float, None]

Review Comment:
   Actually, my point here was that we shoudln't be sending tuple or set over 
the network as it isnt json serialisable. To state an example with the DAG:
   ```
   from __future__ import annotations
   
   import sys
   import time
   from datetime import datetime
   
   from airflow import DAG
   from airflow.decorators import dag, task
   from airflow.operators.bash import BashOperator
   
   
   @dag(
       # every minute on the 30-second mark
       catchup=False,
       tags=[],
       schedule=None,
       start_date=datetime(2021, 1, 1),
   )
   def hello_dag():
       """
       ### TaskFlow API Tutorial Documentation
       This is a simple data pipeline example which demonstrates the use of
       the TaskFlow API using three simple tasks for Extract, Transform, and 
Load.
       Documentation that goes along with the Airflow TaskFlow API tutorial is
       located
       
[here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html)
       """
   
       @task()
       def hello():
           print("hello")
           time.sleep(3)
           print("goodbye")
           print("err mesg", file=sys.stderr)
   
       hello()
   
   
   hello_dag()
   ```
   
   With the current state of using:
   ```
   """Defines the types that the RTIF payload's dictionary values can take. 
These are all JsonAble types """
   JsonAbleValueTypes = Union[str, dict[str, str], list[str], int, float, None]
   
   
   class SetRenderedFields(BaseModel):
       """Payload for setting RTIF for a task instance."""
   
       # We are using a BaseModel here compared to server using RootModel 
because we
       # have a discriminator running with "type", and RootModel doesn't 
support type
   
       rendered_fields: dict[str, JsonAbleValueTypes]
       type: Literal["SetRenderedFields"] = "SetRenderedFields"
   ```
   
   I can see that the DAG that was failing earlier doesn't because the 
`op_args` is converted to a list internally. 
   
![image](https://github.com/user-attachments/assets/c4de68db-2e7a-488f-aadc-6ed8dc0718d6)
   
   
   And this will allow it to send a valid message to the supervisor without 
having to convert types around. Ideally, we should only be able to send JSON 
serialisable fields over APIs. And the part where `set` or `tuple` will be 
converted to a list / str should be handled when we port templating to task SDK.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to