This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch xinyuan-scheduler-jump
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/xinyuan-scheduler-jump by this
push:
new 0eafd12ad4 fix fmt
0eafd12ad4 is described below
commit 0eafd12ad48aafb4bf63873dc4f23a7fc8ca014b
Author: Xinyuan Lin <[email protected]>
AuthorDate: Fri May 1 22:16:24 2026 -0700
fix fmt
---
.../promisehandlers/JumpToOperatorRegionHandler.scala | 16 +++++++++-------
.../scheduling/WorkflowExecutionCoordinatorSpec.scala | 17 +++++++++--------
2 files changed, 18 insertions(+), 15 deletions(-)
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/JumpToOperatorRegionHandler.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/JumpToOperatorRegionHandler.scala
index 5b15a082b2..0047efe45f 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/JumpToOperatorRegionHandler.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/JumpToOperatorRegionHandler.scala
@@ -35,13 +35,15 @@ trait JumpToOperatorRegionHandler {
ctx: AsyncRPCContext
): Future[EmptyReturn] = {
val coordinator = cp.workflowExecutionCoordinator
- coordinator.schedule.levelSets.collectFirst {
- case (level, regions)
- if regions.exists(_.getOperators.exists(_.id.logicalOpId ==
msg.targetOperatorId)) =>
- level
- }.foreach { targetLevel =>
- coordinator.schedule = coordinator.schedule.copy(initialLevelIndex =
targetLevel)
- }
+ coordinator.schedule.levelSets
+ .collectFirst {
+ case (level, regions)
+ if regions.exists(_.getOperators.exists(_.id.logicalOpId ==
msg.targetOperatorId)) =>
+ level
+ }
+ .foreach { targetLevel =>
+ coordinator.schedule = coordinator.schedule.copy(initialLevelIndex =
targetLevel)
+ }
EmptyReturn()
}
}
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinatorSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinatorSpec.scala
index 7547f4aa7e..f5fc17f8e0 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinatorSpec.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinatorSpec.scala
@@ -90,12 +90,14 @@ class WorkflowExecutionCoordinatorSpec
private def jumpTo(coordinator: WorkflowExecutionCoordinator, opName:
String): Unit = {
val opId = OperatorIdentity(opName)
val schedule = coordinator.schedule
- schedule.levelSets.collectFirst {
- case (level, regions) if
regions.exists(_.getOperators.exists(_.id.logicalOpId == opId)) =>
- level
- }.foreach { targetLevel =>
- coordinator.schedule = schedule.copy(initialLevelIndex = targetLevel)
- }
+ schedule.levelSets
+ .collectFirst {
+ case (level, regions) if
regions.exists(_.getOperators.exists(_.id.logicalOpId == opId)) =>
+ level
+ }
+ .foreach { targetLevel =>
+ coordinator.schedule = schedule.copy(initialLevelIndex = targetLevel)
+ }
}
"WorkflowExecutionCoordinator" should
@@ -127,8 +129,7 @@ class WorkflowExecutionCoordinatorSpec
ControllerConfig(None, None, None, None),
rpcProbe.asyncRPCClient
)
- workflowCoordinator.schedule =
- Schedule(Map(0 -> Set(firstRegion), 1 -> Set(secondRegion)))
+ workflowCoordinator.schedule = Schedule(Map(0 -> Set(firstRegion), 1 ->
Set(secondRegion)))
workflowCoordinator.setupActorRefService(controller.actorRefService)
await(workflowCoordinator.coordinateRegionExecutors(controller.actorService))