This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch xinyuan-scheduler-jump
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/xinyuan-scheduler-jump by this
push:
new dd9e738ce1 refactor(amber): move target-operator search into
JumpToOperatorRegionHandler
dd9e738ce1 is described below
commit dd9e738ce158d28cd09ebc39b4d12884e7a77289
Author: Xinyuan Lin <[email protected]>
AuthorDate: Fri May 1 19:12:35 2026 -0700
refactor(amber): move target-operator search into
JumpToOperatorRegionHandler
The precomputed `operatorLevelIndices` in `Schedule` was rebuilt every
time a Schedule was constructed (including the `copy(...)` on each
jump), so the up-front O(N*R*Ops) build cost matched a single linear
scan — the O(1) lookup didn't pay off when each lookup happens at most
once before the schedule is replaced. Drop the precomputed map and
`getLevelIndexOfOperator`; the handler now scans `levelSets` inline
with `collectFirst`. `levelSets` becomes a public val so the handler
can reach it.
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
---
.../promisehandlers/JumpToOperatorRegionHandler.scala | 6 +++++-
.../amber/engine/architecture/scheduling/Schedule.scala | 12 +-----------
.../scheduling/WorkflowExecutionCoordinatorSpec.scala | 12 ++++++++----
3 files changed, 14 insertions(+), 16 deletions(-)
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/JumpToOperatorRegionHandler.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/JumpToOperatorRegionHandler.scala
index 15663ef903..7eb841f8d0 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/JumpToOperatorRegionHandler.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/JumpToOperatorRegionHandler.scala
@@ -36,7 +36,11 @@ trait JumpToOperatorRegionHandler {
ctx: AsyncRPCContext
): Future[EmptyReturn] = {
val schedule = cp.workflowExecutionCoordinator.getSchedule
- schedule.getLevelIndexOfOperator(msg.targetOperatorId).foreach {
targetLevel =>
+ schedule.levelSets.collectFirst {
+ case (level, regions)
+ if regions.exists(_.getOperators.exists(_.id.logicalOpId ==
msg.targetOperatorId)) =>
+ level
+ }.foreach { targetLevel =>
cp.workflowExecutionCoordinator.replaceSchedule(
schedule.copy(initialLevelIndex = targetLevel)
)
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala
index 6bdd8e665b..9c03d07a62 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala
@@ -19,10 +19,8 @@
package org.apache.texera.amber.engine.architecture.scheduling
-import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
-
case class Schedule(
- private val levelSets: Map[Int, Set[Region]],
+ levelSets: Map[Int, Set[Region]],
initialLevelIndex: Int = 0
) extends Iterator[Set[Region]] {
require(
@@ -30,18 +28,10 @@ case class Schedule(
s"Schedule level keys must be contiguous starting at 0, got:
${levelSets.keys.toSeq.sorted}"
)
- private val operatorLevelIndices: Map[OperatorIdentity, Int] =
- levelSets.iterator.flatMap {
- case (level, regions) =>
- regions.iterator.flatMap(region =>
region.getOperators.map(_.id.logicalOpId -> level))
- }.toMap
-
private var currentLevel: Int = initialLevelIndex
def getRegions: List[Region] = levelSets.values.flatten.toList
- def getLevelIndexOfOperator(opId: OperatorIdentity): Option[Int] =
operatorLevelIndices.get(opId)
-
override def hasNext: Boolean = levelSets.isDefinedAt(currentLevel)
override def next(): Set[Region] = {
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinatorSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinatorSpec.scala
index 8ba381af3f..6ca9316abf 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinatorSpec.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinatorSpec.scala
@@ -81,12 +81,16 @@ class WorkflowExecutionCoordinatorSpec
if (schedule.hasNext) schedule.next() else Set.empty
}
- // Mirrors what JumpToOperatorRegionHandler does: read the current schedule,
look up the level
- // containing the target operator, and replace the schedule with a copy
whose cursor is at
- // that level.
+ // Mirrors what JumpToOperatorRegionHandler does: read the current schedule,
scan for the
+ // level containing the target operator, and replace the schedule with a
copy whose cursor is
+ // at that level.
private def jumpTo(coordinator: WorkflowExecutionCoordinator, opName:
String): Unit = {
+ val opId = OperatorIdentity(opName)
val schedule = coordinator.getSchedule
- schedule.getLevelIndexOfOperator(OperatorIdentity(opName)).foreach {
targetLevel =>
+ schedule.levelSets.collectFirst {
+ case (level, regions) if
regions.exists(_.getOperators.exists(_.id.logicalOpId == opId)) =>
+ level
+ }.foreach { targetLevel =>
coordinator.replaceSchedule(schedule.copy(initialLevelIndex =
targetLevel))
}
}