amoghrajesh commented on code in PR #51699:
URL: https://github.com/apache/airflow/pull/51699#discussion_r2151472228


##########
devel-common/src/tests_common/pytest_plugin.py:
##########
@@ -1956,17 +1956,27 @@ def override_caplog(request):
 
 
 @pytest.fixture
-def mock_supervisor_comms():
+def mock_supervisor_comms(monkeypatch):
     # for back-compat
     from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
 
     if not AIRFLOW_V_3_0_PLUS:
         yield None
         return
-    with mock.patch(
-        "airflow.sdk.execution_time.task_runner.SUPERVISOR_COMMS", create=True
-    ) as supervisor_comms:
-        yield supervisor_comms
+
+    from airflow.sdk.execution_time import comms, task_runner
+
+    # Deal with TaskSDK 1.0/1.1 vs 1.2+. Annoying, and shouldn't need to exist 
once the separation between
+    # core and TaskSDK is finished
+    if CommsDecoder := getattr(comms, "CommsDecoder", None):
+        comms = mock.create_autospec(CommsDecoder)
+        monkeypatch.setattr(task_runner, "SUPERVISOR_COMMS", comms, 
raising=False)
+    else:
+        CommsDecoder = getattr(task_runner, "CommsDecoder")
+        comms = mock.create_autospec(CommsDecoder)
+        comms.send = comms.get_response
+        monkeypatch.setattr(task_runner, "SUPERVISOR_COMMS", comms, 
raising=False)
+    yield comms

Review Comment:
   ☠️ it is what it is!



##########
task-sdk/tests/task_sdk/execution_time/test_comms.py:
##########
@@ -0,0 +1,83 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import uuid
+from socket import socketpair
+
+import msgspec
+import pytest
+
+from airflow.sdk.execution_time.comms import BundleInfo, StartupDetails, 
_ResponseFrame
+from airflow.sdk.execution_time.task_runner import CommsDecoder
+from airflow.utils import timezone
+
+
+class TestCommsDecoder:
+    """Test the communication between the subprocess and the "supervisor"."""
+
+    @pytest.mark.usefixtures("disable_capturing")
+    def test_recv_StartupDetails(self):
+        r, w = socketpair()
+
+        msg = {
+            "type": "StartupDetails",
+            "ti": {
+                "id": uuid.UUID("4d828a62-a417-4936-a7a6-2b3fabacecab"),
+                "task_id": "a",
+                "try_number": 1,
+                "run_id": "b",
+                "dag_id": "c",
+            },
+            "ti_context": {
+                "dag_run": {
+                    "dag_id": "c",
+                    "run_id": "b",
+                    "logical_date": "2024-12-01T01:00:00Z",
+                    "data_interval_start": "2024-12-01T00:00:00Z",
+                    "data_interval_end": "2024-12-01T01:00:00Z",
+                    "start_date": "2024-12-01T01:00:00Z",
+                    "run_after": "2024-12-01T01:00:00Z",
+                    "end_date": None,
+                    "run_type": "manual",
+                    "conf": None,
+                    "consumed_asset_events": [],
+                },
+                "max_tries": 0,
+                "should_retry": False,
+                "variables": None,
+                "connections": None,
+            },
+            "file": "/dev/null",
+            "start_date": "2024-12-01T01:00:00Z",
+            "dag_rel_path": "/dev/null",
+            "bundle_info": {"name": "any-name", "version": "any-version"},
+        }
+        bytes = msgspec.msgpack.encode(_ResponseFrame(0, msg, None))
+        w.sendall(len(bytes).to_bytes(4, byteorder="big") + bytes)
+
+        decoder = CommsDecoder(socket=r, log=None)
+
+        msg = decoder._get_response()
+        assert isinstance(msg, StartupDetails)
+        assert msg.ti.id == uuid.UUID("4d828a62-a417-4936-a7a6-2b3fabacecab")
+        assert msg.ti.task_id == "a"
+        assert msg.ti.dag_id == "c"
+        assert msg.dag_rel_path == "/dev/null"
+        assert msg.bundle_info == BundleInfo(name="any-name", 
version="any-version")
+        assert msg.start_date == timezone.datetime(2024, 12, 1, 1)

Review Comment:
   Nice, gives confidence!



##########
task-sdk/src/airflow/sdk/execution_time/comms.py:
##########
@@ -19,12 +19,17 @@
 Communication protocol between the Supervisor and the task process
 ==================================================================
 
-* All communication is done over stdout/stdin in the form of "JSON lines" (each
-  message is a single JSON document terminated by `\n` character)
-* Messages from the subprocess are all log messages and are sent directly to 
the log
+* All communication is done over the subprocesses stdin in the form of a 
binary length-prefixed msgpack frame
+  (4 byte, big-endian length, followed by the msgpack-encoded _RequestFrame.) 
Each side uses this same
+  encoding
+* Log Messages from the subprocess are sent over the dedicated logs socket 
(which is line-based JSON)
 * No messages are sent to task process except in response to a request. (This 
is because the task process will
   be running user's code, so we can't read from stdin until we enter our code, 
such as when requesting an XCom
   value etc.)
+* Every request returns a response, even if the frame is otherwise empty.
+* Requests are written by the subprocess to fd0/stdin. This is making use of 
the fact that stdin is a
+  bi-directional socket, and thus we can write to it and don't need a 
dedicated extra socket for sending
+  requests.

Review Comment:
   Nice, love it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to