This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch xinyuan-state-only
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/xinyuan-state-only by this 
push:
     new 42201cba49 fix fmt
42201cba49 is described below

commit 42201cba499562faedd34a498ad6c1501f920928
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sun Apr 19 16:13:37 2026 -0700

    fix fmt
---
 .../architecture/controller/WorkflowScheduler.scala   |  2 ++
 .../WorkerExecutionCompletedHandler.scala             |  6 +++++-
 .../scheduling/RegionExecutionCoordinator.scala       | 19 ++-----------------
 3 files changed, 9 insertions(+), 18 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala
index 9dcf3ad4bf..b1acb3c065 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala
@@ -54,4 +54,6 @@ class WorkflowScheduler(
 
   def getNextRegions: Set[Region] = if (!schedule.hasNext) Set() else 
schedule.next()
 
+  def hasPendingRegions: Boolean = schedule != null && schedule.hasNext
+
 }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala
index d54a22f26b..c3b3ddb234 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala
@@ -61,7 +61,11 @@ trait WorkerExecutionCompletedHandler {
       .collect(Seq(statsRequest))
       .flatMap(_ => {
         // if entire workflow is completed, clean up
-        if (cp.workflowExecution.isCompleted) {
+        val isWorkflowTerminal =
+          cp.workflowExecution.isCompleted &&
+            !cp.workflowScheduler.hasPendingRegions &&
+            !cp.workflowExecutionCoordinator.hasUnfinishedRegionCoordinators
+        if (isWorkflowTerminal) {
           // after query result come back: send completed event, cleanup ,and 
kill workflow
           sendToClient(ExecutionStateUpdate(cp.workflowExecution.getState))
           cp.controllerTimerService.disableStatusUpdate()
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
index a384f383e1..85c03081f6 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
@@ -573,23 +573,8 @@ class RegionExecutionCoordinator(
           
region.getOperator(outputPortId.opId).outputPorts(outputPortId.portId)._3
         val schema =
           schemaOptional.getOrElse(throw new IllegalStateException("Schema is 
missing"))
-        if 
(region.getOperators.exists(_.id.logicalOpId.id.startsWith("LoopEnd-operator-")))
 {
-          try {
-            DocumentFactory.openDocument(storageUriToAdd)
-          } catch {
-            case _: Exception =>
-              DocumentFactory.createDocument(storageUriToAdd, schema)
-          }
-          try {
-            DocumentFactory.openDocument(stateUriToAdd)
-          } catch {
-            case _: Exception =>
-              DocumentFactory.createDocument(stateUriToAdd, State.schema)
-          }
-        } else {
-          DocumentFactory.createDocument(storageUriToAdd, schema)
-          DocumentFactory.createDocument(stateUriToAdd, State.schema)
-        }
+        DocumentFactory.createDocument(storageUriToAdd, schema)
+        DocumentFactory.createDocument(stateUriToAdd, State.schema)
 
         WorkflowExecutionsResource.insertOperatorPortResultUri(
           eid = eid,

Reply via email to