This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch xinyuan-state-only
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/xinyuan-state-only by this
push:
new 42201cba49 fix fmt
42201cba49 is described below
commit 42201cba499562faedd34a498ad6c1501f920928
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sun Apr 19 16:13:37 2026 -0700
fix fmt
---
.../architecture/controller/WorkflowScheduler.scala | 2 ++
.../WorkerExecutionCompletedHandler.scala | 6 +++++-
.../scheduling/RegionExecutionCoordinator.scala | 19 ++-----------------
3 files changed, 9 insertions(+), 18 deletions(-)
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala
index 9dcf3ad4bf..b1acb3c065 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala
@@ -54,4 +54,6 @@ class WorkflowScheduler(
def getNextRegions: Set[Region] = if (!schedule.hasNext) Set() else
schedule.next()
+ def hasPendingRegions: Boolean = schedule != null && schedule.hasNext
+
}
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala
index d54a22f26b..c3b3ddb234 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala
@@ -61,7 +61,11 @@ trait WorkerExecutionCompletedHandler {
.collect(Seq(statsRequest))
.flatMap(_ => {
// if entire workflow is completed, clean up
- if (cp.workflowExecution.isCompleted) {
+ val isWorkflowTerminal =
+ cp.workflowExecution.isCompleted &&
+ !cp.workflowScheduler.hasPendingRegions &&
+ !cp.workflowExecutionCoordinator.hasUnfinishedRegionCoordinators
+ if (isWorkflowTerminal) {
// after query result come back: send completed event, cleanup ,and
kill workflow
sendToClient(ExecutionStateUpdate(cp.workflowExecution.getState))
cp.controllerTimerService.disableStatusUpdate()
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
index a384f383e1..85c03081f6 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
@@ -573,23 +573,8 @@ class RegionExecutionCoordinator(
region.getOperator(outputPortId.opId).outputPorts(outputPortId.portId)._3
val schema =
schemaOptional.getOrElse(throw new IllegalStateException("Schema is
missing"))
- if
(region.getOperators.exists(_.id.logicalOpId.id.startsWith("LoopEnd-operator-")))
{
- try {
- DocumentFactory.openDocument(storageUriToAdd)
- } catch {
- case _: Exception =>
- DocumentFactory.createDocument(storageUriToAdd, schema)
- }
- try {
- DocumentFactory.openDocument(stateUriToAdd)
- } catch {
- case _: Exception =>
- DocumentFactory.createDocument(stateUriToAdd, State.schema)
- }
- } else {
- DocumentFactory.createDocument(storageUriToAdd, schema)
- DocumentFactory.createDocument(stateUriToAdd, State.schema)
- }
+ DocumentFactory.createDocument(storageUriToAdd, schema)
+ DocumentFactory.createDocument(stateUriToAdd, State.schema)
WorkflowExecutionsResource.insertOperatorPortResultUri(
eid = eid,