aglinxinyuan commented on code in PR #4206:
URL: https://github.com/apache/texera/pull/4206#discussion_r3407270416


##########
amber/src/main/python/core/runnables/main_loop.py:
##########
@@ -87,19 +101,69 @@ def __init__(
             target=self.data_processor.run, daemon=True, 
name="data_processor_thread"
         ).start()
 
+    def _compute_loop_start_id(self) -> typing.Tuple[str, str]:
+        # A LoopStart stamps its own operator id and the iceberg URI its input
+        # is read from onto the state it emits; the matching LoopEnd reads them
+        # back to jump. These ride the StateFrame envelope, not user state.
+        loop_start_id = get_operator_id(self.context.worker_id)
+        # The URI lives on the upstream operator's output port (which
+        # LoopStart's first materialization reader is reading from). LoopStart
+        # is constrained to a single input port + single reader, so fail loud
+        # rather than silently picking whichever dict iterator yields first --
+        # that would mask future graph mistakes by choosing an arbitrary URI.
+        reader_runnables = (
+            self.context.input_manager.get_input_port_mat_reader_threads()
+        )
+        if len(reader_runnables) != 1:
+            raise RuntimeError(
+                f"LoopStart expected exactly one input port, "
+                f"got {len(reader_runnables)}"
+            )
+        [(_, readers)] = reader_runnables.items()
+        if len(readers) != 1:
+            raise RuntimeError(
+                f"LoopStart expected exactly one input reader on its port, "
+                f"got {len(readers)}"
+            )
+        loop_start_state_uri = VFSURIFactory.state_uri(readers[0].uri)
+        return loop_start_id, loop_start_state_uri
+
+    def _jump_to_loop_start(
+        self, executor: LoopEndOperator, controller_interface
+    ) -> None:
+        state = executor.state
+        controller_interface.jump_to_operator_region(
+            JumpToOperatorRegionRequest(OperatorIdentity(self._loop_start_id))
+        )
+        uri = self._loop_start_state_uri
+        # Strip the per-iteration scratch (`table`, `output`) so only the
+        # user-visible loop state is written back to LoopStart's input. The
+        # loop metadata (counter, LoopStartId, LoopStartStateURI) is owned by
+        # the runtime and never lived in this dict.
+        for key in ("table", "output"):
+            state.pop(key, None)
+        writer = DocumentFactory.create_document(uri, State.SCHEMA).writer("0")
+        # The back-edge fires only after the matching LoopEnd consumed at
+        # loop_counter == 0, so the next iteration's input starts at depth 0.
+        writer.put_one(State(state).to_tuple(0))
+        writer.close()
+
     def complete(self) -> None:
         """
         Complete the DataProcessor, marking state to COMPLETED, and notify the
         controller.
         """
         # flush the buffered console prints
         self._check_and_report_console_messages(force_flush=True)
-        self.context.executor_manager.executor.close()
+        controller_interface = self._async_rpc_client.controller_stub()
+        executor = self.context.executor_manager.executor
+        if isinstance(executor, LoopEndOperator) and executor.condition():

Review Comment:
   Guarded it — fixed in 212687a8a6.
   
   You're right that `condition()` is the odd one out: `complete()` calls it on 
the main loop thread, before `close()` and the COMPLETED transition, outside 
`DataProcessor`'s guarded executor session. An exception there (a typo, an 
undefined name) propagated through `run()`'s `@logger.catch(reraise=True)` and 
killed the worker thread silently — the controller never learned of it.
   
   Now it's caught and reported the same way a UDF error is:
   - record it on the exception manager,
   - queue an **ERROR** console message,
   - flush it, then enter `EXCEPTION_PAUSE` — **skipping the loop-back edge and 
completion**, so the worker pauses with the error showing instead of dying or 
falsely reporting success.
   
   To keep both paths reporting identically, I pulled the console-message build 
out of `DataProcessor._report_exception` into a shared 
`ConsoleMessageManager.report_exception(worker_id, exc_info)`; the data path 
and this main-loop path now call the same helper.
   
   Tests: added 
`test_complete_reports_loopend_condition_error_instead_of_crashing` — a Loop 
End whose `condition()` raises is recorded + reported (ERROR console message) + 
paused (`EXCEPTION_PAUSE`), with no loop-back and no completion. The full 
`main_loop`, `data_processor`, and loop-operator suites pass (47 tests); ruff 
clean.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to