This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch xinyuan-scheduler-jump
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/xinyuan-scheduler-jump by this 
push:
     new 560b670a67 update
560b670a67 is described below

commit 560b670a6716021b623aba7cb9d34343d9d2c36e
Author: Xinyuan Lin <[email protected]>
AuthorDate: Mon Apr 27 22:22:12 2026 -0700

    update
---
 .../controller/ControllerProcessor.scala           | 64 +++++++++++++---------
 .../engine/architecture/scheduling/Schedule.scala  |  7 +++
 2 files changed, 44 insertions(+), 27 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala
index c0dd2c7ce8..094bbb876c 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala
@@ -43,36 +43,46 @@ class ControllerProcessor(
   val workflowExecution: WorkflowExecution = WorkflowExecution()
   val workflowScheduler: WorkflowScheduler =
     new WorkflowScheduler(workflowContext, actorId)
-  // The coordinator consumes regions through callbacks rather than reading 
Schedule directly.
-  // This cursor tracks the next ranked level to execute and can be reset when 
control flow
-  // requests jumping back to the region containing a target operator.
   private var nextRegionLevel: Option[Int] = None
-  val workflowExecutionCoordinator: WorkflowExecutionCoordinator = new 
WorkflowExecutionCoordinator(
-    () => {
-      Option(this.workflowScheduler.getSchedule)
-        .map { schedule =>
-          if (nextRegionLevel.isEmpty) {
-            nextRegionLevel = Some(schedule.startingLevel)
-          }
-          nextRegionLevel
-            .filter(schedule.levelSets.contains)
-            .map { level =>
-              nextRegionLevel = Some(level + 1)
-              schedule.levelSets(level)
-            }
-            .getOrElse(Set.empty)
-        }
-        .getOrElse(Set.empty)
-    },
-    opId => {
-      Option(this.workflowScheduler.getSchedule).foreach { schedule =>
-        nextRegionLevel = schedule.levelSets.collectFirst {
-          case (level, regions)
-              if regions.exists(_.getOperators.exists(_.id.logicalOpId == 
opId)) =>
-            level
+
+  /**
+    * The coordinator consumes regions through this callback rather than 
reading the schedule directly.
+    * The controller owns the cursor so it can reset the next schedule level 
when control flow requests
+    * jumping back to the region containing a target operator.
+    */
+  private def getNextScheduledRegions(): 
Set[org.apache.texera.amber.engine.architecture.scheduling.Region] = {
+    Option(this.workflowScheduler.getSchedule)
+      .map { schedule =>
+        if (nextRegionLevel.isEmpty) {
+          nextRegionLevel = Some(schedule.startingLevel)
         }
+        nextRegionLevel
+          .filter(schedule.levelSets.contains)
+          .map { level =>
+            nextRegionLevel = Some(level + 1)
+            schedule.levelSets(level)
+          }
+          .getOrElse(Set.empty)
       }
-    },
+      .getOrElse(Set.empty)
+  }
+
+  /**
+    * Resets the schedule cursor so the next coordinator pull starts from the 
region containing the
+    * given operator. Schedule precomputes the operator-to-level mapping 
because loop control flow may
+    * jump repeatedly and should avoid rescanning all level sets on each jump.
+    */
+  private def jumpToRegionContainingOperator(
+      opId: org.apache.texera.amber.core.virtualidentity.OperatorIdentity
+  ): Unit = {
+    Option(this.workflowScheduler.getSchedule).foreach { schedule =>
+      nextRegionLevel = schedule.getLevelOfOperator(opId)
+    }
+  }
+
+  val workflowExecutionCoordinator: WorkflowExecutionCoordinator = new 
WorkflowExecutionCoordinator(
+    getNextScheduledRegions,
+    jumpToRegionContainingOperator,
     workflowExecution,
     controllerConfig,
     asyncRPCClient
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala
index 65ed3f1fca..f0e0c5b21d 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala
@@ -19,11 +19,18 @@
 
 package org.apache.texera.amber.engine.architecture.scheduling
 
+import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
+
 case class Schedule(levelSets: Map[Int, Set[Region]]) extends 
Iterable[Set[Region]] {
   val startingLevel: Int = levelSets.keys.minOption.getOrElse(0)
+  private val operatorLevels = levelSets.iterator.flatMap { case (level, 
regions) =>
+    regions.iterator.flatMap(region => 
region.getOperators.map(_.id.logicalOpId -> level))
+  }.toMap
 
   def getRegions: List[Region] = levelSets.values.flatten.toList
 
+  def getLevelOfOperator(opId: OperatorIdentity): Option[Int] = 
operatorLevels.get(opId)
+
   override def iterator: Iterator[Set[Region]] =
     levelSets.keys.toSeq.sorted.iterator.map(levelSets)
 }

Reply via email to