Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]
kaxil commented on code in PR #44843: URL: https://github.com/apache/airflow/pull/44843#discussion_r1884280990 ## tests/models/test_renderedtifields.py: ## @@ -158,16 +163,31 @@ def test_get_templated_fields(self, templated_field, expected_rendered_field, da assert ti.dag_id == rtif.dag_id assert ti.task_id == rtif.task_id assert ti.run_id == rtif.run_id -assert expected_rendered_field == rtif.rendered_fields.get("bash_command") +if type(templated_field) is set: +assert ast.literal_eval(expected_rendered_field) == ast.literal_eval( +rtif.rendered_fields.get("bash_command") +) +else: +assert expected_rendered_field == rtif.rendered_fields.get("bash_command") Review Comment: Follow-up: https://github.com/apache/airflow/pull/44920 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]
kaxil commented on PR #44843: URL: https://github.com/apache/airflow/pull/44843#issuecomment-2541943313 @amoghrajesh Created https://github.com/apache/airflow/pull/44920 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]
kaxil commented on PR #44843: URL: https://github.com/apache/airflow/pull/44843#issuecomment-2541929935 Merged this PR as-is so we don't keep main broken for this case. But I'll have PR in just a bit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]
kaxil merged PR #44843: URL: https://github.com/apache/airflow/pull/44843 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]
amoghrajesh commented on code in PR #44843: URL: https://github.com/apache/airflow/pull/44843#discussion_r1884266901 ## tests/models/test_renderedtifields.py: ## @@ -158,16 +163,35 @@ def test_get_templated_fields(self, templated_field, expected_rendered_field, da assert ti.dag_id == rtif.dag_id assert ti.task_id == rtif.task_id assert ti.run_id == rtif.run_id -assert expected_rendered_field == rtif.rendered_fields.get("bash_command") +if type(templated_field) is set: +# the output order of a set is non-deterministic and can change per process. +# this validation can fail if that happens before stringification, so we convert to set and compare. +assert ast.literal_eval(expected_rendered_field) == ast.literal_eval( +rtif.rendered_fields.get("bash_command") +) +else: +assert expected_rendered_field == rtif.rendered_fields.get("bash_command") session.add(rtif) session.flush() -assert RTIF.get_templated_fields(ti=ti, session=session) == { -"bash_command": expected_rendered_field, -"env": None, -"cwd": None, -} +if type(templated_field) is set: Review Comment: Nice! Thanks. Let me commit that -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]
kaxil commented on code in PR #44843: URL: https://github.com/apache/airflow/pull/44843#discussion_r1884266838 ## tests/models/test_renderedtifields.py: ## @@ -158,16 +163,35 @@ def test_get_templated_fields(self, templated_field, expected_rendered_field, da assert ti.dag_id == rtif.dag_id assert ti.task_id == rtif.task_id assert ti.run_id == rtif.run_id -assert expected_rendered_field == rtif.rendered_fields.get("bash_command") +if type(templated_field) is set: +# the output order of a set is non-deterministic and can change per process. +# this validation can fail if that happens before stringification, so we convert to set and compare. +assert ast.literal_eval(expected_rendered_field) == ast.literal_eval( +rtif.rendered_fields.get("bash_command") +) +else: +assert expected_rendered_field == rtif.rendered_fields.get("bash_command") session.add(rtif) session.flush() -assert RTIF.get_templated_fields(ti=ti, session=session) == { -"bash_command": expected_rendered_field, -"env": None, -"cwd": None, -} +if type(templated_field) is set: Review Comment: Actually I have a better idea to not have to deal with this problem, let me push a change -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]
kaxil commented on code in PR #44843: URL: https://github.com/apache/airflow/pull/44843#discussion_r1884264997 ## tests/models/test_renderedtifields.py: ## @@ -158,16 +163,35 @@ def test_get_templated_fields(self, templated_field, expected_rendered_field, da assert ti.dag_id == rtif.dag_id assert ti.task_id == rtif.task_id assert ti.run_id == rtif.run_id -assert expected_rendered_field == rtif.rendered_fields.get("bash_command") +if type(templated_field) is set: +# the output order of a set is non-deterministic and can change per process. +# this validation can fail if that happens before stringification, so we convert to set and compare. +assert ast.literal_eval(expected_rendered_field) == ast.literal_eval( +rtif.rendered_fields.get("bash_command") +) +else: +assert expected_rendered_field == rtif.rendered_fields.get("bash_command") session.add(rtif) session.flush() -assert RTIF.get_templated_fields(ti=ti, session=session) == { -"bash_command": expected_rendered_field, -"env": None, -"cwd": None, -} +if type(templated_field) is set: Review Comment: This is in tests we control the parameters so not a big deal here anyway and more a "nit" -- but to illustrate the reason: ```py In [13]: class CustomSet(set): ...: def add(self, element): ...: print(f"Adding element: {element}") ...: super().add(element) ...: In [15]: a = CustomSet([1,2]) In [16]: isinstance(a, set) Out[16]: True In [17]: type(a) is set Out[17]: False ``` ## tests/models/test_renderedtifields.py: ## @@ -158,16 +163,35 @@ def test_get_templated_fields(self, templated_field, expected_rendered_field, da assert ti.dag_id == rtif.dag_id assert ti.task_id == rtif.task_id assert ti.run_id == rtif.run_id -assert expected_rendered_field == rtif.rendered_fields.get("bash_command") +if type(templated_field) is set: +# the output order of a set is non-deterministic and can change per process. +# this validation can fail if that happens before stringification, so we convert to set and compare. +assert ast.literal_eval(expected_rendered_field) == ast.literal_eval( +rtif.rendered_fields.get("bash_command") +) +else: +assert expected_rendered_field == rtif.rendered_fields.get("bash_command") session.add(rtif) session.flush() -assert RTIF.get_templated_fields(ti=ti, session=session) == { -"bash_command": expected_rendered_field, -"env": None, -"cwd": None, -} +if type(templated_field) is set: Review Comment: This is in tests so we control the parameters so not a big deal here anyway and more a "nit" -- but to illustrate the reason: ```py In [13]: class CustomSet(set): ...: def add(self, element): ...: print(f"Adding element: {element}") ...: super().add(element) ...: In [15]: a = CustomSet([1,2]) In [16]: isinstance(a, set) Out[16]: True In [17]: type(a) is set Out[17]: False ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]
kaxil commented on code in PR #44843: URL: https://github.com/apache/airflow/pull/44843#discussion_r1884255743 ## tests/models/test_renderedtifields.py: ## @@ -158,16 +163,35 @@ def test_get_templated_fields(self, templated_field, expected_rendered_field, da assert ti.dag_id == rtif.dag_id assert ti.task_id == rtif.task_id assert ti.run_id == rtif.run_id -assert expected_rendered_field == rtif.rendered_fields.get("bash_command") +if type(templated_field) is set: +# the output order of a set is non-deterministic and can change per process. +# this validation can fail if that happens before stringification, so we convert to set and compare. +assert ast.literal_eval(expected_rendered_field) == ast.literal_eval( +rtif.rendered_fields.get("bash_command") +) +else: +assert expected_rendered_field == rtif.rendered_fields.get("bash_command") session.add(rtif) session.flush() -assert RTIF.get_templated_fields(ti=ti, session=session) == { -"bash_command": expected_rendered_field, -"env": None, -"cwd": None, -} +if type(templated_field) is set: Review Comment: ```suggestion if isinstance(templated_field, set): ``` Better from OOPs perspective -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]
kaxil commented on code in PR #44843: URL: https://github.com/apache/airflow/pull/44843#discussion_r1884252800 ## tests/models/test_renderedtifields.py: ## @@ -158,16 +163,31 @@ def test_get_templated_fields(self, templated_field, expected_rendered_field, da assert ti.dag_id == rtif.dag_id assert ti.task_id == rtif.task_id assert ti.run_id == rtif.run_id -assert expected_rendered_field == rtif.rendered_fields.get("bash_command") +if type(templated_field) is set: +assert ast.literal_eval(expected_rendered_field) == ast.literal_eval( +rtif.rendered_fields.get("bash_command") +) +else: +assert expected_rendered_field == rtif.rendered_fields.get("bash_command") Review Comment: damn -- yeah the ordering issue with set! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]
amoghrajesh commented on code in PR #44843: URL: https://github.com/apache/airflow/pull/44843#discussion_r1884094412 ## tests/models/test_renderedtifields.py: ## @@ -158,16 +163,31 @@ def test_get_templated_fields(self, templated_field, expected_rendered_field, da assert ti.dag_id == rtif.dag_id assert ti.task_id == rtif.task_id assert ti.run_id == rtif.run_id -assert expected_rendered_field == rtif.rendered_fields.get("bash_command") +if type(templated_field) is set: +assert ast.literal_eval(expected_rendered_field) == ast.literal_eval( +rtif.rendered_fields.get("bash_command") +) +else: +assert expected_rendered_field == rtif.rendered_fields.get("bash_command") Review Comment: The reason for doing this is because sometimes, the rendered fields in case of sets are stringified in a different set order. Leading to comparison failure like this: ``` _ TestRenderedTaskInstanceFields.test_get_templated_fields[templated_field8-{'foo', 'bar'}] _ tests/models/test_renderedtifields.py:165: in test_get_templated_fields assert expected_rendered_field == rtif.rendered_fields.get("bash_command") E assert equals failed E "{'foo', 'bar'}" "{'bar', 'foo'}" ``` Example run https://github.com/apache/airflow/actions/runs/12313918735/job/34369300336 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]
kaxil commented on code in PR #44843: URL: https://github.com/apache/airflow/pull/44843#discussion_r1884021823 ## tests/models/test_renderedtifields.py: ## @@ -158,16 +163,31 @@ def test_get_templated_fields(self, templated_field, expected_rendered_field, da assert ti.dag_id == rtif.dag_id assert ti.task_id == rtif.task_id assert ti.run_id == rtif.run_id -assert expected_rendered_field == rtif.rendered_fields.get("bash_command") +if type(templated_field) is set: +assert ast.literal_eval(expected_rendered_field) == ast.literal_eval( +rtif.rendered_fields.get("bash_command") +) +else: +assert expected_rendered_field == rtif.rendered_fields.get("bash_command") Review Comment: Why this @amoghrajesh ? ## tests/models/test_renderedtifields.py: ## @@ -158,16 +163,31 @@ def test_get_templated_fields(self, templated_field, expected_rendered_field, da assert ti.dag_id == rtif.dag_id assert ti.task_id == rtif.task_id assert ti.run_id == rtif.run_id -assert expected_rendered_field == rtif.rendered_fields.get("bash_command") +if type(templated_field) is set: +assert ast.literal_eval(expected_rendered_field) == ast.literal_eval( +rtif.rendered_fields.get("bash_command") +) +else: +assert expected_rendered_field == rtif.rendered_fields.get("bash_command") session.add(rtif) session.flush() -assert RTIF.get_templated_fields(ti=ti, session=session) == { -"bash_command": expected_rendered_field, -"env": None, -"cwd": None, -} +if type(templated_field) is set: +expected = RTIF.get_templated_fields(ti=ti, session=session) +expected["bash_command"] = ast.literal_eval(expected["bash_command"]) +actual = { +"bash_command": ast.literal_eval(expected_rendered_field), +"env": None, +"cwd": None, +} +assert expected == actual +else: +assert RTIF.get_templated_fields(ti=ti, session=session) == { +"bash_command": expected_rendered_field, +"env": None, +"cwd": None, +} Review Comment: and this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]
amoghrajesh commented on PR #44843: URL: https://github.com/apache/airflow/pull/44843#issuecomment-2541086170 War to fix the CI starts! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]
kaxil commented on code in PR #44843: URL: https://github.com/apache/airflow/pull/44843#discussion_r1883635008 ## task_sdk/tests/execution_time/test_task_runner.py: ## @@ -260,3 +273,98 @@ def test_startup_basic_templated_dag(mocked_parse): ), log=mock.ANY, ) + + +def test_startup_dag_with_no_templates(mocked_parse): +"""Test startup of a simple DAG.""" +from airflow.providers.standard.operators.python import PythonOperator + +task = PythonOperator( +task_id="task1", +python_callable=lambda: print("hello world!"), +) + +what = StartupDetails( +ti=TaskInstance(id=uuid7(), task_id="task1", dag_id="basic_dag", run_id="c", try_number=1), +file="", +requests_fd=0, +) +mocked_parse(what, "basic_dag", task) + +with mock.patch( +"airflow.sdk.execution_time.task_runner.SUPERVISOR_COMMS", create=True +) as mock_supervisor_comms: +mock_supervisor_comms.get_message.return_value = what +startup() + +mock_supervisor_comms.send_request.assert_called_once_with( +msg=SetRenderedFields(rendered_fields={"op_args": [], "op_kwargs": {}, "templates_dict": None}), +log=mock.ANY, +) + + +def test_startup_dag_with_no_templates_mixed_types(mocked_parse): +"""Test startup of a simple DAG.""" +from airflow.providers.standard.operators.python import PythonOperator + +task = PythonOperator( +task_id="task1", +python_callable=lambda *args, **kwargs: print(f"op_args: {args}, op_kwargs: {kwargs}"), +op_args=["arg1", "arg2", 1, 2, 3.75, {"key": "value"}], +op_kwargs={"key1": "value1", "key2": 99.0, "key3": {"nested_key": "nested_value"}}, +) + +what = StartupDetails( +ti=TaskInstance(id=uuid7(), task_id="task1", dag_id="basic_dag", run_id="c", try_number=1), +file="", +requests_fd=0, +) +mocked_parse(what, "basic_dag", task) + +with mock.patch( +"airflow.sdk.execution_time.task_runner.SUPERVISOR_COMMS", create=True +) as mock_supervisor_comms: +mock_supervisor_comms.get_message.return_value = what +startup() + +mock_supervisor_comms.send_request.assert_called_once_with( +msg=SetRenderedFields( +rendered_fields={ +"templates_dict": None, +"op_args": ["arg1", "arg2", 1, 2, 3.75, {"key": "value"}], +"op_kwargs": {"key1": "value1", "key2": 99.0, "key3": {"nested_key": "nested_value"}}, +} +), +log=mock.ANY, +) + + +def test_startup_dag_with_tuple_and_set_templated_fields(mocked_parse): +"""Test startup of a simple DAG with tuple and set templated fields.""" + +task = CustomOperator2( +task_id="templated_task", +my_tup=( +1, +2, +), +my_set={1, 2, 3}, +) + +what = StartupDetails( +ti=TaskInstance(id=uuid7(), task_id="templated_task", dag_id="basic_dag", run_id="c", try_number=1), +file="", +requests_fd=0, +) +mocked_parse(what, "basic_dag", task) + +with mock.patch( +"airflow.sdk.execution_time.task_runner.SUPERVISOR_COMMS", create=True +) as mock_supervisor_comms: +mock_supervisor_comms.get_message.return_value = what +startup() + +mock_supervisor_comms.send_request.assert_called_once_with( +msg=SetRenderedFields(rendered_fields={"my_set": "{1, 2, 3}", "my_tup": "(1, 2)"}), +log=mock.ANY, +) Review Comment: I have simplified the tests in https://github.com/apache/airflow/pull/44843/commits/2381c10026f528a1f4ce5c2260eaa9a874c2fbd2 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]
kaxil commented on code in PR #44843: URL: https://github.com/apache/airflow/pull/44843#discussion_r1883581737 ## airflow/serialization/helpers.py: ## @@ -65,4 +69,4 @@ def is_jsonable(x): "Truncated. You can change this behaviour in [core]max_templated_field_length. " f"{rendered[:max_length - 79]!r}... " ) -return template_field +return serialized Review Comment: This would have converted everything to string :) https://github.com/apache/airflow/blob/0f0bed0ae00b7c07fed147f1d42760bf654336cb/airflow/serialization/helpers.py#L70 ## airflow/serialization/helpers.py: ## @@ -65,4 +69,4 @@ def is_jsonable(x): "Truncated. You can change this behaviour in [core]max_templated_field_length. " f"{rendered[:max_length - 79]!r}... " ) -return template_field +return serialized Review Comment: This would have converted everything to string -- even list, int etc :) https://github.com/apache/airflow/blob/0f0bed0ae00b7c07fed147f1d42760bf654336cb/airflow/serialization/helpers.py#L70 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]
kaxil commented on code in PR #44843: URL: https://github.com/apache/airflow/pull/44843#discussion_r1883581737 ## airflow/serialization/helpers.py: ## @@ -65,4 +69,4 @@ def is_jsonable(x): "Truncated. You can change this behaviour in [core]max_templated_field_length. " f"{rendered[:max_length - 79]!r}... " ) -return template_field +return serialized Review Comment: This would have converted everything to string -- even list, int etc would have change to str :) https://github.com/apache/airflow/blob/0f0bed0ae00b7c07fed147f1d42760bf654336cb/airflow/serialization/helpers.py#L70 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]
kaxil commented on code in PR #44843: URL: https://github.com/apache/airflow/pull/44843#discussion_r1883581737 ## airflow/serialization/helpers.py: ## @@ -65,4 +69,4 @@ def is_jsonable(x): "Truncated. You can change this behaviour in [core]max_templated_field_length. " f"{rendered[:max_length - 79]!r}... " ) -return template_field +return serialized Review Comment: This would have converted everything to string :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]
amoghrajesh commented on code in PR #44843: URL: https://github.com/apache/airflow/pull/44843#discussion_r1883546427 ## airflow/serialization/helpers.py: ## @@ -65,4 +69,4 @@ def is_jsonable(x): "Truncated. You can change this behaviour in [core]max_templated_field_length. " f"{rendered[:max_length - 79]!r}... " ) -return template_field +return serialized Review Comment: Not making such a change in this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]
amoghrajesh commented on code in PR #44843: URL: https://github.com/apache/airflow/pull/44843#discussion_r1883545915 ## task_sdk/tests/execution_time/test_task_runner.py: ## @@ -260,3 +273,98 @@ def test_startup_basic_templated_dag(mocked_parse): ), log=mock.ANY, ) + + +def test_startup_dag_with_no_templates(mocked_parse): +"""Test startup of a simple DAG.""" +from airflow.providers.standard.operators.python import PythonOperator + +task = PythonOperator( +task_id="task1", +python_callable=lambda: print("hello world!"), +) + +what = StartupDetails( +ti=TaskInstance(id=uuid7(), task_id="task1", dag_id="basic_dag", run_id="c", try_number=1), +file="", +requests_fd=0, +) +mocked_parse(what, "basic_dag", task) + +with mock.patch( +"airflow.sdk.execution_time.task_runner.SUPERVISOR_COMMS", create=True +) as mock_supervisor_comms: +mock_supervisor_comms.get_message.return_value = what +startup() + +mock_supervisor_comms.send_request.assert_called_once_with( +msg=SetRenderedFields(rendered_fields={"op_args": [], "op_kwargs": {}, "templates_dict": None}), +log=mock.ANY, +) + + +def test_startup_dag_with_no_templates_mixed_types(mocked_parse): +"""Test startup of a simple DAG.""" +from airflow.providers.standard.operators.python import PythonOperator + +task = PythonOperator( +task_id="task1", +python_callable=lambda *args, **kwargs: print(f"op_args: {args}, op_kwargs: {kwargs}"), +op_args=["arg1", "arg2", 1, 2, 3.75, {"key": "value"}], +op_kwargs={"key1": "value1", "key2": 99.0, "key3": {"nested_key": "nested_value"}}, +) + +what = StartupDetails( +ti=TaskInstance(id=uuid7(), task_id="task1", dag_id="basic_dag", run_id="c", try_number=1), +file="", +requests_fd=0, +) +mocked_parse(what, "basic_dag", task) + +with mock.patch( +"airflow.sdk.execution_time.task_runner.SUPERVISOR_COMMS", create=True +) as mock_supervisor_comms: +mock_supervisor_comms.get_message.return_value = what +startup() + +mock_supervisor_comms.send_request.assert_called_once_with( +msg=SetRenderedFields( +rendered_fields={ +"templates_dict": None, +"op_args": ["arg1", "arg2", 1, 2, 3.75, {"key": "value"}], +"op_kwargs": {"key1": "value1", "key2": 99.0, "key3": {"nested_key": "nested_value"}}, +} +), +log=mock.ANY, +) + + +def test_startup_dag_with_tuple_and_set_templated_fields(mocked_parse): +"""Test startup of a simple DAG with tuple and set templated fields.""" + +task = CustomOperator2( +task_id="templated_task", +my_tup=( +1, +2, +), +my_set={1, 2, 3}, +) + +what = StartupDetails( +ti=TaskInstance(id=uuid7(), task_id="templated_task", dag_id="basic_dag", run_id="c", try_number=1), +file="", +requests_fd=0, +) +mocked_parse(what, "basic_dag", task) + +with mock.patch( +"airflow.sdk.execution_time.task_runner.SUPERVISOR_COMMS", create=True +) as mock_supervisor_comms: +mock_supervisor_comms.get_message.return_value = what +startup() + +mock_supervisor_comms.send_request.assert_called_once_with( +msg=SetRenderedFields(rendered_fields={"my_set": "{1, 2, 3}", "my_tup": "(1, 2)"}), +log=mock.ANY, +) Review Comment: Lets not make this change in this PR. ## task_sdk/tests/execution_time/test_task_runner.py: ## @@ -260,3 +273,98 @@ def test_startup_basic_templated_dag(mocked_parse): ), log=mock.ANY, ) + + +def test_startup_dag_with_no_templates(mocked_parse): +"""Test startup of a simple DAG.""" +from airflow.providers.standard.operators.python import PythonOperator + +task = PythonOperator( +task_id="task1", +python_callable=lambda: print("hello world!"), +) + +what = StartupDetails( +ti=TaskInstance(id=uuid7(), task_id="task1", dag_id="basic_dag", run_id="c", try_number=1), +file="", +requests_fd=0, +) +mocked_parse(what, "basic_dag", task) + +with mock.patch( +"airflow.sdk.execution_time.task_runner.SUPERVISOR_COMMS", create=True +) as mock_supervisor_comms: +mock_supervisor_comms.get_message.return_value = what +startup() + +mock_supervisor_comms.send_request.assert_called_once_with( +msg=SetRenderedFields(rendered_fields={"op_args": [], "op_kwargs": {}, "templates_dict": None}), +log=mock.ANY, +) + + +def test_startup_dag_with_no_templates_mixed_types(mocked_parse): +"""Test startup of a simple DAG.""" +from airf
Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]
kaxil commented on code in PR #44843: URL: https://github.com/apache/airflow/pull/44843#discussion_r1883540029 ## airflow/serialization/helpers.py: ## @@ -43,6 +43,10 @@ def is_jsonable(x): max_length = conf.getint("core", "max_templated_field_length") +# Handle empty set or tuple explicitly +if isinstance(template_field, (set, tuple)) and not template_field: +return [] Review Comment: This makes the return type of this function correct as previously it did return an empty tuple -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]
kaxil commented on code in PR #44843: URL: https://github.com/apache/airflow/pull/44843#discussion_r1883536589 ## airflow/serialization/helpers.py: ## @@ -65,4 +69,4 @@ def is_jsonable(x): "Truncated. You can change this behaviour in [core]max_templated_field_length. " f"{rendered[:max_length - 79]!r}... " ) -return template_field +return serialized Review Comment: What's the bug that is fixed by this change? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]
amoghrajesh commented on code in PR #44843: URL: https://github.com/apache/airflow/pull/44843#discussion_r1883412215 ## task_sdk/tests/execution_time/test_task_runner.py: ## @@ -260,3 +273,98 @@ def test_startup_basic_templated_dag(mocked_parse): ), log=mock.ANY, ) + + +def test_startup_dag_with_no_templates(mocked_parse): +"""Test startup of a simple DAG.""" +from airflow.providers.standard.operators.python import PythonOperator + +task = PythonOperator( +task_id="task1", +python_callable=lambda: print("hello world!"), +) + +what = StartupDetails( +ti=TaskInstance(id=uuid7(), task_id="task1", dag_id="basic_dag", run_id="c", try_number=1), +file="", +requests_fd=0, +) +mocked_parse(what, "basic_dag", task) + +with mock.patch( +"airflow.sdk.execution_time.task_runner.SUPERVISOR_COMMS", create=True +) as mock_supervisor_comms: +mock_supervisor_comms.get_message.return_value = what +startup() + +mock_supervisor_comms.send_request.assert_called_once_with( +msg=SetRenderedFields(rendered_fields={"op_args": [], "op_kwargs": {}, "templates_dict": None}), +log=mock.ANY, +) + + +def test_startup_dag_with_no_templates_mixed_types(mocked_parse): +"""Test startup of a simple DAG.""" +from airflow.providers.standard.operators.python import PythonOperator + +task = PythonOperator( +task_id="task1", +python_callable=lambda *args, **kwargs: print(f"op_args: {args}, op_kwargs: {kwargs}"), +op_args=["arg1", "arg2", 1, 2, 3.75, {"key": "value"}], +op_kwargs={"key1": "value1", "key2": 99.0, "key3": {"nested_key": "nested_value"}}, +) + +what = StartupDetails( +ti=TaskInstance(id=uuid7(), task_id="task1", dag_id="basic_dag", run_id="c", try_number=1), +file="", +requests_fd=0, +) +mocked_parse(what, "basic_dag", task) + +with mock.patch( +"airflow.sdk.execution_time.task_runner.SUPERVISOR_COMMS", create=True +) as mock_supervisor_comms: +mock_supervisor_comms.get_message.return_value = what +startup() + +mock_supervisor_comms.send_request.assert_called_once_with( +msg=SetRenderedFields( +rendered_fields={ +"templates_dict": None, +"op_args": ["arg1", "arg2", 1, 2, 3.75, {"key": "value"}], +"op_kwargs": {"key1": "value1", "key2": 99.0, "key3": {"nested_key": "nested_value"}}, +} +), +log=mock.ANY, +) + + +def test_startup_dag_with_tuple_and_set_templated_fields(mocked_parse): +"""Test startup of a simple DAG with tuple and set templated fields.""" + +task = CustomOperator2( +task_id="templated_task", +my_tup=( +1, +2, +), +my_set={1, 2, 3}, +) + +what = StartupDetails( +ti=TaskInstance(id=uuid7(), task_id="templated_task", dag_id="basic_dag", run_id="c", try_number=1), +file="", +requests_fd=0, +) +mocked_parse(what, "basic_dag", task) + +with mock.patch( +"airflow.sdk.execution_time.task_runner.SUPERVISOR_COMMS", create=True +) as mock_supervisor_comms: +mock_supervisor_comms.get_message.return_value = what +startup() + +mock_supervisor_comms.send_request.assert_called_once_with( +msg=SetRenderedFields(rendered_fields={"my_set": "{1, 2, 3}", "my_tup": "(1, 2)"}), +log=mock.ANY, +) Review Comment: Added this test that validates tuples and sets thoroughly WHile doing this I realised that there was a bug in our serialiser which wasnt able to handle tuples. Fixed it here: https://github.com/apache/airflow/pull/44843/files#diff-6a66bbf06ff636982d3b650e1b35bcea75971d211691143abef6eb8349416b98R72 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]
amoghrajesh commented on code in PR #44843: URL: https://github.com/apache/airflow/pull/44843#discussion_r1883375971 ## airflow/serialization/helpers.py: ## @@ -43,6 +43,10 @@ def is_jsonable(x): max_length = conf.getint("core", "max_templated_field_length") +# Handle empty set or tuple explicitly +if isinstance(template_field, (set, tuple)) and not template_field: +return [] Review Comment: Yeah i was thinking of something similar. This looks fine for now ## airflow/api_fastapi/execution_api/routes/task_instances.py: ## @@ -237,7 +237,7 @@ def ti_heartbeat( ) def ti_put_rtif( task_instance_id: UUID, -put_rtif_payload: RTIFPayload, Review Comment: ACK. Sounds good ## task_sdk/src/airflow/sdk/execution_time/task_runner.py: ## @@ -157,22 +157,26 @@ def startup() -> tuple[RuntimeTaskInstance, Logger]: # 1. Implementing the part where we pull in the logic to render fields and add that here # for all operators, we should do setattr(task, templated_field, rendered_templated_field) # task.templated_fields should give all the templated_fields and each of those fields should -# give the rendered values. +# give the rendered values. task.templated_fields should already be in a JSONable format and +# we should not have to handle that here. # 2. Once rendered, we call the `set_rtif` API to store the rtif in the metadata DB -templated_fields = ti.task.template_fields -payload = {} - -for field in templated_fields: -if field not in payload: -payload[field] = getattr(ti.task, field) # so that we do not call the API unnecessarily -if payload: -SUPERVISOR_COMMS.send_request(log=log, msg=SetRenderedFields(rendered_fields=payload)) +if rendered_fields := _get_rendered_fields(ti.task): +SUPERVISOR_COMMS.send_request(log=log, msg=SetRenderedFields(rendered_fields=rendered_fields)) return ti, log +def _get_rendered_fields(task: BaseOperator) -> dict[str, JsonValue]: +# TODO: Port one of the following to Task SDK +# airflow.serialization.helpers.serialize_template_field or +# airflow.models.renderedtifields.get_serialized_template_fields +from airflow.serialization.helpers import serialize_template_field + +return {field: serialize_template_field(getattr(task, field), field) for field in task.template_fields} Review Comment: Right, instead of duplication at the moment, we can reuse it and when we port the logic for templating, we anyways are going to revisit this. Better to handle it then. We will not miss it too cos we have a TODO -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]
ashb commented on code in PR #44843: URL: https://github.com/apache/airflow/pull/44843#discussion_r1882865152 ## task_sdk/src/airflow/sdk/execution_time/task_runner.py: ## @@ -157,22 +157,26 @@ def startup() -> tuple[RuntimeTaskInstance, Logger]: # 1. Implementing the part where we pull in the logic to render fields and add that here # for all operators, we should do setattr(task, templated_field, rendered_templated_field) # task.templated_fields should give all the templated_fields and each of those fields should -# give the rendered values. +# give the rendered values. task.templated_fields should already be in a JSONable format and +# we should not have to handle that here. # 2. Once rendered, we call the `set_rtif` API to store the rtif in the metadata DB -templated_fields = ti.task.template_fields -payload = {} - -for field in templated_fields: -if field not in payload: -payload[field] = getattr(ti.task, field) # so that we do not call the API unnecessarily -if payload: -SUPERVISOR_COMMS.send_request(log=log, msg=SetRenderedFields(rendered_fields=payload)) +if rendered_fields := _get_rendered_fields(ti.task): +SUPERVISOR_COMMS.send_request(log=log, msg=SetRenderedFields(rendered_fields=rendered_fields)) return ti, log +def _get_rendered_fields(task: BaseOperator) -> dict[str, JsonValue]: +# TODO: Port one of the following to Task SDK +# airflow.serialization.helpers.serialize_template_field or +# airflow.models.renderedtifields.get_serialized_template_fields +from airflow.serialization.helpers import serialize_template_field + +return {field: serialize_template_field(getattr(task, field), field) for field in task.template_fields} Review Comment: I'd say for now reuse the code -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]
kaxil commented on code in PR #44843: URL: https://github.com/apache/airflow/pull/44843#discussion_r1882884043 ## airflow/api_fastapi/execution_api/routes/task_instances.py: ## @@ -237,7 +237,7 @@ def ti_heartbeat( ) def ti_put_rtif( task_instance_id: UUID, -put_rtif_payload: RTIFPayload, Review Comment: I just removed `RTIFPayload` and `RootModel` -- we can just have it directly over here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]
kaxil commented on code in PR #44843: URL: https://github.com/apache/airflow/pull/44843#discussion_r1882836965 ## task_sdk/src/airflow/sdk/execution_time/task_runner.py: ## @@ -157,22 +157,26 @@ def startup() -> tuple[RuntimeTaskInstance, Logger]: # 1. Implementing the part where we pull in the logic to render fields and add that here # for all operators, we should do setattr(task, templated_field, rendered_templated_field) # task.templated_fields should give all the templated_fields and each of those fields should -# give the rendered values. +# give the rendered values. task.templated_fields should already be in a JSONable format and +# we should not have to handle that here. # 2. Once rendered, we call the `set_rtif` API to store the rtif in the metadata DB -templated_fields = ti.task.template_fields -payload = {} - -for field in templated_fields: -if field not in payload: -payload[field] = getattr(ti.task, field) # so that we do not call the API unnecessarily -if payload: -SUPERVISOR_COMMS.send_request(log=log, msg=SetRenderedFields(rendered_fields=payload)) +if rendered_fields := _get_rendered_fields(ti.task): +SUPERVISOR_COMMS.send_request(log=log, msg=SetRenderedFields(rendered_fields=rendered_fields)) return ti, log +def _get_rendered_fields(task: BaseOperator) -> dict[str, JsonValue]: +# TODO: Port one of the following to Task SDK +# airflow.serialization.helpers.serialize_template_field or +# airflow.models.renderedtifields.get_serialized_template_fields +from airflow.serialization.helpers import serialize_template_field + +return {field: serialize_template_field(getattr(task, field), field) for field in task.template_fields} Review Comment: If we don't use `serialize_template_field`, we will have to duplicate the logic -- which is also fine -- but we need to take an informed call. On one hand we need to de-couple, on the other hand we can keep it centralized and port it later --- There is also logic to redact: https://github.com/apache/airflow/blob/cc9b9a8794da50630e26f5de04b4c0ae5d49353f/airflow/serialization/helpers.py#L52 https://github.com/apache/airflow/blob/cc9b9a8794da50630e26f5de04b4c0ae5d49353f/airflow/serialization/helpers.py#L63 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]
kaxil commented on code in PR #44843: URL: https://github.com/apache/airflow/pull/44843#discussion_r1882837307 ## task_sdk/src/airflow/sdk/execution_time/task_runner.py: ## @@ -157,22 +157,26 @@ def startup() -> tuple[RuntimeTaskInstance, Logger]: # 1. Implementing the part where we pull in the logic to render fields and add that here # for all operators, we should do setattr(task, templated_field, rendered_templated_field) # task.templated_fields should give all the templated_fields and each of those fields should -# give the rendered values. +# give the rendered values. task.templated_fields should already be in a JSONable format and +# we should not have to handle that here. # 2. Once rendered, we call the `set_rtif` API to store the rtif in the metadata DB -templated_fields = ti.task.template_fields -payload = {} - -for field in templated_fields: -if field not in payload: -payload[field] = getattr(ti.task, field) # so that we do not call the API unnecessarily -if payload: -SUPERVISOR_COMMS.send_request(log=log, msg=SetRenderedFields(rendered_fields=payload)) +if rendered_fields := _get_rendered_fields(ti.task): +SUPERVISOR_COMMS.send_request(log=log, msg=SetRenderedFields(rendered_fields=rendered_fields)) return ti, log +def _get_rendered_fields(task: BaseOperator) -> dict[str, JsonValue]: +# TODO: Port one of the following to Task SDK +# airflow.serialization.helpers.serialize_template_field or +# airflow.models.renderedtifields.get_serialized_template_fields +from airflow.serialization.helpers import serialize_template_field + +return {field: serialize_template_field(getattr(task, field), field) for field in task.template_fields} Review Comment: @ashb / @amoghrajesh thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]
kaxil commented on code in PR #44843: URL: https://github.com/apache/airflow/pull/44843#discussion_r1882836965 ## task_sdk/src/airflow/sdk/execution_time/task_runner.py: ## @@ -157,22 +157,26 @@ def startup() -> tuple[RuntimeTaskInstance, Logger]: # 1. Implementing the part where we pull in the logic to render fields and add that here # for all operators, we should do setattr(task, templated_field, rendered_templated_field) # task.templated_fields should give all the templated_fields and each of those fields should -# give the rendered values. +# give the rendered values. task.templated_fields should already be in a JSONable format and +# we should not have to handle that here. # 2. Once rendered, we call the `set_rtif` API to store the rtif in the metadata DB -templated_fields = ti.task.template_fields -payload = {} - -for field in templated_fields: -if field not in payload: -payload[field] = getattr(ti.task, field) # so that we do not call the API unnecessarily -if payload: -SUPERVISOR_COMMS.send_request(log=log, msg=SetRenderedFields(rendered_fields=payload)) +if rendered_fields := _get_rendered_fields(ti.task): +SUPERVISOR_COMMS.send_request(log=log, msg=SetRenderedFields(rendered_fields=rendered_fields)) return ti, log +def _get_rendered_fields(task: BaseOperator) -> dict[str, JsonValue]: +# TODO: Port one of the following to Task SDK +# airflow.serialization.helpers.serialize_template_field or +# airflow.models.renderedtifields.get_serialized_template_fields +from airflow.serialization.helpers import serialize_template_field + +return {field: serialize_template_field(getattr(task, field), field) for field in task.template_fields} Review Comment: If we don't use `serialize_template_field`, we will have to duplicate the logic -- which is also fine -- but we need to take an informed call. On one hand we need to de-couple, on the other hand we can keep it centralized and port it later -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]
kaxil commented on code in PR #44843: URL: https://github.com/apache/airflow/pull/44843#discussion_r1882834039 ## task_sdk/src/airflow/sdk/execution_time/task_runner.py: ## @@ -157,22 +157,26 @@ def startup() -> tuple[RuntimeTaskInstance, Logger]: # 1. Implementing the part where we pull in the logic to render fields and add that here # for all operators, we should do setattr(task, templated_field, rendered_templated_field) # task.templated_fields should give all the templated_fields and each of those fields should -# give the rendered values. +# give the rendered values. task.templated_fields should already be in a JSONable format and +# we should not have to handle that here. # 2. Once rendered, we call the `set_rtif` API to store the rtif in the metadata DB -templated_fields = ti.task.template_fields -payload = {} - -for field in templated_fields: -if field not in payload: -payload[field] = getattr(ti.task, field) # so that we do not call the API unnecessarily -if payload: -SUPERVISOR_COMMS.send_request(log=log, msg=SetRenderedFields(rendered_fields=payload)) +if rendered_fields := _get_rendered_fields(ti.task): +SUPERVISOR_COMMS.send_request(log=log, msg=SetRenderedFields(rendered_fields=rendered_fields)) return ti, log +def _get_rendered_fields(task: BaseOperator) -> dict[str, JsonValue]: +# TODO: Port one of the following to Task SDK +# airflow.serialization.helpers.serialize_template_field or +# airflow.models.renderedtifields.get_serialized_template_fields +from airflow.serialization.helpers import serialize_template_field + +return {field: serialize_template_field(getattr(task, field), field) for field in task.template_fields} Review Comment: This is mainly because there maybe some `template_fields` that are classes with `.serialize()` method! https://github.com/apache/airflow/blob/cc9b9a8794da50630e26f5de04b4c0ae5d49353f/airflow/serialization/helpers.py#L48 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]
amoghrajesh commented on code in PR #44843: URL: https://github.com/apache/airflow/pull/44843#discussion_r1882428319 ## airflow/api_fastapi/execution_api/datamodels/taskinstance.py: ## @@ -137,5 +137,12 @@ class TaskInstance(BaseModel): map_index: int | None = None +"""Defines the types that the RTIF payload's dictionary values can take. These are all JsonAble types """ +JsonAbleValueTypes = Union[ +dict[str, JsonValue], +list[JsonValue], +JsonValue, +] + Review Comment: Yeah good observation. Let me change it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]
kaxil commented on code in PR #44843: URL: https://github.com/apache/airflow/pull/44843#discussion_r1881844567 ## task_sdk/src/airflow/sdk/execution_time/comms.py: ## @@ -170,13 +170,21 @@ class PutVariable(BaseModel): type: Literal["PutVariable"] = "PutVariable" +"""Defines the types that the RTIF payload's dictionary values can take. These are all JsonAble types """ +JsonAbleValueTypes = Union[ +dict[str, JsonValue], +list[JsonValue], +JsonValue, +] + + class SetRenderedFields(BaseModel): """Payload for setting RTIF for a task instance.""" # We are using a BaseModel here compared to server using RootModel because we # have a discriminator running with "type", and RootModel doesn't support type -rendered_fields: dict[str, str | None] +rendered_fields: dict[str, JsonAbleValueTypes] Review Comment: ```suggestion rendered_fields: dict[str, JsonValue] ``` ## airflow/api_fastapi/execution_api/datamodels/taskinstance.py: ## @@ -137,5 +137,12 @@ class TaskInstance(BaseModel): map_index: int | None = None +"""Defines the types that the RTIF payload's dictionary values can take. These are all JsonAble types """ +JsonAbleValueTypes = Union[ +dict[str, JsonValue], +list[JsonValue], +JsonValue, +] + """Schema for setting RTIF for a task instance.""" -RTIFPayload = RootModel[dict[str, str]] +RTIFPayload = RootModel[dict[str, JsonAbleValueTypes]] Review Comment: ```suggestion RTIFPayload = RootModel[dict[str, JsonValue]] ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]
kaxil commented on code in PR #44843: URL: https://github.com/apache/airflow/pull/44843#discussion_r1881843653 ## airflow/api_fastapi/execution_api/datamodels/taskinstance.py: ## @@ -137,5 +137,12 @@ class TaskInstance(BaseModel): map_index: int | None = None +"""Defines the types that the RTIF payload's dictionary values can take. These are all JsonAble types """ +JsonAbleValueTypes = Union[ +dict[str, JsonValue], +list[JsonValue], +JsonValue, +] + Review Comment: Actually now you can just remove this entirely and change L148 to use `JsonValue` https://docs.pydantic.dev/2.9/api/types/#pydantic.types.JsonValue It is sufficient for what we want: ``` JsonValue: TypeAlias = Union[ List["JsonValue"], Dict[str, "JsonValue"], str, bool, int, float, None, ] ``` ## airflow/api_fastapi/execution_api/datamodels/taskinstance.py: ## @@ -137,5 +137,12 @@ class TaskInstance(BaseModel): map_index: int | None = None +"""Defines the types that the RTIF payload's dictionary values can take. These are all JsonAble types """ +JsonAbleValueTypes = Union[ +dict[str, JsonValue], +list[JsonValue], +JsonValue, +] + Review Comment: Actually now you can just remove this entirely and change L148 to use `JsonValue` https://docs.pydantic.dev/2.9/api/types/#pydantic.types.JsonValue It is sufficient for what we want: ```py JsonValue: TypeAlias = Union[ List["JsonValue"], Dict[str, "JsonValue"], str, bool, int, float, None, ] ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]
amoghrajesh commented on PR #44843: URL: https://github.com/apache/airflow/pull/44843#issuecomment-2538393429 FYI, I ran the DAG from ash that found this issue: ``` from __future__ import annotations import sys import time from datetime import datetime from airflow.decorators import dag, task @dag( # every minute on the 30-second mark catchup=False, tags=[], schedule=None, start_date=datetime(2021, 1, 1), ) def hello_dag(): """ ### TaskFlow API Tutorial Documentation This is a simple data pipeline example which demonstrates the use of the TaskFlow API using three simple tasks for Extract, Transform, and Load. Documentation that goes along with the Airflow TaskFlow API tutorial is located [here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html) """ @task() def hello(): print("hello") time.sleep(3) print("goodbye") print("err mesg", file=sys.stderr) hello() hello_dag() ``` using this UT: ``` def test_startup_dag_with_no_templates_mixed_types(mocked_parse, test_dags_dir): """Test startup of a simple DAG.""" what = StartupDetails( ti=TaskInstance(id=uuid7(), task_id="hello", dag_id="hello_dag", run_id="c", try_number=1), file=str(test_dags_dir / "mydag.py"), requests_fd=0, ) ti = parse(what) with mock.patch( "airflow.sdk.execution_time.task_runner.SUPERVISOR_COMMS", create=True ) as mock_supervisor_comms: mock_supervisor_comms.get_message.return_value = what startup() run(ti, log=mock.ANY) ``` Result: ``` Home of the user: /Users/amoghdesai Airflow home /Users/amoghdesai/airflow PASSED [100%][2024-12-12T15:16:57.104+0530] {dagbag.py:535} INFO - Filling up the DagBag from /Users/amoghdesai/Documents/OSS/airflow/task_sdk/tests/dags/mydag.py [2024-12-12T15:16:57.175+0530] {dagbag.py:535} INFO - Filling up the DagBag from /Users/amoghdesai/Documents/OSS/airflow/task_sdk/tests/dags/mydag.py 2024-12-12 15:16:57 [debug] DAG file parsed[task] file=/Users/amoghdesai/Documents/OSS/airflow/task_sdk/tests/dags/mydag.py hello goodbye [2024-12-12T15:17:00.178+0530] {python.py:197} INFO - Done. Returned value was: None ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]
amoghrajesh commented on code in PR #44843: URL: https://github.com/apache/airflow/pull/44843#discussion_r1881628969 ## tests/api_fastapi/execution_api/routes/test_task_instances.py: ## @@ -422,16 +422,37 @@ def teardown_method(self): clear_db_runs() clear_rendered_ti_fields() -def test_ti_put_rtif_success(self, client, session, create_task_instance): +@pytest.mark.parametrize( +"payload", +[ +# string value +{"field1": "string_value", "field2": "another_string"}, +# dictionary value +{"field1": {"nested_key": "nested_value"}}, +# integer value +{"field1": 100}, +# None value +{"field1": None}, +# float value +{"field1": 3.14159}, +# string lists value +{"field1": ["123"], "field2": ["a", "b", "c"]}, +# list of JSON values +{"field1": [1, "string", 3.14, True, None, {"nested": "dict"}]}, +# nested dictionary with mixed types in lists +{ +"field1": {"nested_dict": {"key1": 123, "key2": "value"}}, +"field2": [3.14, {"sub_key": "sub_value"}, [1, 2]], Review Comment: Tries to cover a lot of possible combinations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]
amoghrajesh commented on PR #44843: URL: https://github.com/apache/airflow/pull/44843#issuecomment-2538120231 @kaxil if we agree with my comment https://github.com/apache/airflow/pull/44843#discussion_r1881472643 here, then this payload: ``` JsonAbleValueTypes = Union[ dict[str, JsonValue], list[JsonValue], JsonValue, ] ``` Will do the job really well. I just pushed a commit with those changes. Added some tests for variety of payloads too -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]
amoghrajesh commented on code in PR #44843: URL: https://github.com/apache/airflow/pull/44843#discussion_r1881472643 ## task_sdk/src/airflow/sdk/execution_time/comms.py: ## @@ -170,13 +170,17 @@ class PutVariable(BaseModel): type: Literal["PutVariable"] = "PutVariable" +"""Defines the types that the RTIF payload's dictionary values can take. These are all JsonAble types """ +JsonAbleValueTypes = Union[str, dict[str, str], list[str], int, float, None] Review Comment: Actually, my point here was that we shoudln't be sending tuple or set over the network as it isnt json serialisable. To state an example with the DAG: ``` from __future__ import annotations import sys import time from datetime import datetime from airflow import DAG from airflow.decorators import dag, task from airflow.operators.bash import BashOperator @dag( # every minute on the 30-second mark catchup=False, tags=[], schedule=None, start_date=datetime(2021, 1, 1), ) def hello_dag(): """ ### TaskFlow API Tutorial Documentation This is a simple data pipeline example which demonstrates the use of the TaskFlow API using three simple tasks for Extract, Transform, and Load. Documentation that goes along with the Airflow TaskFlow API tutorial is located [here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html) """ @task() def hello(): print("hello") time.sleep(3) print("goodbye") print("err mesg", file=sys.stderr) hello() hello_dag() ``` With the current state of using: ``` """Defines the types that the RTIF payload's dictionary values can take. These are all JsonAble types """ JsonAbleValueTypes = Union[str, dict[str, str], list[str], int, float, None] class SetRenderedFields(BaseModel): """Payload for setting RTIF for a task instance.""" # We are using a BaseModel here compared to server using RootModel because we # have a discriminator running with "type", and RootModel doesn't support type rendered_fields: dict[str, JsonAbleValueTypes] type: Literal["SetRenderedFields"] = "SetRenderedFields" ``` I can see that the DAG that was failing earlier doesn't because the `op_args` is converted to a list internally. ![Uploading image.png…]() And this will allow it to send a valid message to the supervisor without having to convert types around. Ideally, we should only be able to send JSON serialisable fields over APIs. And the part where `set` or `tuple` will be converted to a list / str should be handled when we port templating to task SDK. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]
amoghrajesh commented on code in PR #44843: URL: https://github.com/apache/airflow/pull/44843#discussion_r1881476525 ## task_sdk/src/airflow/sdk/execution_time/comms.py: ## @@ -170,13 +170,17 @@ class PutVariable(BaseModel): type: Literal["PutVariable"] = "PutVariable" +"""Defines the types that the RTIF payload's dictionary values can take. These are all JsonAble types """ +JsonAbleValueTypes = Union[str, dict[str, str], list[str], int, float, None] Review Comment: > Don't think `dict[str, str]` or `list[str]` will cover all cases it could have `list[int]` or `list[dict]` for example Although I agree that this needs to be done. I am working on ti ## task_sdk/src/airflow/sdk/execution_time/comms.py: ## @@ -170,13 +170,17 @@ class PutVariable(BaseModel): type: Literal["PutVariable"] = "PutVariable" +"""Defines the types that the RTIF payload's dictionary values can take. These are all JsonAble types """ +JsonAbleValueTypes = Union[str, dict[str, str], list[str], int, float, None] Review Comment: > Don't think `dict[str, str]` or `list[str]` will cover all cases it could have `list[int]` or `list[dict]` for example Although I agree that this needs to be done. I am working on it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]
amoghrajesh commented on code in PR #44843: URL: https://github.com/apache/airflow/pull/44843#discussion_r1881472643 ## task_sdk/src/airflow/sdk/execution_time/comms.py: ## @@ -170,13 +170,17 @@ class PutVariable(BaseModel): type: Literal["PutVariable"] = "PutVariable" +"""Defines the types that the RTIF payload's dictionary values can take. These are all JsonAble types """ +JsonAbleValueTypes = Union[str, dict[str, str], list[str], int, float, None] Review Comment: Actually, my point here was that we shoudln't be sending tuple or set over the network as it isnt json serialisable. To state an example with the DAG: ``` from __future__ import annotations import sys import time from datetime import datetime from airflow import DAG from airflow.decorators import dag, task from airflow.operators.bash import BashOperator @dag( # every minute on the 30-second mark catchup=False, tags=[], schedule=None, start_date=datetime(2021, 1, 1), ) def hello_dag(): """ ### TaskFlow API Tutorial Documentation This is a simple data pipeline example which demonstrates the use of the TaskFlow API using three simple tasks for Extract, Transform, and Load. Documentation that goes along with the Airflow TaskFlow API tutorial is located [here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html) """ @task() def hello(): print("hello") time.sleep(3) print("goodbye") print("err mesg", file=sys.stderr) hello() hello_dag() ``` With the current state of using: ``` """Defines the types that the RTIF payload's dictionary values can take. These are all JsonAble types """ JsonAbleValueTypes = Union[str, dict[str, str], list[str], int, float, None] class SetRenderedFields(BaseModel): """Payload for setting RTIF for a task instance.""" # We are using a BaseModel here compared to server using RootModel because we # have a discriminator running with "type", and RootModel doesn't support type rendered_fields: dict[str, JsonAbleValueTypes] type: Literal["SetRenderedFields"] = "SetRenderedFields" ``` I can see that the DAG that was failing earlier doesn't because the `op_args` is converted to a list internally.  And this will allow it to send a valid message to the supervisor without having to convert types around. Ideally, we should only be able to send JSON serialisable fields over APIs. And the part where `set` or `tuple` will be converted to a list / str should be handled when we port templating to task SDK. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]
amoghrajesh commented on code in PR #44843: URL: https://github.com/apache/airflow/pull/44843#discussion_r1881450046 ## task_sdk/src/airflow/sdk/execution_time/comms.py: ## @@ -170,13 +170,17 @@ class PutVariable(BaseModel): type: Literal["PutVariable"] = "PutVariable" +"""Defines the types that the RTIF payload's dictionary values can take. These are all JsonAble types """ +JsonAbleValueTypes = Union[str, dict[str, str], list[str], int, float, None] Review Comment: @kaxil you are right. I tried digging in to this deeply and on debugging, I can see tuple will be present for cases like `op_args`.  -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]
amoghrajesh commented on code in PR #44843: URL: https://github.com/apache/airflow/pull/44843#discussion_r1881450046 ## task_sdk/src/airflow/sdk/execution_time/comms.py: ## @@ -170,13 +170,17 @@ class PutVariable(BaseModel): type: Literal["PutVariable"] = "PutVariable" +"""Defines the types that the RTIF payload's dictionary values can take. These are all JsonAble types """ +JsonAbleValueTypes = Union[str, dict[str, str], list[str], int, float, None] Review Comment: @kaxil you are right. I tried digging in to this deeply and on debugging, I can see tuple will be present for cases like `op_args`.  -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]
amoghrajesh commented on code in PR #44843: URL: https://github.com/apache/airflow/pull/44843#discussion_r1881450046 ## task_sdk/src/airflow/sdk/execution_time/comms.py: ## @@ -170,13 +170,17 @@ class PutVariable(BaseModel): type: Literal["PutVariable"] = "PutVariable" +"""Defines the types that the RTIF payload's dictionary values can take. These are all JsonAble types """ +JsonAbleValueTypes = Union[str, dict[str, str], list[str], int, float, None] Review Comment: @kaxil you are right. I tried digging in to this deeply and on debugging, I can see tuple will be present for cases like `op_args`. However, I see thji  -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]
kaxil commented on code in PR #44843: URL: https://github.com/apache/airflow/pull/44843#discussion_r1880397402 ## task_sdk/src/airflow/sdk/execution_time/comms.py: ## @@ -170,13 +170,17 @@ class PutVariable(BaseModel): type: Literal["PutVariable"] = "PutVariable" +"""Defines the types that the RTIF payload's dictionary values can take. These are all JsonAble types """ +JsonAbleValueTypes = Union[str, dict[str, str], list[str], int, float, None] Review Comment: Wasn't that the error you are trying to address in the first place? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]
kaxil commented on code in PR #44843: URL: https://github.com/apache/airflow/pull/44843#discussion_r1880397924 ## task_sdk/src/airflow/sdk/execution_time/comms.py: ## @@ -170,13 +170,17 @@ class PutVariable(BaseModel): type: Literal["PutVariable"] = "PutVariable" +"""Defines the types that the RTIF payload's dictionary values can take. These are all JsonAble types """ +JsonAbleValueTypes = Union[str, dict[str, str], list[str], int, float, None] Review Comment: ``` {"timestamp":"2024-12-11T10:54:52.942049","logger":"task","error_detail":[{"exc_type":"ValidationError","exc_value":"2 validation errors for SetRenderedFields\nrendered_fields.op_args\n Input should be a valid string [type=string_type, input_value=(), input_type=tuple]\n For further information visit https://errors.pydantic.dev/2.10/v/string_type\nrendered_fields.op_kwargs\n Input should be a valid string [type=string_type, input_value={}, input_type=dict]\nFor further information visit https://errors.pydantic.dev/2.10/v/string_type","syntax_error":null,"is_cause":false,"frames":[{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/execution_time/task_runner.py","lineno":234,"name":"main"},{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/execution_time/task_runner.py","lineno":165,"name":"startup"},{"filename":"/usr/local/lib/python3.9/site-packages/pydantic/main.py","lineno":214,"name":"__init__"}]}],"event":"Top level error","level":"error"} ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]
amoghrajesh commented on code in PR #44843: URL: https://github.com/apache/airflow/pull/44843#discussion_r1880389663 ## task_sdk/src/airflow/sdk/execution_time/comms.py: ## @@ -170,13 +170,17 @@ class PutVariable(BaseModel): type: Literal["PutVariable"] = "PutVariable" +"""Defines the types that the RTIF payload's dictionary values can take. These are all JsonAble types """ +JsonAbleValueTypes = Union[str, dict[str, str], list[str], int, float, None] Review Comment: We shouldn't be adding tuple and set because those will be taken care by our internal serialisation helper: https://github.com/apache/airflow/blob/main/airflow/serialization/helpers.py#L28. This will not ever return a set or tuple. When we introduce templating in TASK sdk, this should be handled there imo -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]
kaxil commented on code in PR #44843: URL: https://github.com/apache/airflow/pull/44843#discussion_r1880388815 ## task_sdk/src/airflow/sdk/execution_time/comms.py: ## @@ -170,13 +170,17 @@ class PutVariable(BaseModel): type: Literal["PutVariable"] = "PutVariable" +"""Defines the types that the RTIF payload's dictionary values can take. These are all JsonAble types """ +JsonAbleValueTypes = Union[str, dict[str, str], list[str], int, float, None] Review Comment: I am wondering if you could do: ```py from pydantic.types import JsonValue JsonAbleValueTypes: TypeAlias = Union[ List["JsonAbleValueTypes"], Dict[str, "JsonAbleValueTypes"], Set["JsonAbleValueTypes"], Tuple["JsonAbleValueTypes"], JsonValue, ] ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]
kaxil commented on code in PR #44843: URL: https://github.com/apache/airflow/pull/44843#discussion_r1880382857 ## task_sdk/src/airflow/sdk/execution_time/comms.py: ## @@ -170,13 +170,17 @@ class PutVariable(BaseModel): type: Literal["PutVariable"] = "PutVariable" +"""Defines the types that the RTIF payload's dictionary values can take. These are all JsonAble types """ +JsonAbleValueTypes = Union[str, dict[str, str], list[str], int, float, None] Review Comment: need to add `set` and `tuple` too -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]
kaxil commented on code in PR #44843: URL: https://github.com/apache/airflow/pull/44843#discussion_r1880382482 ## task_sdk/src/airflow/sdk/execution_time/comms.py: ## @@ -170,13 +170,17 @@ class PutVariable(BaseModel): type: Literal["PutVariable"] = "PutVariable" +"""Defines the types that the RTIF payload's dictionary values can take. These are all JsonAble types """ +JsonAbleValueTypes = Union[str, dict[str, str], list[str], int, float, None] Review Comment: Check https://docs.pydantic.dev/latest/api/types/#pydantic.types.JsonValue: ```py JsonValue: TypeAlias = Union[ List["JsonValue"], Dict[str, "JsonValue"], str, bool, int, float, None, ] ``` So we should have similar: ```py JsonAbleValueTypes: TypeAlias = Union[ List["JsonAbleValueTypes"], Dict[str, "JsonAbleValueTypes"], Set["JsonAbleValueTypes"], Tuple["JsonAbleValueTypes"] str, bool, int, float, None, ] ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]
amoghrajesh commented on code in PR #44843: URL: https://github.com/apache/airflow/pull/44843#discussion_r1880381503 ## task_sdk/src/airflow/sdk/execution_time/comms.py: ## @@ -170,13 +170,17 @@ class PutVariable(BaseModel): type: Literal["PutVariable"] = "PutVariable" +"""Defines the types that the RTIF payload's dictionary values can take. These are all JsonAble types """ +JsonAbleValueTypes = Union[str, dict[str, str], list[str], int, float, None] Review Comment: Making it list[Any] or dict[str, Any] would probably be too liberal here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] AIP-72: Extending SET RTIF endpoint to accept all JSONable types [airflow]
kaxil commented on code in PR #44843: URL: https://github.com/apache/airflow/pull/44843#discussion_r1880365109 ## task_sdk/src/airflow/sdk/execution_time/comms.py: ## @@ -170,13 +170,17 @@ class PutVariable(BaseModel): type: Literal["PutVariable"] = "PutVariable" +"""Defines the types that the RTIF payload's dictionary values can take. These are all JsonAble types """ +JsonAbleValueTypes = Union[str, dict[str, str], list[str], int, float, None] Review Comment: Don't think `dict[str, str]` or `list[str]` will cover all cases it could have `list[int]` or `list[dict]` for example -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org