This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch xinyuan-region-restart
in repository https://gitbox.apache.org/repos/asf/texera.git

commit 4c446c9e8b98513f63657d8da6c737f0d1728b96
Author: Xinyuan Lin <[email protected]>
AuthorDate: Tue Apr 21 02:09:44 2026 -0700

    Add support for restarting regions
---
 .../texera/amber/engine/architecture/common/AmberProcessor.scala  | 5 ++---
 .../architecture/controller/execution/WorkflowExecution.scala     | 1 +
 .../engine/architecture/messaginglayer/NetworkInputGateway.scala  | 4 ++++
 .../engine/architecture/messaginglayer/NetworkOutputGateway.scala | 4 ++++
 .../architecture/scheduling/RegionExecutionCoordinator.scala      | 2 ++
 .../apache/texera/amber/engine/common/rpc/AsyncRPCClient.scala    | 8 ++++++--
 6 files changed, 19 insertions(+), 5 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AmberProcessor.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AmberProcessor.scala
index e776307323..f1c8136fd8 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AmberProcessor.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AmberProcessor.scala
@@ -21,7 +21,6 @@ package org.apache.texera.amber.engine.architecture.common
 
 import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity, 
ChannelIdentity}
 import org.apache.texera.amber.engine.architecture.messaginglayer.{
-  InputGateway,
   NetworkInputGateway,
   NetworkOutputGateway
 }
@@ -43,7 +42,7 @@ abstract class AmberProcessor(
     with Serializable {
 
   /** FIFO & exactly once */
-  val inputGateway: InputGateway = new NetworkInputGateway(this.actorId)
+  val inputGateway: NetworkInputGateway = new NetworkInputGateway(this.actorId)
 
   // 1. Unified Output
   val outputGateway: NetworkOutputGateway =
@@ -55,7 +54,7 @@ abstract class AmberProcessor(
       }
     )
   // 2. RPC Layer
-  val asyncRPCClient = new AsyncRPCClient(outputGateway, actorId)
+  val asyncRPCClient = new AsyncRPCClient(inputGateway, outputGateway, actorId)
   val asyncRPCServer: AsyncRPCServer =
     new AsyncRPCServer(outputGateway, actorId)
 
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala
index b806479b89..2de29f31fd 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala
@@ -44,6 +44,7 @@ case class WorkflowExecution() {
     * @throws AssertionError if the `RegionExecution` has already been 
initialized.
     */
   def initRegionExecution(region: Region): RegionExecution = {
+    regionExecutions.remove(region.id)
     // ensure the region execution hasn't been initialized already.
     assert(
       !regionExecutions.contains(region.id),
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGateway.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGateway.scala
index 5cfd8aabc0..1d3ee3cb72 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGateway.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGateway.scala
@@ -86,4 +86,8 @@ class NetworkInputGateway(val actorId: ActorVirtualIdentity)
     enforcers += enforcer
   }
 
+  def removeControlChannel(from: ActorVirtualIdentity): Unit = {
+    inputChannels.remove(ChannelIdentity(from, actorId, isControl = true))
+  }
+
 }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkOutputGateway.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkOutputGateway.scala
index 929a30f4ef..e35e819d41 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkOutputGateway.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkOutputGateway.scala
@@ -94,4 +94,8 @@ class NetworkOutputGateway(
     idToSequenceNums.getOrElseUpdate(channelId, new 
AtomicLong()).getAndIncrement()
   }
 
+  def removeControlChannel(to: ActorVirtualIdentity): Unit = {
+    idToSequenceNums.remove(ChannelIdentity(actorId, to, isControl = true))
+  }
+
 }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
index 5be5d942e5..85355b85c2 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
@@ -182,6 +182,8 @@ class RegionExecutionCoordinator(
                 val actorRef = actorRefService.getActorRef(workerId)
                 // Remove the actorRef so that no other actors can find the 
worker and send messages.
                 actorRefService.removeActorRef(workerId)
+                asyncRPCClient.inputGateway.removeControlChannel(workerId)
+                asyncRPCClient.outputGateway.removeControlChannel(workerId)
                 gracefulStop(actorRef, ScalaDuration(5, 
TimeUnit.SECONDS)).asTwitter()
               }
           }.toSeq
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCClient.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCClient.scala
index 704ebd7f47..f7e26803b4 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCClient.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCClient.scala
@@ -27,7 +27,10 @@ import org.apache.texera.amber.core.virtualidentity.{
   EmbeddedControlMessageIdentity
 }
 import org.apache.texera.amber.engine.architecture.controller.ClientEvent
-import 
org.apache.texera.amber.engine.architecture.messaginglayer.NetworkOutputGateway
+import org.apache.texera.amber.engine.architecture.messaginglayer.{
+  NetworkInputGateway,
+  NetworkOutputGateway
+}
 import org.apache.texera.amber.engine.architecture.rpc.controlcommands._
 import 
org.apache.texera.amber.engine.architecture.rpc.controllerservice.ControllerServiceFs2Grpc
 import org.apache.texera.amber.engine.architecture.rpc.controlreturns.{
@@ -125,7 +128,8 @@ object AsyncRPCClient {
 }
 
 class AsyncRPCClient(
-    outputGateway: NetworkOutputGateway,
+    val inputGateway: NetworkInputGateway,
+    val outputGateway: NetworkOutputGateway,
     val actorId: ActorVirtualIdentity
 ) extends AmberLogging {
 

Reply via email to