Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]

2024-12-13 Thread via GitHub


kaxil commented on code in PR #44843:
URL: https://github.com/apache/airflow/pull/44843#discussion_r1884280990


##
tests/models/test_renderedtifields.py:
##
@@ -158,16 +163,31 @@ def test_get_templated_fields(self, templated_field, 
expected_rendered_field, da
 assert ti.dag_id == rtif.dag_id
 assert ti.task_id == rtif.task_id
 assert ti.run_id == rtif.run_id
-assert expected_rendered_field == 
rtif.rendered_fields.get("bash_command")
+if type(templated_field) is set:
+assert ast.literal_eval(expected_rendered_field) == 
ast.literal_eval(
+rtif.rendered_fields.get("bash_command")
+)
+else:
+assert expected_rendered_field == 
rtif.rendered_fields.get("bash_command")

Review Comment:
   Follow-up: https://github.com/apache/airflow/pull/44920



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]

2024-12-13 Thread via GitHub


kaxil commented on PR #44843:
URL: https://github.com/apache/airflow/pull/44843#issuecomment-2541943313

   @amoghrajesh Created https://github.com/apache/airflow/pull/44920


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]

2024-12-13 Thread via GitHub


kaxil commented on PR #44843:
URL: https://github.com/apache/airflow/pull/44843#issuecomment-2541929935

   Merged this PR as-is so we don't keep main broken for this case. But I'll 
have PR in just a bit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]

2024-12-13 Thread via GitHub


kaxil merged PR #44843:
URL: https://github.com/apache/airflow/pull/44843


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]

2024-12-13 Thread via GitHub


amoghrajesh commented on code in PR #44843:
URL: https://github.com/apache/airflow/pull/44843#discussion_r1884266901


##
tests/models/test_renderedtifields.py:
##
@@ -158,16 +163,35 @@ def test_get_templated_fields(self, templated_field, 
expected_rendered_field, da
 assert ti.dag_id == rtif.dag_id
 assert ti.task_id == rtif.task_id
 assert ti.run_id == rtif.run_id
-assert expected_rendered_field == 
rtif.rendered_fields.get("bash_command")
+if type(templated_field) is set:
+# the output order of a set is non-deterministic and can change 
per process.
+# this validation can fail if that happens before stringification, 
so we convert to set and compare.
+assert ast.literal_eval(expected_rendered_field) == 
ast.literal_eval(
+rtif.rendered_fields.get("bash_command")
+)
+else:
+assert expected_rendered_field == 
rtif.rendered_fields.get("bash_command")
 
 session.add(rtif)
 session.flush()
 
-assert RTIF.get_templated_fields(ti=ti, session=session) == {
-"bash_command": expected_rendered_field,
-"env": None,
-"cwd": None,
-}
+if type(templated_field) is set:

Review Comment:
   Nice! Thanks. Let me commit that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]

2024-12-13 Thread via GitHub


kaxil commented on code in PR #44843:
URL: https://github.com/apache/airflow/pull/44843#discussion_r1884266838


##
tests/models/test_renderedtifields.py:
##
@@ -158,16 +163,35 @@ def test_get_templated_fields(self, templated_field, 
expected_rendered_field, da
 assert ti.dag_id == rtif.dag_id
 assert ti.task_id == rtif.task_id
 assert ti.run_id == rtif.run_id
-assert expected_rendered_field == 
rtif.rendered_fields.get("bash_command")
+if type(templated_field) is set:
+# the output order of a set is non-deterministic and can change 
per process.
+# this validation can fail if that happens before stringification, 
so we convert to set and compare.
+assert ast.literal_eval(expected_rendered_field) == 
ast.literal_eval(
+rtif.rendered_fields.get("bash_command")
+)
+else:
+assert expected_rendered_field == 
rtif.rendered_fields.get("bash_command")
 
 session.add(rtif)
 session.flush()
 
-assert RTIF.get_templated_fields(ti=ti, session=session) == {
-"bash_command": expected_rendered_field,
-"env": None,
-"cwd": None,
-}
+if type(templated_field) is set:

Review Comment:
   Actually I have a better idea to not have to deal with this problem, let me 
push a change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]

2024-12-13 Thread via GitHub


kaxil commented on code in PR #44843:
URL: https://github.com/apache/airflow/pull/44843#discussion_r1884264997


##
tests/models/test_renderedtifields.py:
##
@@ -158,16 +163,35 @@ def test_get_templated_fields(self, templated_field, 
expected_rendered_field, da
 assert ti.dag_id == rtif.dag_id
 assert ti.task_id == rtif.task_id
 assert ti.run_id == rtif.run_id
-assert expected_rendered_field == 
rtif.rendered_fields.get("bash_command")
+if type(templated_field) is set:
+# the output order of a set is non-deterministic and can change 
per process.
+# this validation can fail if that happens before stringification, 
so we convert to set and compare.
+assert ast.literal_eval(expected_rendered_field) == 
ast.literal_eval(
+rtif.rendered_fields.get("bash_command")
+)
+else:
+assert expected_rendered_field == 
rtif.rendered_fields.get("bash_command")
 
 session.add(rtif)
 session.flush()
 
-assert RTIF.get_templated_fields(ti=ti, session=session) == {
-"bash_command": expected_rendered_field,
-"env": None,
-"cwd": None,
-}
+if type(templated_field) is set:

Review Comment:
   This is in tests we control the parameters so not a big deal here anyway and 
more a "nit" -- but to illustrate the reason:
   
   ```py
   In [13]: class CustomSet(set):
   ...: def add(self, element):
   ...: print(f"Adding element: {element}")
   ...: super().add(element)
   ...:
   
   In [15]: a = CustomSet([1,2])
   
   In [16]: isinstance(a, set)
   Out[16]: True
   
   In [17]: type(a) is set
   Out[17]: False
   ```



##
tests/models/test_renderedtifields.py:
##
@@ -158,16 +163,35 @@ def test_get_templated_fields(self, templated_field, 
expected_rendered_field, da
 assert ti.dag_id == rtif.dag_id
 assert ti.task_id == rtif.task_id
 assert ti.run_id == rtif.run_id
-assert expected_rendered_field == 
rtif.rendered_fields.get("bash_command")
+if type(templated_field) is set:
+# the output order of a set is non-deterministic and can change 
per process.
+# this validation can fail if that happens before stringification, 
so we convert to set and compare.
+assert ast.literal_eval(expected_rendered_field) == 
ast.literal_eval(
+rtif.rendered_fields.get("bash_command")
+)
+else:
+assert expected_rendered_field == 
rtif.rendered_fields.get("bash_command")
 
 session.add(rtif)
 session.flush()
 
-assert RTIF.get_templated_fields(ti=ti, session=session) == {
-"bash_command": expected_rendered_field,
-"env": None,
-"cwd": None,
-}
+if type(templated_field) is set:

Review Comment:
   This is in tests so we control the parameters so not a big deal here anyway 
and more a "nit" -- but to illustrate the reason:
   
   ```py
   In [13]: class CustomSet(set):
   ...: def add(self, element):
   ...: print(f"Adding element: {element}")
   ...: super().add(element)
   ...:
   
   In [15]: a = CustomSet([1,2])
   
   In [16]: isinstance(a, set)
   Out[16]: True
   
   In [17]: type(a) is set
   Out[17]: False
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]

2024-12-13 Thread via GitHub


kaxil commented on code in PR #44843:
URL: https://github.com/apache/airflow/pull/44843#discussion_r1884255743


##
tests/models/test_renderedtifields.py:
##
@@ -158,16 +163,35 @@ def test_get_templated_fields(self, templated_field, 
expected_rendered_field, da
 assert ti.dag_id == rtif.dag_id
 assert ti.task_id == rtif.task_id
 assert ti.run_id == rtif.run_id
-assert expected_rendered_field == 
rtif.rendered_fields.get("bash_command")
+if type(templated_field) is set:
+# the output order of a set is non-deterministic and can change 
per process.
+# this validation can fail if that happens before stringification, 
so we convert to set and compare.
+assert ast.literal_eval(expected_rendered_field) == 
ast.literal_eval(
+rtif.rendered_fields.get("bash_command")
+)
+else:
+assert expected_rendered_field == 
rtif.rendered_fields.get("bash_command")
 
 session.add(rtif)
 session.flush()
 
-assert RTIF.get_templated_fields(ti=ti, session=session) == {
-"bash_command": expected_rendered_field,
-"env": None,
-"cwd": None,
-}
+if type(templated_field) is set:

Review Comment:
   ```suggestion
   if isinstance(templated_field, set):
   ```
   
   Better from OOPs perspective



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]

2024-12-13 Thread via GitHub


kaxil commented on code in PR #44843:
URL: https://github.com/apache/airflow/pull/44843#discussion_r1884252800


##
tests/models/test_renderedtifields.py:
##
@@ -158,16 +163,31 @@ def test_get_templated_fields(self, templated_field, 
expected_rendered_field, da
 assert ti.dag_id == rtif.dag_id
 assert ti.task_id == rtif.task_id
 assert ti.run_id == rtif.run_id
-assert expected_rendered_field == 
rtif.rendered_fields.get("bash_command")
+if type(templated_field) is set:
+assert ast.literal_eval(expected_rendered_field) == 
ast.literal_eval(
+rtif.rendered_fields.get("bash_command")
+)
+else:
+assert expected_rendered_field == 
rtif.rendered_fields.get("bash_command")

Review Comment:
   damn -- yeah the ordering issue with set!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]

2024-12-13 Thread via GitHub


amoghrajesh commented on code in PR #44843:
URL: https://github.com/apache/airflow/pull/44843#discussion_r1884094412


##
tests/models/test_renderedtifields.py:
##
@@ -158,16 +163,31 @@ def test_get_templated_fields(self, templated_field, 
expected_rendered_field, da
 assert ti.dag_id == rtif.dag_id
 assert ti.task_id == rtif.task_id
 assert ti.run_id == rtif.run_id
-assert expected_rendered_field == 
rtif.rendered_fields.get("bash_command")
+if type(templated_field) is set:
+assert ast.literal_eval(expected_rendered_field) == 
ast.literal_eval(
+rtif.rendered_fields.get("bash_command")
+)
+else:
+assert expected_rendered_field == 
rtif.rendered_fields.get("bash_command")

Review Comment:
   The reason for doing this is because sometimes, the rendered fields in case 
of sets are stringified in a different set order. Leading to comparison failure 
like this: 
   ```
   _ 
TestRenderedTaskInstanceFields.test_get_templated_fields[templated_field8-{'foo',
 'bar'}] _
   tests/models/test_renderedtifields.py:165: in test_get_templated_fields
   assert expected_rendered_field == 
rtif.rendered_fields.get("bash_command")
   E   assert equals failed
   E "{'foo', 'bar'}"  "{'bar', 'foo'}"
   ```
   
   Example run 
https://github.com/apache/airflow/actions/runs/12313918735/job/34369300336



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]

2024-12-13 Thread via GitHub


kaxil commented on code in PR #44843:
URL: https://github.com/apache/airflow/pull/44843#discussion_r1884021823


##
tests/models/test_renderedtifields.py:
##
@@ -158,16 +163,31 @@ def test_get_templated_fields(self, templated_field, 
expected_rendered_field, da
 assert ti.dag_id == rtif.dag_id
 assert ti.task_id == rtif.task_id
 assert ti.run_id == rtif.run_id
-assert expected_rendered_field == 
rtif.rendered_fields.get("bash_command")
+if type(templated_field) is set:
+assert ast.literal_eval(expected_rendered_field) == 
ast.literal_eval(
+rtif.rendered_fields.get("bash_command")
+)
+else:
+assert expected_rendered_field == 
rtif.rendered_fields.get("bash_command")

Review Comment:
   Why this @amoghrajesh ?



##
tests/models/test_renderedtifields.py:
##
@@ -158,16 +163,31 @@ def test_get_templated_fields(self, templated_field, 
expected_rendered_field, da
 assert ti.dag_id == rtif.dag_id
 assert ti.task_id == rtif.task_id
 assert ti.run_id == rtif.run_id
-assert expected_rendered_field == 
rtif.rendered_fields.get("bash_command")
+if type(templated_field) is set:
+assert ast.literal_eval(expected_rendered_field) == 
ast.literal_eval(
+rtif.rendered_fields.get("bash_command")
+)
+else:
+assert expected_rendered_field == 
rtif.rendered_fields.get("bash_command")
 
 session.add(rtif)
 session.flush()
 
-assert RTIF.get_templated_fields(ti=ti, session=session) == {
-"bash_command": expected_rendered_field,
-"env": None,
-"cwd": None,
-}
+if type(templated_field) is set:
+expected = RTIF.get_templated_fields(ti=ti, session=session)
+expected["bash_command"] = 
ast.literal_eval(expected["bash_command"])
+actual = {
+"bash_command": ast.literal_eval(expected_rendered_field),
+"env": None,
+"cwd": None,
+}
+assert expected == actual
+else:
+assert RTIF.get_templated_fields(ti=ti, session=session) == {
+"bash_command": expected_rendered_field,
+"env": None,
+"cwd": None,
+}

Review Comment:
   and this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]

2024-12-13 Thread via GitHub


amoghrajesh commented on PR #44843:
URL: https://github.com/apache/airflow/pull/44843#issuecomment-2541086170

   War to fix the CI starts!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]

2024-12-13 Thread via GitHub


kaxil commented on code in PR #44843:
URL: https://github.com/apache/airflow/pull/44843#discussion_r1883635008


##
task_sdk/tests/execution_time/test_task_runner.py:
##
@@ -260,3 +273,98 @@ def test_startup_basic_templated_dag(mocked_parse):
 ),
 log=mock.ANY,
 )
+
+
+def test_startup_dag_with_no_templates(mocked_parse):
+"""Test startup of a simple DAG."""
+from airflow.providers.standard.operators.python import PythonOperator
+
+task = PythonOperator(
+task_id="task1",
+python_callable=lambda: print("hello world!"),
+)
+
+what = StartupDetails(
+ti=TaskInstance(id=uuid7(), task_id="task1", dag_id="basic_dag", 
run_id="c", try_number=1),
+file="",
+requests_fd=0,
+)
+mocked_parse(what, "basic_dag", task)
+
+with mock.patch(
+"airflow.sdk.execution_time.task_runner.SUPERVISOR_COMMS", create=True
+) as mock_supervisor_comms:
+mock_supervisor_comms.get_message.return_value = what
+startup()
+
+mock_supervisor_comms.send_request.assert_called_once_with(
+msg=SetRenderedFields(rendered_fields={"op_args": [], "op_kwargs": 
{}, "templates_dict": None}),
+log=mock.ANY,
+)
+
+
+def test_startup_dag_with_no_templates_mixed_types(mocked_parse):
+"""Test startup of a simple DAG."""
+from airflow.providers.standard.operators.python import PythonOperator
+
+task = PythonOperator(
+task_id="task1",
+python_callable=lambda *args, **kwargs: print(f"op_args: {args}, 
op_kwargs: {kwargs}"),
+op_args=["arg1", "arg2", 1, 2, 3.75, {"key": "value"}],
+op_kwargs={"key1": "value1", "key2": 99.0, "key3": {"nested_key": 
"nested_value"}},
+)
+
+what = StartupDetails(
+ti=TaskInstance(id=uuid7(), task_id="task1", dag_id="basic_dag", 
run_id="c", try_number=1),
+file="",
+requests_fd=0,
+)
+mocked_parse(what, "basic_dag", task)
+
+with mock.patch(
+"airflow.sdk.execution_time.task_runner.SUPERVISOR_COMMS", create=True
+) as mock_supervisor_comms:
+mock_supervisor_comms.get_message.return_value = what
+startup()
+
+mock_supervisor_comms.send_request.assert_called_once_with(
+msg=SetRenderedFields(
+rendered_fields={
+"templates_dict": None,
+"op_args": ["arg1", "arg2", 1, 2, 3.75, {"key": "value"}],
+"op_kwargs": {"key1": "value1", "key2": 99.0, "key3": 
{"nested_key": "nested_value"}},
+}
+),
+log=mock.ANY,
+)
+
+
+def test_startup_dag_with_tuple_and_set_templated_fields(mocked_parse):
+"""Test startup of a simple DAG with tuple and set templated fields."""
+
+task = CustomOperator2(
+task_id="templated_task",
+my_tup=(
+1,
+2,
+),
+my_set={1, 2, 3},
+)
+
+what = StartupDetails(
+ti=TaskInstance(id=uuid7(), task_id="templated_task", 
dag_id="basic_dag", run_id="c", try_number=1),
+file="",
+requests_fd=0,
+)
+mocked_parse(what, "basic_dag", task)
+
+with mock.patch(
+"airflow.sdk.execution_time.task_runner.SUPERVISOR_COMMS", create=True
+) as mock_supervisor_comms:
+mock_supervisor_comms.get_message.return_value = what
+startup()
+
+mock_supervisor_comms.send_request.assert_called_once_with(
+msg=SetRenderedFields(rendered_fields={"my_set": "{1, 2, 3}", 
"my_tup": "(1, 2)"}),
+log=mock.ANY,
+)

Review Comment:
   I have simplified the tests in 
https://github.com/apache/airflow/pull/44843/commits/2381c10026f528a1f4ce5c2260eaa9a874c2fbd2



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]

2024-12-13 Thread via GitHub


kaxil commented on code in PR #44843:
URL: https://github.com/apache/airflow/pull/44843#discussion_r1883581737


##
airflow/serialization/helpers.py:
##
@@ -65,4 +69,4 @@ def is_jsonable(x):
 "Truncated. You can change this behaviour in 
[core]max_templated_field_length. "
 f"{rendered[:max_length - 79]!r}... "
 )
-return template_field
+return serialized

Review Comment:
   This would have converted everything to string :) 
   
   
https://github.com/apache/airflow/blob/0f0bed0ae00b7c07fed147f1d42760bf654336cb/airflow/serialization/helpers.py#L70



##
airflow/serialization/helpers.py:
##
@@ -65,4 +69,4 @@ def is_jsonable(x):
 "Truncated. You can change this behaviour in 
[core]max_templated_field_length. "
 f"{rendered[:max_length - 79]!r}... "
 )
-return template_field
+return serialized

Review Comment:
   This would have converted everything to string -- even list, int etc :) 
   
   
https://github.com/apache/airflow/blob/0f0bed0ae00b7c07fed147f1d42760bf654336cb/airflow/serialization/helpers.py#L70



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]

2024-12-13 Thread via GitHub


kaxil commented on code in PR #44843:
URL: https://github.com/apache/airflow/pull/44843#discussion_r1883581737


##
airflow/serialization/helpers.py:
##
@@ -65,4 +69,4 @@ def is_jsonable(x):
 "Truncated. You can change this behaviour in 
[core]max_templated_field_length. "
 f"{rendered[:max_length - 79]!r}... "
 )
-return template_field
+return serialized

Review Comment:
   This would have converted everything to string -- even list, int etc would 
have change to str :) 
   
   
https://github.com/apache/airflow/blob/0f0bed0ae00b7c07fed147f1d42760bf654336cb/airflow/serialization/helpers.py#L70



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]

2024-12-13 Thread via GitHub


kaxil commented on code in PR #44843:
URL: https://github.com/apache/airflow/pull/44843#discussion_r1883581737


##
airflow/serialization/helpers.py:
##
@@ -65,4 +69,4 @@ def is_jsonable(x):
 "Truncated. You can change this behaviour in 
[core]max_templated_field_length. "
 f"{rendered[:max_length - 79]!r}... "
 )
-return template_field
+return serialized

Review Comment:
   This would have converted everything to string :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]

2024-12-13 Thread via GitHub


amoghrajesh commented on code in PR #44843:
URL: https://github.com/apache/airflow/pull/44843#discussion_r1883546427


##
airflow/serialization/helpers.py:
##
@@ -65,4 +69,4 @@ def is_jsonable(x):
 "Truncated. You can change this behaviour in 
[core]max_templated_field_length. "
 f"{rendered[:max_length - 79]!r}... "
 )
-return template_field
+return serialized

Review Comment:
Not making such a change in this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]

2024-12-13 Thread via GitHub


amoghrajesh commented on code in PR #44843:
URL: https://github.com/apache/airflow/pull/44843#discussion_r1883545915


##
task_sdk/tests/execution_time/test_task_runner.py:
##
@@ -260,3 +273,98 @@ def test_startup_basic_templated_dag(mocked_parse):
 ),
 log=mock.ANY,
 )
+
+
+def test_startup_dag_with_no_templates(mocked_parse):
+"""Test startup of a simple DAG."""
+from airflow.providers.standard.operators.python import PythonOperator
+
+task = PythonOperator(
+task_id="task1",
+python_callable=lambda: print("hello world!"),
+)
+
+what = StartupDetails(
+ti=TaskInstance(id=uuid7(), task_id="task1", dag_id="basic_dag", 
run_id="c", try_number=1),
+file="",
+requests_fd=0,
+)
+mocked_parse(what, "basic_dag", task)
+
+with mock.patch(
+"airflow.sdk.execution_time.task_runner.SUPERVISOR_COMMS", create=True
+) as mock_supervisor_comms:
+mock_supervisor_comms.get_message.return_value = what
+startup()
+
+mock_supervisor_comms.send_request.assert_called_once_with(
+msg=SetRenderedFields(rendered_fields={"op_args": [], "op_kwargs": 
{}, "templates_dict": None}),
+log=mock.ANY,
+)
+
+
+def test_startup_dag_with_no_templates_mixed_types(mocked_parse):
+"""Test startup of a simple DAG."""
+from airflow.providers.standard.operators.python import PythonOperator
+
+task = PythonOperator(
+task_id="task1",
+python_callable=lambda *args, **kwargs: print(f"op_args: {args}, 
op_kwargs: {kwargs}"),
+op_args=["arg1", "arg2", 1, 2, 3.75, {"key": "value"}],
+op_kwargs={"key1": "value1", "key2": 99.0, "key3": {"nested_key": 
"nested_value"}},
+)
+
+what = StartupDetails(
+ti=TaskInstance(id=uuid7(), task_id="task1", dag_id="basic_dag", 
run_id="c", try_number=1),
+file="",
+requests_fd=0,
+)
+mocked_parse(what, "basic_dag", task)
+
+with mock.patch(
+"airflow.sdk.execution_time.task_runner.SUPERVISOR_COMMS", create=True
+) as mock_supervisor_comms:
+mock_supervisor_comms.get_message.return_value = what
+startup()
+
+mock_supervisor_comms.send_request.assert_called_once_with(
+msg=SetRenderedFields(
+rendered_fields={
+"templates_dict": None,
+"op_args": ["arg1", "arg2", 1, 2, 3.75, {"key": "value"}],
+"op_kwargs": {"key1": "value1", "key2": 99.0, "key3": 
{"nested_key": "nested_value"}},
+}
+),
+log=mock.ANY,
+)
+
+
+def test_startup_dag_with_tuple_and_set_templated_fields(mocked_parse):
+"""Test startup of a simple DAG with tuple and set templated fields."""
+
+task = CustomOperator2(
+task_id="templated_task",
+my_tup=(
+1,
+2,
+),
+my_set={1, 2, 3},
+)
+
+what = StartupDetails(
+ti=TaskInstance(id=uuid7(), task_id="templated_task", 
dag_id="basic_dag", run_id="c", try_number=1),
+file="",
+requests_fd=0,
+)
+mocked_parse(what, "basic_dag", task)
+
+with mock.patch(
+"airflow.sdk.execution_time.task_runner.SUPERVISOR_COMMS", create=True
+) as mock_supervisor_comms:
+mock_supervisor_comms.get_message.return_value = what
+startup()
+
+mock_supervisor_comms.send_request.assert_called_once_with(
+msg=SetRenderedFields(rendered_fields={"my_set": "{1, 2, 3}", 
"my_tup": "(1, 2)"}),
+log=mock.ANY,
+)

Review Comment:
   Lets not make this change in this PR.



##
task_sdk/tests/execution_time/test_task_runner.py:
##
@@ -260,3 +273,98 @@ def test_startup_basic_templated_dag(mocked_parse):
 ),
 log=mock.ANY,
 )
+
+
+def test_startup_dag_with_no_templates(mocked_parse):
+"""Test startup of a simple DAG."""
+from airflow.providers.standard.operators.python import PythonOperator
+
+task = PythonOperator(
+task_id="task1",
+python_callable=lambda: print("hello world!"),
+)
+
+what = StartupDetails(
+ti=TaskInstance(id=uuid7(), task_id="task1", dag_id="basic_dag", 
run_id="c", try_number=1),
+file="",
+requests_fd=0,
+)
+mocked_parse(what, "basic_dag", task)
+
+with mock.patch(
+"airflow.sdk.execution_time.task_runner.SUPERVISOR_COMMS", create=True
+) as mock_supervisor_comms:
+mock_supervisor_comms.get_message.return_value = what
+startup()
+
+mock_supervisor_comms.send_request.assert_called_once_with(
+msg=SetRenderedFields(rendered_fields={"op_args": [], "op_kwargs": 
{}, "templates_dict": None}),
+log=mock.ANY,
+)
+
+
+def test_startup_dag_with_no_templates_mixed_types(mocked_parse):
+"""Test startup of a simple DAG."""
+from airf

Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]

2024-12-13 Thread via GitHub


kaxil commented on code in PR #44843:
URL: https://github.com/apache/airflow/pull/44843#discussion_r1883540029


##
airflow/serialization/helpers.py:
##
@@ -43,6 +43,10 @@ def is_jsonable(x):
 
 max_length = conf.getint("core", "max_templated_field_length")
 
+# Handle empty set or tuple explicitly
+if isinstance(template_field, (set, tuple)) and not template_field:
+return []

Review Comment:
   This makes the return type of this function correct as previously it did 
return an empty tuple



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]

2024-12-13 Thread via GitHub


kaxil commented on code in PR #44843:
URL: https://github.com/apache/airflow/pull/44843#discussion_r1883536589


##
airflow/serialization/helpers.py:
##
@@ -65,4 +69,4 @@ def is_jsonable(x):
 "Truncated. You can change this behaviour in 
[core]max_templated_field_length. "
 f"{rendered[:max_length - 79]!r}... "
 )
-return template_field
+return serialized

Review Comment:
   What's the bug that is fixed by this change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]

2024-12-12 Thread via GitHub


amoghrajesh commented on code in PR #44843:
URL: https://github.com/apache/airflow/pull/44843#discussion_r1883412215


##
task_sdk/tests/execution_time/test_task_runner.py:
##
@@ -260,3 +273,98 @@ def test_startup_basic_templated_dag(mocked_parse):
 ),
 log=mock.ANY,
 )
+
+
+def test_startup_dag_with_no_templates(mocked_parse):
+"""Test startup of a simple DAG."""
+from airflow.providers.standard.operators.python import PythonOperator
+
+task = PythonOperator(
+task_id="task1",
+python_callable=lambda: print("hello world!"),
+)
+
+what = StartupDetails(
+ti=TaskInstance(id=uuid7(), task_id="task1", dag_id="basic_dag", 
run_id="c", try_number=1),
+file="",
+requests_fd=0,
+)
+mocked_parse(what, "basic_dag", task)
+
+with mock.patch(
+"airflow.sdk.execution_time.task_runner.SUPERVISOR_COMMS", create=True
+) as mock_supervisor_comms:
+mock_supervisor_comms.get_message.return_value = what
+startup()
+
+mock_supervisor_comms.send_request.assert_called_once_with(
+msg=SetRenderedFields(rendered_fields={"op_args": [], "op_kwargs": 
{}, "templates_dict": None}),
+log=mock.ANY,
+)
+
+
+def test_startup_dag_with_no_templates_mixed_types(mocked_parse):
+"""Test startup of a simple DAG."""
+from airflow.providers.standard.operators.python import PythonOperator
+
+task = PythonOperator(
+task_id="task1",
+python_callable=lambda *args, **kwargs: print(f"op_args: {args}, 
op_kwargs: {kwargs}"),
+op_args=["arg1", "arg2", 1, 2, 3.75, {"key": "value"}],
+op_kwargs={"key1": "value1", "key2": 99.0, "key3": {"nested_key": 
"nested_value"}},
+)
+
+what = StartupDetails(
+ti=TaskInstance(id=uuid7(), task_id="task1", dag_id="basic_dag", 
run_id="c", try_number=1),
+file="",
+requests_fd=0,
+)
+mocked_parse(what, "basic_dag", task)
+
+with mock.patch(
+"airflow.sdk.execution_time.task_runner.SUPERVISOR_COMMS", create=True
+) as mock_supervisor_comms:
+mock_supervisor_comms.get_message.return_value = what
+startup()
+
+mock_supervisor_comms.send_request.assert_called_once_with(
+msg=SetRenderedFields(
+rendered_fields={
+"templates_dict": None,
+"op_args": ["arg1", "arg2", 1, 2, 3.75, {"key": "value"}],
+"op_kwargs": {"key1": "value1", "key2": 99.0, "key3": 
{"nested_key": "nested_value"}},
+}
+),
+log=mock.ANY,
+)
+
+
+def test_startup_dag_with_tuple_and_set_templated_fields(mocked_parse):
+"""Test startup of a simple DAG with tuple and set templated fields."""
+
+task = CustomOperator2(
+task_id="templated_task",
+my_tup=(
+1,
+2,
+),
+my_set={1, 2, 3},
+)
+
+what = StartupDetails(
+ti=TaskInstance(id=uuid7(), task_id="templated_task", 
dag_id="basic_dag", run_id="c", try_number=1),
+file="",
+requests_fd=0,
+)
+mocked_parse(what, "basic_dag", task)
+
+with mock.patch(
+"airflow.sdk.execution_time.task_runner.SUPERVISOR_COMMS", create=True
+) as mock_supervisor_comms:
+mock_supervisor_comms.get_message.return_value = what
+startup()
+
+mock_supervisor_comms.send_request.assert_called_once_with(
+msg=SetRenderedFields(rendered_fields={"my_set": "{1, 2, 3}", 
"my_tup": "(1, 2)"}),
+log=mock.ANY,
+)

Review Comment:
   Added this test that validates tuples and sets thoroughly
   WHile doing this I realised that there was a bug in our serialiser which 
wasnt able to handle tuples. Fixed it here: 
https://github.com/apache/airflow/pull/44843/files#diff-6a66bbf06ff636982d3b650e1b35bcea75971d211691143abef6eb8349416b98R72



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]

2024-12-12 Thread via GitHub


amoghrajesh commented on code in PR #44843:
URL: https://github.com/apache/airflow/pull/44843#discussion_r1883375971


##
airflow/serialization/helpers.py:
##
@@ -43,6 +43,10 @@ def is_jsonable(x):
 
 max_length = conf.getint("core", "max_templated_field_length")
 
+# Handle empty set or tuple explicitly
+if isinstance(template_field, (set, tuple)) and not template_field:
+return []

Review Comment:
   Yeah i was thinking of something similar. This looks fine for now



##
airflow/api_fastapi/execution_api/routes/task_instances.py:
##
@@ -237,7 +237,7 @@ def ti_heartbeat(
 )
 def ti_put_rtif(
 task_instance_id: UUID,
-put_rtif_payload: RTIFPayload,

Review Comment:
   ACK. Sounds good



##
task_sdk/src/airflow/sdk/execution_time/task_runner.py:
##
@@ -157,22 +157,26 @@ def startup() -> tuple[RuntimeTaskInstance, Logger]:
 # 1. Implementing the part where we pull in the logic to render fields and 
add that here
 # for all operators, we should do setattr(task, templated_field, 
rendered_templated_field)
 # task.templated_fields should give all the templated_fields and each of 
those fields should
-# give the rendered values.
+# give the rendered values. task.templated_fields should already be in a 
JSONable format and
+# we should not have to handle that here.
 
 # 2. Once rendered, we call the `set_rtif` API to store the rtif in the 
metadata DB
-templated_fields = ti.task.template_fields
-payload = {}
-
-for field in templated_fields:
-if field not in payload:
-payload[field] = getattr(ti.task, field)
 
 # so that we do not call the API unnecessarily
-if payload:
-SUPERVISOR_COMMS.send_request(log=log, 
msg=SetRenderedFields(rendered_fields=payload))
+if rendered_fields := _get_rendered_fields(ti.task):
+SUPERVISOR_COMMS.send_request(log=log, 
msg=SetRenderedFields(rendered_fields=rendered_fields))
 return ti, log
 
 
+def _get_rendered_fields(task: BaseOperator) -> dict[str, JsonValue]:
+# TODO: Port one of the following to Task SDK
+#   airflow.serialization.helpers.serialize_template_field or
+#   airflow.models.renderedtifields.get_serialized_template_fields
+from airflow.serialization.helpers import serialize_template_field
+
+return {field: serialize_template_field(getattr(task, field), field) for 
field in task.template_fields}

Review Comment:
   Right, instead of duplication at the moment, we can reuse it and when we 
port the logic for templating, we anyways are going to revisit this. Better to 
handle it then. We will not miss it too cos we have a TODO



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]

2024-12-12 Thread via GitHub


ashb commented on code in PR #44843:
URL: https://github.com/apache/airflow/pull/44843#discussion_r1882865152


##
task_sdk/src/airflow/sdk/execution_time/task_runner.py:
##
@@ -157,22 +157,26 @@ def startup() -> tuple[RuntimeTaskInstance, Logger]:
 # 1. Implementing the part where we pull in the logic to render fields and 
add that here
 # for all operators, we should do setattr(task, templated_field, 
rendered_templated_field)
 # task.templated_fields should give all the templated_fields and each of 
those fields should
-# give the rendered values.
+# give the rendered values. task.templated_fields should already be in a 
JSONable format and
+# we should not have to handle that here.
 
 # 2. Once rendered, we call the `set_rtif` API to store the rtif in the 
metadata DB
-templated_fields = ti.task.template_fields
-payload = {}
-
-for field in templated_fields:
-if field not in payload:
-payload[field] = getattr(ti.task, field)
 
 # so that we do not call the API unnecessarily
-if payload:
-SUPERVISOR_COMMS.send_request(log=log, 
msg=SetRenderedFields(rendered_fields=payload))
+if rendered_fields := _get_rendered_fields(ti.task):
+SUPERVISOR_COMMS.send_request(log=log, 
msg=SetRenderedFields(rendered_fields=rendered_fields))
 return ti, log
 
 
+def _get_rendered_fields(task: BaseOperator) -> dict[str, JsonValue]:
+# TODO: Port one of the following to Task SDK
+#   airflow.serialization.helpers.serialize_template_field or
+#   airflow.models.renderedtifields.get_serialized_template_fields
+from airflow.serialization.helpers import serialize_template_field
+
+return {field: serialize_template_field(getattr(task, field), field) for 
field in task.template_fields}

Review Comment:
   I'd say for now reuse the code 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]

2024-12-12 Thread via GitHub


kaxil commented on code in PR #44843:
URL: https://github.com/apache/airflow/pull/44843#discussion_r1882884043


##
airflow/api_fastapi/execution_api/routes/task_instances.py:
##
@@ -237,7 +237,7 @@ def ti_heartbeat(
 )
 def ti_put_rtif(
 task_instance_id: UUID,
-put_rtif_payload: RTIFPayload,

Review Comment:
   I just removed `RTIFPayload` and `RootModel` -- we can just have it directly 
over here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]

2024-12-12 Thread via GitHub


kaxil commented on code in PR #44843:
URL: https://github.com/apache/airflow/pull/44843#discussion_r1882836965


##
task_sdk/src/airflow/sdk/execution_time/task_runner.py:
##
@@ -157,22 +157,26 @@ def startup() -> tuple[RuntimeTaskInstance, Logger]:
 # 1. Implementing the part where we pull in the logic to render fields and 
add that here
 # for all operators, we should do setattr(task, templated_field, 
rendered_templated_field)
 # task.templated_fields should give all the templated_fields and each of 
those fields should
-# give the rendered values.
+# give the rendered values. task.templated_fields should already be in a 
JSONable format and
+# we should not have to handle that here.
 
 # 2. Once rendered, we call the `set_rtif` API to store the rtif in the 
metadata DB
-templated_fields = ti.task.template_fields
-payload = {}
-
-for field in templated_fields:
-if field not in payload:
-payload[field] = getattr(ti.task, field)
 
 # so that we do not call the API unnecessarily
-if payload:
-SUPERVISOR_COMMS.send_request(log=log, 
msg=SetRenderedFields(rendered_fields=payload))
+if rendered_fields := _get_rendered_fields(ti.task):
+SUPERVISOR_COMMS.send_request(log=log, 
msg=SetRenderedFields(rendered_fields=rendered_fields))
 return ti, log
 
 
+def _get_rendered_fields(task: BaseOperator) -> dict[str, JsonValue]:
+# TODO: Port one of the following to Task SDK
+#   airflow.serialization.helpers.serialize_template_field or
+#   airflow.models.renderedtifields.get_serialized_template_fields
+from airflow.serialization.helpers import serialize_template_field
+
+return {field: serialize_template_field(getattr(task, field), field) for 
field in task.template_fields}

Review Comment:
   If we don't use `serialize_template_field`, we will have to duplicate the 
logic -- which is also fine -- but we need to take an informed call. On one 
hand we need to de-couple, on the other hand we can keep it centralized and 
port it later
   
   ---
   
   There is also logic to redact:
   
   
https://github.com/apache/airflow/blob/cc9b9a8794da50630e26f5de04b4c0ae5d49353f/airflow/serialization/helpers.py#L52
   
   
https://github.com/apache/airflow/blob/cc9b9a8794da50630e26f5de04b4c0ae5d49353f/airflow/serialization/helpers.py#L63



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]

2024-12-12 Thread via GitHub


kaxil commented on code in PR #44843:
URL: https://github.com/apache/airflow/pull/44843#discussion_r1882837307


##
task_sdk/src/airflow/sdk/execution_time/task_runner.py:
##
@@ -157,22 +157,26 @@ def startup() -> tuple[RuntimeTaskInstance, Logger]:
 # 1. Implementing the part where we pull in the logic to render fields and 
add that here
 # for all operators, we should do setattr(task, templated_field, 
rendered_templated_field)
 # task.templated_fields should give all the templated_fields and each of 
those fields should
-# give the rendered values.
+# give the rendered values. task.templated_fields should already be in a 
JSONable format and
+# we should not have to handle that here.
 
 # 2. Once rendered, we call the `set_rtif` API to store the rtif in the 
metadata DB
-templated_fields = ti.task.template_fields
-payload = {}
-
-for field in templated_fields:
-if field not in payload:
-payload[field] = getattr(ti.task, field)
 
 # so that we do not call the API unnecessarily
-if payload:
-SUPERVISOR_COMMS.send_request(log=log, 
msg=SetRenderedFields(rendered_fields=payload))
+if rendered_fields := _get_rendered_fields(ti.task):
+SUPERVISOR_COMMS.send_request(log=log, 
msg=SetRenderedFields(rendered_fields=rendered_fields))
 return ti, log
 
 
+def _get_rendered_fields(task: BaseOperator) -> dict[str, JsonValue]:
+# TODO: Port one of the following to Task SDK
+#   airflow.serialization.helpers.serialize_template_field or
+#   airflow.models.renderedtifields.get_serialized_template_fields
+from airflow.serialization.helpers import serialize_template_field
+
+return {field: serialize_template_field(getattr(task, field), field) for 
field in task.template_fields}

Review Comment:
   @ashb / @amoghrajesh thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]

2024-12-12 Thread via GitHub


kaxil commented on code in PR #44843:
URL: https://github.com/apache/airflow/pull/44843#discussion_r1882836965


##
task_sdk/src/airflow/sdk/execution_time/task_runner.py:
##
@@ -157,22 +157,26 @@ def startup() -> tuple[RuntimeTaskInstance, Logger]:
 # 1. Implementing the part where we pull in the logic to render fields and 
add that here
 # for all operators, we should do setattr(task, templated_field, 
rendered_templated_field)
 # task.templated_fields should give all the templated_fields and each of 
those fields should
-# give the rendered values.
+# give the rendered values. task.templated_fields should already be in a 
JSONable format and
+# we should not have to handle that here.
 
 # 2. Once rendered, we call the `set_rtif` API to store the rtif in the 
metadata DB
-templated_fields = ti.task.template_fields
-payload = {}
-
-for field in templated_fields:
-if field not in payload:
-payload[field] = getattr(ti.task, field)
 
 # so that we do not call the API unnecessarily
-if payload:
-SUPERVISOR_COMMS.send_request(log=log, 
msg=SetRenderedFields(rendered_fields=payload))
+if rendered_fields := _get_rendered_fields(ti.task):
+SUPERVISOR_COMMS.send_request(log=log, 
msg=SetRenderedFields(rendered_fields=rendered_fields))
 return ti, log
 
 
+def _get_rendered_fields(task: BaseOperator) -> dict[str, JsonValue]:
+# TODO: Port one of the following to Task SDK
+#   airflow.serialization.helpers.serialize_template_field or
+#   airflow.models.renderedtifields.get_serialized_template_fields
+from airflow.serialization.helpers import serialize_template_field
+
+return {field: serialize_template_field(getattr(task, field), field) for 
field in task.template_fields}

Review Comment:
   If we don't use `serialize_template_field`, we will have to duplicate the 
logic -- which is also fine -- but we need to take an informed call. On one 
hand we need to de-couple, on the other hand we can keep it centralized and 
port it later



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]

2024-12-12 Thread via GitHub


kaxil commented on code in PR #44843:
URL: https://github.com/apache/airflow/pull/44843#discussion_r1882834039


##
task_sdk/src/airflow/sdk/execution_time/task_runner.py:
##
@@ -157,22 +157,26 @@ def startup() -> tuple[RuntimeTaskInstance, Logger]:
 # 1. Implementing the part where we pull in the logic to render fields and 
add that here
 # for all operators, we should do setattr(task, templated_field, 
rendered_templated_field)
 # task.templated_fields should give all the templated_fields and each of 
those fields should
-# give the rendered values.
+# give the rendered values. task.templated_fields should already be in a 
JSONable format and
+# we should not have to handle that here.
 
 # 2. Once rendered, we call the `set_rtif` API to store the rtif in the 
metadata DB
-templated_fields = ti.task.template_fields
-payload = {}
-
-for field in templated_fields:
-if field not in payload:
-payload[field] = getattr(ti.task, field)
 
 # so that we do not call the API unnecessarily
-if payload:
-SUPERVISOR_COMMS.send_request(log=log, 
msg=SetRenderedFields(rendered_fields=payload))
+if rendered_fields := _get_rendered_fields(ti.task):
+SUPERVISOR_COMMS.send_request(log=log, 
msg=SetRenderedFields(rendered_fields=rendered_fields))
 return ti, log
 
 
+def _get_rendered_fields(task: BaseOperator) -> dict[str, JsonValue]:
+# TODO: Port one of the following to Task SDK
+#   airflow.serialization.helpers.serialize_template_field or
+#   airflow.models.renderedtifields.get_serialized_template_fields
+from airflow.serialization.helpers import serialize_template_field
+
+return {field: serialize_template_field(getattr(task, field), field) for 
field in task.template_fields}

Review Comment:
   This is mainly because there maybe some `template_fields` that are classes 
with `.serialize()` method!
   
   
https://github.com/apache/airflow/blob/cc9b9a8794da50630e26f5de04b4c0ae5d49353f/airflow/serialization/helpers.py#L48



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]

2024-12-12 Thread via GitHub


amoghrajesh commented on code in PR #44843:
URL: https://github.com/apache/airflow/pull/44843#discussion_r1882428319


##
airflow/api_fastapi/execution_api/datamodels/taskinstance.py:
##
@@ -137,5 +137,12 @@ class TaskInstance(BaseModel):
 map_index: int | None = None
 
 
+"""Defines the types that the RTIF payload's dictionary values can take. These 
are all JsonAble types """
+JsonAbleValueTypes = Union[
+dict[str, JsonValue],
+list[JsonValue],
+JsonValue,
+]
+

Review Comment:
   Yeah good observation. Let me change it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]

2024-12-12 Thread via GitHub


kaxil commented on code in PR #44843:
URL: https://github.com/apache/airflow/pull/44843#discussion_r1881844567


##
task_sdk/src/airflow/sdk/execution_time/comms.py:
##
@@ -170,13 +170,21 @@ class PutVariable(BaseModel):
 type: Literal["PutVariable"] = "PutVariable"
 
 
+"""Defines the types that the RTIF payload's dictionary values can take. These 
are all JsonAble types """
+JsonAbleValueTypes = Union[
+dict[str, JsonValue],
+list[JsonValue],
+JsonValue,
+]
+
+
 class SetRenderedFields(BaseModel):
 """Payload for setting RTIF for a task instance."""
 
 # We are using a BaseModel here compared to server using RootModel because 
we
 # have a discriminator running with "type", and RootModel doesn't support 
type
 
-rendered_fields: dict[str, str | None]
+rendered_fields: dict[str, JsonAbleValueTypes]

Review Comment:
   ```suggestion
   rendered_fields: dict[str, JsonValue]
   ```



##
airflow/api_fastapi/execution_api/datamodels/taskinstance.py:
##
@@ -137,5 +137,12 @@ class TaskInstance(BaseModel):
 map_index: int | None = None
 
 
+"""Defines the types that the RTIF payload's dictionary values can take. These 
are all JsonAble types """
+JsonAbleValueTypes = Union[
+dict[str, JsonValue],
+list[JsonValue],
+JsonValue,
+]
+
 """Schema for setting RTIF for a task instance."""
-RTIFPayload = RootModel[dict[str, str]]
+RTIFPayload = RootModel[dict[str, JsonAbleValueTypes]]

Review Comment:
   ```suggestion
   RTIFPayload = RootModel[dict[str, JsonValue]]
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]

2024-12-12 Thread via GitHub


kaxil commented on code in PR #44843:
URL: https://github.com/apache/airflow/pull/44843#discussion_r1881843653


##
airflow/api_fastapi/execution_api/datamodels/taskinstance.py:
##
@@ -137,5 +137,12 @@ class TaskInstance(BaseModel):
 map_index: int | None = None
 
 
+"""Defines the types that the RTIF payload's dictionary values can take. These 
are all JsonAble types """
+JsonAbleValueTypes = Union[
+dict[str, JsonValue],
+list[JsonValue],
+JsonValue,
+]
+

Review Comment:
   Actually now you can just remove this entirely and change L148 to use 
`JsonValue`
   
   https://docs.pydantic.dev/2.9/api/types/#pydantic.types.JsonValue
   
   It is sufficient for what we want:
   
   ```
   JsonValue: TypeAlias = Union[
   List["JsonValue"],
   Dict[str, "JsonValue"],
   str,
   bool,
   int,
   float,
   None,
   ]
   ```



##
airflow/api_fastapi/execution_api/datamodels/taskinstance.py:
##
@@ -137,5 +137,12 @@ class TaskInstance(BaseModel):
 map_index: int | None = None
 
 
+"""Defines the types that the RTIF payload's dictionary values can take. These 
are all JsonAble types """
+JsonAbleValueTypes = Union[
+dict[str, JsonValue],
+list[JsonValue],
+JsonValue,
+]
+

Review Comment:
   Actually now you can just remove this entirely and change L148 to use 
`JsonValue`
   
   https://docs.pydantic.dev/2.9/api/types/#pydantic.types.JsonValue
   
   It is sufficient for what we want:
   
   ```py
   JsonValue: TypeAlias = Union[
   List["JsonValue"],
   Dict[str, "JsonValue"],
   str,
   bool,
   int,
   float,
   None,
   ]
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]

2024-12-12 Thread via GitHub


amoghrajesh commented on PR #44843:
URL: https://github.com/apache/airflow/pull/44843#issuecomment-2538393429

   FYI, I ran the DAG from ash that found this issue:
   ```
   from __future__ import annotations
   
   import sys
   import time
   from datetime import datetime
   
   from airflow.decorators import dag, task
   
   
   @dag(
   # every minute on the 30-second mark
   catchup=False,
   tags=[],
   schedule=None,
   start_date=datetime(2021, 1, 1),
   )
   def hello_dag():
   """
   ### TaskFlow API Tutorial Documentation
   This is a simple data pipeline example which demonstrates the use of
   the TaskFlow API using three simple tasks for Extract, Transform, and 
Load.
   Documentation that goes along with the Airflow TaskFlow API tutorial is
   located
   
[here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html)
   """
   
   @task()
   def hello():
   print("hello")
   time.sleep(3)
   print("goodbye")
   print("err mesg", file=sys.stderr)
   
   hello()
   
   
   hello_dag()
   
   ```
   
   
   using this UT:
   ```
   def test_startup_dag_with_no_templates_mixed_types(mocked_parse, 
test_dags_dir):
   """Test startup of a simple DAG."""
   what = StartupDetails(
   ti=TaskInstance(id=uuid7(), task_id="hello", dag_id="hello_dag", 
run_id="c", try_number=1),
   file=str(test_dags_dir / "mydag.py"),
   requests_fd=0,
   )
   ti  = parse(what)
   
   with mock.patch(
   "airflow.sdk.execution_time.task_runner.SUPERVISOR_COMMS", 
create=True
   ) as mock_supervisor_comms:
   mock_supervisor_comms.get_message.return_value = what
   startup()
   run(ti, log=mock.ANY)
   
   ```
   
   Result:
   ```
   Home of the user: /Users/amoghdesai
   Airflow home /Users/amoghdesai/airflow
   PASSED [100%][2024-12-12T15:16:57.104+0530] {dagbag.py:535} INFO - Filling 
up the DagBag from 
/Users/amoghdesai/Documents/OSS/airflow/task_sdk/tests/dags/mydag.py
   [2024-12-12T15:16:57.175+0530] {dagbag.py:535} INFO - Filling up the DagBag 
from /Users/amoghdesai/Documents/OSS/airflow/task_sdk/tests/dags/mydag.py
   2024-12-12 15:16:57 [debug] DAG file parsed[task] 
file=/Users/amoghdesai/Documents/OSS/airflow/task_sdk/tests/dags/mydag.py
   hello
   goodbye
   [2024-12-12T15:17:00.178+0530] {python.py:197} INFO - Done. Returned value 
was: None
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]

2024-12-12 Thread via GitHub


amoghrajesh commented on code in PR #44843:
URL: https://github.com/apache/airflow/pull/44843#discussion_r1881628969


##
tests/api_fastapi/execution_api/routes/test_task_instances.py:
##
@@ -422,16 +422,37 @@ def teardown_method(self):
 clear_db_runs()
 clear_rendered_ti_fields()
 
-def test_ti_put_rtif_success(self, client, session, create_task_instance):
+@pytest.mark.parametrize(
+"payload",
+[
+# string value
+{"field1": "string_value", "field2": "another_string"},
+# dictionary value
+{"field1": {"nested_key": "nested_value"}},
+# integer value
+{"field1": 100},
+# None value
+{"field1": None},
+# float value
+{"field1": 3.14159},
+# string lists value
+{"field1": ["123"], "field2": ["a", "b", "c"]},
+# list of JSON values
+{"field1": [1, "string", 3.14, True, None, {"nested": "dict"}]},
+# nested dictionary with mixed types in lists
+{
+"field1": {"nested_dict": {"key1": 123, "key2": "value"}},
+"field2": [3.14, {"sub_key": "sub_value"}, [1, 2]],

Review Comment:
   Tries to cover a lot of possible combinations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]

2024-12-12 Thread via GitHub


amoghrajesh commented on PR #44843:
URL: https://github.com/apache/airflow/pull/44843#issuecomment-2538120231

   @kaxil if we agree with my comment 
https://github.com/apache/airflow/pull/44843#discussion_r1881472643 here, then 
this payload:
   ```
   JsonAbleValueTypes = Union[
   dict[str, JsonValue],
   list[JsonValue],
   JsonValue,
   ]
   ```
   
   Will do the job really well. I just pushed a commit with those changes. 
Added some tests for variety of payloads too


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]

2024-12-11 Thread via GitHub


amoghrajesh commented on code in PR #44843:
URL: https://github.com/apache/airflow/pull/44843#discussion_r1881472643


##
task_sdk/src/airflow/sdk/execution_time/comms.py:
##
@@ -170,13 +170,17 @@ class PutVariable(BaseModel):
 type: Literal["PutVariable"] = "PutVariable"
 
 
+"""Defines the types that the RTIF payload's dictionary values can take. These 
are all JsonAble types """
+JsonAbleValueTypes = Union[str, dict[str, str], list[str], int, float, None]

Review Comment:
   Actually, my point here was that we shoudln't be sending tuple or set over 
the network as it isnt json serialisable. To state an example with the DAG:
   ```
   from __future__ import annotations
   
   import sys
   import time
   from datetime import datetime
   
   from airflow import DAG
   from airflow.decorators import dag, task
   from airflow.operators.bash import BashOperator
   
   
   @dag(
   # every minute on the 30-second mark
   catchup=False,
   tags=[],
   schedule=None,
   start_date=datetime(2021, 1, 1),
   )
   def hello_dag():
   """
   ### TaskFlow API Tutorial Documentation
   This is a simple data pipeline example which demonstrates the use of
   the TaskFlow API using three simple tasks for Extract, Transform, and 
Load.
   Documentation that goes along with the Airflow TaskFlow API tutorial is
   located
   
[here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html)
   """
   
   @task()
   def hello():
   print("hello")
   time.sleep(3)
   print("goodbye")
   print("err mesg", file=sys.stderr)
   
   hello()
   
   
   hello_dag()
   ```
   
   With the current state of using:
   ```
   """Defines the types that the RTIF payload's dictionary values can take. 
These are all JsonAble types """
   JsonAbleValueTypes = Union[str, dict[str, str], list[str], int, float, None]
   
   
   class SetRenderedFields(BaseModel):
   """Payload for setting RTIF for a task instance."""
   
   # We are using a BaseModel here compared to server using RootModel 
because we
   # have a discriminator running with "type", and RootModel doesn't 
support type
   
   rendered_fields: dict[str, JsonAbleValueTypes]
   type: Literal["SetRenderedFields"] = "SetRenderedFields"
   ```
   
   I can see that the DAG that was failing earlier doesn't because the 
`op_args` is converted to a list internally. 
   ![Uploading image.png…]()
   
   
   And this will allow it to send a valid message to the supervisor without 
having to convert types around. Ideally, we should only be able to send JSON 
serialisable fields over APIs. And the part where `set` or `tuple` will be 
converted to a list / str should be handled when we port templating to task SDK.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]

2024-12-11 Thread via GitHub


amoghrajesh commented on code in PR #44843:
URL: https://github.com/apache/airflow/pull/44843#discussion_r1881476525


##
task_sdk/src/airflow/sdk/execution_time/comms.py:
##
@@ -170,13 +170,17 @@ class PutVariable(BaseModel):
 type: Literal["PutVariable"] = "PutVariable"
 
 
+"""Defines the types that the RTIF payload's dictionary values can take. These 
are all JsonAble types """
+JsonAbleValueTypes = Union[str, dict[str, str], list[str], int, float, None]

Review Comment:
   > Don't think `dict[str, str]` or `list[str]` will cover all cases it could 
have `list[int]` or `list[dict]` for example
   
   Although I agree that this needs to be done. I am working on ti



##
task_sdk/src/airflow/sdk/execution_time/comms.py:
##
@@ -170,13 +170,17 @@ class PutVariable(BaseModel):
 type: Literal["PutVariable"] = "PutVariable"
 
 
+"""Defines the types that the RTIF payload's dictionary values can take. These 
are all JsonAble types """
+JsonAbleValueTypes = Union[str, dict[str, str], list[str], int, float, None]

Review Comment:
   > Don't think `dict[str, str]` or `list[str]` will cover all cases it could 
have `list[int]` or `list[dict]` for example
   
   Although I agree that this needs to be done. I am working on it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]

2024-12-11 Thread via GitHub


amoghrajesh commented on code in PR #44843:
URL: https://github.com/apache/airflow/pull/44843#discussion_r1881472643


##
task_sdk/src/airflow/sdk/execution_time/comms.py:
##
@@ -170,13 +170,17 @@ class PutVariable(BaseModel):
 type: Literal["PutVariable"] = "PutVariable"
 
 
+"""Defines the types that the RTIF payload's dictionary values can take. These 
are all JsonAble types """
+JsonAbleValueTypes = Union[str, dict[str, str], list[str], int, float, None]

Review Comment:
   Actually, my point here was that we shoudln't be sending tuple or set over 
the network as it isnt json serialisable. To state an example with the DAG:
   ```
   from __future__ import annotations
   
   import sys
   import time
   from datetime import datetime
   
   from airflow import DAG
   from airflow.decorators import dag, task
   from airflow.operators.bash import BashOperator
   
   
   @dag(
   # every minute on the 30-second mark
   catchup=False,
   tags=[],
   schedule=None,
   start_date=datetime(2021, 1, 1),
   )
   def hello_dag():
   """
   ### TaskFlow API Tutorial Documentation
   This is a simple data pipeline example which demonstrates the use of
   the TaskFlow API using three simple tasks for Extract, Transform, and 
Load.
   Documentation that goes along with the Airflow TaskFlow API tutorial is
   located
   
[here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html)
   """
   
   @task()
   def hello():
   print("hello")
   time.sleep(3)
   print("goodbye")
   print("err mesg", file=sys.stderr)
   
   hello()
   
   
   hello_dag()
   ```
   
   With the current state of using:
   ```
   """Defines the types that the RTIF payload's dictionary values can take. 
These are all JsonAble types """
   JsonAbleValueTypes = Union[str, dict[str, str], list[str], int, float, None]
   
   
   class SetRenderedFields(BaseModel):
   """Payload for setting RTIF for a task instance."""
   
   # We are using a BaseModel here compared to server using RootModel 
because we
   # have a discriminator running with "type", and RootModel doesn't 
support type
   
   rendered_fields: dict[str, JsonAbleValueTypes]
   type: Literal["SetRenderedFields"] = "SetRenderedFields"
   ```
   
   I can see that the DAG that was failing earlier doesn't because the 
`op_args` is converted to a list internally. 
   
![image](https://github.com/user-attachments/assets/c4de68db-2e7a-488f-aadc-6ed8dc0718d6)
   
   
   And this will allow it to send a valid message to the supervisor without 
having to convert types around. Ideally, we should only be able to send JSON 
serialisable fields over APIs. And the part where `set` or `tuple` will be 
converted to a list / str should be handled when we port templating to task SDK.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]

2024-12-11 Thread via GitHub


amoghrajesh commented on code in PR #44843:
URL: https://github.com/apache/airflow/pull/44843#discussion_r1881450046


##
task_sdk/src/airflow/sdk/execution_time/comms.py:
##
@@ -170,13 +170,17 @@ class PutVariable(BaseModel):
 type: Literal["PutVariable"] = "PutVariable"
 
 
+"""Defines the types that the RTIF payload's dictionary values can take. These 
are all JsonAble types """
+JsonAbleValueTypes = Union[str, dict[str, str], list[str], int, float, None]

Review Comment:
   @kaxil you are right. I tried digging in to this deeply and on debugging, I 
can see tuple will be present for cases like `op_args`.
   
![image](https://github.com/user-attachments/assets/3e2cb006-7309-48c8-bd30-b439dea242cd)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]

2024-12-11 Thread via GitHub


amoghrajesh commented on code in PR #44843:
URL: https://github.com/apache/airflow/pull/44843#discussion_r1881450046


##
task_sdk/src/airflow/sdk/execution_time/comms.py:
##
@@ -170,13 +170,17 @@ class PutVariable(BaseModel):
 type: Literal["PutVariable"] = "PutVariable"
 
 
+"""Defines the types that the RTIF payload's dictionary values can take. These 
are all JsonAble types """
+JsonAbleValueTypes = Union[str, dict[str, str], list[str], int, float, None]

Review Comment:
   @kaxil you are right. I tried digging in to this deeply and on debugging, I 
can see tuple will be present for cases like `op_args`.
   
![image](https://github.com/user-attachments/assets/3e2cb006-7309-48c8-bd30-b439dea242cd)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]

2024-12-11 Thread via GitHub


amoghrajesh commented on code in PR #44843:
URL: https://github.com/apache/airflow/pull/44843#discussion_r1881450046


##
task_sdk/src/airflow/sdk/execution_time/comms.py:
##
@@ -170,13 +170,17 @@ class PutVariable(BaseModel):
 type: Literal["PutVariable"] = "PutVariable"
 
 
+"""Defines the types that the RTIF payload's dictionary values can take. These 
are all JsonAble types """
+JsonAbleValueTypes = Union[str, dict[str, str], list[str], int, float, None]

Review Comment:
   @kaxil you are right. I tried digging in to this deeply and on debugging, I 
can see tuple will be present for cases like `op_args`. However, I see thji
   
![image](https://github.com/user-attachments/assets/3e2cb006-7309-48c8-bd30-b439dea242cd)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]

2024-12-11 Thread via GitHub


kaxil commented on code in PR #44843:
URL: https://github.com/apache/airflow/pull/44843#discussion_r1880397402


##
task_sdk/src/airflow/sdk/execution_time/comms.py:
##
@@ -170,13 +170,17 @@ class PutVariable(BaseModel):
 type: Literal["PutVariable"] = "PutVariable"
 
 
+"""Defines the types that the RTIF payload's dictionary values can take. These 
are all JsonAble types """
+JsonAbleValueTypes = Union[str, dict[str, str], list[str], int, float, None]

Review Comment:
   Wasn't that the error you are trying to address in the first place?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]

2024-12-11 Thread via GitHub


kaxil commented on code in PR #44843:
URL: https://github.com/apache/airflow/pull/44843#discussion_r1880397924


##
task_sdk/src/airflow/sdk/execution_time/comms.py:
##
@@ -170,13 +170,17 @@ class PutVariable(BaseModel):
 type: Literal["PutVariable"] = "PutVariable"
 
 
+"""Defines the types that the RTIF payload's dictionary values can take. These 
are all JsonAble types """
+JsonAbleValueTypes = Union[str, dict[str, str], list[str], int, float, None]

Review Comment:
   ```
   
{"timestamp":"2024-12-11T10:54:52.942049","logger":"task","error_detail":[{"exc_type":"ValidationError","exc_value":"2
 validation errors for SetRenderedFields\nrendered_fields.op_args\n  Input 
should be a valid string [type=string_type, input_value=(), input_type=tuple]\n 
   For further information visit 
https://errors.pydantic.dev/2.10/v/string_type\nrendered_fields.op_kwargs\n  
Input should be a valid string [type=string_type, input_value={}, 
input_type=dict]\nFor further information visit 
https://errors.pydantic.dev/2.10/v/string_type","syntax_error":null,"is_cause":false,"frames":[{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/execution_time/task_runner.py","lineno":234,"name":"main"},{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/execution_time/task_runner.py","lineno":165,"name":"startup"},{"filename":"/usr/local/lib/python3.9/site-packages/pydantic/main.py","lineno":214,"name":"__init__"}]}],"event":"Top
 level error","level":"error"}
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]

2024-12-11 Thread via GitHub


amoghrajesh commented on code in PR #44843:
URL: https://github.com/apache/airflow/pull/44843#discussion_r1880389663


##
task_sdk/src/airflow/sdk/execution_time/comms.py:
##
@@ -170,13 +170,17 @@ class PutVariable(BaseModel):
 type: Literal["PutVariable"] = "PutVariable"
 
 
+"""Defines the types that the RTIF payload's dictionary values can take. These 
are all JsonAble types """
+JsonAbleValueTypes = Union[str, dict[str, str], list[str], int, float, None]

Review Comment:
   We shouldn't be adding tuple and set because those will be taken care by our 
internal serialisation helper: 
https://github.com/apache/airflow/blob/main/airflow/serialization/helpers.py#L28.
 This will not ever return a set or tuple. When we introduce templating in TASK 
sdk, this should be handled there imo



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]

2024-12-11 Thread via GitHub


kaxil commented on code in PR #44843:
URL: https://github.com/apache/airflow/pull/44843#discussion_r1880388815


##
task_sdk/src/airflow/sdk/execution_time/comms.py:
##
@@ -170,13 +170,17 @@ class PutVariable(BaseModel):
 type: Literal["PutVariable"] = "PutVariable"
 
 
+"""Defines the types that the RTIF payload's dictionary values can take. These 
are all JsonAble types """
+JsonAbleValueTypes = Union[str, dict[str, str], list[str], int, float, None]

Review Comment:
   I am wondering if you could do:
   
   ```py
   from pydantic.types import JsonValue
   
   JsonAbleValueTypes: TypeAlias = Union[
   List["JsonAbleValueTypes"],
   Dict[str, "JsonAbleValueTypes"],
   Set["JsonAbleValueTypes"],
   Tuple["JsonAbleValueTypes"],
   JsonValue,
   ]
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]

2024-12-11 Thread via GitHub


kaxil commented on code in PR #44843:
URL: https://github.com/apache/airflow/pull/44843#discussion_r1880382857


##
task_sdk/src/airflow/sdk/execution_time/comms.py:
##
@@ -170,13 +170,17 @@ class PutVariable(BaseModel):
 type: Literal["PutVariable"] = "PutVariable"
 
 
+"""Defines the types that the RTIF payload's dictionary values can take. These 
are all JsonAble types """
+JsonAbleValueTypes = Union[str, dict[str, str], list[str], int, float, None]

Review Comment:
   need to add `set` and `tuple` too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]

2024-12-11 Thread via GitHub


kaxil commented on code in PR #44843:
URL: https://github.com/apache/airflow/pull/44843#discussion_r1880382482


##
task_sdk/src/airflow/sdk/execution_time/comms.py:
##
@@ -170,13 +170,17 @@ class PutVariable(BaseModel):
 type: Literal["PutVariable"] = "PutVariable"
 
 
+"""Defines the types that the RTIF payload's dictionary values can take. These 
are all JsonAble types """
+JsonAbleValueTypes = Union[str, dict[str, str], list[str], int, float, None]

Review Comment:
   Check https://docs.pydantic.dev/latest/api/types/#pydantic.types.JsonValue:
   
   ```py
   JsonValue: TypeAlias = Union[
   List["JsonValue"],
   Dict[str, "JsonValue"],
   str,
   bool,
   int,
   float,
   None,
   ]
   ```
   
   So we should have similar:
   
   
   ```py
   JsonAbleValueTypes: TypeAlias = Union[
   List["JsonAbleValueTypes"],
   Dict[str, "JsonAbleValueTypes"],
   Set["JsonAbleValueTypes"],
   Tuple["JsonAbleValueTypes"]
   str,
   bool,
   int,
   float,
   None,
   ]
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]

2024-12-11 Thread via GitHub


amoghrajesh commented on code in PR #44843:
URL: https://github.com/apache/airflow/pull/44843#discussion_r1880381503


##
task_sdk/src/airflow/sdk/execution_time/comms.py:
##
@@ -170,13 +170,17 @@ class PutVariable(BaseModel):
 type: Literal["PutVariable"] = "PutVariable"
 
 
+"""Defines the types that the RTIF payload's dictionary values can take. These 
are all JsonAble types """
+JsonAbleValueTypes = Union[str, dict[str, str], list[str], int, float, None]

Review Comment:
   Making it list[Any] or dict[str, Any] would probably be too liberal here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]

2024-12-11 Thread via GitHub


kaxil commented on code in PR #44843:
URL: https://github.com/apache/airflow/pull/44843#discussion_r1880365109


##
task_sdk/src/airflow/sdk/execution_time/comms.py:
##
@@ -170,13 +170,17 @@ class PutVariable(BaseModel):
 type: Literal["PutVariable"] = "PutVariable"
 
 
+"""Defines the types that the RTIF payload's dictionary values can take. These 
are all JsonAble types """
+JsonAbleValueTypes = Union[str, dict[str, str], list[str], int, float, None]

Review Comment:
   Don't think `dict[str, str]` or `list[str]` will cover all cases it could 
have `list[int]` or `list[dict]` for example



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org