This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch xinyuan-scheduler-jump
in repository https://gitbox.apache.org/repos/asf/texera.git

commit 555e55df5483a103e4282b503e33ee3b0dcc1bb2
Author: Xinyuan Lin <[email protected]>
AuthorDate: Tue Apr 21 01:20:16 2026 -0700

    Add scheduler jump-to-operator support
---
 .../engine/architecture/rpc/controlcommands.proto  |  7 +++-
 .../architecture/rpc/controllerservice.proto       |  3 +-
 .../amber/engine/architecture/rpc/__init__.py      | 43 ++++++++++++++++++++++
 .../ControllerAsyncRPCHandlerInitializer.scala     |  1 +
 .../controller/ControllerProcessor.scala           |  2 +-
 .../controller/WorkflowScheduler.scala             |  3 ++
 .../promisehandlers/JumpToOperatorHandler.scala}   | 27 +++++++++-----
 .../engine/architecture/scheduling/Schedule.scala  | 11 ++++++
 .../scheduling/WorkflowExecutionCoordinator.scala  | 11 ++++--
 9 files changed, 92 insertions(+), 16 deletions(-)

diff --git 
a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto
 
b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto
index d714f64a15..d8d8b512d0 100644
--- 
a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto
+++ 
b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto
@@ -46,6 +46,7 @@ message ControlRequest {
     PortCompletedRequest portCompletedRequest = 9;
     WorkerStateUpdatedRequest workerStateUpdatedRequest = 10;
     LinkWorkersRequest linkWorkersRequest = 11;
+    JumpToOperatorRequest jumpToOperatorRequest = 12;
 
     // request for worker
     AddInputChannelRequest addInputChannelRequest = 50;
@@ -278,4 +279,8 @@ enum StatisticsUpdateTarget {
 message QueryStatisticsRequest{
   repeated core.ActorVirtualIdentity filterByWorkers = 1;
   StatisticsUpdateTarget updateTarget = 2;
-}
\ No newline at end of file
+}
+
+message JumpToOperatorRequest{
+  core.OperatorIdentity targetOperatorId = 1 [(scalapb.field).no_box = true];
+}
diff --git 
a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto
 
b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto
index 70d189a341..25c90e3e93 100644
--- 
a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto
+++ 
b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto
@@ -42,7 +42,8 @@ service ControllerService {
   rpc PauseWorkflow(EmptyRequest) returns (EmptyReturn);
   rpc WorkerStateUpdated(WorkerStateUpdatedRequest) returns (EmptyReturn);
   rpc WorkerExecutionCompleted(EmptyRequest) returns (EmptyReturn);
+  rpc JumpToOperator(JumpToOperatorRequest) returns (EmptyReturn);
   rpc LinkWorkers(LinkWorkersRequest) returns (EmptyReturn);
   rpc ControllerInitiateQueryStatistics(QueryStatisticsRequest) returns 
(EmptyReturn);
   rpc RetryWorkflow(RetryWorkflowRequest) returns (EmptyReturn);
-}
\ No newline at end of file
+}
diff --git 
a/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py
 
b/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py
index b7522a696a..f946e7123e 100644
--- 
a/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py
+++ 
b/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py
@@ -102,6 +102,9 @@ class ControlRequest(betterproto.Message):
     link_workers_request: "LinkWorkersRequest" = betterproto.message_field(
         11, group="sealed_value"
     )
+    jump_to_operator_request: "JumpToOperatorRequest" = 
betterproto.message_field(
+        12, group="sealed_value"
+    )
     add_input_channel_request: "AddInputChannelRequest" = 
betterproto.message_field(
         50, group="sealed_value"
     )
@@ -394,6 +397,11 @@ class QueryStatisticsRequest(betterproto.Message):
     update_target: "StatisticsUpdateTarget" = betterproto.enum_field(2)
 
 
+@dataclass(eq=False, repr=False)
+class JumpToOperatorRequest(betterproto.Message):
+    target_operator_id: "___core__.OperatorIdentity" = 
betterproto.message_field(1)
+
+
 @dataclass(eq=False, repr=False)
 class ControlReturn(betterproto.Message):
     """The generic return message"""
@@ -1243,6 +1251,23 @@ class ControllerServiceStub(betterproto.ServiceStub):
             metadata=metadata,
         )
 
+    async def jump_to_operator(
+        self,
+        jump_to_operator_request: "JumpToOperatorRequest",
+        *,
+        timeout: Optional[float] = None,
+        deadline: Optional["Deadline"] = None,
+        metadata: Optional["MetadataLike"] = None
+    ) -> "EmptyReturn":
+        return await self._unary_unary(
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/JumpToOperator",
+            jump_to_operator_request,
+            EmptyReturn,
+            timeout=timeout,
+            deadline=deadline,
+            metadata=metadata,
+        )
+
     async def link_workers(
         self,
         link_workers_request: "LinkWorkersRequest",
@@ -1880,6 +1905,11 @@ class ControllerServiceBase(ServiceBase):
     ) -> "EmptyReturn":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
+    async def jump_to_operator(
+        self, jump_to_operator_request: "JumpToOperatorRequest"
+    ) -> "EmptyReturn":
+        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
+
     async def link_workers(
         self, link_workers_request: "LinkWorkersRequest"
     ) -> "EmptyReturn":
@@ -1984,6 +2014,13 @@ class ControllerServiceBase(ServiceBase):
         response = await self.worker_execution_completed(request)
         await stream.send_message(response)
 
+    async def __rpc_jump_to_operator(
+        self, stream: "grpclib.server.Stream[JumpToOperatorRequest, 
EmptyReturn]"
+    ) -> None:
+        request = await stream.recv_message()
+        response = await self.jump_to_operator(request)
+        await stream.send_message(response)
+
     async def __rpc_link_workers(
         self, stream: "grpclib.server.Stream[LinkWorkersRequest, EmptyReturn]"
     ) -> None:
@@ -2079,6 +2116,12 @@ class ControllerServiceBase(ServiceBase):
                 EmptyRequest,
                 EmptyReturn,
             ),
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/JumpToOperator":
 grpclib.const.Handler(
+                self.__rpc_jump_to_operator,
+                grpclib.const.Cardinality.UNARY_UNARY,
+                JumpToOperatorRequest,
+                EmptyReturn,
+            ),
             
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/LinkWorkers":
 grpclib.const.Handler(
                 self.__rpc_link_workers,
                 grpclib.const.Cardinality.UNARY_UNARY,
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala
index 4d9a36bab4..03a4d47968 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala
@@ -34,6 +34,7 @@ class ControllerAsyncRPCHandlerInitializer(
     with AmberLogging
     with LinkWorkersHandler
     with WorkerExecutionCompletedHandler
+    with JumpToOperatorHandler
     with WorkerStateUpdatedHandler
     with PauseHandler
     with QueryWorkerStatisticsHandler
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala
index 7a8e94cf3a..3461619cb3 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala
@@ -44,7 +44,7 @@ class ControllerProcessor(
   val workflowScheduler: WorkflowScheduler =
     new WorkflowScheduler(workflowContext, actorId)
   val workflowExecutionCoordinator: WorkflowExecutionCoordinator = new 
WorkflowExecutionCoordinator(
-    () => this.workflowScheduler.getNextRegions,
+    workflowScheduler,
     workflowExecution,
     controllerConfig,
     asyncRPCClient
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala
index 9dcf3ad4bf..c8a107e045 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala
@@ -20,6 +20,7 @@
 package org.apache.texera.amber.engine.architecture.controller
 
 import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
+import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
 import org.apache.texera.amber.core.workflow.{PhysicalPlan, WorkflowContext}
 import org.apache.texera.amber.engine.architecture.scheduling.{
   CostBasedScheduleGenerator,
@@ -54,4 +55,6 @@ class WorkflowScheduler(
 
   def getNextRegions: Set[Region] = if (!schedule.hasNext) Set() else 
schedule.next()
 
+  def jumpToOperator(opId: OperatorIdentity): Unit = 
schedule.jumpToOperator(opId)
+
 }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/JumpToOperatorHandler.scala
similarity index 50%
copy from 
amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala
copy to 
amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/JumpToOperatorHandler.scala
index 6f34c9ed1e..aad72f08e9 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/JumpToOperatorHandler.scala
@@ -17,18 +17,25 @@
  * under the License.
  */
 
-package org.apache.texera.amber.engine.architecture.scheduling
+package org.apache.texera.amber.engine.architecture.controller.promisehandlers
 
-case class Schedule(private val levelSets: Map[Int, Set[Region]]) extends 
Iterator[Set[Region]] {
-  private var currentLevel = levelSets.keys.minOption.getOrElse(0)
-
-  def getRegions: List[Region] = levelSets.values.flatten.toList
+import com.twitter.util.Future
+import 
org.apache.texera.amber.engine.architecture.controller.ControllerAsyncRPCHandlerInitializer
+import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
+  AsyncRPCContext,
+  JumpToOperatorRequest
+}
+import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn
 
-  override def hasNext: Boolean = levelSets.isDefinedAt(currentLevel)
+/** Requests the scheduler to continue from the region containing the target 
operator. */
+trait JumpToOperatorHandler {
+  this: ControllerAsyncRPCHandlerInitializer =>
 
-  override def next(): Set[Region] = {
-    val regions = levelSets(currentLevel)
-    currentLevel += 1
-    regions
+  override def jumpToOperator(
+      msg: JumpToOperatorRequest,
+      ctx: AsyncRPCContext
+  ): Future[EmptyReturn] = {
+    cp.workflowExecutionCoordinator.jumpToOperator(msg.targetOperatorId)
+    EmptyReturn()
   }
 }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala
index 6f34c9ed1e..fa8121cad5 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala
@@ -19,6 +19,8 @@
 
 package org.apache.texera.amber.engine.architecture.scheduling
 
+import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
+
 case class Schedule(private val levelSets: Map[Int, Set[Region]]) extends 
Iterator[Set[Region]] {
   private var currentLevel = levelSets.keys.minOption.getOrElse(0)
 
@@ -31,4 +33,13 @@ case class Schedule(private val levelSets: Map[Int, 
Set[Region]]) extends Iterat
     currentLevel += 1
     regions
   }
+
+  def jumpToOperator(opId: OperatorIdentity): Unit =
+    levelSets
+      .collectFirst {
+        case (level, regions)
+            if regions.exists(_.getOperators.exists(_.id.logicalOpId == opId)) 
=>
+          level
+      }
+      .foreach(currentLevel = _)
 }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
index 05585f88d8..2b8e3ce145 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
@@ -21,19 +21,20 @@ package 
org.apache.texera.amber.engine.architecture.scheduling
 
 import com.twitter.util.Future
 import com.typesafe.scalalogging.LazyLogging
+import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
 import org.apache.texera.amber.core.workflow.{GlobalPortIdentity, PhysicalLink}
 import org.apache.texera.amber.engine.architecture.common.{
   AkkaActorRefMappingService,
   AkkaActorService
 }
-import org.apache.texera.amber.engine.architecture.controller.ControllerConfig
+import 
org.apache.texera.amber.engine.architecture.controller.{ControllerConfig, 
WorkflowScheduler}
 import 
org.apache.texera.amber.engine.architecture.controller.execution.WorkflowExecution
 import org.apache.texera.amber.engine.common.rpc.AsyncRPCClient
 
 import scala.collection.mutable
 
 class WorkflowExecutionCoordinator(
-    getNextRegions: () => Set[Region],
+    workflowScheduler: WorkflowScheduler,
     workflowExecution: WorkflowExecution,
     controllerConfig: ControllerConfig,
     asyncRPCClient: AsyncRPCClient
@@ -81,7 +82,7 @@ class WorkflowExecutionCoordinator(
     // All existing regions are completed. Start the next region (if any).
     Future
       .collect({
-        val nextRegions = getNextRegions()
+        val nextRegions = workflowScheduler.getNextRegions
         executedRegions.append(nextRegions)
         nextRegions
           .map(region => {
@@ -116,4 +117,8 @@ class WorkflowExecutionCoordinator(
       .toSet
   }
 
+  def jumpToOperator(opId: OperatorIdentity): Unit = {
+    workflowScheduler.jumpToOperator(opId)
+  }
+
 }

Reply via email to