This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch xinyuan-scheduler-jump
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/xinyuan-scheduler-jump by this
push:
new 9872d8eb77 refactor(amber): lazy-init WorkflowExecutionCoordinator
with the real schedule
9872d8eb77 is described below
commit 9872d8eb77a10da10816226ce79430b504cf6c23
Author: Xinyuan Lin <[email protected]>
AuthorDate: Fri May 1 17:28:28 2026 -0700
refactor(amber): lazy-init WorkflowExecutionCoordinator with the real
schedule
`ControllerProcessor.workflowExecutionCoordinator` is now a `lazy val`
that constructs with `workflowScheduler.getSchedule` on first access,
removing the empty-schedule placeholder + handoff dance.
`Controller.initState` reorders so `updateSchedule(physicalPlan)` runs
before `attachRuntimeServicesToCPState()` — the latter triggers the
lazy coordinator via `setupActorRefService`, which must observe a
populated schedule. WorkflowScheduler.updateSchedule does not depend on
any of the runtime services, so the reorder is safe.
`updateExecutionSchedule` on ControllerProcessor is gone.
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
---
.../architecture/controller/Controller.scala | 5 +++--
.../controller/ControllerProcessor.scala | 25 ++++++++++------------
2 files changed, 14 insertions(+), 16 deletions(-)
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/Controller.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/Controller.scala
index a838b1ae3c..2f0d9a9adb 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/Controller.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/Controller.scala
@@ -114,9 +114,10 @@ class Controller(
)
override def initState(): Unit = {
- attachRuntimeServicesToCPState()
+ // updateSchedule must run before attachRuntimeServicesToCPState: the
latter triggers
+ // the lazy `workflowExecutionCoordinator`, which reads
`workflowScheduler.getSchedule`.
cp.workflowScheduler.updateSchedule(physicalPlan)
- cp.updateExecutionSchedule(cp.workflowScheduler.getSchedule)
+ attachRuntimeServicesToCPState()
val regions: List[(Long, List[String])] =
cp.workflowScheduler.getSchedule.getRegions.map { region =>
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala
index ea43787599..4b3d290f6e 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala
@@ -29,10 +29,7 @@ import org.apache.texera.amber.engine.architecture.common.{
}
import
org.apache.texera.amber.engine.architecture.controller.execution.WorkflowExecution
import org.apache.texera.amber.engine.architecture.logreplay.ReplayLogManager
-import org.apache.texera.amber.engine.architecture.scheduling.{
- Schedule,
- WorkflowExecutionCoordinator
-}
+import
org.apache.texera.amber.engine.architecture.scheduling.WorkflowExecutionCoordinator
import
org.apache.texera.amber.engine.architecture.worker.WorkflowWorker.MainThreadDelegateMessage
import org.apache.texera.amber.engine.common.ambermessage.WorkflowFIFOMessage
@@ -46,16 +43,16 @@ class ControllerProcessor(
val workflowExecution: WorkflowExecution = WorkflowExecution()
val workflowScheduler: WorkflowScheduler =
new WorkflowScheduler(workflowContext, actorId)
- val workflowExecutionCoordinator: WorkflowExecutionCoordinator = new
WorkflowExecutionCoordinator(
- Schedule(Map.empty),
- workflowExecution,
- controllerConfig,
- asyncRPCClient
- )
-
- def updateExecutionSchedule(schedule: Schedule): Unit = {
- workflowExecutionCoordinator.replaceSchedule(schedule)
- }
+
+ // Lazy: first access must happen *after*
`workflowScheduler.updateSchedule(...)` has produced
+ // a real schedule. Controller.initState enforces this order.
+ lazy val workflowExecutionCoordinator: WorkflowExecutionCoordinator =
+ new WorkflowExecutionCoordinator(
+ workflowScheduler.getSchedule,
+ workflowExecution,
+ controllerConfig,
+ asyncRPCClient
+ )
private val initializer = new ControllerAsyncRPCHandlerInitializer(this)