This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch xinyuan-state-only
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/xinyuan-state-only by this 
push:
     new 67c4e24f54 fix fmt
67c4e24f54 is described below

commit 67c4e24f54a667183d2e009058dad3d03a9deca3
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sun Apr 19 15:59:01 2026 -0700

    fix fmt
---
 amber/src/main/python/core/runnables/main_loop.py              | 10 +++-------
 .../promisehandlers/WorkerExecutionCompletedHandler.scala      |  6 +-----
 2 files changed, 4 insertions(+), 12 deletions(-)

diff --git a/amber/src/main/python/core/runnables/main_loop.py 
b/amber/src/main/python/core/runnables/main_loop.py
index e0104a755d..844ce95f65 100644
--- a/amber/src/main/python/core/runnables/main_loop.py
+++ b/amber/src/main/python/core/runnables/main_loop.py
@@ -102,15 +102,12 @@ class MainLoop(StoppableQueueBlockingRunnable):
         """
         # flush the buffered console prints
         self._check_and_report_console_messages(force_flush=True)
-        controller_interface = self._async_rpc_client.controller_stub()
-        executor = self.context.executor_manager.executor
-        if isinstance(executor, LoopEndOperator) and executor.condition():
-            self._next_iteration(executor, controller_interface)
-        executor.close()
+        self.context.executor_manager.executor.close()
         # stop the data processing thread
         self.data_processor.stop()
         self.context.state_manager.transit_to(WorkerState.COMPLETED)
         
self.context.statistics_manager.update_total_execution_time(time.time_ns())
+        controller_interface = self._async_rpc_client.controller_stub()
         controller_interface.worker_execution_completed(EmptyRequest())
         self.context.close()
 
@@ -252,7 +249,6 @@ class MainLoop(StoppableQueueBlockingRunnable):
 
     def _process_state(self, state_: State) -> None:
         self.context.state_processing_manager.current_input_state = state_
-        self._switch_context()
         self.process_input_state()
         self._check_and_process_control()
 
@@ -341,7 +337,7 @@ class MainLoop(StoppableQueueBlockingRunnable):
 
             if ecm.ecm_type != EmbeddedControlMessageType.NO_ALIGNMENT:
                 self.context.pause_manager.resume(PauseType.ECM_PAUSE)
-            self._switch_context()
+
             if self.context.tuple_processing_manager.current_internal_marker:
                 {
                     StartChannel: self._process_start_channel,
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala
index c3b3ddb234..d54a22f26b 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala
@@ -61,11 +61,7 @@ trait WorkerExecutionCompletedHandler {
       .collect(Seq(statsRequest))
       .flatMap(_ => {
         // if entire workflow is completed, clean up
-        val isWorkflowTerminal =
-          cp.workflowExecution.isCompleted &&
-            !cp.workflowScheduler.hasPendingRegions &&
-            !cp.workflowExecutionCoordinator.hasUnfinishedRegionCoordinators
-        if (isWorkflowTerminal) {
+        if (cp.workflowExecution.isCompleted) {
           // after query result come back: send completed event, cleanup ,and 
kill workflow
           sendToClient(ExecutionStateUpdate(cp.workflowExecution.getState))
           cp.controllerTimerService.disableStatusUpdate()

Reply via email to