This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch xinyuan-scheduler-jump
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/xinyuan-scheduler-jump by this 
push:
     new dd9e738ce1 refactor(amber): move target-operator search into 
JumpToOperatorRegionHandler
dd9e738ce1 is described below

commit dd9e738ce158d28cd09ebc39b4d12884e7a77289
Author: Xinyuan Lin <[email protected]>
AuthorDate: Fri May 1 19:12:35 2026 -0700

    refactor(amber): move target-operator search into 
JumpToOperatorRegionHandler
    
    The precomputed `operatorLevelIndices` in `Schedule` was rebuilt every
    time a Schedule was constructed (including the `copy(...)` on each
    jump), so the up-front O(N*R*Ops) build cost matched a single linear
    scan — the O(1) lookup didn't pay off when each lookup happens at most
    once before the schedule is replaced. Drop the precomputed map and
    `getLevelIndexOfOperator`; the handler now scans `levelSets` inline
    with `collectFirst`. `levelSets` becomes a public val so the handler
    can reach it.
    
    Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
---
 .../promisehandlers/JumpToOperatorRegionHandler.scala        |  6 +++++-
 .../amber/engine/architecture/scheduling/Schedule.scala      | 12 +-----------
 .../scheduling/WorkflowExecutionCoordinatorSpec.scala        | 12 ++++++++----
 3 files changed, 14 insertions(+), 16 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/JumpToOperatorRegionHandler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/JumpToOperatorRegionHandler.scala
index 15663ef903..7eb841f8d0 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/JumpToOperatorRegionHandler.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/JumpToOperatorRegionHandler.scala
@@ -36,7 +36,11 @@ trait JumpToOperatorRegionHandler {
       ctx: AsyncRPCContext
   ): Future[EmptyReturn] = {
     val schedule = cp.workflowExecutionCoordinator.getSchedule
-    schedule.getLevelIndexOfOperator(msg.targetOperatorId).foreach { 
targetLevel =>
+    schedule.levelSets.collectFirst {
+      case (level, regions)
+          if regions.exists(_.getOperators.exists(_.id.logicalOpId == 
msg.targetOperatorId)) =>
+        level
+    }.foreach { targetLevel =>
       cp.workflowExecutionCoordinator.replaceSchedule(
         schedule.copy(initialLevelIndex = targetLevel)
       )
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala
index 6bdd8e665b..9c03d07a62 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala
@@ -19,10 +19,8 @@
 
 package org.apache.texera.amber.engine.architecture.scheduling
 
-import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
-
 case class Schedule(
-    private val levelSets: Map[Int, Set[Region]],
+    levelSets: Map[Int, Set[Region]],
     initialLevelIndex: Int = 0
 ) extends Iterator[Set[Region]] {
   require(
@@ -30,18 +28,10 @@ case class Schedule(
     s"Schedule level keys must be contiguous starting at 0, got: 
${levelSets.keys.toSeq.sorted}"
   )
 
-  private val operatorLevelIndices: Map[OperatorIdentity, Int] =
-    levelSets.iterator.flatMap {
-      case (level, regions) =>
-        regions.iterator.flatMap(region => 
region.getOperators.map(_.id.logicalOpId -> level))
-    }.toMap
-
   private var currentLevel: Int = initialLevelIndex
 
   def getRegions: List[Region] = levelSets.values.flatten.toList
 
-  def getLevelIndexOfOperator(opId: OperatorIdentity): Option[Int] = 
operatorLevelIndices.get(opId)
-
   override def hasNext: Boolean = levelSets.isDefinedAt(currentLevel)
 
   override def next(): Set[Region] = {
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinatorSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinatorSpec.scala
index 8ba381af3f..6ca9316abf 100644
--- 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinatorSpec.scala
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinatorSpec.scala
@@ -81,12 +81,16 @@ class WorkflowExecutionCoordinatorSpec
     if (schedule.hasNext) schedule.next() else Set.empty
   }
 
-  // Mirrors what JumpToOperatorRegionHandler does: read the current schedule, 
look up the level
-  // containing the target operator, and replace the schedule with a copy 
whose cursor is at
-  // that level.
+  // Mirrors what JumpToOperatorRegionHandler does: read the current schedule, 
scan for the
+  // level containing the target operator, and replace the schedule with a 
copy whose cursor is
+  // at that level.
   private def jumpTo(coordinator: WorkflowExecutionCoordinator, opName: 
String): Unit = {
+    val opId = OperatorIdentity(opName)
     val schedule = coordinator.getSchedule
-    schedule.getLevelIndexOfOperator(OperatorIdentity(opName)).foreach { 
targetLevel =>
+    schedule.levelSets.collectFirst {
+      case (level, regions) if 
regions.exists(_.getOperators.exists(_.id.logicalOpId == opId)) =>
+        level
+    }.foreach { targetLevel =>
       coordinator.replaceSchedule(schedule.copy(initialLevelIndex = 
targetLevel))
     }
   }

Reply via email to