This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch xinyuan-region-restart
in repository https://gitbox.apache.org/repos/asf/texera.git

commit 49eba41b49be83905b1d56be3acae471cc97538a
Author: Xinyuan Lin <[email protected]>
AuthorDate: Tue Apr 21 01:42:04 2026 -0700

    Add support for restarting regions
---
 .../architecture/common/AmberProcessor.scala       |  5 +-
 .../controller/WorkflowScheduler.scala             |  2 +
 .../controller/execution/WorkflowExecution.scala   |  1 +
 .../WorkerExecutionCompletedHandler.scala          |  6 ++-
 .../messaginglayer/NetworkInputGateway.scala       |  4 ++
 .../messaginglayer/NetworkOutputGateway.scala      |  4 ++
 .../scheduling/RegionExecutionCoordinator.scala    | 57 ++++++++++++++++++----
 .../scheduling/WorkflowExecutionCoordinator.scala  | 50 +++++++++++++------
 .../worker/promisehandlers/EndHandler.scala        |  2 +-
 .../amber/engine/common/rpc/AsyncRPCClient.scala   |  8 ++-
 10 files changed, 106 insertions(+), 33 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AmberProcessor.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AmberProcessor.scala
index e776307323..f1c8136fd8 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AmberProcessor.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AmberProcessor.scala
@@ -21,7 +21,6 @@ package org.apache.texera.amber.engine.architecture.common
 
 import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity, 
ChannelIdentity}
 import org.apache.texera.amber.engine.architecture.messaginglayer.{
-  InputGateway,
   NetworkInputGateway,
   NetworkOutputGateway
 }
@@ -43,7 +42,7 @@ abstract class AmberProcessor(
     with Serializable {
 
   /** FIFO & exactly once */
-  val inputGateway: InputGateway = new NetworkInputGateway(this.actorId)
+  val inputGateway: NetworkInputGateway = new NetworkInputGateway(this.actorId)
 
   // 1. Unified Output
   val outputGateway: NetworkOutputGateway =
@@ -55,7 +54,7 @@ abstract class AmberProcessor(
       }
     )
   // 2. RPC Layer
-  val asyncRPCClient = new AsyncRPCClient(outputGateway, actorId)
+  val asyncRPCClient = new AsyncRPCClient(inputGateway, outputGateway, actorId)
   val asyncRPCServer: AsyncRPCServer =
     new AsyncRPCServer(outputGateway, actorId)
 
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala
index c8a107e045..d7f6b06dfa 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala
@@ -55,6 +55,8 @@ class WorkflowScheduler(
 
   def getNextRegions: Set[Region] = if (!schedule.hasNext) Set() else 
schedule.next()
 
+  def hasPendingRegions: Boolean = schedule != null && schedule.hasNext
+
   def jumpToOperator(opId: OperatorIdentity): Unit = 
schedule.jumpToOperator(opId)
 
 }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala
index b806479b89..2de29f31fd 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala
@@ -44,6 +44,7 @@ case class WorkflowExecution() {
     * @throws AssertionError if the `RegionExecution` has already been 
initialized.
     */
   def initRegionExecution(region: Region): RegionExecution = {
+    regionExecutions.remove(region.id)
     // ensure the region execution hasn't been initialized already.
     assert(
       !regionExecutions.contains(region.id),
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala
index d54a22f26b..c3b3ddb234 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala
@@ -61,7 +61,11 @@ trait WorkerExecutionCompletedHandler {
       .collect(Seq(statsRequest))
       .flatMap(_ => {
         // if entire workflow is completed, clean up
-        if (cp.workflowExecution.isCompleted) {
+        val isWorkflowTerminal =
+          cp.workflowExecution.isCompleted &&
+            !cp.workflowScheduler.hasPendingRegions &&
+            !cp.workflowExecutionCoordinator.hasUnfinishedRegionCoordinators
+        if (isWorkflowTerminal) {
           // after query result come back: send completed event, cleanup ,and 
kill workflow
           sendToClient(ExecutionStateUpdate(cp.workflowExecution.getState))
           cp.controllerTimerService.disableStatusUpdate()
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGateway.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGateway.scala
index 5cfd8aabc0..1d3ee3cb72 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGateway.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGateway.scala
@@ -86,4 +86,8 @@ class NetworkInputGateway(val actorId: ActorVirtualIdentity)
     enforcers += enforcer
   }
 
+  def removeControlChannel(from: ActorVirtualIdentity): Unit = {
+    inputChannels.remove(ChannelIdentity(from, actorId, isControl = true))
+  }
+
 }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkOutputGateway.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkOutputGateway.scala
index 929a30f4ef..e35e819d41 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkOutputGateway.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkOutputGateway.scala
@@ -94,4 +94,8 @@ class NetworkOutputGateway(
     idToSequenceNums.getOrElseUpdate(channelId, new 
AtomicLong()).getAndIncrement()
   }
 
+  def removeControlChannel(to: ActorVirtualIdentity): Unit = {
+    idToSequenceNums.remove(ChannelIdentity(actorId, to, isControl = true))
+  }
+
 }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
index e490cde3d9..428fcfd0ff 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
@@ -20,7 +20,7 @@
 package org.apache.texera.amber.engine.architecture.scheduling
 
 import org.apache.pekko.pattern.gracefulStop
-import com.twitter.util.{Future, Return, Throw}
+import com.twitter.util.{Duration => TwitterDuration, Future, JavaTimer, 
Return, Throw, Timer}
 import org.apache.texera.amber.core.storage.DocumentFactory
 import org.apache.texera.amber.core.storage.VFSURIFactory.decodeURI
 import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
@@ -61,7 +61,7 @@ import 
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutions
 
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicReference
-import scala.concurrent.duration.Duration
+import scala.concurrent.duration.{Duration => ScalaDuration}
 
 /**
   * The executor of a region.
@@ -109,10 +109,14 @@ class RegionExecutionCoordinator(
   private val currentPhaseRef: AtomicReference[RegionExecutionPhase] = new 
AtomicReference(
     Unexecuted
   )
+  private val terminationFutureRef: AtomicReference[Future[Unit]] = new 
AtomicReference(null)
+  private val killRetryTimer: Timer = new JavaTimer(true)
+  private val killRetryDelay: TwitterDuration = 
TwitterDuration.fromMilliseconds(200)
 
   /**
     * Sync the status of `RegionExecution` and transition this coordinator's 
phase to `Completed` only when the
-    * coordinator is currently in `ExecutingNonDependeePortsPhase` and all the 
ports of this region are completed.
+    * coordinator is currently in `ExecutingNonDependeePortsPhase`, all the 
ports of this region are completed, and
+    * all workers in this region are terminated.
     *
     * Additionally, this method will also terminate all the workers of this 
region:
     *
@@ -135,12 +139,22 @@ class RegionExecutionCoordinator(
       return Future.Unit
     }
 
-    // Set this coordinator's status to be completed so that subsequent 
regions can be started by
-    // WorkflowExecutionCoordinator.
-    setPhase(Completed)
-
-    // Terminate all the workers in this region.
-    terminateWorkers(regionExecution)
+    val existingTerminationFuture = terminationFutureRef.get
+    if (existingTerminationFuture != null) {
+      existingTerminationFuture
+    } else {
+      val terminationFuture = 
terminateWorkersWithRetry(regionExecution).flatMap { _ =>
+        // Set this coordinator's status to be completed so that subsequent 
regions can be started by
+        // WorkflowExecutionCoordinator.
+        setPhase(Completed)
+        Future.Unit
+      }
+      if (terminationFutureRef.compareAndSet(null, terminationFuture)) {
+        terminationFuture
+      } else {
+        terminationFutureRef.get
+      }
+    }
   }
 
   private def terminateWorkers(regionExecution: RegionExecution) = {
@@ -167,7 +181,9 @@ class RegionExecutionCoordinator(
                 val actorRef = actorRefService.getActorRef(workerId)
                 // Remove the actorRef so that no other actors can find the 
worker and send messages.
                 actorRefService.removeActorRef(workerId)
-                gracefulStop(actorRef, Duration(5, 
TimeUnit.SECONDS)).asTwitter()
+                asyncRPCClient.inputGateway.removeControlChannel(workerId)
+                asyncRPCClient.outputGateway.removeControlChannel(workerId)
+                gracefulStop(actorRef, ScalaDuration(5, 
TimeUnit.SECONDS)).asTwitter()
               }
           }.toSeq
 
@@ -191,8 +207,29 @@ class RegionExecutionCoordinator(
     }
   }
 
+  private def terminateWorkersWithRetry(
+      regionExecution: RegionExecution,
+      attempt: Int = 1
+  ): Future[Unit] = {
+    terminateWorkers(regionExecution).rescue { case err =>
+      logger.warn(
+        s"Failed to terminate region ${region.id.id} on attempt $attempt. 
Retrying in ${killRetryDelay.inMilliseconds} ms.",
+        err
+      )
+      Future
+        .sleep(killRetryDelay)(killRetryTimer)
+        .flatMap(_ => terminateWorkersWithRetry(regionExecution, attempt + 1))
+    }
+  }
+
   def isCompleted: Boolean = currentPhaseRef.get == Completed
 
+  /**
+    * Returns the region termination future if termination has been initiated.
+    * This is only set by `tryCompleteRegionExecution()`.
+    */
+  def getTerminationFutureOpt: Option[Future[Unit]] = 
Option(terminationFutureRef.get)
+
   /**
     * This will sync and transition the region execution phase from one to 
another depending on its current phase:
     *
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
index 2b8e3ce145..37c687a27d 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
@@ -27,10 +27,15 @@ import org.apache.texera.amber.engine.architecture.common.{
   AkkaActorRefMappingService,
   AkkaActorService
 }
-import 
org.apache.texera.amber.engine.architecture.controller.{ControllerConfig, 
WorkflowScheduler}
+import org.apache.texera.amber.engine.architecture.controller.{
+  ControllerConfig,
+  ExecutionStateUpdate,
+  WorkflowScheduler
+}
 import 
org.apache.texera.amber.engine.architecture.controller.execution.WorkflowExecution
 import org.apache.texera.amber.engine.common.rpc.AsyncRPCClient
 
+import java.util.concurrent.atomic.AtomicBoolean
 import scala.collection.mutable
 
 class WorkflowExecutionCoordinator(
@@ -45,6 +50,7 @@ class WorkflowExecutionCoordinator(
   private val regionExecutionCoordinators
       : mutable.HashMap[RegionIdentity, RegionExecutionCoordinator] =
     mutable.HashMap()
+  private val completionNotified: AtomicBoolean = new AtomicBoolean(false)
 
   @transient var actorRefService: AkkaActorRefMappingService = _
 
@@ -60,18 +66,19 @@ class WorkflowExecutionCoordinator(
     * After the syncs, if there are no running region(s), it will start new 
regions (if available).
     */
   def coordinateRegionExecutors(actorService: AkkaActorService): Future[Unit] 
= {
-    if (regionExecutionCoordinators.values.exists(!_.isCompleted)) {
-      // As this method is invoked by the completion of each port in a region, 
and regionExecutionCoordinator only
-      // lanuches each phase asynchronously, we need to let each current 
unfinished regionExecutionCoordinator
-      // sync its status and proceed with next phases if needed.
-      Future
-        .collect({
-          regionExecutionCoordinators.values
-            .filter(!_.isCompleted)
-            .map(_.syncStatusAndTransitionRegionExecutionPhase())
-            .toSeq
-        })
+    val unfinishedRegionCoordinators =
+      regionExecutionCoordinators.values.filter(!_.isCompleted).toSeq
+
+    // Trigger sync for each unfinished region.
+    
unfinishedRegionCoordinators.foreach(_.syncStatusAndTransitionRegionExecutionPhase())
+
+    // Wait only for region termination futures, then re-run coordination.
+    val terminationFutures = 
unfinishedRegionCoordinators.flatMap(_.getTerminationFutureOpt)
+    if (terminationFutures.nonEmpty) {
+      return Future
+        .collect(terminationFutures)
         .unit
+        .flatMap(_ => coordinateRegionExecutors(actorService))
     }
 
     if (regionExecutionCoordinators.values.exists(!_.isCompleted)) {
@@ -80,10 +87,17 @@ class WorkflowExecutionCoordinator(
     }
 
     // All existing regions are completed. Start the next region (if any).
+    val nextRegions = workflowScheduler.getNextRegions
+    if (nextRegions.isEmpty) {
+      if (workflowExecution.isCompleted && 
completionNotified.compareAndSet(false, true)) {
+        
asyncRPCClient.sendToClient(ExecutionStateUpdate(workflowExecution.getState))
+      }
+      return Future.Unit
+    }
+
+    executedRegions.append(nextRegions)
     Future
-      .collect({
-        val nextRegions = workflowScheduler.getNextRegions
-        executedRegions.append(nextRegions)
+      .collect(
         nextRegions
           .map(region => {
             workflowExecution.initRegionExecution(region)
@@ -99,7 +113,7 @@ class WorkflowExecutionCoordinator(
           })
           .map(_.syncStatusAndTransitionRegionExecutionPhase())
           .toSeq
-      })
+      )
       .unit
   }
 
@@ -121,4 +135,8 @@ class WorkflowExecutionCoordinator(
     workflowScheduler.jumpToOperator(opId)
   }
 
+  def hasUnfinishedRegionCoordinators: Boolean = {
+    regionExecutionCoordinators.values.exists(!_.isCompleted)
+  }
+
 }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndHandler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndHandler.scala
index 2a6a20b3d3..0504e66f52 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndHandler.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndHandler.scala
@@ -48,8 +48,8 @@ trait EndHandler {
         s"Received EndHandler before all messages are processed. Unprocessed 
messages: " +
           s"${dp.inputManager.inputMessageQueue.peek()}"
       )
+      return Future.exception(new IllegalStateException("worker still has 
unprocessed messages"))
     }
-    assert(dp.inputManager.inputMessageQueue.isEmpty)
     // Now we can safely acknowledge that this worker can be terminated.
     EmptyReturn()
   }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCClient.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCClient.scala
index 704ebd7f47..f7e26803b4 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCClient.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCClient.scala
@@ -27,7 +27,10 @@ import org.apache.texera.amber.core.virtualidentity.{
   EmbeddedControlMessageIdentity
 }
 import org.apache.texera.amber.engine.architecture.controller.ClientEvent
-import 
org.apache.texera.amber.engine.architecture.messaginglayer.NetworkOutputGateway
+import org.apache.texera.amber.engine.architecture.messaginglayer.{
+  NetworkInputGateway,
+  NetworkOutputGateway
+}
 import org.apache.texera.amber.engine.architecture.rpc.controlcommands._
 import 
org.apache.texera.amber.engine.architecture.rpc.controllerservice.ControllerServiceFs2Grpc
 import org.apache.texera.amber.engine.architecture.rpc.controlreturns.{
@@ -125,7 +128,8 @@ object AsyncRPCClient {
 }
 
 class AsyncRPCClient(
-    outputGateway: NetworkOutputGateway,
+    val inputGateway: NetworkInputGateway,
+    val outputGateway: NetworkOutputGateway,
     val actorId: ActorVirtualIdentity
 ) extends AmberLogging {
 

Reply via email to