This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-5720-44df4f77013208248c0e5e59d7d7b505ec04ad56 in repository https://gitbox.apache.org/repos/asf/texera.git
commit 2e0dd7ca45e58ae7a036591cd04aa5b156b55103 Author: Xinyuan Lin <[email protected]> AuthorDate: Sun Jun 21 23:13:09 2026 -0700 feat(execution): add PhysicalOp.requiresMaterializedExecution, consumed by the scheduler (#5720) ### What changes were proposed in this PR? Lets an operator declare it can only run under a fully-materialized schedule, and has the scheduler honor it: - `PhysicalOp` gains `requiresMaterializedExecution: Boolean = false` (+ a `withRequiresMaterializedExecution` builder). It is a physical-execution property, so it lives on the physical op. - `CostBasedScheduleGenerator` consumes it: when any physical op requires materialized execution it forces a fully-materialized schedule regardless of the requested execution mode; otherwise the existing PIPELINED/MATERIALIZED logic runs unchanged. Default `false` ⇒ dormant and behavior-preserving: no operator requires it yet, so the scheduler's effective mode is unchanged today. The loop operators set the flag on their physical op. ### Any related issues, documentation, discussions? Resolves #5719 (sub-issue of #4442 "Introduce for loop"). Split out of #5700. Reflects the review discussion with @Yicong-Huang: the property belongs on `PhysicalOp`, and it is consumed by the scheduler. ### How was this PR tested? `WorkflowCoreTypesSpec` covers the `PhysicalOp.requiresMaterializedExecution` default + builder. `WorkflowExecutionService/Test/compile`, `scalafixAll --check`, and `scalafmtCheckAll` pass locally. The scheduler consumer is exercised end-to-end by the loop integration tests once the loop operators (which set the flag) land. ### Was this PR authored or co-authored using generative AI tooling? Co-authored with Claude Opus 4.8 in compliance with ASF. --- .../scheduling/CostBasedScheduleGenerator.scala | 23 +++++++++++++- .../CostBasedScheduleGeneratorSpec.scala | 37 ++++++++++++++++++++++ .../texera/amber/core/workflow/PhysicalOp.scala | 17 ++++++++++ .../core/workflow/WorkflowCoreTypesSpec.scala | 8 +++++ 4 files changed, 84 insertions(+), 1 deletion(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala index 44958718b2..85de480210 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala @@ -44,6 +44,23 @@ import scala.jdk.CollectionConverters._ import scala.util.control.Breaks.{break, breakable} import scala.util.{Failure, Success, Try} +object CostBasedScheduleGenerator { + + /** + * The execution mode to schedule under: MATERIALIZED when any operator in + * `physicalPlan` requires it (e.g. the loop operators, whose back-edge is a + * cross-region materialized state channel), otherwise the requested mode. + */ + private[scheduling] def effectiveExecutionMode( + physicalPlan: PhysicalPlan, + requestedMode: ExecutionMode + ): ExecutionMode = + if (physicalPlan.operators.exists(_.requiresMaterializedExecution)) + ExecutionMode.MATERIALIZED + else + requestedMode +} + class CostBasedScheduleGenerator( workflowContext: WorkflowContext, initialPhysicalPlan: PhysicalPlan, @@ -304,7 +321,11 @@ class CostBasedScheduleGenerator( */ private def createRegionDAG(): DirectedAcyclicGraph[Region, RegionLink] = { val searchResultFuture: Future[SearchResult] = Future { - workflowContext.workflowSettings.executionMode match { + val effectiveMode = CostBasedScheduleGenerator.effectiveExecutionMode( + physicalPlan, + workflowContext.workflowSettings.executionMode + ) + effectiveMode match { case ExecutionMode.MATERIALIZED => getFullyMaterializedSearchState case ExecutionMode.PIPELINED => diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGeneratorSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGeneratorSpec.scala index 7d5227c36b..bff2917daa 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGeneratorSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGeneratorSpec.scala @@ -21,6 +21,7 @@ package org.apache.texera.amber.engine.architecture.scheduling import org.apache.texera.amber.core.workflow.{ ExecutionMode, + PhysicalPlan, PortIdentity, WorkflowContext, WorkflowSettings @@ -515,4 +516,40 @@ class CostBasedScheduleGeneratorSpec extends AnyFlatSpec with MockFactory { ) } + "CostBasedScheduleGenerator.effectiveExecutionMode" should + "force MATERIALIZED when an operator requires it, even if PIPELINED is requested" in { + val workflow = buildWorkflow( + List(TestOperators.headerlessSmallCsvScanOpDesc()), + List(), + new WorkflowContext() + ) + val planRequiringMaterialization = PhysicalPlan( + workflow.physicalPlan.operators.map(_.withRequiresMaterializedExecution(true)), + workflow.physicalPlan.links + ) + assert( + CostBasedScheduleGenerator.effectiveExecutionMode( + planRequiringMaterialization, + ExecutionMode.PIPELINED + ) == ExecutionMode.MATERIALIZED + ) + } + + it should "keep the requested mode when no operator requires materialization" in { + val workflow = buildWorkflow( + List(TestOperators.headerlessSmallCsvScanOpDesc()), + List(), + new WorkflowContext() + ) + val plan = workflow.physicalPlan + assert( + CostBasedScheduleGenerator.effectiveExecutionMode(plan, ExecutionMode.PIPELINED) == + ExecutionMode.PIPELINED + ) + assert( + CostBasedScheduleGenerator.effectiveExecutionMode(plan, ExecutionMode.MATERIALIZED) == + ExecutionMode.MATERIALIZED + ) + } + } diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalOp.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalOp.scala index 44125045c9..1fdda49a6a 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalOp.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalOp.scala @@ -198,6 +198,16 @@ case class PhysicalOp( // schema propagation function propagateSchema: SchemaPropagationFunc = SchemaPropagationFunc(schemas => schemas), isOneToManyOp: Boolean = false, + // Whether this operator can only run correctly under a fully-materialized + // schedule (e.g. a loop operator, whose back-edge is a cross-region + // materialized state channel that requires region-based re-execution). + // When ANY operator in the plan sets this, the schedule generator runs the + // WHOLE workflow fully materialized -- every link materialized, nothing + // pipelined -- not just this operator's own region boundaries. Whole-plan + // materialization is the minimal correct behavior for loops today; + // restricting it to only the requiring operator's regions is a possible + // future optimization. Default false. + requiresMaterializedExecution: Boolean = false, // hint for number of workers suggestedWorkerNum: Option[Int] = None, // name of the PVE to execute within @@ -316,6 +326,13 @@ case class PhysicalOp( def withIsOneToManyOp(isOneToManyOp: Boolean): PhysicalOp = this.copy(isOneToManyOp = isOneToManyOp) + /** + * creates a copy specifying whether this operator can only run correctly + * under a fully-materialized schedule (see the field doc) + */ + def withRequiresMaterializedExecution(requiresMaterializedExecution: Boolean): PhysicalOp = + this.copy(requiresMaterializedExecution = requiresMaterializedExecution) + /** * Creates a copy of the PhysicalOp with the schema of a specified input port updated. * The schema can either be a successful schema definition or an error represented as a Throwable. diff --git a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/workflow/WorkflowCoreTypesSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/workflow/WorkflowCoreTypesSpec.scala index 11f73013bf..2bf47489a3 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/workflow/WorkflowCoreTypesSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/workflow/WorkflowCoreTypesSpec.scala @@ -142,6 +142,14 @@ class WorkflowCoreTypesSpec extends AnyFlatSpec { assert(op.parallelizable, "the original instance is immutable") } + "PhysicalOp.withRequiresMaterializedExecution" should "default to false and round-trip through copy" in { + val op = newPhysicalOp("a") + assert(!op.requiresMaterializedExecution, "defaults to false") + val flipped = op.withRequiresMaterializedExecution(true) + assert(flipped.requiresMaterializedExecution) + assert(!op.requiresMaterializedExecution, "the original instance is immutable") + } + "PhysicalOp.withSuggestedWorkerNum" should "set the suggested worker count" in { val op = newPhysicalOp("a").withSuggestedWorkerNum(7) assert(op.suggestedWorkerNum.contains(7))
