This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 0bfbf616d9 feat(engine): add jump-to-operator support (#4444)
0bfbf616d9 is described below

commit 0bfbf616d975c54e7a3e26f7667bd69d33b8cb9b
Author: Xinyuan Lin <[email protected]>
AuthorDate: Fri May 1 23:08:07 2026 -0700

    feat(engine): add jump-to-operator support (#4444)
    
    ### What changes were proposed in this PR?
    
    Add a generic controller-side primitive for jumping execution to the
    region containing a target operator (`JumpToOperatorRegion`).
    
    Design:
    - `Schedule` stays a minimal data class — a `Map[Int, Set[Region]]`
    keyed by contiguous `0..N-1` levels (enforced by a `require` invariant),
    an `initialLevelIndex` that seeds the iteration cursor on construction
    or `copy(...)`, and `Iterator[Set[Region]]` semantics so consumers walk
    it directly. No factory methods on `Schedule` produce other Schedules;
    `case class.copy(...)` is used at call sites instead.
    - `WorkflowExecutionCoordinator` exposes its current schedule as a
    public `var schedule` (no `getSchedule`/`replaceSchedule` wrappers).
    Normal execution mutates the cursor in place; a jump replaces the var
    with a new schedule via `copy(initialLevelIndex = targetLevel)`.
    - The jump computation lives in `JumpToOperatorRegionHandler`: it scans
    the coordinator's `schedule.levelSets` for the level containing the
    target operator and assigns a fresh schedule back to the coordinator. No
    precomputed operator→level map — each Schedule is single-use, so a map
    would cost the same as one linear scan.
    - `ControllerProcessor` constructs the coordinator with no schedule arg
    (the coordinator defaults to `Schedule(Map.empty)` until populated).
    `Controller.initState` assigns the real schedule into the coordinator
    immediately after `WorkflowScheduler.updateSchedule(physicalPlan)`.
    
    Out of scope (intentional): loop-operator logic, region-restart logic,
    state materialization. The primitive is generic so those features can be
    built on top.
    
    ### Any related issues, documentation, discussions?
    
    Closes #4443
    
    Precursor test coverage for related modules:
    - #4562 — `ScheduleSpec` covering `Schedule` iterator semantics (open)
    - #4564 — `WorkflowSchedulerSpec` covering `WorkflowScheduler` contract
    (merged)
    
    ### How was this PR tested?
    
    - `sbt "WorkflowExecutionService/compile"` — passes.
    - `sbt "WorkflowExecutionService/testOnly
    
org.apache.texera.amber.engine.architecture.scheduling.WorkflowExecutionCoordinatorSpec"`
    — 8/8 tests pass (1 pre-existing kill-sync test plus 7 added by this PR
    exercising jump behavior).
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: ChatGPT (Codex), Claude Code (Claude Opus 4.7)
    
    ---------
    
    Signed-off-by: Xinyuan Lin <[email protected]>
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
 .../engine/architecture/rpc/controlcommands.proto  |   7 +-
 .../architecture/rpc/controllerservice.proto       |   3 +-
 .../amber/engine/architecture/rpc/__init__.py      |  43 +++++
 .../architecture/controller/Controller.scala       |   1 +
 .../ControllerAsyncRPCHandlerInitializer.scala     |   1 +
 .../controller/ControllerProcessor.scala           |   1 -
 .../JumpToOperatorRegionHandler.scala              |  49 ++++++
 .../engine/architecture/scheduling/Schedule.scala  |  12 +-
 .../scheduling/WorkflowExecutionCoordinator.scala  |   5 +-
 .../WorkflowExecutionCoordinatorSpec.scala         | 180 ++++++++++++++++++++-
 10 files changed, 290 insertions(+), 12 deletions(-)

diff --git 
a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto
 
b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto
index b22c1bdf7c..1f55927e4a 100644
--- 
a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto
+++ 
b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto
@@ -45,6 +45,7 @@ message ControlRequest {
     WorkerStateUpdatedRequest workerStateUpdatedRequest = 8;
     LinkWorkersRequest linkWorkersRequest = 9;
     WorkflowReconfigureRequest workflowReconfigureRequest = 10;
+    JumpToOperatorRegionRequest jumpToOperatorRegionRequest = 11;
 
     // request for worker
     AddInputChannelRequest addInputChannelRequest = 50;
@@ -272,4 +273,8 @@ enum StatisticsUpdateTarget {
 message QueryStatisticsRequest{
   repeated core.ActorVirtualIdentity filterByWorkers = 1;
   StatisticsUpdateTarget updateTarget = 2;
-}
\ No newline at end of file
+}
+
+message JumpToOperatorRegionRequest{
+  core.OperatorIdentity targetOperatorId = 1 [(scalapb.field).no_box = true];
+}
diff --git 
a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto
 
b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto
index 27b4727ee9..0932a7b914 100644
--- 
a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto
+++ 
b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto
@@ -42,8 +42,9 @@ service ControllerService {
   rpc PauseWorkflow(EmptyRequest) returns (EmptyReturn);
   rpc WorkerStateUpdated(WorkerStateUpdatedRequest) returns (EmptyReturn);
   rpc WorkerExecutionCompleted(EmptyRequest) returns (EmptyReturn);
+  rpc JumpToOperatorRegion(JumpToOperatorRegionRequest) returns (EmptyReturn);
   rpc LinkWorkers(LinkWorkersRequest) returns (EmptyReturn);
   rpc ControllerInitiateQueryStatistics(QueryStatisticsRequest) returns 
(EmptyReturn);
   rpc RetryWorkflow(RetryWorkflowRequest) returns (EmptyReturn);
   rpc ReconfigureWorkflow(WorkflowReconfigureRequest) returns (EmptyReturn);
-}
\ No newline at end of file
+}
diff --git 
a/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py
 
b/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py
index 77d51933af..2bad2b0bfb 100644
--- 
a/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py
+++ 
b/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py
@@ -101,6 +101,9 @@ class ControlRequest(betterproto.Message):
     workflow_reconfigure_request: "WorkflowReconfigureRequest" = (
         betterproto.message_field(10, group="sealed_value")
     )
+    jump_to_operator_region_request: "JumpToOperatorRegionRequest" = 
betterproto.message_field(
+        11, group="sealed_value"
+    )
     add_input_channel_request: "AddInputChannelRequest" = 
betterproto.message_field(
         50, group="sealed_value"
     )
@@ -385,6 +388,11 @@ class QueryStatisticsRequest(betterproto.Message):
     update_target: "StatisticsUpdateTarget" = betterproto.enum_field(2)
 
 
+@dataclass(eq=False, repr=False)
+class JumpToOperatorRegionRequest(betterproto.Message):
+    target_operator_id: "___core__.OperatorIdentity" = 
betterproto.message_field(1)
+
+
 @dataclass(eq=False, repr=False)
 class ControlReturn(betterproto.Message):
     """The generic return message"""
@@ -1251,6 +1259,23 @@ class ControllerServiceStub(betterproto.ServiceStub):
             metadata=metadata,
         )
 
+    async def jump_to_operator_region(
+        self,
+        jump_to_operator_region_request: "JumpToOperatorRegionRequest",
+        *,
+        timeout: Optional[float] = None,
+        deadline: Optional["Deadline"] = None,
+        metadata: Optional["MetadataLike"] = None
+    ) -> "EmptyReturn":
+        return await self._unary_unary(
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/JumpToOperatorRegion",
+            jump_to_operator_region_request,
+            EmptyReturn,
+            timeout=timeout,
+            deadline=deadline,
+            metadata=metadata,
+        )
+
     async def link_workers(
         self,
         link_workers_request: "LinkWorkersRequest",
@@ -1923,6 +1948,11 @@ class ControllerServiceBase(ServiceBase):
     ) -> "EmptyReturn":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
+    async def jump_to_operator_region(
+        self, jump_to_operator_region_request: "JumpToOperatorRegionRequest"
+    ) -> "EmptyReturn":
+        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
+
     async def link_workers(
         self, link_workers_request: "LinkWorkersRequest"
     ) -> "EmptyReturn":
@@ -2032,6 +2062,13 @@ class ControllerServiceBase(ServiceBase):
         response = await self.worker_execution_completed(request)
         await stream.send_message(response)
 
+    async def __rpc_jump_to_operator_region(
+        self, stream: "grpclib.server.Stream[JumpToOperatorRegionRequest, 
EmptyReturn]"
+    ) -> None:
+        request = await stream.recv_message()
+        response = await self.jump_to_operator_region(request)
+        await stream.send_message(response)
+
     async def __rpc_link_workers(
         self, stream: "grpclib.server.Stream[LinkWorkersRequest, EmptyReturn]"
     ) -> None:
@@ -2134,6 +2171,12 @@ class ControllerServiceBase(ServiceBase):
                 EmptyRequest,
                 EmptyReturn,
             ),
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/JumpToOperatorRegion":
 grpclib.const.Handler(
+                self.__rpc_jump_to_operator_region,
+                grpclib.const.Cardinality.UNARY_UNARY,
+                JumpToOperatorRegionRequest,
+                EmptyReturn,
+            ),
             
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/LinkWorkers":
 grpclib.const.Handler(
                 self.__rpc_link_workers,
                 grpclib.const.Cardinality.UNARY_UNARY,
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/Controller.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/Controller.scala
index daa977d857..512a3342ce 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/Controller.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/Controller.scala
@@ -116,6 +116,7 @@ class Controller(
   override def initState(): Unit = {
     attachRuntimeServicesToCPState()
     cp.workflowScheduler.updateSchedule(physicalPlan)
+    cp.workflowExecutionCoordinator.schedule = cp.workflowScheduler.getSchedule
 
     val regions: List[(Long, List[String])] =
       cp.workflowScheduler.getSchedule.getRegions.map { region =>
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala
index 2902173364..7e5a904716 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala
@@ -34,6 +34,7 @@ class ControllerAsyncRPCHandlerInitializer(
     with AmberLogging
     with LinkWorkersHandler
     with WorkerExecutionCompletedHandler
+    with JumpToOperatorRegionHandler
     with WorkerStateUpdatedHandler
     with PauseHandler
     with QueryWorkerStatisticsHandler
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala
index 7a8e94cf3a..3ff8e7d978 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala
@@ -44,7 +44,6 @@ class ControllerProcessor(
   val workflowScheduler: WorkflowScheduler =
     new WorkflowScheduler(workflowContext, actorId)
   val workflowExecutionCoordinator: WorkflowExecutionCoordinator = new 
WorkflowExecutionCoordinator(
-    () => this.workflowScheduler.getNextRegions,
     workflowExecution,
     controllerConfig,
     asyncRPCClient
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/JumpToOperatorRegionHandler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/JumpToOperatorRegionHandler.scala
new file mode 100644
index 0000000000..0047efe45f
--- /dev/null
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/JumpToOperatorRegionHandler.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.controller.promisehandlers
+
+import com.twitter.util.Future
+import 
org.apache.texera.amber.engine.architecture.controller.ControllerAsyncRPCHandlerInitializer
+import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
+  AsyncRPCContext,
+  JumpToOperatorRegionRequest
+}
+import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn
+
+trait JumpToOperatorRegionHandler {
+  this: ControllerAsyncRPCHandlerInitializer =>
+
+  override def jumpToOperatorRegion(
+      msg: JumpToOperatorRegionRequest,
+      ctx: AsyncRPCContext
+  ): Future[EmptyReturn] = {
+    val coordinator = cp.workflowExecutionCoordinator
+    coordinator.schedule.levelSets
+      .collectFirst {
+        case (level, regions)
+            if regions.exists(_.getOperators.exists(_.id.logicalOpId == 
msg.targetOperatorId)) =>
+          level
+      }
+      .foreach { targetLevel =>
+        coordinator.schedule = coordinator.schedule.copy(initialLevelIndex = 
targetLevel)
+      }
+    EmptyReturn()
+  }
+}
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala
index 6f34c9ed1e..9c03d07a62 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala
@@ -19,8 +19,16 @@
 
 package org.apache.texera.amber.engine.architecture.scheduling
 
-case class Schedule(private val levelSets: Map[Int, Set[Region]]) extends 
Iterator[Set[Region]] {
-  private var currentLevel = levelSets.keys.minOption.getOrElse(0)
+case class Schedule(
+    levelSets: Map[Int, Set[Region]],
+    initialLevelIndex: Int = 0
+) extends Iterator[Set[Region]] {
+  require(
+    levelSets.keys.toSet == (0 until levelSets.size).toSet,
+    s"Schedule level keys must be contiguous starting at 0, got: 
${levelSets.keys.toSeq.sorted}"
+  )
+
+  private var currentLevel: Int = initialLevelIndex
 
   def getRegions: List[Region] = levelSets.values.flatten.toList
 
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
index 4b639fc241..df504bf92d 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
@@ -35,12 +35,13 @@ import java.util.concurrent.atomic.AtomicBoolean
 import scala.collection.mutable
 
 class WorkflowExecutionCoordinator(
-    getNextRegions: () => Set[Region],
     workflowExecution: WorkflowExecution,
     controllerConfig: ControllerConfig,
     asyncRPCClient: AsyncRPCClient
 ) extends LazyLogging {
 
+  var schedule: Schedule = Schedule(Map.empty)
+
   private val executedRegions: mutable.ListBuffer[Set[Region]] = 
mutable.ListBuffer()
 
   private val regionExecutionCoordinators
@@ -83,7 +84,7 @@ class WorkflowExecutionCoordinator(
     }
 
     // All existing regions are completed. Start the next region (if any).
-    val nextRegions = getNextRegions()
+    val nextRegions = if (!schedule.hasNext) Set.empty[Region] else 
schedule.next()
     if (nextRegions.isEmpty) {
       if (workflowExecution.isCompleted && 
completionNotified.compareAndSet(false, true)) {
         
asyncRPCClient.sendToClient(ExecutionStateUpdate(workflowExecution.getState))
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinatorSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinatorSpec.scala
index f4372e8b57..f5fc17f8e0 100644
--- 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinatorSpec.scala
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinatorSpec.scala
@@ -21,6 +21,14 @@ package 
org.apache.texera.amber.engine.architecture.scheduling
 
 import org.apache.pekko.actor.ActorSystem
 import org.apache.pekko.testkit.TestKit
+import org.apache.texera.amber.core.executor.OpExecInitInfo
+import org.apache.texera.amber.core.virtualidentity.{
+  ExecutionIdentity,
+  OperatorIdentity,
+  PhysicalOpIdentity,
+  WorkflowIdentity
+}
+import org.apache.texera.amber.core.workflow.PhysicalOp
 import org.apache.texera.amber.engine.architecture.controller.ControllerConfig
 import 
org.apache.texera.amber.engine.architecture.controller.execution.WorkflowExecution
 import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn
@@ -29,8 +37,6 @@ import org.apache.texera.amber.engine.common.AmberRuntime
 import org.scalatest.BeforeAndAfterAll
 import org.scalatest.flatspec.AnyFlatSpecLike
 
-import scala.collection.mutable
-
 class WorkflowExecutionCoordinatorSpec
     extends TestKit(ActorSystem("WorkflowExecutionCoordinatorSpec", 
AmberRuntime.akkaConfig))
     with AnyFlatSpecLike
@@ -41,7 +47,61 @@ class WorkflowExecutionCoordinatorSpec
     TestKit.shutdownActorSystem(system)
   }
 
-  "WorkflowExecutionCoordinator" should "start the next region only after 
previous region termination succeeds" in {
+  // -- Helpers used only by the jump-to-operator-region tests --
+
+  private def jumpRegion(regionId: Long, opId: String): Region = {
+    val physicalOp = PhysicalOp(
+      PhysicalOpIdentity(OperatorIdentity(opId), "main"),
+      WorkflowIdentity(0),
+      ExecutionIdentity(0),
+      OpExecInitInfo.Empty
+    )
+    Region(RegionIdentity(regionId), Set(physicalOp), Set.empty)
+  }
+
+  private def threeLevelSchedule(): (Region, Region, Region, Schedule) = {
+    val first = jumpRegion(1, "first")
+    val second = jumpRegion(2, "second")
+    val third = jumpRegion(3, "third")
+    val schedule = Schedule(
+      Map(
+        0 -> Set(first),
+        1 -> Set(second),
+        2 -> Set(third)
+      )
+    )
+    (first, second, third, schedule)
+  }
+
+  private def newJumpCoordinator(schedule: Schedule): 
WorkflowExecutionCoordinator = {
+    val coordinator = new WorkflowExecutionCoordinator(WorkflowExecution(), 
null, null)
+    coordinator.schedule = schedule
+    coordinator
+  }
+
+  private def nextRegions(coordinator: WorkflowExecutionCoordinator): 
Set[Region] = {
+    val schedule = coordinator.schedule
+    if (schedule.hasNext) schedule.next() else Set.empty
+  }
+
+  // Mirrors what JumpToOperatorRegionHandler does: read the current schedule, 
scan for the
+  // level containing the target operator, and replace the schedule with a 
copy whose cursor is
+  // at that level.
+  private def jumpTo(coordinator: WorkflowExecutionCoordinator, opName: 
String): Unit = {
+    val opId = OperatorIdentity(opName)
+    val schedule = coordinator.schedule
+    schedule.levelSets
+      .collectFirst {
+        case (level, regions) if 
regions.exists(_.getOperators.exists(_.id.logicalOpId == opId)) =>
+          level
+      }
+      .foreach { targetLevel =>
+        coordinator.schedule = schedule.copy(initialLevelIndex = targetLevel)
+      }
+  }
+
+  "WorkflowExecutionCoordinator" should
+    "start the next region only after previous region termination succeeds" in 
{
     val firstOp = createSourceOp("first-op")
     val firstWorkerId = createWorkerId(firstOp)
     val firstRegion = createSingleWorkerRegion(1, firstOp, firstWorkerId)
@@ -64,13 +124,12 @@ class WorkflowExecutionCoordinatorSpec
     registerLiveWorker(controller.actorRefService, firstWorkerId)
     registerLiveWorker(controller.actorRefService, secondWorkerId)
 
-    val nextRegionLevels = mutable.Queue(Set(firstRegion), Set(secondRegion))
     val workflowCoordinator = new WorkflowExecutionCoordinator(
-      () => if (nextRegionLevels.nonEmpty) nextRegionLevels.dequeue() else 
Set.empty,
       workflowExecution,
       ControllerConfig(None, None, None, None),
       rpcProbe.asyncRPCClient
     )
+    workflowCoordinator.schedule = Schedule(Map(0 -> Set(firstRegion), 1 -> 
Set(secondRegion)))
     workflowCoordinator.setupActorRefService(controller.actorRefService)
 
     
await(workflowCoordinator.coordinateRegionExecutors(controller.actorService))
@@ -90,4 +149,115 @@ class WorkflowExecutionCoordinatorSpec
     assert(rpcProbe.initializedWorkers.contains(secondWorkerId))
     assert(rpcProbe.startedWorkers.contains(secondWorkerId))
   }
+
+  "Jumping to an operator's region" should
+    "make the next scheduled region contain the target operator's region" in {
+    val (first, second, _, schedule) = threeLevelSchedule()
+    val coordinator = newJumpCoordinator(schedule)
+
+    assert(nextRegions(coordinator) == Set(first))
+    assert(nextRegions(coordinator) == Set(second))
+
+    jumpTo(coordinator, "first")
+
+    assert(nextRegions(coordinator) == Set(first))
+  }
+
+  it should "support multiple sequential jumps interleaved with region pulls" 
in {
+    val (first, second, third, schedule) = threeLevelSchedule()
+    val coordinator = newJumpCoordinator(schedule)
+
+    assert(nextRegions(coordinator) == Set(first))
+    assert(nextRegions(coordinator) == Set(second))
+
+    jumpTo(coordinator, "first")
+    assert(nextRegions(coordinator) == Set(first))
+
+    jumpTo(coordinator, "second")
+    assert(nextRegions(coordinator) == Set(second))
+    assert(nextRegions(coordinator) == Set(third))
+
+    jumpTo(coordinator, "first")
+    assert(nextRegions(coordinator) == Set(first))
+  }
+
+  it should "be a no-op when the target operator is not in any scheduled 
region" in {
+    val (first, second, _, schedule) = threeLevelSchedule()
+    val coordinator = newJumpCoordinator(schedule)
+
+    assert(nextRegions(coordinator) == Set(first))
+
+    jumpTo(coordinator, "does-not-exist")
+
+    // Iteration position must be unaffected by an unknown target.
+    assert(nextRegions(coordinator) == Set(second))
+  }
+
+  it should "leave the schedule untouched when called repeatedly with unknown 
operators" in {
+    val (first, second, third, schedule) = threeLevelSchedule()
+    val coordinator = newJumpCoordinator(schedule)
+
+    jumpTo(coordinator, "ghost-1")
+    jumpTo(coordinator, "ghost-2")
+    jumpTo(coordinator, "ghost-3")
+
+    assert(nextRegions(coordinator) == Set(first))
+    assert(nextRegions(coordinator) == Set(second))
+    assert(nextRegions(coordinator) == Set(third))
+  }
+
+  it should "allow jumping back to the first region after the schedule is 
exhausted" in {
+    val (first, second, third, schedule) = threeLevelSchedule()
+    val coordinator = newJumpCoordinator(schedule)
+
+    assert(nextRegions(coordinator) == Set(first))
+    assert(nextRegions(coordinator) == Set(second))
+    assert(nextRegions(coordinator) == Set(third))
+    assert(nextRegions(coordinator) == Set.empty)
+
+    jumpTo(coordinator, "first")
+    assert(nextRegions(coordinator) == Set(first))
+  }
+
+  it should "support jumping forward past regions that have not yet been 
pulled" in {
+    val (first, _, third, schedule) = threeLevelSchedule()
+    val coordinator = newJumpCoordinator(schedule)
+
+    assert(nextRegions(coordinator) == Set(first))
+
+    jumpTo(coordinator, "third")
+    assert(nextRegions(coordinator) == Set(third))
+    assert(nextRegions(coordinator) == Set.empty)
+  }
+
+  it should "replay the target-onward range each time it jumps back" in {
+    // Schedule ABCDEF: jumping from E back to C yields the visible sequence 
ABCDECDEF; jumping
+    // again from E back to C yields ABCDECDECDEF.
+    val a = jumpRegion(1, "a")
+    val b = jumpRegion(2, "b")
+    val c = jumpRegion(3, "c")
+    val d = jumpRegion(4, "d")
+    val e = jumpRegion(5, "e")
+    val f = jumpRegion(6, "f")
+    val schedule = Schedule(
+      Map(0 -> Set(a), 1 -> Set(b), 2 -> Set(c), 3 -> Set(d), 4 -> Set(e), 5 
-> Set(f))
+    )
+    val coordinator = newJumpCoordinator(schedule)
+
+    Seq(a, b, c, d, e).foreach { region =>
+      assert(nextRegions(coordinator) == Set(region))
+    }
+
+    jumpTo(coordinator, "c")
+    Seq(c, d, e).foreach { region =>
+      assert(nextRegions(coordinator) == Set(region))
+    }
+
+    jumpTo(coordinator, "c")
+    Seq(c, d, e, f).foreach { region =>
+      assert(nextRegions(coordinator) == Set(region))
+    }
+
+    assert(nextRegions(coordinator) == Set.empty)
+  }
 }

Reply via email to