This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch xinyuan-scheduler-jump
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/xinyuan-scheduler-jump by this 
push:
     new 9872d8eb77 refactor(amber): lazy-init WorkflowExecutionCoordinator 
with the real schedule
9872d8eb77 is described below

commit 9872d8eb77a10da10816226ce79430b504cf6c23
Author: Xinyuan Lin <[email protected]>
AuthorDate: Fri May 1 17:28:28 2026 -0700

    refactor(amber): lazy-init WorkflowExecutionCoordinator with the real 
schedule
    
    `ControllerProcessor.workflowExecutionCoordinator` is now a `lazy val`
    that constructs with `workflowScheduler.getSchedule` on first access,
    removing the empty-schedule placeholder + handoff dance.
    
    `Controller.initState` reorders so `updateSchedule(physicalPlan)` runs
    before `attachRuntimeServicesToCPState()` — the latter triggers the
    lazy coordinator via `setupActorRefService`, which must observe a
    populated schedule. WorkflowScheduler.updateSchedule does not depend on
    any of the runtime services, so the reorder is safe.
    
    `updateExecutionSchedule` on ControllerProcessor is gone.
    
    Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
---
 .../architecture/controller/Controller.scala       |  5 +++--
 .../controller/ControllerProcessor.scala           | 25 ++++++++++------------
 2 files changed, 14 insertions(+), 16 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/Controller.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/Controller.scala
index a838b1ae3c..2f0d9a9adb 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/Controller.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/Controller.scala
@@ -114,9 +114,10 @@ class Controller(
   )
 
   override def initState(): Unit = {
-    attachRuntimeServicesToCPState()
+    // updateSchedule must run before attachRuntimeServicesToCPState: the 
latter triggers
+    // the lazy `workflowExecutionCoordinator`, which reads 
`workflowScheduler.getSchedule`.
     cp.workflowScheduler.updateSchedule(physicalPlan)
-    cp.updateExecutionSchedule(cp.workflowScheduler.getSchedule)
+    attachRuntimeServicesToCPState()
 
     val regions: List[(Long, List[String])] =
       cp.workflowScheduler.getSchedule.getRegions.map { region =>
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala
index ea43787599..4b3d290f6e 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala
@@ -29,10 +29,7 @@ import org.apache.texera.amber.engine.architecture.common.{
 }
 import 
org.apache.texera.amber.engine.architecture.controller.execution.WorkflowExecution
 import org.apache.texera.amber.engine.architecture.logreplay.ReplayLogManager
-import org.apache.texera.amber.engine.architecture.scheduling.{
-  Schedule,
-  WorkflowExecutionCoordinator
-}
+import 
org.apache.texera.amber.engine.architecture.scheduling.WorkflowExecutionCoordinator
 import 
org.apache.texera.amber.engine.architecture.worker.WorkflowWorker.MainThreadDelegateMessage
 import org.apache.texera.amber.engine.common.ambermessage.WorkflowFIFOMessage
 
@@ -46,16 +43,16 @@ class ControllerProcessor(
   val workflowExecution: WorkflowExecution = WorkflowExecution()
   val workflowScheduler: WorkflowScheduler =
     new WorkflowScheduler(workflowContext, actorId)
-  val workflowExecutionCoordinator: WorkflowExecutionCoordinator = new 
WorkflowExecutionCoordinator(
-    Schedule(Map.empty),
-    workflowExecution,
-    controllerConfig,
-    asyncRPCClient
-  )
-
-  def updateExecutionSchedule(schedule: Schedule): Unit = {
-    workflowExecutionCoordinator.replaceSchedule(schedule)
-  }
+
+  // Lazy: first access must happen *after* 
`workflowScheduler.updateSchedule(...)` has produced
+  // a real schedule. Controller.initState enforces this order.
+  lazy val workflowExecutionCoordinator: WorkflowExecutionCoordinator =
+    new WorkflowExecutionCoordinator(
+      workflowScheduler.getSchedule,
+      workflowExecution,
+      controllerConfig,
+      asyncRPCClient
+    )
 
   private val initializer = new ControllerAsyncRPCHandlerInitializer(this)
 

Reply via email to