This is an automated email from the ASF dual-hosted git repository. aglinxinyuan pushed a commit to branch xinyuan-region-restart in repository https://gitbox.apache.org/repos/asf/texera.git
commit 4c446c9e8b98513f63657d8da6c737f0d1728b96 Author: Xinyuan Lin <[email protected]> AuthorDate: Tue Apr 21 02:09:44 2026 -0700 Add support for restarting regions --- .../texera/amber/engine/architecture/common/AmberProcessor.scala | 5 ++--- .../architecture/controller/execution/WorkflowExecution.scala | 1 + .../engine/architecture/messaginglayer/NetworkInputGateway.scala | 4 ++++ .../engine/architecture/messaginglayer/NetworkOutputGateway.scala | 4 ++++ .../architecture/scheduling/RegionExecutionCoordinator.scala | 2 ++ .../apache/texera/amber/engine/common/rpc/AsyncRPCClient.scala | 8 ++++++-- 6 files changed, 19 insertions(+), 5 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AmberProcessor.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AmberProcessor.scala index e776307323..f1c8136fd8 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AmberProcessor.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AmberProcessor.scala @@ -21,7 +21,6 @@ package org.apache.texera.amber.engine.architecture.common import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity, ChannelIdentity} import org.apache.texera.amber.engine.architecture.messaginglayer.{ - InputGateway, NetworkInputGateway, NetworkOutputGateway } @@ -43,7 +42,7 @@ abstract class AmberProcessor( with Serializable { /** FIFO & exactly once */ - val inputGateway: InputGateway = new NetworkInputGateway(this.actorId) + val inputGateway: NetworkInputGateway = new NetworkInputGateway(this.actorId) // 1. Unified Output val outputGateway: NetworkOutputGateway = @@ -55,7 +54,7 @@ abstract class AmberProcessor( } ) // 2. RPC Layer - val asyncRPCClient = new AsyncRPCClient(outputGateway, actorId) + val asyncRPCClient = new AsyncRPCClient(inputGateway, outputGateway, actorId) val asyncRPCServer: AsyncRPCServer = new AsyncRPCServer(outputGateway, actorId) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala index b806479b89..2de29f31fd 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala @@ -44,6 +44,7 @@ case class WorkflowExecution() { * @throws AssertionError if the `RegionExecution` has already been initialized. */ def initRegionExecution(region: Region): RegionExecution = { + regionExecutions.remove(region.id) // ensure the region execution hasn't been initialized already. assert( !regionExecutions.contains(region.id), diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGateway.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGateway.scala index 5cfd8aabc0..1d3ee3cb72 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGateway.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGateway.scala @@ -86,4 +86,8 @@ class NetworkInputGateway(val actorId: ActorVirtualIdentity) enforcers += enforcer } + def removeControlChannel(from: ActorVirtualIdentity): Unit = { + inputChannels.remove(ChannelIdentity(from, actorId, isControl = true)) + } + } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkOutputGateway.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkOutputGateway.scala index 929a30f4ef..e35e819d41 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkOutputGateway.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkOutputGateway.scala @@ -94,4 +94,8 @@ class NetworkOutputGateway( idToSequenceNums.getOrElseUpdate(channelId, new AtomicLong()).getAndIncrement() } + def removeControlChannel(to: ActorVirtualIdentity): Unit = { + idToSequenceNums.remove(ChannelIdentity(actorId, to, isControl = true)) + } + } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala index 5be5d942e5..85355b85c2 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala @@ -182,6 +182,8 @@ class RegionExecutionCoordinator( val actorRef = actorRefService.getActorRef(workerId) // Remove the actorRef so that no other actors can find the worker and send messages. actorRefService.removeActorRef(workerId) + asyncRPCClient.inputGateway.removeControlChannel(workerId) + asyncRPCClient.outputGateway.removeControlChannel(workerId) gracefulStop(actorRef, ScalaDuration(5, TimeUnit.SECONDS)).asTwitter() } }.toSeq diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCClient.scala b/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCClient.scala index 704ebd7f47..f7e26803b4 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCClient.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCClient.scala @@ -27,7 +27,10 @@ import org.apache.texera.amber.core.virtualidentity.{ EmbeddedControlMessageIdentity } import org.apache.texera.amber.engine.architecture.controller.ClientEvent -import org.apache.texera.amber.engine.architecture.messaginglayer.NetworkOutputGateway +import org.apache.texera.amber.engine.architecture.messaginglayer.{ + NetworkInputGateway, + NetworkOutputGateway +} import org.apache.texera.amber.engine.architecture.rpc.controlcommands._ import org.apache.texera.amber.engine.architecture.rpc.controllerservice.ControllerServiceFs2Grpc import org.apache.texera.amber.engine.architecture.rpc.controlreturns.{ @@ -125,7 +128,8 @@ object AsyncRPCClient { } class AsyncRPCClient( - outputGateway: NetworkOutputGateway, + val inputGateway: NetworkInputGateway, + val outputGateway: NetworkOutputGateway, val actorId: ActorVirtualIdentity ) extends AmberLogging {
