This is an automated email from the ASF dual-hosted git repository.
github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new da99a35945 feat(scheduling): reuse output storage across region
re-executions (#5707)
da99a35945 is described below
commit da99a359458bb285908752779b4c2dc3f11ecd89
Author: Xinyuan Lin <[email protected]>
AuthorDate: Tue Jun 16 17:11:35 2026 -0700
feat(scheduling): reuse output storage across region re-executions (#5707)
### What changes were proposed in this PR?
Adds an opt-in mechanism for an output port to **reuse** its storage
when the owning operator's region re-executes, instead of recreating the
document each time. Dormant and behavior-preserving — no operator sets
the flag in this PR.
- `OutputPort` gains a `reuseStorage: Boolean` proto field (alongside
`blocking` / `mode`). It marks a port whose output accumulates across
region re-executions — e.g. a Loop End port whose result builds up over
the iterations of its own loop.
- `DocumentFactory.createOrReuseDocument(uri, schema, reuseExisting, …)`
is the create-or-reuse decision: when reuse is requested and a document
already exists it opens and returns that one; otherwise it creates a
fresh one. It always returns the document, so the call site does not
branch.
- `RegionExecutionCoordinator` reads each output port's `reuseStorage`
flag while provisioning that port's result/state documents and routes
through `createOrReuseDocument`.
| port flag | region re-run behavior |
|---|---|
| `false` (every operator today) | recreate output/state documents —
unchanged |
| `true` (set by Loop End in the loop PR) | keep and reopen the existing
documents |
A runtime guard in `RegionExecutionCoordinator` asserts no port sets
`reuseStorage` for now: the flag activates only with the loop operators,
which are not yet on `main`. The guard keeps the dormant reuse path from
being silently exercised before its consumer exists, and is removed when
the loop operators land.
### Any related issues, documentation, discussions?
Resolves #5709 (sub-issue of #4442 "Introduce for loop"). Split out of
#5700 to keep that PR reviewable, per @Xiao-zhen-Liu's
[review](https://github.com/apache/texera/pull/4206#pullrequestreview-4482667715).
### How was this PR tested?
- `DocumentFactorySpec` — pins the create-or-reuse decision (the reuse ×
exists matrix plus the "no-reuse never probes existence" short-circuit)
with injected document stubs, no iceberg backend.
- `OutputPortReuseFlagSpec` — guards that no registered operator enables
`reuseStorage` on any output port.
- `WorkflowCore` / `WorkflowOperator` / `WorkflowExecutionService`
compile; scalafmt + scalafix clean.
### Was this PR authored or co-authored using generative AI tooling?
Co-authored with Claude Opus 4.8 in compliance with ASF.
---
.../scheduling/RegionExecutionCoordinator.scala | 29 ++++++-
.../org/apache/texera/amber/core/workflow.proto | 6 ++
.../amber/core/storage/DocumentFactory.scala | 22 +++++
.../amber/core/storage/DocumentFactorySpec.scala | 96 ++++++++++++++++++++++
.../metadata/OutputPortReuseFlagSpec.scala | 48 +++++++++++
5 files changed, 199 insertions(+), 2 deletions(-)
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
index 5a9df11b58..4497d7c4ae 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
@@ -576,8 +576,33 @@ class RegionExecutionCoordinator(
region.getOperator(outputPortId.opId).outputPorts(outputPortId.portId)._3
val schema =
schemaOptional.getOrElse(throw new IllegalStateException("Schema is
missing"))
- DocumentFactory.createDocument(resultURI, schema)
- DocumentFactory.createDocument(stateURI, State.schema)
+ // An output port whose storage accumulates across region re-executions
+ // (e.g. a LoopEnd port, whose output builds up over the iterations of
+ // its own loop) sets `reuseStorage`. When set, the port's existing
+ // document is kept and reopened on each re-run; when unset, a fresh
one
+ // is created. Read per output port -- storage behavior is
port-specific.
+ // (The inner LoopEnd of a nested loop additionally drops its output
+ // once per outer iteration on the Python worker side in
+ // MainLoop._process_state_frame, which is orthogonal to this.)
+ val reuseStorage =
+ region
+ .getOperator(outputPortId.opId)
+ .outputPorts(outputPortId.portId)
+ ._1
+ .reuseStorage
+ // Guard: no operator enables reuseStorage in production yet -- it
+ // activates with the loop operators, which aren't on main. Until then
+ // this fails loudly so the dormant reuse path is never silently
+ // exercised. Remove/relax this guard when introducing the loop
operators.
+ require(
+ !reuseStorage,
+ s"Output port $outputPortId set reuseStorage, which is not " +
+ "supported in production yet (it activates with the loop
operators)."
+ )
+ Seq((resultURI, schema), (stateURI, State.schema)).foreach {
+ case (uri, sch) =>
+ DocumentFactory.createOrReuseDocument(uri, sch, reuseStorage)
+ }
if (!isRestart) {
val (_, eid, _, _) = decodeURI(resultURI)
WorkflowExecutionsResource.insertOperatorPortResultUri(
diff --git
a/common/workflow-core/src/main/protobuf/org/apache/texera/amber/core/workflow.proto
b/common/workflow-core/src/main/protobuf/org/apache/texera/amber/core/workflow.proto
index c5b2cb248f..b3b72ed924 100644
---
a/common/workflow-core/src/main/protobuf/org/apache/texera/amber/core/workflow.proto
+++
b/common/workflow-core/src/main/protobuf/org/apache/texera/amber/core/workflow.proto
@@ -62,6 +62,12 @@ message OutputPort {
string displayName = 2;
bool blocking = 3;
OutputMode mode = 4;
+ // Whether storage at this port persists across the owning operator's region
+ // re-executions: when set, the existing document is kept and appended to on
+ // each re-run; when unset, it is recreated. Set e.g. on a LoopEnd port whose
+ // output accumulates across the iterations of its own loop. The region
+ // scheduler reads this when provisioning the port's output document.
+ bool reuseStorage = 5;
}
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala
index d3fcae868f..a26340e79c 100644
---
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala
+++
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala
@@ -133,6 +133,28 @@ object DocumentFactory {
}
}
+ /**
+ * Return the document at `uri`: when `reuseExisting` is set and a document
+ * already exists there, open and return the existing one -- so a caller
whose
+ * output accumulates across re-runs (e.g. a LoopEnd port whose region
+ * re-executes once per loop iteration) keeps the already-populated document
+ * instead of clobbering it, since `createDocument` overrides any existing
+ * document. Otherwise create it.
+ *
+ * `exists` / `open` / `create` default to this object's own
`documentExists`
+ * / `openDocument` / `createDocument`; they are parameterized only so the
+ * create-or-reuse decision can be unit-tested without an iceberg backend.
+ */
+ def createOrReuseDocument(
+ uri: URI,
+ schema: Schema,
+ reuseExisting: Boolean,
+ exists: URI => Boolean = documentExists,
+ open: URI => VirtualDocument[_] = (u: URI) => openDocument(u)._1,
+ create: (URI, Schema) => VirtualDocument[_] = createDocument
+ ): VirtualDocument[_] =
+ if (reuseExisting && exists(uri)) open(uri) else create(uri, schema)
+
/**
* Open a document specified by the uri.
* If the document is storing structural data, the schema will also be
returned
diff --git
a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/DocumentFactorySpec.scala
b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/DocumentFactorySpec.scala
new file mode 100644
index 0000000000..d67b90f363
--- /dev/null
+++
b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/DocumentFactorySpec.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.core.storage
+
+import org.apache.texera.amber.core.storage.model.VirtualDocument
+import org.apache.texera.amber.core.tuple.{Schema, Tuple}
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+import java.net.URI
+
+/**
+ * Unit tests for `DocumentFactory.createOrReuseDocument`, the create-or-reuse
+ * decision behind output-port storage provisioning. It always returns the
+ * document (opened when reused, created otherwise) so the call site doesn't
+ * branch.
+ *
+ * `exists` / `open` / `create` are injected so the decision can be pinned
with
+ * trivial document stubs -- no iceberg backend, no live region.
+ */
+class DocumentFactorySpec extends AnyFlatSpec with Matchers {
+
+ private val uri = new URI("vfs:///wf/result/loop-end")
+ private val schema = Schema()
+
+ private def stubDoc: VirtualDocument[Tuple] =
+ new VirtualDocument[Tuple] {
+ override def getURI: URI = uri
+ override def clear(): Unit = ()
+ }
+ private val opened: VirtualDocument[_] = stubDoc
+ private val created: VirtualDocument[_] = stubDoc
+
+ /** Run with spies; return (document handed back, which path was taken). */
+ private def run(reuseExisting: Boolean, exists: Boolean):
(VirtualDocument[_], String) = {
+ var path = ""
+ val doc = DocumentFactory.createOrReuseDocument(
+ uri,
+ schema,
+ reuseExisting,
+ _ => exists,
+ _ => { path = "open"; opened },
+ (_, _) => { path = "create"; created }
+ )
+ (doc, path)
+ }
+
+ "createOrReuseDocument" should
+ "open and return the existing document when the port reuses storage and
one exists" in {
+ val (doc, path) = run(reuseExisting = true, exists = true)
+ path shouldBe "open"
+ doc should be theSameInstanceAs opened
+ }
+
+ it should "create when the port reuses storage but none exists yet" in {
+ val (doc, path) = run(reuseExisting = true, exists = false)
+ path shouldBe "create"
+ doc should be theSameInstanceAs created
+ }
+
+ it should "always create when the port does not reuse storage, even if one
exists" in {
+ val (doc, path) = run(reuseExisting = false, exists = true)
+ path shouldBe "create"
+ doc should be theSameInstanceAs created
+ }
+
+ it should "not probe existence when the port does not reuse storage" in {
+ var probed = false
+ DocumentFactory.createOrReuseDocument(
+ uri,
+ schema,
+ reuseExisting = false,
+ _ => { probed = true; true },
+ _ => opened,
+ (_, _) => created
+ )
+ probed shouldBe false
+ }
+}
diff --git
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/metadata/OutputPortReuseFlagSpec.scala
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/metadata/OutputPortReuseFlagSpec.scala
new file mode 100644
index 0000000000..7850d2b98f
--- /dev/null
+++
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/metadata/OutputPortReuseFlagSpec.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.metadata
+
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+/**
+ * Guard for the `OutputPort.reuseStorage` flag.
+ *
+ * The flag tells the region scheduler to reuse (append to) a port's storage
+ * across region re-executions instead of recreating it. Only an operator
whose
+ * output accumulates across re-executions should set it -- today that is no
+ * operator on `main` (the only one that will, Loop End, is not yet merged).
+ *
+ * This pins the flag off for every registered operator so it can't be turned
+ * on unexpectedly. When the loop operators land, update this to allow Loop
+ * End's output port (and only it).
+ */
+class OutputPortReuseFlagSpec extends AnyFlatSpec with Matchers {
+
+ "No registered operator" should "enable OutputPort.reuseStorage on any of
its output ports" in {
+ OperatorMetadataGenerator.operatorTypeMap.keys.foreach { opClass =>
+ opClass.getConstructor().newInstance().operatorInfo.outputPorts.foreach
{ port =>
+ withClue(s"${opClass.getSimpleName} / output port ${port.id}: ") {
+ port.reuseStorage shouldBe false
+ }
+ }
+ }
+ }
+}