Yicong-Huang commented on code in PR #4560:
URL: https://github.com/apache/texera/pull/4560#discussion_r3165586978


##########
amber/src/main/python/core/runnables/test_main_loop.py:
##########
@@ -1231,3 +1247,297 @@ def channel_size(channel: ChannelIdentity) -> int:
             "test-1"
         ] == b"pickle    " + pickle.dumps(mock_binary_tuple["test-1"])
         reraise()
+
+    @pytest.mark.timeout(2)
+    def test_process_state_can_emit_multiple_states(
+        self,
+        main_loop,
+        output_queue,
+        mock_data_output_channel,
+        monkeypatch,
+    ):
+        # Stub-level coverage of the single-switch state handshake. Each
+        # call to the (stubbed) _switch_context simulates DataProc
+        # consuming the queued input state and writing
+        # current_output_state, mirroring what real DataProc.process_state
+        # does between MainLoop's switches.
+        class DummyExecutor:
+            @staticmethod
+            def process_state(state: State, port: int) -> State:
+                output_state = State()
+                output_state.add("value", state["value"] + 1)
+                output_state.add("port", port)
+                return output_state
+
+        main_loop.context.executor_manager.executor = DummyExecutor()
+        monkeypatch.setattr(main_loop, "_check_and_process_control", lambda: 
None)
+        monkeypatch.setattr(
+            main_loop.context.output_manager,
+            "emit_state",
+            lambda state: [(mock_data_output_channel.to_worker_id, 
StateFrame(state))],
+        )
+
+        def fake_switch_context():
+            current_input_state = (
+                main_loop.context.state_processing_manager.current_input_state
+            )
+            if current_input_state is not None:
+                
main_loop.context.state_processing_manager.current_output_state = (
+                    DummyExecutor.process_state(current_input_state, 0)
+                )
+
+        monkeypatch.setattr(main_loop, "_switch_context", fake_switch_context)
+
+        first_state = State()
+        first_state.add("value", 1)
+        second_state = State()
+        second_state.add("value", 41)
+
+        main_loop._process_state(first_state)
+        main_loop._process_state(second_state)
+
+        first_output: DataElement = output_queue.get()
+        second_output: DataElement = output_queue.get()
+
+        assert first_output.tag == mock_data_output_channel
+        assert isinstance(first_output.payload, StateFrame)
+        assert first_output.payload.frame["value"] == 2
+        assert first_output.payload.frame["port"] == 0
+
+        assert second_output.tag == mock_data_output_channel
+        assert isinstance(second_output.payload, StateFrame)
+        assert second_output.payload.frame["value"] == 42
+        assert second_output.payload.frame["port"] == 0
+
+    @pytest.mark.timeout(5)
+    def test_main_loop_thread_can_process_state(
+        self,
+        mock_data_output_channel,
+        mock_control_output_channel,
+        input_queue,
+        output_queue,
+        main_loop,
+        main_loop_thread,
+        mock_assign_input_port,
+        mock_assign_output_port,
+        mock_add_input_channel,
+        mock_add_partitioning,
+        mock_initialize_executor,
+        mock_state_data_elements,
+        mock_end_of_upstream,
+        command_sequence,
+        reraise,
+    ):
+        # End-to-end coverage of the state-processing path through the real
+        # MainLoop + DataProcessor threads. The single-switch state handshake
+        # in MainLoop.process_input_state means each state is emitted in its
+        # own cycle (no lag), and an EndChannel ECM after the last state
+        # produces an additional output via produce_state_on_finish.
+        main_loop_thread.start()
+
+        for setup_msg in [
+            mock_assign_input_port,
+            mock_assign_output_port,
+            mock_add_input_channel,
+            mock_add_partitioning,
+            mock_initialize_executor,
+        ]:
+            input_queue.put(setup_msg)
+            assert output_queue.get() == DCMElement(
+                tag=mock_control_output_channel,
+                payload=DirectControlMessagePayloadV2(
+                    return_invocation=ReturnInvocation(
+                        command_id=command_sequence,
+                        return_value=ControlReturn(empty_return=EmptyReturn()),
+                    )
+                ),
+            )
+
+        # Replace the EchoOperator that mock_initialize_executor loaded
+        # with an in-process executor that tags processed states and emits
+        # a finish marker on EndChannel. Going through the
+        # InitializeExecutor RPC above sets up the rest of the worker
+        # state (output schema, partitioning bookkeeping); swapping the
+        # executor instance here lets the test observe whether
+        # process_state actually runs without depending on Python's
+        # cross-test module caching for the loaded operator class.
+        class StateProcessingExecutor:
+            @staticmethod
+            def process_tuple(tuple_, port):
+                yield tuple_
+
+            @staticmethod
+            def process_state(state: State, port: int) -> State:
+                new_state = State()
+                for key, value in state.__dict__.items():
+                    if key != "schema":
+                        new_state.add(key, value)
+                new_state.add("processed_marker", "executed")
+                new_state.add("port", port)
+                return new_state
+
+            @staticmethod
+            def produce_state_on_finish(port: int) -> State:
+                finish_state = State()
+                finish_state.add("finish_marker", 
"produce_state_on_finish_ran")
+                return finish_state
+
+            @staticmethod
+            def on_finish(port):
+                yield
+
+            @staticmethod
+            def close():
+                pass
+
+        main_loop.context.executor_manager.executor = StateProcessingExecutor()
+
+        # Send four states. With the lag-free state pipeline we expect each
+        # state to produce its own output in order.
+        for state_element in mock_state_data_elements:
+            input_queue.put(state_element)
+
+        for expected_value in (1, 2, 3, 4):
+            output_data_element: DataElement = output_queue.get()
+            assert output_data_element.tag == mock_data_output_channel
+            assert isinstance(output_data_element.payload, StateFrame), (
+                f"expected StateFrame for value={expected_value}, got "
+                f"{type(output_data_element.payload).__name__}"
+            )
+            output_state = output_data_element.payload.frame
+            assert output_state["value"] == expected_value, (
+                f"state outputs arrived out of order: expected value="
+                f"{expected_value}, got value={output_state['value']}"
+            )
+            assert output_state["processed_marker"] == "executed"
+            assert output_state["port"] == 0
+
+        # Send EndChannel to drive _process_end_channel. The executor's
+        # produce_state_on_finish writes a finish-marker state into
+        # current_output_state inside DataProc's process_internal_marker;
+        # MainLoop's process_input_state then emits it.
+        input_queue.put(mock_end_of_upstream)
+
+        # Drain the control reply messages so the next data
+        # output_queue.get() returns the post-EndChannel data emission.
+        output_queue.disable_data(InternalQueue.DisableType.DISABLE_BY_PAUSE)
+        for _ in range(3):
+            control_reply = output_queue.get()
+            assert isinstance(control_reply, DCMElement), (
+                f"expected DCMElement during EndChannel teardown, got "
+                f"{type(control_reply).__name__}"
+            )
+        output_queue.enable_data(InternalQueue.DisableType.DISABLE_BY_PAUSE)
+
+        end_channel_state_output: DataElement = output_queue.get()
+        assert end_channel_state_output.tag == mock_data_output_channel
+        assert isinstance(end_channel_state_output.payload, StateFrame), (
+            f"expected StateFrame for the EndChannel-driven emission, got "
+            f"{type(end_channel_state_output.payload).__name__}"
+        )
+        end_channel_state = end_channel_state_output.payload.frame
+        assert "finish_marker" in end_channel_state.__dict__, (
+            f"EndChannel emission should be the finish-marker state from "
+            f"produce_state_on_finish, got {end_channel_state!r}"
+        )
+        assert end_channel_state["finish_marker"] == 
"produce_state_on_finish_ran"
+
+        reraise()
+
+    @pytest.mark.timeout(5)

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to