This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch xinyuan-scheduler-jump
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/xinyuan-scheduler-jump by this 
push:
     new 0eafd12ad4 fix fmt
0eafd12ad4 is described below

commit 0eafd12ad48aafb4bf63873dc4f23a7fc8ca014b
Author: Xinyuan Lin <[email protected]>
AuthorDate: Fri May 1 22:16:24 2026 -0700

    fix fmt
---
 .../promisehandlers/JumpToOperatorRegionHandler.scala   | 16 +++++++++-------
 .../scheduling/WorkflowExecutionCoordinatorSpec.scala   | 17 +++++++++--------
 2 files changed, 18 insertions(+), 15 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/JumpToOperatorRegionHandler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/JumpToOperatorRegionHandler.scala
index 5b15a082b2..0047efe45f 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/JumpToOperatorRegionHandler.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/JumpToOperatorRegionHandler.scala
@@ -35,13 +35,15 @@ trait JumpToOperatorRegionHandler {
       ctx: AsyncRPCContext
   ): Future[EmptyReturn] = {
     val coordinator = cp.workflowExecutionCoordinator
-    coordinator.schedule.levelSets.collectFirst {
-      case (level, regions)
-          if regions.exists(_.getOperators.exists(_.id.logicalOpId == 
msg.targetOperatorId)) =>
-        level
-    }.foreach { targetLevel =>
-      coordinator.schedule = coordinator.schedule.copy(initialLevelIndex = 
targetLevel)
-    }
+    coordinator.schedule.levelSets
+      .collectFirst {
+        case (level, regions)
+            if regions.exists(_.getOperators.exists(_.id.logicalOpId == 
msg.targetOperatorId)) =>
+          level
+      }
+      .foreach { targetLevel =>
+        coordinator.schedule = coordinator.schedule.copy(initialLevelIndex = 
targetLevel)
+      }
     EmptyReturn()
   }
 }
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinatorSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinatorSpec.scala
index 7547f4aa7e..f5fc17f8e0 100644
--- 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinatorSpec.scala
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinatorSpec.scala
@@ -90,12 +90,14 @@ class WorkflowExecutionCoordinatorSpec
   private def jumpTo(coordinator: WorkflowExecutionCoordinator, opName: 
String): Unit = {
     val opId = OperatorIdentity(opName)
     val schedule = coordinator.schedule
-    schedule.levelSets.collectFirst {
-      case (level, regions) if 
regions.exists(_.getOperators.exists(_.id.logicalOpId == opId)) =>
-        level
-    }.foreach { targetLevel =>
-      coordinator.schedule = schedule.copy(initialLevelIndex = targetLevel)
-    }
+    schedule.levelSets
+      .collectFirst {
+        case (level, regions) if 
regions.exists(_.getOperators.exists(_.id.logicalOpId == opId)) =>
+          level
+      }
+      .foreach { targetLevel =>
+        coordinator.schedule = schedule.copy(initialLevelIndex = targetLevel)
+      }
   }
 
   "WorkflowExecutionCoordinator" should
@@ -127,8 +129,7 @@ class WorkflowExecutionCoordinatorSpec
       ControllerConfig(None, None, None, None),
       rpcProbe.asyncRPCClient
     )
-    workflowCoordinator.schedule =
-      Schedule(Map(0 -> Set(firstRegion), 1 -> Set(secondRegion)))
+    workflowCoordinator.schedule = Schedule(Map(0 -> Set(firstRegion), 1 -> 
Set(secondRegion)))
     workflowCoordinator.setupActorRefService(controller.actorRefService)
 
     
await(workflowCoordinator.coordinateRegionExecutors(controller.actorService))

Reply via email to