This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch xinyuan-state-only
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/xinyuan-state-only by this
push:
new 67c4e24f54 fix fmt
67c4e24f54 is described below
commit 67c4e24f54a667183d2e009058dad3d03a9deca3
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sun Apr 19 15:59:01 2026 -0700
fix fmt
---
amber/src/main/python/core/runnables/main_loop.py | 10 +++-------
.../promisehandlers/WorkerExecutionCompletedHandler.scala | 6 +-----
2 files changed, 4 insertions(+), 12 deletions(-)
diff --git a/amber/src/main/python/core/runnables/main_loop.py
b/amber/src/main/python/core/runnables/main_loop.py
index e0104a755d..844ce95f65 100644
--- a/amber/src/main/python/core/runnables/main_loop.py
+++ b/amber/src/main/python/core/runnables/main_loop.py
@@ -102,15 +102,12 @@ class MainLoop(StoppableQueueBlockingRunnable):
"""
# flush the buffered console prints
self._check_and_report_console_messages(force_flush=True)
- controller_interface = self._async_rpc_client.controller_stub()
- executor = self.context.executor_manager.executor
- if isinstance(executor, LoopEndOperator) and executor.condition():
- self._next_iteration(executor, controller_interface)
- executor.close()
+ self.context.executor_manager.executor.close()
# stop the data processing thread
self.data_processor.stop()
self.context.state_manager.transit_to(WorkerState.COMPLETED)
self.context.statistics_manager.update_total_execution_time(time.time_ns())
+ controller_interface = self._async_rpc_client.controller_stub()
controller_interface.worker_execution_completed(EmptyRequest())
self.context.close()
@@ -252,7 +249,6 @@ class MainLoop(StoppableQueueBlockingRunnable):
def _process_state(self, state_: State) -> None:
self.context.state_processing_manager.current_input_state = state_
- self._switch_context()
self.process_input_state()
self._check_and_process_control()
@@ -341,7 +337,7 @@ class MainLoop(StoppableQueueBlockingRunnable):
if ecm.ecm_type != EmbeddedControlMessageType.NO_ALIGNMENT:
self.context.pause_manager.resume(PauseType.ECM_PAUSE)
- self._switch_context()
+
if self.context.tuple_processing_manager.current_internal_marker:
{
StartChannel: self._process_start_channel,
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala
index c3b3ddb234..d54a22f26b 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala
@@ -61,11 +61,7 @@ trait WorkerExecutionCompletedHandler {
.collect(Seq(statsRequest))
.flatMap(_ => {
// if entire workflow is completed, clean up
- val isWorkflowTerminal =
- cp.workflowExecution.isCompleted &&
- !cp.workflowScheduler.hasPendingRegions &&
- !cp.workflowExecutionCoordinator.hasUnfinishedRegionCoordinators
- if (isWorkflowTerminal) {
+ if (cp.workflowExecution.isCompleted) {
// after query result come back: send completed event, cleanup ,and
kill workflow
sendToClient(ExecutionStateUpdate(cp.workflowExecution.getState))
cp.controllerTimerService.disableStatusUpdate()