This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch xinyuan-state-materialization
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/xinyuan-state-materialization
by this push:
new 4223cf5640 refactor(storage): make port URIs symmetric over a base URI
4223cf5640 is described below
commit 4223cf56408e51283280fdc61bdb7008f5f6f2e7
Author: Xinyuan Lin <[email protected]>
AuthorDate: Wed May 6 17:56:10 2026 -0700
refactor(storage): make port URIs symmetric over a base URI
Drop the result-as-primary asymmetry in VFSURIFactory: ports now build
a single base URI, with result and state URIs as equal first-class
derivatives via resultURI(base) / stateURI(base). Removes the
substring-replace `siblingStateURI` helper and the asymmetric
createResultURI / createStateURI pair.
---
.../core/architecture/packaging/output_manager.py | 7 +++--
amber/src/main/python/core/models/state.py | 4 ---
.../input_port_materialization_reader_runnable.py | 5 ++--
.../main/python/core/storage/vfs_uri_factory.py | 16 +++++++++--
.../messaginglayer/OutputManager.scala | 8 +++---
.../scheduling/CostBasedScheduleGenerator.scala | 6 ++--
.../ExpansionGreedyScheduleGenerator.scala | 4 +--
.../scheduling/RegionExecutionCoordinator.scala | 15 +++++-----
.../InputPortMaterializationReaderThread.scala | 6 ++--
.../core/storage/iceberg/test_iceberg_document.py | 32 ++++++++++++----------
.../org/apache/texera/amber/core/state/State.scala | 4 ---
.../texera/amber/core/storage/VFSURIFactory.scala | 21 +++++++++-----
.../amber/core/storage/VFSURIFactorySpec.scala | 23 ++++++++++------
.../result/iceberg/IcebergDocumentSpec.scala | 8 ++++--
.../result/iceberg/IcebergTableStatsSpec.scala | 22 ++++++++-------
15 files changed, 105 insertions(+), 76 deletions(-)
diff --git
a/amber/src/main/python/core/architecture/packaging/output_manager.py
b/amber/src/main/python/core/architecture/packaging/output_manager.py
index cea76904fe..8521b98ef4 100644
--- a/amber/src/main/python/core/architecture/packaging/output_manager.py
+++ b/amber/src/main/python/core/architecture/packaging/output_manager.py
@@ -45,6 +45,7 @@ from core.models import Tuple, Schema, StateFrame
from core.models.payload import DataPayload, DataFrame
from core.models.state import State
from core.storage.document_factory import DocumentFactory
+from core.storage.vfs_uri_factory import VFSURIFactory
from core.storage.runnables.port_storage_writer import (
PortStorageWriter,
PortStorageWriterElement,
@@ -131,7 +132,9 @@ class OutputManager:
to storage in batch, and open a long-lived buffered writer for
state materialization on the same port.
"""
- document, _ = DocumentFactory.open_document(storage_uri)
+ document, _ = DocumentFactory.open_document(
+ VFSURIFactory.result_uri(storage_uri)
+ )
buffered_item_writer =
document.writer(str(get_worker_index(self.worker_id)))
writer_queue = Queue()
port_storage_writer = PortStorageWriter(
@@ -150,7 +153,7 @@ class OutputManager:
)
state_document, _ = DocumentFactory.open_document(
- State.uri_from_result_uri(storage_uri)
+ VFSURIFactory.state_uri(storage_uri)
)
state_buffered_item_writer = state_document.writer(
str(get_worker_index(self.worker_id))
diff --git a/amber/src/main/python/core/models/state.py
b/amber/src/main/python/core/models/state.py
index 3ce610bbee..003aaa212a 100644
--- a/amber/src/main/python/core/models/state.py
+++ b/amber/src/main/python/core/models/state.py
@@ -41,10 +41,6 @@ class State(dict):
def from_tuple(cls, row: Tuple) -> "State":
return cls.from_json(row[cls.CONTENT])
- @staticmethod
- def uri_from_result_uri(result_uri: str) -> str:
- return result_uri.replace("/result", "/state")
-
_TYPE_MARKER = "__texera_type__"
_PAYLOAD_MARKER = "payload"
diff --git
a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
index bdb678aac2..692bff6555 100644
---
a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
+++
b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
@@ -37,6 +37,7 @@ from core.architecture.sendsemantics.round_robin_partitioner
import (
from core.models import Tuple, InternalQueue, DataFrame, DataPayload, State,
StateFrame
from core.models.internal_queue import DataElement, ECMElement
from core.storage.document_factory import DocumentFactory
+from core.storage.vfs_uri_factory import VFSURIFactory
from core.util import Stoppable, get_one_of
from core.util.runnable.runnable import Runnable
from core.util.virtual_identity import get_from_actor_id_for_input_port_storage
@@ -143,12 +144,12 @@ class InputPortMaterializationReaderRunnable(Runnable,
Stoppable):
"""
try:
self.materialization, self.tuple_schema =
DocumentFactory.open_document(
- self.uri
+ VFSURIFactory.result_uri(self.uri)
)
self.emit_ecm("StartChannel",
EmbeddedControlMessageType.NO_ALIGNMENT)
state_document, _ = DocumentFactory.open_document(
- State.uri_from_result_uri(self.uri)
+ VFSURIFactory.state_uri(self.uri)
)
for state_row in state_document.get():
self.emit_payload(StateFrame(State.from_tuple(state_row)))
diff --git a/amber/src/main/python/core/storage/vfs_uri_factory.py
b/amber/src/main/python/core/storage/vfs_uri_factory.py
index 0e23e60705..883450abf2 100644
--- a/amber/src/main/python/core/storage/vfs_uri_factory.py
+++ b/amber/src/main/python/core/storage/vfs_uri_factory.py
@@ -89,12 +89,22 @@ class VFSURIFactory:
)
@staticmethod
- def create_result_uri(workflow_id, execution_id, global_port_id) -> str:
- """Creates a URI pointing to a result storage."""
- base_uri = (
+ def create_port_base_uri(workflow_id, execution_id, global_port_id) -> str:
+ """Base URI for a port. Result and state URIs derive from it via
+ `result_uri` / `state_uri`.
+ """
+ return (
f"{VFSURIFactory.VFS_FILE_URI_SCHEME}:///wid/{workflow_id.id}"
f"/eid/{execution_id.id}/globalportid/"
f"{serialize_global_port_identity(global_port_id)}"
)
+ @staticmethod
+ def result_uri(base_uri: str) -> str:
+ """The result-resource URI under a port base URI."""
return f"{base_uri}/{VFSResourceType.RESULT.value}"
+
+ @staticmethod
+ def state_uri(base_uri: str) -> str:
+ """The state-resource URI under a port base URI."""
+ return f"{base_uri}/{VFSResourceType.STATE.value}"
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
index d76a76be33..3b233d8cf7 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
@@ -20,7 +20,7 @@
package org.apache.texera.amber.engine.architecture.messaginglayer
import org.apache.texera.amber.core.state.State
-import org.apache.texera.amber.core.storage.DocumentFactory
+import org.apache.texera.amber.core.storage.{DocumentFactory, VFSURIFactory}
import org.apache.texera.amber.core.storage.model.BufferedItemWriter
import org.apache.texera.amber.core.tuple._
import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity,
ChannelIdentity}
@@ -295,9 +295,9 @@ class OutputManager(
ports.head._1
}
- private def setupOutputStorageWriterThread(portId: PortIdentity, storageUri:
URI): Unit = {
+ private def setupOutputStorageWriterThread(portId: PortIdentity,
portBaseURI: URI): Unit = {
val bufferedItemWriter = DocumentFactory
- .openDocument(storageUri)
+ .openDocument(VFSURIFactory.resultURI(portBaseURI))
._1
.writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString)
.asInstanceOf[BufferedItemWriter[Tuple]]
@@ -308,7 +308,7 @@ class OutputManager(
// The state document is provisioned alongside the result document
// by RegionExecutionCoordinator, so it is always present.
val stateWriter = DocumentFactory
- .openDocument(State.uriFromResultUri(storageUri))
+ .openDocument(VFSURIFactory.stateURI(portBaseURI))
._1
.writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString)
.asInstanceOf[BufferedItemWriter[Tuple]]
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
index 401ccddc0a..d2c3175368 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
@@ -20,7 +20,7 @@
package org.apache.texera.amber.engine.architecture.scheduling
import org.apache.texera.amber.config.ApplicationConfig
-import org.apache.texera.amber.core.storage.VFSURIFactory.createResultURI
+import org.apache.texera.amber.core.storage.VFSURIFactory.createPortBaseURI
import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity,
PhysicalOpIdentity}
import org.apache.texera.amber.core.workflow._
import
org.apache.texera.amber.engine.architecture.scheduling.SchedulingUtils.replaceVertex
@@ -174,12 +174,12 @@ class CostBasedScheduleGenerator(
// Allocate an URI for each of these output ports
val outputPortConfigs: Map[GlobalPortIdentity, OutputPortConfig] =
outputPortIdsNeedingStorage.map { gpid =>
- val outputWriterURI = createResultURI(
+ val portBaseURI = createPortBaseURI(
workflowId = workflowContext.workflowId,
executionId = workflowContext.executionId,
globalPortId = gpid
)
- gpid -> OutputPortConfig(outputWriterURI)
+ gpid -> OutputPortConfig(portBaseURI)
}.toMap
val resourceConfig = ResourceConfig(portConfigs = outputPortConfigs)
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/ExpansionGreedyScheduleGenerator.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/ExpansionGreedyScheduleGenerator.scala
index 4bb8933896..304e1496f8 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/ExpansionGreedyScheduleGenerator.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/ExpansionGreedyScheduleGenerator.scala
@@ -21,7 +21,7 @@ package org.apache.texera.amber.engine.architecture.scheduling
import com.typesafe.scalalogging.LazyLogging
import org.apache.texera.amber.core.WorkflowRuntimeException
-import org.apache.texera.amber.core.storage.VFSURIFactory.createResultURI
+import org.apache.texera.amber.core.storage.VFSURIFactory.createPortBaseURI
import org.apache.texera.amber.core.virtualidentity.PhysicalOpIdentity
import org.apache.texera.amber.core.workflow.{
GlobalPortIdentity,
@@ -331,7 +331,7 @@ class ExpansionGreedyScheduleGenerator(
private def getStorageURIFromGlobalOutputPortId(outputPortId:
GlobalPortIdentity) = {
assert(!outputPortId.input)
- createResultURI(
+ createPortBaseURI(
workflowId = workflowContext.workflowId,
executionId = workflowContext.executionId,
globalPortId = outputPortId
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
index 58fdf9f242..9262fcee18 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
@@ -22,7 +22,7 @@ package org.apache.texera.amber.engine.architecture.scheduling
import org.apache.pekko.pattern.gracefulStop
import com.twitter.util.{Duration => TwitterDuration, Future, JavaTimer,
Return, Throw, Timer}
import org.apache.texera.amber.core.state.State
-import org.apache.texera.amber.core.storage.DocumentFactory
+import org.apache.texera.amber.core.storage.{DocumentFactory, VFSURIFactory}
import org.apache.texera.amber.core.storage.VFSURIFactory.decodeURI
import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
import org.apache.texera.amber.core.workflow.{GlobalPortIdentity,
PhysicalLink, PhysicalOp}
@@ -569,20 +569,21 @@ class RegionExecutionCoordinator(
): Unit = {
portConfigs.foreach {
case (outputPortId, portConfig) =>
- val storageUriToAdd = portConfig.storageURI
- val stateUriToAdd = State.uriFromResultUri(storageUriToAdd)
- val (_, eid, _, _) = decodeURI(storageUriToAdd)
+ val portBaseURI = portConfig.storageURI
+ val resultURI = VFSURIFactory.resultURI(portBaseURI)
+ val stateURI = VFSURIFactory.stateURI(portBaseURI)
val schemaOptional =
region.getOperator(outputPortId.opId).outputPorts(outputPortId.portId)._3
val schema =
schemaOptional.getOrElse(throw new IllegalStateException("Schema is
missing"))
- DocumentFactory.createDocument(storageUriToAdd, schema)
- DocumentFactory.createDocument(stateUriToAdd, State.schema)
+ DocumentFactory.createDocument(resultURI, schema)
+ DocumentFactory.createDocument(stateURI, State.schema)
if (!isRestart) {
+ val (_, eid, _, _) = decodeURI(resultURI)
WorkflowExecutionsResource.insertOperatorPortResultUri(
eid = eid,
globalPortId = outputPortId,
- uri = storageUriToAdd
+ uri = resultURI
)
}
}
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala
index e06453b1a3..a7d259c37e 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala
@@ -22,7 +22,7 @@ package
org.apache.texera.amber.engine.architecture.worker.managers
import io.grpc.MethodDescriptor
import org.apache.texera.amber.config.ApplicationConfig
import org.apache.texera.amber.core.state.State
-import org.apache.texera.amber.core.storage.DocumentFactory
+import org.apache.texera.amber.core.storage.{DocumentFactory, VFSURIFactory}
import org.apache.texera.amber.core.storage.model.VirtualDocument
import org.apache.texera.amber.core.tuple.Tuple
import org.apache.texera.amber.core.virtualidentity.{
@@ -99,7 +99,7 @@ class InputPortMaterializationReaderThread(
try {
val stateDocument =
DocumentFactory
- .openDocument(State.uriFromResultUri(uri))
+ .openDocument(VFSURIFactory.stateURI(uri))
._1
.asInstanceOf[VirtualDocument[Tuple]]
val stateReadIterator = stateDocument.get()
@@ -111,7 +111,7 @@ class InputPortMaterializationReaderThread(
}
val materialization: VirtualDocument[Tuple] = DocumentFactory
- .openDocument(uri)
+ .openDocument(VFSURIFactory.resultURI(uri))
._1
.asInstanceOf[VirtualDocument[Tuple]]
val storageReadIterator = materialization.get()
diff --git
a/amber/src/test/python/core/storage/iceberg/test_iceberg_document.py
b/amber/src/test/python/core/storage/iceberg/test_iceberg_document.py
index 44e62bbb8a..381f8e5ff6 100644
--- a/amber/src/test/python/core/storage/iceberg/test_iceberg_document.py
+++ b/amber/src/test/python/core/storage/iceberg/test_iceberg_document.py
@@ -83,17 +83,21 @@ class TestIcebergDocument:
with a random operator id
"""
operator_uuid = str(uuid.uuid4()).replace("-", "")
- uri = VFSURIFactory.create_result_uri(
- WorkflowIdentity(id=0),
- ExecutionIdentity(id=0),
- GlobalPortIdentity(
- op_id=PhysicalOpIdentity(
-
logical_op_id=OperatorIdentity(id=f"test_table_{operator_uuid}"),
- layer_name="main",
+ uri = VFSURIFactory.result_uri(
+ VFSURIFactory.create_port_base_uri(
+ WorkflowIdentity(id=0),
+ ExecutionIdentity(id=0),
+ GlobalPortIdentity(
+ op_id=PhysicalOpIdentity(
+ logical_op_id=OperatorIdentity(
+ id=f"test_table_{operator_uuid}"
+ ),
+ layer_name="main",
+ ),
+ port_id=PortIdentity(id=0),
+ input=False,
),
- port_id=PortIdentity(id=0),
- input=False,
- ),
+ )
)
DocumentFactory.create_document(uri, amber_schema)
document, _ = DocumentFactory.open_document(uri)
@@ -327,7 +331,7 @@ class TestIcebergDocument:
def test_state_materialization_round_trip(self):
operator_uuid = str(uuid.uuid4()).replace("-", "")
- result_uri = VFSURIFactory.create_result_uri(
+ base_uri = VFSURIFactory.create_port_base_uri(
WorkflowIdentity(id=0),
ExecutionIdentity(id=0),
GlobalPortIdentity(
@@ -339,7 +343,7 @@ class TestIcebergDocument:
input=False,
),
)
- state_uri = State.uri_from_result_uri(result_uri)
+ state_uri = VFSURIFactory.state_uri(base_uri)
DocumentFactory.create_document(state_uri, State.SCHEMA)
document, _ = DocumentFactory.open_document(state_uri)
@@ -363,7 +367,7 @@ class TestIcebergDocument:
def test_multiple_states_materialize_as_rows_in_one_table(self):
operator_uuid = str(uuid.uuid4()).replace("-", "")
- result_uri = VFSURIFactory.create_result_uri(
+ base_uri = VFSURIFactory.create_port_base_uri(
WorkflowIdentity(id=0),
ExecutionIdentity(id=0),
GlobalPortIdentity(
@@ -377,7 +381,7 @@ class TestIcebergDocument:
input=False,
),
)
- state_uri = State.uri_from_result_uri(result_uri)
+ state_uri = VFSURIFactory.state_uri(base_uri)
DocumentFactory.create_document(state_uri, State.SCHEMA)
document, _ = DocumentFactory.open_document(state_uri)
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala
index 532f355c17..ba146f1d57 100644
---
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala
+++
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.JsonNode
import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema,
Tuple}
import org.apache.texera.amber.util.JSONUtils.objectMapper
-import java.net.URI
import java.util.Base64
import scala.jdk.CollectionConverters.IteratorHasAsScala
@@ -58,9 +57,6 @@ object State {
def fromTuple(row: Tuple): State = fromJson(row.getField[String](Content))
- def uriFromResultUri(resultUri: URI): URI =
- new URI(resultUri.toString.replace("/result", "/state"))
-
private def toJsonValue(value: Any): Any =
value match {
case null => null
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala
index e687b28a29..291c31896b 100644
---
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala
+++
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala
@@ -84,18 +84,25 @@ object VFSURIFactory {
}
/**
- * Create a URI pointing to a result storage
+ * Create the base URI for a port. Result and state URIs are derived
+ * from this base via `resultURI` / `stateURI`.
*/
- def createResultURI(
+ def createPortBaseURI(
workflowId: WorkflowIdentity,
executionId: ExecutionIdentity,
globalPortId: GlobalPortIdentity
- ): URI = {
- val baseUri =
-
s"$VFS_FILE_URI_SCHEME:///wid/${workflowId.id}/eid/${executionId.id}/globalportid/${globalPortId.serializeAsString}"
+ ): URI =
+ new URI(
+ s"$VFS_FILE_URI_SCHEME:///wid/${workflowId.id}/eid/${executionId.id}" +
+ s"/globalportid/${globalPortId.serializeAsString}"
+ )
- new URI(s"$baseUri/${VFSResourceType.RESULT.toString.toLowerCase}")
- }
+ def resultURI(baseURI: URI): URI = appendResource(baseURI,
VFSResourceType.RESULT)
+
+ def stateURI(baseURI: URI): URI = appendResource(baseURI,
VFSResourceType.STATE)
+
+ private def appendResource(baseURI: URI, resourceType:
VFSResourceType.Value): URI =
+ new URI(s"$baseURI/${resourceType.toString.toLowerCase}")
/**
* Create a URI pointing to runtime statistics
diff --git
a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/VFSURIFactorySpec.scala
b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/VFSURIFactorySpec.scala
index 6fbe35873a..0b8ae4a19c 100644
---
a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/VFSURIFactorySpec.scala
+++
b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/VFSURIFactorySpec.scala
@@ -42,23 +42,30 @@ class VFSURIFactorySpec extends AnyFlatSpec {
input = true
)
- "VFSURIFactory.createResultURI" should "include workflow, execution, port,
and the result resource type" in {
- val uri = VFSURIFactory.createResultURI(workflowId, executionId, portId)
- assert(uri.getScheme == VFSURIFactory.VFS_FILE_URI_SCHEME)
- val path = uri.getPath
+ "VFSURIFactory.createPortBaseURI" should "include workflow, execution, and
port segments without a resource type" in {
+ val baseURI = VFSURIFactory.createPortBaseURI(workflowId, executionId,
portId)
+ assert(baseURI.getScheme == VFSURIFactory.VFS_FILE_URI_SCHEME)
+ val path = baseURI.getPath
assert(path.contains("/wid/7"))
assert(path.contains("/eid/11"))
assert(path.contains("/globalportid/"))
- assert(path.endsWith("/result"))
+ assert(!path.endsWith("/result"))
+ assert(!path.endsWith("/state"))
}
- it should "round-trip through decodeURI" in {
- val uri = VFSURIFactory.createResultURI(workflowId, executionId, portId)
- val (wid, eid, globalPortIdOpt, resourceType) =
VFSURIFactory.decodeURI(uri)
+ "VFSURIFactory.resultURI / stateURI" should "append the resource segment and
round-trip through decodeURI" in {
+ val baseURI = VFSURIFactory.createPortBaseURI(workflowId, executionId,
portId)
+ val resultURI = VFSURIFactory.resultURI(baseURI)
+ val stateURI = VFSURIFactory.stateURI(baseURI)
+ assert(resultURI.getPath.endsWith("/result"))
+ assert(stateURI.getPath.endsWith("/state"))
+
+ val (wid, eid, globalPortIdOpt, resourceType) =
VFSURIFactory.decodeURI(resultURI)
assert(wid == workflowId)
assert(eid == executionId)
assert(globalPortIdOpt.contains(portId))
assert(resourceType == VFSResourceType.RESULT)
+ assert(VFSURIFactory.decodeURI(stateURI)._4 == VFSResourceType.STATE)
}
"VFSURIFactory.createRuntimeStatisticsURI" should "produce a
runtimeStatistics URI without an opid segment" in {
diff --git
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
index 7f1d8573c2..b865fff94d 100644
---
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
+++
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
@@ -52,6 +52,7 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple]
with BeforeAndAfter
var deserde: (IcebergSchema, Record) => Tuple = _
var catalog: Catalog = _
val tableNamespace = "test_namespace"
+ var baseURI: URI = _
var uri: URI = _
override def beforeAll(): Unit = {
@@ -80,7 +81,7 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple]
with BeforeAndAfter
override def beforeEach(): Unit = {
// Generate a unique table name for each test
- uri = VFSURIFactory.createResultURI(
+ baseURI = VFSURIFactory.createPortBaseURI(
WorkflowIdentity(0),
ExecutionIdentity(0),
GlobalPortIdentity(
@@ -92,6 +93,7 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple]
with BeforeAndAfter
PortIdentity()
)
)
+ uri = VFSURIFactory.resultURI(baseURI)
DocumentFactory.createDocument(uri, amberSchema)
super.beforeEach()
}
@@ -143,7 +145,7 @@ class IcebergDocumentSpec extends
VirtualDocumentSpec[Tuple] with BeforeAndAfter
}
it should "round trip materialized state documents" in {
- val stateUri = State.uriFromResultUri(uri)
+ val stateUri = VFSURIFactory.stateURI(baseURI)
DocumentFactory.createDocument(stateUri, State.schema)
val stateDocument =
DocumentFactory.openDocument(stateUri)._1.asInstanceOf[VirtualDocument[Tuple]]
@@ -172,7 +174,7 @@ class IcebergDocumentSpec extends
VirtualDocumentSpec[Tuple] with BeforeAndAfter
}
it should "materialize multiple states as rows in one state table" in {
- val stateUri = State.uriFromResultUri(uri)
+ val stateUri = VFSURIFactory.stateURI(baseURI)
DocumentFactory.createDocument(stateUri, State.schema)
val stateDocument =
DocumentFactory.openDocument(stateUri)._1.asInstanceOf[VirtualDocument[Tuple]]
diff --git
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergTableStatsSpec.scala
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergTableStatsSpec.scala
index 175ebc2c01..b7611f6f77 100644
---
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergTableStatsSpec.scala
+++
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergTableStatsSpec.scala
@@ -50,16 +50,18 @@ class IcebergTableStatsSpec extends AnyFlatSpec with
BeforeAndAfterAll with Suit
var deserde: (IcebergSchema, Record) => Tuple = _
var catalog: Catalog = _
val tableNamespace = "test_namespace"
- var uri: URI = VFSURIFactory.createResultURI(
- WorkflowIdentity(0),
- ExecutionIdentity(0),
- GlobalPortIdentity(
- PhysicalOpIdentity(
- logicalOpId =
-
OperatorIdentity(s"test_table_${UUID.randomUUID().toString.replace("-", "")}"),
- layerName = "main"
- ),
- PortIdentity()
+ var uri: URI = VFSURIFactory.resultURI(
+ VFSURIFactory.createPortBaseURI(
+ WorkflowIdentity(0),
+ ExecutionIdentity(0),
+ GlobalPortIdentity(
+ PhysicalOpIdentity(
+ logicalOpId =
+
OperatorIdentity(s"test_table_${UUID.randomUUID().toString.replace("-", "")}"),
+ layerName = "main"
+ ),
+ PortIdentity()
+ )
)
)