This is an automated email from the ASF dual-hosted git repository. aglinxinyuan pushed a commit to branch xinyuan-region-restart in repository https://gitbox.apache.org/repos/asf/texera.git
commit 49eba41b49be83905b1d56be3acae471cc97538a Author: Xinyuan Lin <[email protected]> AuthorDate: Tue Apr 21 01:42:04 2026 -0700 Add support for restarting regions --- .../architecture/common/AmberProcessor.scala | 5 +- .../controller/WorkflowScheduler.scala | 2 + .../controller/execution/WorkflowExecution.scala | 1 + .../WorkerExecutionCompletedHandler.scala | 6 ++- .../messaginglayer/NetworkInputGateway.scala | 4 ++ .../messaginglayer/NetworkOutputGateway.scala | 4 ++ .../scheduling/RegionExecutionCoordinator.scala | 57 ++++++++++++++++++---- .../scheduling/WorkflowExecutionCoordinator.scala | 50 +++++++++++++------ .../worker/promisehandlers/EndHandler.scala | 2 +- .../amber/engine/common/rpc/AsyncRPCClient.scala | 8 ++- 10 files changed, 106 insertions(+), 33 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AmberProcessor.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AmberProcessor.scala index e776307323..f1c8136fd8 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AmberProcessor.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AmberProcessor.scala @@ -21,7 +21,6 @@ package org.apache.texera.amber.engine.architecture.common import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity, ChannelIdentity} import org.apache.texera.amber.engine.architecture.messaginglayer.{ - InputGateway, NetworkInputGateway, NetworkOutputGateway } @@ -43,7 +42,7 @@ abstract class AmberProcessor( with Serializable { /** FIFO & exactly once */ - val inputGateway: InputGateway = new NetworkInputGateway(this.actorId) + val inputGateway: NetworkInputGateway = new NetworkInputGateway(this.actorId) // 1. Unified Output val outputGateway: NetworkOutputGateway = @@ -55,7 +54,7 @@ abstract class AmberProcessor( } ) // 2. RPC Layer - val asyncRPCClient = new AsyncRPCClient(outputGateway, actorId) + val asyncRPCClient = new AsyncRPCClient(inputGateway, outputGateway, actorId) val asyncRPCServer: AsyncRPCServer = new AsyncRPCServer(outputGateway, actorId) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala index c8a107e045..d7f6b06dfa 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala @@ -55,6 +55,8 @@ class WorkflowScheduler( def getNextRegions: Set[Region] = if (!schedule.hasNext) Set() else schedule.next() + def hasPendingRegions: Boolean = schedule != null && schedule.hasNext + def jumpToOperator(opId: OperatorIdentity): Unit = schedule.jumpToOperator(opId) } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala index b806479b89..2de29f31fd 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala @@ -44,6 +44,7 @@ case class WorkflowExecution() { * @throws AssertionError if the `RegionExecution` has already been initialized. */ def initRegionExecution(region: Region): RegionExecution = { + regionExecutions.remove(region.id) // ensure the region execution hasn't been initialized already. assert( !regionExecutions.contains(region.id), diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala index d54a22f26b..c3b3ddb234 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala @@ -61,7 +61,11 @@ trait WorkerExecutionCompletedHandler { .collect(Seq(statsRequest)) .flatMap(_ => { // if entire workflow is completed, clean up - if (cp.workflowExecution.isCompleted) { + val isWorkflowTerminal = + cp.workflowExecution.isCompleted && + !cp.workflowScheduler.hasPendingRegions && + !cp.workflowExecutionCoordinator.hasUnfinishedRegionCoordinators + if (isWorkflowTerminal) { // after query result come back: send completed event, cleanup ,and kill workflow sendToClient(ExecutionStateUpdate(cp.workflowExecution.getState)) cp.controllerTimerService.disableStatusUpdate() diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGateway.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGateway.scala index 5cfd8aabc0..1d3ee3cb72 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGateway.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGateway.scala @@ -86,4 +86,8 @@ class NetworkInputGateway(val actorId: ActorVirtualIdentity) enforcers += enforcer } + def removeControlChannel(from: ActorVirtualIdentity): Unit = { + inputChannels.remove(ChannelIdentity(from, actorId, isControl = true)) + } + } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkOutputGateway.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkOutputGateway.scala index 929a30f4ef..e35e819d41 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkOutputGateway.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkOutputGateway.scala @@ -94,4 +94,8 @@ class NetworkOutputGateway( idToSequenceNums.getOrElseUpdate(channelId, new AtomicLong()).getAndIncrement() } + def removeControlChannel(to: ActorVirtualIdentity): Unit = { + idToSequenceNums.remove(ChannelIdentity(actorId, to, isControl = true)) + } + } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala index e490cde3d9..428fcfd0ff 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala @@ -20,7 +20,7 @@ package org.apache.texera.amber.engine.architecture.scheduling import org.apache.pekko.pattern.gracefulStop -import com.twitter.util.{Future, Return, Throw} +import com.twitter.util.{Duration => TwitterDuration, Future, JavaTimer, Return, Throw, Timer} import org.apache.texera.amber.core.storage.DocumentFactory import org.apache.texera.amber.core.storage.VFSURIFactory.decodeURI import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity @@ -61,7 +61,7 @@ import org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutions import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicReference -import scala.concurrent.duration.Duration +import scala.concurrent.duration.{Duration => ScalaDuration} /** * The executor of a region. @@ -109,10 +109,14 @@ class RegionExecutionCoordinator( private val currentPhaseRef: AtomicReference[RegionExecutionPhase] = new AtomicReference( Unexecuted ) + private val terminationFutureRef: AtomicReference[Future[Unit]] = new AtomicReference(null) + private val killRetryTimer: Timer = new JavaTimer(true) + private val killRetryDelay: TwitterDuration = TwitterDuration.fromMilliseconds(200) /** * Sync the status of `RegionExecution` and transition this coordinator's phase to `Completed` only when the - * coordinator is currently in `ExecutingNonDependeePortsPhase` and all the ports of this region are completed. + * coordinator is currently in `ExecutingNonDependeePortsPhase`, all the ports of this region are completed, and + * all workers in this region are terminated. * * Additionally, this method will also terminate all the workers of this region: * @@ -135,12 +139,22 @@ class RegionExecutionCoordinator( return Future.Unit } - // Set this coordinator's status to be completed so that subsequent regions can be started by - // WorkflowExecutionCoordinator. - setPhase(Completed) - - // Terminate all the workers in this region. - terminateWorkers(regionExecution) + val existingTerminationFuture = terminationFutureRef.get + if (existingTerminationFuture != null) { + existingTerminationFuture + } else { + val terminationFuture = terminateWorkersWithRetry(regionExecution).flatMap { _ => + // Set this coordinator's status to be completed so that subsequent regions can be started by + // WorkflowExecutionCoordinator. + setPhase(Completed) + Future.Unit + } + if (terminationFutureRef.compareAndSet(null, terminationFuture)) { + terminationFuture + } else { + terminationFutureRef.get + } + } } private def terminateWorkers(regionExecution: RegionExecution) = { @@ -167,7 +181,9 @@ class RegionExecutionCoordinator( val actorRef = actorRefService.getActorRef(workerId) // Remove the actorRef so that no other actors can find the worker and send messages. actorRefService.removeActorRef(workerId) - gracefulStop(actorRef, Duration(5, TimeUnit.SECONDS)).asTwitter() + asyncRPCClient.inputGateway.removeControlChannel(workerId) + asyncRPCClient.outputGateway.removeControlChannel(workerId) + gracefulStop(actorRef, ScalaDuration(5, TimeUnit.SECONDS)).asTwitter() } }.toSeq @@ -191,8 +207,29 @@ class RegionExecutionCoordinator( } } + private def terminateWorkersWithRetry( + regionExecution: RegionExecution, + attempt: Int = 1 + ): Future[Unit] = { + terminateWorkers(regionExecution).rescue { case err => + logger.warn( + s"Failed to terminate region ${region.id.id} on attempt $attempt. Retrying in ${killRetryDelay.inMilliseconds} ms.", + err + ) + Future + .sleep(killRetryDelay)(killRetryTimer) + .flatMap(_ => terminateWorkersWithRetry(regionExecution, attempt + 1)) + } + } + def isCompleted: Boolean = currentPhaseRef.get == Completed + /** + * Returns the region termination future if termination has been initiated. + * This is only set by `tryCompleteRegionExecution()`. + */ + def getTerminationFutureOpt: Option[Future[Unit]] = Option(terminationFutureRef.get) + /** * This will sync and transition the region execution phase from one to another depending on its current phase: * diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala index 2b8e3ce145..37c687a27d 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala @@ -27,10 +27,15 @@ import org.apache.texera.amber.engine.architecture.common.{ AkkaActorRefMappingService, AkkaActorService } -import org.apache.texera.amber.engine.architecture.controller.{ControllerConfig, WorkflowScheduler} +import org.apache.texera.amber.engine.architecture.controller.{ + ControllerConfig, + ExecutionStateUpdate, + WorkflowScheduler +} import org.apache.texera.amber.engine.architecture.controller.execution.WorkflowExecution import org.apache.texera.amber.engine.common.rpc.AsyncRPCClient +import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable class WorkflowExecutionCoordinator( @@ -45,6 +50,7 @@ class WorkflowExecutionCoordinator( private val regionExecutionCoordinators : mutable.HashMap[RegionIdentity, RegionExecutionCoordinator] = mutable.HashMap() + private val completionNotified: AtomicBoolean = new AtomicBoolean(false) @transient var actorRefService: AkkaActorRefMappingService = _ @@ -60,18 +66,19 @@ class WorkflowExecutionCoordinator( * After the syncs, if there are no running region(s), it will start new regions (if available). */ def coordinateRegionExecutors(actorService: AkkaActorService): Future[Unit] = { - if (regionExecutionCoordinators.values.exists(!_.isCompleted)) { - // As this method is invoked by the completion of each port in a region, and regionExecutionCoordinator only - // lanuches each phase asynchronously, we need to let each current unfinished regionExecutionCoordinator - // sync its status and proceed with next phases if needed. - Future - .collect({ - regionExecutionCoordinators.values - .filter(!_.isCompleted) - .map(_.syncStatusAndTransitionRegionExecutionPhase()) - .toSeq - }) + val unfinishedRegionCoordinators = + regionExecutionCoordinators.values.filter(!_.isCompleted).toSeq + + // Trigger sync for each unfinished region. + unfinishedRegionCoordinators.foreach(_.syncStatusAndTransitionRegionExecutionPhase()) + + // Wait only for region termination futures, then re-run coordination. + val terminationFutures = unfinishedRegionCoordinators.flatMap(_.getTerminationFutureOpt) + if (terminationFutures.nonEmpty) { + return Future + .collect(terminationFutures) .unit + .flatMap(_ => coordinateRegionExecutors(actorService)) } if (regionExecutionCoordinators.values.exists(!_.isCompleted)) { @@ -80,10 +87,17 @@ class WorkflowExecutionCoordinator( } // All existing regions are completed. Start the next region (if any). + val nextRegions = workflowScheduler.getNextRegions + if (nextRegions.isEmpty) { + if (workflowExecution.isCompleted && completionNotified.compareAndSet(false, true)) { + asyncRPCClient.sendToClient(ExecutionStateUpdate(workflowExecution.getState)) + } + return Future.Unit + } + + executedRegions.append(nextRegions) Future - .collect({ - val nextRegions = workflowScheduler.getNextRegions - executedRegions.append(nextRegions) + .collect( nextRegions .map(region => { workflowExecution.initRegionExecution(region) @@ -99,7 +113,7 @@ class WorkflowExecutionCoordinator( }) .map(_.syncStatusAndTransitionRegionExecutionPhase()) .toSeq - }) + ) .unit } @@ -121,4 +135,8 @@ class WorkflowExecutionCoordinator( workflowScheduler.jumpToOperator(opId) } + def hasUnfinishedRegionCoordinators: Boolean = { + regionExecutionCoordinators.values.exists(!_.isCompleted) + } + } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndHandler.scala index 2a6a20b3d3..0504e66f52 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndHandler.scala @@ -48,8 +48,8 @@ trait EndHandler { s"Received EndHandler before all messages are processed. Unprocessed messages: " + s"${dp.inputManager.inputMessageQueue.peek()}" ) + return Future.exception(new IllegalStateException("worker still has unprocessed messages")) } - assert(dp.inputManager.inputMessageQueue.isEmpty) // Now we can safely acknowledge that this worker can be terminated. EmptyReturn() } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCClient.scala b/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCClient.scala index 704ebd7f47..f7e26803b4 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCClient.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCClient.scala @@ -27,7 +27,10 @@ import org.apache.texera.amber.core.virtualidentity.{ EmbeddedControlMessageIdentity } import org.apache.texera.amber.engine.architecture.controller.ClientEvent -import org.apache.texera.amber.engine.architecture.messaginglayer.NetworkOutputGateway +import org.apache.texera.amber.engine.architecture.messaginglayer.{ + NetworkInputGateway, + NetworkOutputGateway +} import org.apache.texera.amber.engine.architecture.rpc.controlcommands._ import org.apache.texera.amber.engine.architecture.rpc.controllerservice.ControllerServiceFs2Grpc import org.apache.texera.amber.engine.architecture.rpc.controlreturns.{ @@ -125,7 +128,8 @@ object AsyncRPCClient { } class AsyncRPCClient( - outputGateway: NetworkOutputGateway, + val inputGateway: NetworkInputGateway, + val outputGateway: NetworkOutputGateway, val actorId: ActorVirtualIdentity ) extends AmberLogging {
