This is an automated email from the ASF dual-hosted git repository.
linxinyuan pushed a commit to branch xinyuan-loop-feb
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/xinyuan-loop-feb by this push:
new 2aa0bbc3bf fix fmt
2aa0bbc3bf is described below
commit 2aa0bbc3bf6983ea47513a0abf8bc23e9efdf51f
Author: Xinyuan Lin <[email protected]>
AuthorDate: Thu Apr 16 03:09:36 2026 -0700
fix fmt
---
amber/src/main/python/core/runnables/main_loop.py | 9 +++++++--
.../scheduling/RegionExecutionCoordinator.scala | 13 ++++++++-----
2 files changed, 15 insertions(+), 7 deletions(-)
diff --git a/amber/src/main/python/core/runnables/main_loop.py
b/amber/src/main/python/core/runnables/main_loop.py
index 614f68ef0a..d669a7456e 100644
--- a/amber/src/main/python/core/runnables/main_loop.py
+++ b/amber/src/main/python/core/runnables/main_loop.py
@@ -106,8 +106,10 @@ class MainLoop(StoppableQueueBlockingRunnable):
controller_interface = self._async_rpc_client.controller_stub()
executor = self.context.executor_manager.executor
if isinstance(executor, LoopEndOperator) and executor.condition():
+ id = executor.loop_start_id()
+ print("rewgerger", id)
controller_interface.iteration_completed(
-
IterationCompletedRequest(OperatorIdentity(executor.loop_start_id()))
+ IterationCompletedRequest(OperatorIdentity(id))
)
uri = executor.state["LoopStartStateURI"]
del executor.state["LoopStartStateURI"]
@@ -208,7 +210,10 @@ class MainLoop(StoppableQueueBlockingRunnable):
output_state = self.context.state_processing_manager.get_output_state()
self._switch_context()
if output_state is not None:
- if isinstance(self.context.executor_manager.executor,
LoopStartOperator):
+ if (
+ isinstance(self.context.executor_manager.executor,
LoopStartOperator)
+ and "LoopStartId" not in output_state.to_dict()
+ ):
output_state.add(
"LoopStartId",
self.context.worker_id.split("-", 1)[1].rsplit("-main-0",
1)[0],
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
index 10e1dce0f6..1f70f9c510 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
@@ -567,6 +567,7 @@ class RegionExecutionCoordinator(
portConfigs.foreach {
case (outputPortId, portConfig) =>
val storageUriToAdd = portConfig.storageURI
+ val stateUriToAdd =
StateMaterialization.stateUriFromResultUri(storageUriToAdd)
val (_, eid, _, _) = decodeURI(storageUriToAdd)
val schemaOptional =
region.getOperator(outputPortId.opId).outputPorts(outputPortId.portId)._3
@@ -580,15 +581,17 @@ class RegionExecutionCoordinator(
case _: Exception =>
DocumentFactory.createDocument(storageUriToAdd, schema)
}
+ try {
+ DocumentFactory.openDocument(stateUriToAdd)
+ } catch {
+ case _: Exception =>
+ DocumentFactory.createDocument(stateUriToAdd,
StateMaterialization.schema)
+ }
} else {
DocumentFactory.createDocument(storageUriToAdd, schema)
+ DocumentFactory.createDocument(stateUriToAdd,
StateMaterialization.schema)
}
- DocumentFactory.createDocument(
- StateMaterialization.stateUriFromResultUri(storageUriToAdd),
- StateMaterialization.schema
- )
-
WorkflowExecutionsResource.insertOperatorPortResultUri(
eid = eid,
globalPortId = outputPortId,