Yicong-Huang commented on code in PR #4444:
URL: https://github.com/apache/texera/pull/4444#discussion_r3144609324


##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala:
##########
@@ -43,8 +45,35 @@ class ControllerProcessor(
   val workflowExecution: WorkflowExecution = WorkflowExecution()
   val workflowScheduler: WorkflowScheduler =
     new WorkflowScheduler(workflowContext, actorId)
+  private val nextRegionLevel: mutable.ArrayBuffer[Option[Int]] = 
mutable.ArrayBuffer(None)
   val workflowExecutionCoordinator: WorkflowExecutionCoordinator = new 
WorkflowExecutionCoordinator(
-    () => this.workflowScheduler.getNextRegions,
+    () => {
+      val schedule = this.workflowScheduler.getSchedule
+      if (schedule == null) {

Review Comment:
   nit: let's avoid null (use Option)?



##########
amber/src/test/scala/org/apache/texera/amber/engine/e2e/BatchSizePropagationSpec.scala:
##########
@@ -54,11 +55,10 @@ class BatchSizePropagationSpec
   }
 
   def verifyBatchSizeInPartitioning(
-      workflowScheduler: WorkflowScheduler,
+      schedule: Schedule,

Review Comment:
   I am not familiar with this Batch size propagation. maybe @Xiao-zhen-Liu can 
review?



##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala:
##########
@@ -43,8 +45,35 @@ class ControllerProcessor(
   val workflowExecution: WorkflowExecution = WorkflowExecution()
   val workflowScheduler: WorkflowScheduler =
     new WorkflowScheduler(workflowContext, actorId)
+  private val nextRegionLevel: mutable.ArrayBuffer[Option[Int]] = 
mutable.ArrayBuffer(None)

Review Comment:
   why is it `mutable.ArrayBuffer[Option[Int]]`? do we need multiple levels? 



##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala:
##########
@@ -43,8 +45,35 @@ class ControllerProcessor(
   val workflowExecution: WorkflowExecution = WorkflowExecution()
   val workflowScheduler: WorkflowScheduler =
     new WorkflowScheduler(workflowContext, actorId)
+  private val nextRegionLevel: mutable.ArrayBuffer[Option[Int]] = 
mutable.ArrayBuffer(None)
   val workflowExecutionCoordinator: WorkflowExecutionCoordinator = new 
WorkflowExecutionCoordinator(
-    () => this.workflowScheduler.getNextRegions,
+    () => {
+      val schedule = this.workflowScheduler.getSchedule
+      if (schedule == null) {
+        Set.empty
+      } else {
+        if (nextRegionLevel(0).isEmpty) {
+          nextRegionLevel(0) = Some(schedule.startingLevel)
+        }
+        nextRegionLevel(0)

Review Comment:
   I don't get it, why we need an array and use the first element only? maybe 
you just  need an Option?



##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala:
##########
@@ -43,8 +45,35 @@ class ControllerProcessor(
   val workflowExecution: WorkflowExecution = WorkflowExecution()
   val workflowScheduler: WorkflowScheduler =
     new WorkflowScheduler(workflowContext, actorId)
+  private val nextRegionLevel: mutable.ArrayBuffer[Option[Int]] = 
mutable.ArrayBuffer(None)
   val workflowExecutionCoordinator: WorkflowExecutionCoordinator = new 
WorkflowExecutionCoordinator(
-    () => this.workflowScheduler.getNextRegions,
+    () => {

Review Comment:
   can we add some comments to document this behavior (in code)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to