Yicong-Huang commented on code in PR #4444:
URL: https://github.com/apache/texera/pull/4444#discussion_r3149122383
##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala:
##########
@@ -43,8 +43,36 @@ class ControllerProcessor(
val workflowExecution: WorkflowExecution = WorkflowExecution()
val workflowScheduler: WorkflowScheduler =
new WorkflowScheduler(workflowContext, actorId)
+ // The coordinator consumes regions through callbacks rather than reading
Schedule directly.
+ // This cursor tracks the next ranked level to execute and can be reset when
control flow
+ // requests jumping back to the region containing a target operator.
+ private var nextRegionLevel: Option[Int] = None
val workflowExecutionCoordinator: WorkflowExecutionCoordinator = new
WorkflowExecutionCoordinator(
- () => this.workflowScheduler.getNextRegions,
+ () => {
+ Option(this.workflowScheduler.getSchedule)
+ .map { schedule =>
+ if (nextRegionLevel.isEmpty) {
+ nextRegionLevel = Some(schedule.startingLevel)
+ }
+ nextRegionLevel
+ .filter(schedule.levelSets.contains)
+ .map { level =>
+ nextRegionLevel = Some(level + 1)
+ schedule.levelSets(level)
+ }
+ .getOrElse(Set.empty)
+ }
+ .getOrElse(Set.empty)
+ },
+ opId => {
+ Option(this.workflowScheduler.getSchedule).foreach { schedule =>
+ nextRegionLevel = schedule.levelSets.collectFirst {
+ case (level, regions)
+ if regions.exists(_.getOperators.exists(_.id.logicalOpId ==
opId)) =>
+ level
+ }
+ }
+ },
Review Comment:
please also create a method for this lambda and document it.
##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala:
##########
@@ -51,6 +53,8 @@ class WorkflowExecutionCoordinator(
this.actorRefService = actorRefService
}
+ private[scheduling] def pullNextRegions: Set[Region] = getNextRegions()
Review Comment:
is this rename needed?
##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala:
##########
@@ -43,8 +43,36 @@ class ControllerProcessor(
val workflowExecution: WorkflowExecution = WorkflowExecution()
val workflowScheduler: WorkflowScheduler =
new WorkflowScheduler(workflowContext, actorId)
+ // The coordinator consumes regions through callbacks rather than reading
Schedule directly.
+ // This cursor tracks the next ranked level to execute and can be reset when
control flow
+ // requests jumping back to the region containing a target operator.
+ private var nextRegionLevel: Option[Int] = None
val workflowExecutionCoordinator: WorkflowExecutionCoordinator = new
WorkflowExecutionCoordinator(
- () => this.workflowScheduler.getNextRegions,
+ () => {
+ Option(this.workflowScheduler.getSchedule)
+ .map { schedule =>
+ if (nextRegionLevel.isEmpty) {
+ nextRegionLevel = Some(schedule.startingLevel)
+ }
+ nextRegionLevel
+ .filter(schedule.levelSets.contains)
+ .map { level =>
+ nextRegionLevel = Some(level + 1)
+ schedule.levelSets(level)
+ }
+ .getOrElse(Set.empty)
+ }
+ .getOrElse(Set.empty)
+ },
Review Comment:
ok I see the comment is added but still not very clear on the purpose and on
the design choice. Let's do the following:
1. define this method instead of using it as a lambda, document it and then
pass it in.
2. the current implementation always iterates the entire schedule's levelSet
to find the matched one. How often are we going to do this? If we are going to
repeatedly jump (e.g., for for loop case) can we do it in O1 instead? I think
it is now possible with your changes in Schedule?
##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala:
##########
@@ -116,4 +120,8 @@ class WorkflowExecutionCoordinator(
.toSet
}
+ def jumpToRegionContainingOperator(opId: OperatorIdentity): Unit = {
+ jumpToRegionContainingOperatorCallback(opId)
+ }
Review Comment:
I see it is hard to do this now.
How about this:
1. Let WorkflowExecutionCoordinator take a function `getNextRegion`. It will
only invoke it when the current regions finish.
3. When jump to a certain region, the handler recalculates the level and
updates the `getNextRegion` (basically resets the cursor).
This way, `WorkflowExecutionCoordinator` is kept unchanged.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]