This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 0bfbf616d9 feat(engine): add jump-to-operator support (#4444)
0bfbf616d9 is described below
commit 0bfbf616d975c54e7a3e26f7667bd69d33b8cb9b
Author: Xinyuan Lin <[email protected]>
AuthorDate: Fri May 1 23:08:07 2026 -0700
feat(engine): add jump-to-operator support (#4444)
### What changes were proposed in this PR?
Add a generic controller-side primitive for jumping execution to the
region containing a target operator (`JumpToOperatorRegion`).
Design:
- `Schedule` stays a minimal data class — a `Map[Int, Set[Region]]`
keyed by contiguous `0..N-1` levels (enforced by a `require` invariant),
an `initialLevelIndex` that seeds the iteration cursor on construction
or `copy(...)`, and `Iterator[Set[Region]]` semantics so consumers walk
it directly. No factory methods on `Schedule` produce other Schedules;
`case class.copy(...)` is used at call sites instead.
- `WorkflowExecutionCoordinator` exposes its current schedule as a
public `var schedule` (no `getSchedule`/`replaceSchedule` wrappers).
Normal execution mutates the cursor in place; a jump replaces the var
with a new schedule via `copy(initialLevelIndex = targetLevel)`.
- The jump computation lives in `JumpToOperatorRegionHandler`: it scans
the coordinator's `schedule.levelSets` for the level containing the
target operator and assigns a fresh schedule back to the coordinator. No
precomputed operator→level map — each Schedule is single-use, so a map
would cost the same as one linear scan.
- `ControllerProcessor` constructs the coordinator with no schedule arg
(the coordinator defaults to `Schedule(Map.empty)` until populated).
`Controller.initState` assigns the real schedule into the coordinator
immediately after `WorkflowScheduler.updateSchedule(physicalPlan)`.
Out of scope (intentional): loop-operator logic, region-restart logic,
state materialization. The primitive is generic so those features can be
built on top.
### Any related issues, documentation, discussions?
Closes #4443
Precursor test coverage for related modules:
- #4562 — `ScheduleSpec` covering `Schedule` iterator semantics (open)
- #4564 — `WorkflowSchedulerSpec` covering `WorkflowScheduler` contract
(merged)
### How was this PR tested?
- `sbt "WorkflowExecutionService/compile"` — passes.
- `sbt "WorkflowExecutionService/testOnly
org.apache.texera.amber.engine.architecture.scheduling.WorkflowExecutionCoordinatorSpec"`
— 8/8 tests pass (1 pre-existing kill-sync test plus 7 added by this PR
exercising jump behavior).
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: ChatGPT (Codex), Claude Code (Claude Opus 4.7)
---------
Signed-off-by: Xinyuan Lin <[email protected]>
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
.../engine/architecture/rpc/controlcommands.proto | 7 +-
.../architecture/rpc/controllerservice.proto | 3 +-
.../amber/engine/architecture/rpc/__init__.py | 43 +++++
.../architecture/controller/Controller.scala | 1 +
.../ControllerAsyncRPCHandlerInitializer.scala | 1 +
.../controller/ControllerProcessor.scala | 1 -
.../JumpToOperatorRegionHandler.scala | 49 ++++++
.../engine/architecture/scheduling/Schedule.scala | 12 +-
.../scheduling/WorkflowExecutionCoordinator.scala | 5 +-
.../WorkflowExecutionCoordinatorSpec.scala | 180 ++++++++++++++++++++-
10 files changed, 290 insertions(+), 12 deletions(-)
diff --git
a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto
b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto
index b22c1bdf7c..1f55927e4a 100644
---
a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto
+++
b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto
@@ -45,6 +45,7 @@ message ControlRequest {
WorkerStateUpdatedRequest workerStateUpdatedRequest = 8;
LinkWorkersRequest linkWorkersRequest = 9;
WorkflowReconfigureRequest workflowReconfigureRequest = 10;
+ JumpToOperatorRegionRequest jumpToOperatorRegionRequest = 11;
// request for worker
AddInputChannelRequest addInputChannelRequest = 50;
@@ -272,4 +273,8 @@ enum StatisticsUpdateTarget {
message QueryStatisticsRequest{
repeated core.ActorVirtualIdentity filterByWorkers = 1;
StatisticsUpdateTarget updateTarget = 2;
-}
\ No newline at end of file
+}
+
+message JumpToOperatorRegionRequest{
+ core.OperatorIdentity targetOperatorId = 1 [(scalapb.field).no_box = true];
+}
diff --git
a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto
b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto
index 27b4727ee9..0932a7b914 100644
---
a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto
+++
b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto
@@ -42,8 +42,9 @@ service ControllerService {
rpc PauseWorkflow(EmptyRequest) returns (EmptyReturn);
rpc WorkerStateUpdated(WorkerStateUpdatedRequest) returns (EmptyReturn);
rpc WorkerExecutionCompleted(EmptyRequest) returns (EmptyReturn);
+ rpc JumpToOperatorRegion(JumpToOperatorRegionRequest) returns (EmptyReturn);
rpc LinkWorkers(LinkWorkersRequest) returns (EmptyReturn);
rpc ControllerInitiateQueryStatistics(QueryStatisticsRequest) returns
(EmptyReturn);
rpc RetryWorkflow(RetryWorkflowRequest) returns (EmptyReturn);
rpc ReconfigureWorkflow(WorkflowReconfigureRequest) returns (EmptyReturn);
-}
\ No newline at end of file
+}
diff --git
a/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py
b/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py
index 77d51933af..2bad2b0bfb 100644
---
a/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py
+++
b/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py
@@ -101,6 +101,9 @@ class ControlRequest(betterproto.Message):
workflow_reconfigure_request: "WorkflowReconfigureRequest" = (
betterproto.message_field(10, group="sealed_value")
)
+ jump_to_operator_region_request: "JumpToOperatorRegionRequest" =
betterproto.message_field(
+ 11, group="sealed_value"
+ )
add_input_channel_request: "AddInputChannelRequest" =
betterproto.message_field(
50, group="sealed_value"
)
@@ -385,6 +388,11 @@ class QueryStatisticsRequest(betterproto.Message):
update_target: "StatisticsUpdateTarget" = betterproto.enum_field(2)
+@dataclass(eq=False, repr=False)
+class JumpToOperatorRegionRequest(betterproto.Message):
+ target_operator_id: "___core__.OperatorIdentity" =
betterproto.message_field(1)
+
+
@dataclass(eq=False, repr=False)
class ControlReturn(betterproto.Message):
"""The generic return message"""
@@ -1251,6 +1259,23 @@ class ControllerServiceStub(betterproto.ServiceStub):
metadata=metadata,
)
+ async def jump_to_operator_region(
+ self,
+ jump_to_operator_region_request: "JumpToOperatorRegionRequest",
+ *,
+ timeout: Optional[float] = None,
+ deadline: Optional["Deadline"] = None,
+ metadata: Optional["MetadataLike"] = None
+ ) -> "EmptyReturn":
+ return await self._unary_unary(
+
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/JumpToOperatorRegion",
+ jump_to_operator_region_request,
+ EmptyReturn,
+ timeout=timeout,
+ deadline=deadline,
+ metadata=metadata,
+ )
+
async def link_workers(
self,
link_workers_request: "LinkWorkersRequest",
@@ -1923,6 +1948,11 @@ class ControllerServiceBase(ServiceBase):
) -> "EmptyReturn":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
+ async def jump_to_operator_region(
+ self, jump_to_operator_region_request: "JumpToOperatorRegionRequest"
+ ) -> "EmptyReturn":
+ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
+
async def link_workers(
self, link_workers_request: "LinkWorkersRequest"
) -> "EmptyReturn":
@@ -2032,6 +2062,13 @@ class ControllerServiceBase(ServiceBase):
response = await self.worker_execution_completed(request)
await stream.send_message(response)
+ async def __rpc_jump_to_operator_region(
+ self, stream: "grpclib.server.Stream[JumpToOperatorRegionRequest,
EmptyReturn]"
+ ) -> None:
+ request = await stream.recv_message()
+ response = await self.jump_to_operator_region(request)
+ await stream.send_message(response)
+
async def __rpc_link_workers(
self, stream: "grpclib.server.Stream[LinkWorkersRequest, EmptyReturn]"
) -> None:
@@ -2134,6 +2171,12 @@ class ControllerServiceBase(ServiceBase):
EmptyRequest,
EmptyReturn,
),
+
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/JumpToOperatorRegion":
grpclib.const.Handler(
+ self.__rpc_jump_to_operator_region,
+ grpclib.const.Cardinality.UNARY_UNARY,
+ JumpToOperatorRegionRequest,
+ EmptyReturn,
+ ),
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/LinkWorkers":
grpclib.const.Handler(
self.__rpc_link_workers,
grpclib.const.Cardinality.UNARY_UNARY,
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/Controller.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/Controller.scala
index daa977d857..512a3342ce 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/Controller.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/Controller.scala
@@ -116,6 +116,7 @@ class Controller(
override def initState(): Unit = {
attachRuntimeServicesToCPState()
cp.workflowScheduler.updateSchedule(physicalPlan)
+ cp.workflowExecutionCoordinator.schedule = cp.workflowScheduler.getSchedule
val regions: List[(Long, List[String])] =
cp.workflowScheduler.getSchedule.getRegions.map { region =>
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala
index 2902173364..7e5a904716 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala
@@ -34,6 +34,7 @@ class ControllerAsyncRPCHandlerInitializer(
with AmberLogging
with LinkWorkersHandler
with WorkerExecutionCompletedHandler
+ with JumpToOperatorRegionHandler
with WorkerStateUpdatedHandler
with PauseHandler
with QueryWorkerStatisticsHandler
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala
index 7a8e94cf3a..3ff8e7d978 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala
@@ -44,7 +44,6 @@ class ControllerProcessor(
val workflowScheduler: WorkflowScheduler =
new WorkflowScheduler(workflowContext, actorId)
val workflowExecutionCoordinator: WorkflowExecutionCoordinator = new
WorkflowExecutionCoordinator(
- () => this.workflowScheduler.getNextRegions,
workflowExecution,
controllerConfig,
asyncRPCClient
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/JumpToOperatorRegionHandler.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/JumpToOperatorRegionHandler.scala
new file mode 100644
index 0000000000..0047efe45f
--- /dev/null
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/JumpToOperatorRegionHandler.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.controller.promisehandlers
+
+import com.twitter.util.Future
+import
org.apache.texera.amber.engine.architecture.controller.ControllerAsyncRPCHandlerInitializer
+import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
+ AsyncRPCContext,
+ JumpToOperatorRegionRequest
+}
+import
org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn
+
+trait JumpToOperatorRegionHandler {
+ this: ControllerAsyncRPCHandlerInitializer =>
+
+ override def jumpToOperatorRegion(
+ msg: JumpToOperatorRegionRequest,
+ ctx: AsyncRPCContext
+ ): Future[EmptyReturn] = {
+ val coordinator = cp.workflowExecutionCoordinator
+ coordinator.schedule.levelSets
+ .collectFirst {
+ case (level, regions)
+ if regions.exists(_.getOperators.exists(_.id.logicalOpId ==
msg.targetOperatorId)) =>
+ level
+ }
+ .foreach { targetLevel =>
+ coordinator.schedule = coordinator.schedule.copy(initialLevelIndex =
targetLevel)
+ }
+ EmptyReturn()
+ }
+}
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala
index 6f34c9ed1e..9c03d07a62 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala
@@ -19,8 +19,16 @@
package org.apache.texera.amber.engine.architecture.scheduling
-case class Schedule(private val levelSets: Map[Int, Set[Region]]) extends
Iterator[Set[Region]] {
- private var currentLevel = levelSets.keys.minOption.getOrElse(0)
+case class Schedule(
+ levelSets: Map[Int, Set[Region]],
+ initialLevelIndex: Int = 0
+) extends Iterator[Set[Region]] {
+ require(
+ levelSets.keys.toSet == (0 until levelSets.size).toSet,
+ s"Schedule level keys must be contiguous starting at 0, got:
${levelSets.keys.toSeq.sorted}"
+ )
+
+ private var currentLevel: Int = initialLevelIndex
def getRegions: List[Region] = levelSets.values.flatten.toList
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
index 4b639fc241..df504bf92d 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
@@ -35,12 +35,13 @@ import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.mutable
class WorkflowExecutionCoordinator(
- getNextRegions: () => Set[Region],
workflowExecution: WorkflowExecution,
controllerConfig: ControllerConfig,
asyncRPCClient: AsyncRPCClient
) extends LazyLogging {
+ var schedule: Schedule = Schedule(Map.empty)
+
private val executedRegions: mutable.ListBuffer[Set[Region]] =
mutable.ListBuffer()
private val regionExecutionCoordinators
@@ -83,7 +84,7 @@ class WorkflowExecutionCoordinator(
}
// All existing regions are completed. Start the next region (if any).
- val nextRegions = getNextRegions()
+ val nextRegions = if (!schedule.hasNext) Set.empty[Region] else
schedule.next()
if (nextRegions.isEmpty) {
if (workflowExecution.isCompleted &&
completionNotified.compareAndSet(false, true)) {
asyncRPCClient.sendToClient(ExecutionStateUpdate(workflowExecution.getState))
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinatorSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinatorSpec.scala
index f4372e8b57..f5fc17f8e0 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinatorSpec.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinatorSpec.scala
@@ -21,6 +21,14 @@ package
org.apache.texera.amber.engine.architecture.scheduling
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.testkit.TestKit
+import org.apache.texera.amber.core.executor.OpExecInitInfo
+import org.apache.texera.amber.core.virtualidentity.{
+ ExecutionIdentity,
+ OperatorIdentity,
+ PhysicalOpIdentity,
+ WorkflowIdentity
+}
+import org.apache.texera.amber.core.workflow.PhysicalOp
import org.apache.texera.amber.engine.architecture.controller.ControllerConfig
import
org.apache.texera.amber.engine.architecture.controller.execution.WorkflowExecution
import
org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn
@@ -29,8 +37,6 @@ import org.apache.texera.amber.engine.common.AmberRuntime
import org.scalatest.BeforeAndAfterAll
import org.scalatest.flatspec.AnyFlatSpecLike
-import scala.collection.mutable
-
class WorkflowExecutionCoordinatorSpec
extends TestKit(ActorSystem("WorkflowExecutionCoordinatorSpec",
AmberRuntime.akkaConfig))
with AnyFlatSpecLike
@@ -41,7 +47,61 @@ class WorkflowExecutionCoordinatorSpec
TestKit.shutdownActorSystem(system)
}
- "WorkflowExecutionCoordinator" should "start the next region only after
previous region termination succeeds" in {
+ // -- Helpers used only by the jump-to-operator-region tests --
+
+ private def jumpRegion(regionId: Long, opId: String): Region = {
+ val physicalOp = PhysicalOp(
+ PhysicalOpIdentity(OperatorIdentity(opId), "main"),
+ WorkflowIdentity(0),
+ ExecutionIdentity(0),
+ OpExecInitInfo.Empty
+ )
+ Region(RegionIdentity(regionId), Set(physicalOp), Set.empty)
+ }
+
+ private def threeLevelSchedule(): (Region, Region, Region, Schedule) = {
+ val first = jumpRegion(1, "first")
+ val second = jumpRegion(2, "second")
+ val third = jumpRegion(3, "third")
+ val schedule = Schedule(
+ Map(
+ 0 -> Set(first),
+ 1 -> Set(second),
+ 2 -> Set(third)
+ )
+ )
+ (first, second, third, schedule)
+ }
+
+ private def newJumpCoordinator(schedule: Schedule):
WorkflowExecutionCoordinator = {
+ val coordinator = new WorkflowExecutionCoordinator(WorkflowExecution(),
null, null)
+ coordinator.schedule = schedule
+ coordinator
+ }
+
+ private def nextRegions(coordinator: WorkflowExecutionCoordinator):
Set[Region] = {
+ val schedule = coordinator.schedule
+ if (schedule.hasNext) schedule.next() else Set.empty
+ }
+
+ // Mirrors what JumpToOperatorRegionHandler does: read the current schedule,
scan for the
+ // level containing the target operator, and replace the schedule with a
copy whose cursor is
+ // at that level.
+ private def jumpTo(coordinator: WorkflowExecutionCoordinator, opName:
String): Unit = {
+ val opId = OperatorIdentity(opName)
+ val schedule = coordinator.schedule
+ schedule.levelSets
+ .collectFirst {
+ case (level, regions) if
regions.exists(_.getOperators.exists(_.id.logicalOpId == opId)) =>
+ level
+ }
+ .foreach { targetLevel =>
+ coordinator.schedule = schedule.copy(initialLevelIndex = targetLevel)
+ }
+ }
+
+ "WorkflowExecutionCoordinator" should
+ "start the next region only after previous region termination succeeds" in
{
val firstOp = createSourceOp("first-op")
val firstWorkerId = createWorkerId(firstOp)
val firstRegion = createSingleWorkerRegion(1, firstOp, firstWorkerId)
@@ -64,13 +124,12 @@ class WorkflowExecutionCoordinatorSpec
registerLiveWorker(controller.actorRefService, firstWorkerId)
registerLiveWorker(controller.actorRefService, secondWorkerId)
- val nextRegionLevels = mutable.Queue(Set(firstRegion), Set(secondRegion))
val workflowCoordinator = new WorkflowExecutionCoordinator(
- () => if (nextRegionLevels.nonEmpty) nextRegionLevels.dequeue() else
Set.empty,
workflowExecution,
ControllerConfig(None, None, None, None),
rpcProbe.asyncRPCClient
)
+ workflowCoordinator.schedule = Schedule(Map(0 -> Set(firstRegion), 1 ->
Set(secondRegion)))
workflowCoordinator.setupActorRefService(controller.actorRefService)
await(workflowCoordinator.coordinateRegionExecutors(controller.actorService))
@@ -90,4 +149,115 @@ class WorkflowExecutionCoordinatorSpec
assert(rpcProbe.initializedWorkers.contains(secondWorkerId))
assert(rpcProbe.startedWorkers.contains(secondWorkerId))
}
+
+ "Jumping to an operator's region" should
+ "make the next scheduled region contain the target operator's region" in {
+ val (first, second, _, schedule) = threeLevelSchedule()
+ val coordinator = newJumpCoordinator(schedule)
+
+ assert(nextRegions(coordinator) == Set(first))
+ assert(nextRegions(coordinator) == Set(second))
+
+ jumpTo(coordinator, "first")
+
+ assert(nextRegions(coordinator) == Set(first))
+ }
+
+ it should "support multiple sequential jumps interleaved with region pulls"
in {
+ val (first, second, third, schedule) = threeLevelSchedule()
+ val coordinator = newJumpCoordinator(schedule)
+
+ assert(nextRegions(coordinator) == Set(first))
+ assert(nextRegions(coordinator) == Set(second))
+
+ jumpTo(coordinator, "first")
+ assert(nextRegions(coordinator) == Set(first))
+
+ jumpTo(coordinator, "second")
+ assert(nextRegions(coordinator) == Set(second))
+ assert(nextRegions(coordinator) == Set(third))
+
+ jumpTo(coordinator, "first")
+ assert(nextRegions(coordinator) == Set(first))
+ }
+
+ it should "be a no-op when the target operator is not in any scheduled
region" in {
+ val (first, second, _, schedule) = threeLevelSchedule()
+ val coordinator = newJumpCoordinator(schedule)
+
+ assert(nextRegions(coordinator) == Set(first))
+
+ jumpTo(coordinator, "does-not-exist")
+
+ // Iteration position must be unaffected by an unknown target.
+ assert(nextRegions(coordinator) == Set(second))
+ }
+
+ it should "leave the schedule untouched when called repeatedly with unknown
operators" in {
+ val (first, second, third, schedule) = threeLevelSchedule()
+ val coordinator = newJumpCoordinator(schedule)
+
+ jumpTo(coordinator, "ghost-1")
+ jumpTo(coordinator, "ghost-2")
+ jumpTo(coordinator, "ghost-3")
+
+ assert(nextRegions(coordinator) == Set(first))
+ assert(nextRegions(coordinator) == Set(second))
+ assert(nextRegions(coordinator) == Set(third))
+ }
+
+ it should "allow jumping back to the first region after the schedule is
exhausted" in {
+ val (first, second, third, schedule) = threeLevelSchedule()
+ val coordinator = newJumpCoordinator(schedule)
+
+ assert(nextRegions(coordinator) == Set(first))
+ assert(nextRegions(coordinator) == Set(second))
+ assert(nextRegions(coordinator) == Set(third))
+ assert(nextRegions(coordinator) == Set.empty)
+
+ jumpTo(coordinator, "first")
+ assert(nextRegions(coordinator) == Set(first))
+ }
+
+ it should "support jumping forward past regions that have not yet been
pulled" in {
+ val (first, _, third, schedule) = threeLevelSchedule()
+ val coordinator = newJumpCoordinator(schedule)
+
+ assert(nextRegions(coordinator) == Set(first))
+
+ jumpTo(coordinator, "third")
+ assert(nextRegions(coordinator) == Set(third))
+ assert(nextRegions(coordinator) == Set.empty)
+ }
+
+ it should "replay the target-onward range each time it jumps back" in {
+ // Schedule ABCDEF: jumping from E back to C yields the visible sequence
ABCDECDEF; jumping
+ // again from E back to C yields ABCDECDECDEF.
+ val a = jumpRegion(1, "a")
+ val b = jumpRegion(2, "b")
+ val c = jumpRegion(3, "c")
+ val d = jumpRegion(4, "d")
+ val e = jumpRegion(5, "e")
+ val f = jumpRegion(6, "f")
+ val schedule = Schedule(
+ Map(0 -> Set(a), 1 -> Set(b), 2 -> Set(c), 3 -> Set(d), 4 -> Set(e), 5
-> Set(f))
+ )
+ val coordinator = newJumpCoordinator(schedule)
+
+ Seq(a, b, c, d, e).foreach { region =>
+ assert(nextRegions(coordinator) == Set(region))
+ }
+
+ jumpTo(coordinator, "c")
+ Seq(c, d, e).foreach { region =>
+ assert(nextRegions(coordinator) == Set(region))
+ }
+
+ jumpTo(coordinator, "c")
+ Seq(c, d, e, f).foreach { region =>
+ assert(nextRegions(coordinator) == Set(region))
+ }
+
+ assert(nextRegions(coordinator) == Set.empty)
+ }
}