Yicong-Huang commented on code in PR #4444:
URL: https://github.com/apache/texera/pull/4444#discussion_r3149122383


##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala:
##########
@@ -43,8 +43,36 @@ class ControllerProcessor(
   val workflowExecution: WorkflowExecution = WorkflowExecution()
   val workflowScheduler: WorkflowScheduler =
     new WorkflowScheduler(workflowContext, actorId)
+  // The coordinator consumes regions through callbacks rather than reading 
Schedule directly.
+  // This cursor tracks the next ranked level to execute and can be reset when 
control flow
+  // requests jumping back to the region containing a target operator.
+  private var nextRegionLevel: Option[Int] = None
   val workflowExecutionCoordinator: WorkflowExecutionCoordinator = new 
WorkflowExecutionCoordinator(
-    () => this.workflowScheduler.getNextRegions,
+    () => {
+      Option(this.workflowScheduler.getSchedule)
+        .map { schedule =>
+          if (nextRegionLevel.isEmpty) {
+            nextRegionLevel = Some(schedule.startingLevel)
+          }
+          nextRegionLevel
+            .filter(schedule.levelSets.contains)
+            .map { level =>
+              nextRegionLevel = Some(level + 1)
+              schedule.levelSets(level)
+            }
+            .getOrElse(Set.empty)
+        }
+        .getOrElse(Set.empty)
+    },
+    opId => {
+      Option(this.workflowScheduler.getSchedule).foreach { schedule =>
+        nextRegionLevel = schedule.levelSets.collectFirst {
+          case (level, regions)
+              if regions.exists(_.getOperators.exists(_.id.logicalOpId == 
opId)) =>
+            level
+        }
+      }
+    },

Review Comment:
   please also create a method for this lambda and document it.



##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala:
##########
@@ -51,6 +53,8 @@ class WorkflowExecutionCoordinator(
     this.actorRefService = actorRefService
   }
 
+  private[scheduling] def pullNextRegions: Set[Region] = getNextRegions()

Review Comment:
   is this rename needed?



##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala:
##########
@@ -43,8 +43,36 @@ class ControllerProcessor(
   val workflowExecution: WorkflowExecution = WorkflowExecution()
   val workflowScheduler: WorkflowScheduler =
     new WorkflowScheduler(workflowContext, actorId)
+  // The coordinator consumes regions through callbacks rather than reading 
Schedule directly.
+  // This cursor tracks the next ranked level to execute and can be reset when 
control flow
+  // requests jumping back to the region containing a target operator.
+  private var nextRegionLevel: Option[Int] = None
   val workflowExecutionCoordinator: WorkflowExecutionCoordinator = new 
WorkflowExecutionCoordinator(
-    () => this.workflowScheduler.getNextRegions,
+    () => {
+      Option(this.workflowScheduler.getSchedule)
+        .map { schedule =>
+          if (nextRegionLevel.isEmpty) {
+            nextRegionLevel = Some(schedule.startingLevel)
+          }
+          nextRegionLevel
+            .filter(schedule.levelSets.contains)
+            .map { level =>
+              nextRegionLevel = Some(level + 1)
+              schedule.levelSets(level)
+            }
+            .getOrElse(Set.empty)
+        }
+        .getOrElse(Set.empty)
+    },

Review Comment:
   ok I see the comment is added but still not very clear on the purpose and on 
the design choice. Let's do the following:
   1. define this method instead of using it as a lambda, document it and then 
pass it in. 
   2. the current implementation always iterates the entire schedule's levelSet 
to find the matched one. How often are we going to do this? If we are going to 
repeatedly jump (e.g., for for loop case) can we do it in O1 instead? I think 
it is now possible with your changes in Schedule?
   
   



##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala:
##########
@@ -116,4 +120,8 @@ class WorkflowExecutionCoordinator(
       .toSet
   }
 
+  def jumpToRegionContainingOperator(opId: OperatorIdentity): Unit = {
+    jumpToRegionContainingOperatorCallback(opId)
+  }

Review Comment:
   I see it is hard to do this now.
   
   How about this:
   
   1. Let WorkflowExecutionCoordinator take a function `getNextRegion`. It will 
only invoke it when the current regions finish.
   3. When jump to a certain region, the handler recalculates the level and  
updates the `getNextRegion` (basically resets the cursor). 
   
   This way, `WorkflowExecutionCoordinator` is kept unchanged. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to