This is an automated email from the ASF dual-hosted git repository.

linxinyuan pushed a commit to branch xinyuan-loop-feb
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/xinyuan-loop-feb by this push:
     new 2aa0bbc3bf fix fmt
2aa0bbc3bf is described below

commit 2aa0bbc3bf6983ea47513a0abf8bc23e9efdf51f
Author: Xinyuan Lin <[email protected]>
AuthorDate: Thu Apr 16 03:09:36 2026 -0700

    fix fmt
---
 amber/src/main/python/core/runnables/main_loop.py           |  9 +++++++--
 .../scheduling/RegionExecutionCoordinator.scala             | 13 ++++++++-----
 2 files changed, 15 insertions(+), 7 deletions(-)

diff --git a/amber/src/main/python/core/runnables/main_loop.py 
b/amber/src/main/python/core/runnables/main_loop.py
index 614f68ef0a..d669a7456e 100644
--- a/amber/src/main/python/core/runnables/main_loop.py
+++ b/amber/src/main/python/core/runnables/main_loop.py
@@ -106,8 +106,10 @@ class MainLoop(StoppableQueueBlockingRunnable):
         controller_interface = self._async_rpc_client.controller_stub()
         executor = self.context.executor_manager.executor
         if isinstance(executor, LoopEndOperator) and executor.condition():
+            id = executor.loop_start_id()
+            print("rewgerger", id)
             controller_interface.iteration_completed(
-                
IterationCompletedRequest(OperatorIdentity(executor.loop_start_id()))
+                IterationCompletedRequest(OperatorIdentity(id))
             )
             uri = executor.state["LoopStartStateURI"]
             del executor.state["LoopStartStateURI"]
@@ -208,7 +210,10 @@ class MainLoop(StoppableQueueBlockingRunnable):
         output_state = self.context.state_processing_manager.get_output_state()
         self._switch_context()
         if output_state is not None:
-            if isinstance(self.context.executor_manager.executor, 
LoopStartOperator):
+            if (
+                isinstance(self.context.executor_manager.executor, 
LoopStartOperator)
+                and "LoopStartId" not in output_state.to_dict()
+            ):
                 output_state.add(
                     "LoopStartId",
                     self.context.worker_id.split("-", 1)[1].rsplit("-main-0", 
1)[0],
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
index 10e1dce0f6..1f70f9c510 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
@@ -567,6 +567,7 @@ class RegionExecutionCoordinator(
     portConfigs.foreach {
       case (outputPortId, portConfig) =>
         val storageUriToAdd = portConfig.storageURI
+        val stateUriToAdd = 
StateMaterialization.stateUriFromResultUri(storageUriToAdd)
         val (_, eid, _, _) = decodeURI(storageUriToAdd)
         val schemaOptional =
           
region.getOperator(outputPortId.opId).outputPorts(outputPortId.portId)._3
@@ -580,15 +581,17 @@ class RegionExecutionCoordinator(
             case _: Exception =>
               DocumentFactory.createDocument(storageUriToAdd, schema)
           }
+          try {
+            DocumentFactory.openDocument(stateUriToAdd)
+          } catch {
+            case _: Exception =>
+              DocumentFactory.createDocument(stateUriToAdd, 
StateMaterialization.schema)
+          }
         } else {
           DocumentFactory.createDocument(storageUriToAdd, schema)
+          DocumentFactory.createDocument(stateUriToAdd, 
StateMaterialization.schema)
         }
 
-        DocumentFactory.createDocument(
-          StateMaterialization.stateUriFromResultUri(storageUriToAdd),
-          StateMaterialization.schema
-        )
-
         WorkflowExecutionsResource.insertOperatorPortResultUri(
           eid = eid,
           globalPortId = outputPortId,

Reply via email to