This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new e27b98aa08 feat(storage): add DocumentFactory.documentExists (#5085)
e27b98aa08 is described below
commit e27b98aa0850013697403b8c66a529e5eb67ad66
Author: Xinyuan Lin <[email protected]>
AuthorDate: Fri May 15 21:35:08 2026 -0700
feat(storage): add DocumentFactory.documentExists (#5085)
### What changes were proposed in this PR?
Adds a `documentExists`-style helper to `DocumentFactory` in both the
Scala and Python code paths, so callers can check whether an
iceberg-backed document already exists at a `vfs://` URI without
catching exceptions from `openDocument` / `open_document`.
- Scala: new `DocumentFactory.documentExists(uri: URI): Boolean`.
Resolves the `VFSResourceType` to its iceberg namespace, then probes the
catalog via
`IcebergCatalogInstance.getInstance().tableExists(TableIdentifier.of(namespace,
storageKey))`. Throws `UnsupportedOperationException` for non-`vfs` URI
schemes; `IllegalArgumentException` for unsupported resource types.
- Python: new `DocumentFactory.document_exists(uri: str) -> bool`. Same
shape: probes via `catalog.table_exists(f"{namespace}.{storage_key}")`;
raises `NotImplementedError` / `ValueError` symmetrically.
- Refactor: extracted a private `resolveNamespace` (Scala) and
`_resolve_namespace` (Python) so `createDocument`, `openDocument`, and
the new helper share one resource-type → namespace mapping in each
language.
- Why `Catalog.tableExists` rather than `loadTableMetadata`:
`loadTableMetadata` catches every exception and returns `None`, so a
transient catalog error would have surfaced as a false-negative "doesn't
exist" answer. `Catalog.tableExists` only returns `false` on actual
not-found, and lets unexpected errors propagate.
- The change in `open_document` from a hard-coded `"vfs"` literal to
`VFSURIFactory.VFS_FILE_URI_SCHEME` aligns the three methods on the same
scheme constant.
### Any related issues, documentation, discussions?
Closes: #5089
### How was this PR tested?
- `sbt "WorkflowCore/Test/compile"` — clean.
- `sbt "WorkflowCore/testOnly *IcebergDocumentSpec"` — 14/14 pass,
including two new cases asserting `documentExists` returns true after
`createDocument`, false on a fresh URI, and throws
`UnsupportedOperationException` for an unsupported scheme.
- `sbt "WorkflowCore/testOnly *IcebergUtilSpec"` — 13/13 pass (refactor
did not touch `IcebergUtil`).
- `pytest amber/src/test/python/core/storage/test_document_factory.py` —
11/11 pass, including four new cases covering `document_exists`
returning true/false based on `catalog.table_exists`, raising
`ValueError` on an unsupported resource type, and raising
`NotImplementedError` on an unsupported scheme.
- `ruff check` clean on `document_factory.py` and
`test_document_factory.py`.
### Was this PR authored or co-authored using generative AI tooling?
Co-authored with Claude Opus 4.7 in compliance with ASF.
---------
Signed-off-by: Xinyuan Lin <[email protected]>
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
Co-authored-by: Copilot Autofix powered by AI
<[email protected]>
Co-authored-by: Meng Wang <[email protected]>
---
.../main/python/core/storage/document_factory.py | 60 ++++++++++++++-------
.../python/core/storage/test_document_factory.py | 38 +++++++++++++
.../amber/core/storage/DocumentFactory.scala | 58 +++++++++++++-------
.../result/iceberg/IcebergDocumentSpec.scala | 62 +++++++++++++++++++++-
4 files changed, 181 insertions(+), 37 deletions(-)
diff --git a/amber/src/main/python/core/storage/document_factory.py
b/amber/src/main/python/core/storage/document_factory.py
index bd690ceb59..5078ca7dd5 100644
--- a/amber/src/main/python/core/storage/document_factory.py
+++ b/amber/src/main/python/core/storage/document_factory.py
@@ -43,6 +43,20 @@ class DocumentFactory:
ICEBERG = "iceberg"
+ @staticmethod
+ def _resolve_namespace(resource_type: VFSResourceType) -> str:
+ # Only RESULT and STATE are mapped here: the Python runtime writes
those
+ # tables. CONSOLE_MESSAGES and RUNTIME_STATISTICS are produced
exclusively
+ # from the Scala side (see DocumentFactory.scala), so they are
intentionally
+ # absent from this mapping and fall through to the unsupported branch.
+ match resource_type:
+ case VFSResourceType.RESULT:
+ return StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE
+ case VFSResourceType.STATE:
+ return StorageConfig.ICEBERG_TABLE_STATE_NAMESPACE
+ case _:
+ raise ValueError(f"Resource type {resource_type} is not
supported")
+
@staticmethod
def sanitize_uri_path(uri):
"""
@@ -60,15 +74,7 @@ class DocumentFactory:
parsed_uri = urlparse(uri)
if parsed_uri.scheme == VFSURIFactory.VFS_FILE_URI_SCHEME:
_, _, _, resource_type = VFSURIFactory.decode_uri(uri)
-
- match resource_type:
- case VFSResourceType.RESULT:
- namespace = StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE
- case VFSResourceType.STATE:
- namespace = StorageConfig.ICEBERG_TABLE_STATE_NAMESPACE
- case _:
- raise ValueError(f"Resource type {resource_type} is not
supported")
-
+ namespace = DocumentFactory._resolve_namespace(resource_type)
storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)
# Convert Amber Schema to Iceberg Schema with LARGE_BINARY
# field name encoding
@@ -96,19 +102,37 @@ class DocumentFactory:
)
@staticmethod
- def open_document(uri: str) -> typing.Tuple[VirtualDocument,
Optional[Schema]]:
+ def document_exists(uri: str) -> bool:
+ """
+ Check whether a document exists at the given URI without opening it.
+
+ Returns True iff the underlying storage already has an entry for this
+ URI (e.g., an iceberg table at the resolved namespace + storage key).
+
+ Raises:
+ NotImplementedError: if the URI scheme is not ``vfs``.
+ ValueError: if the resolved resource type has no iceberg namespace
+ mapping (see ``_resolve_namespace``).
+ """
parsed_uri = urlparse(uri)
- if parsed_uri.scheme == "vfs":
+ if parsed_uri.scheme == VFSURIFactory.VFS_FILE_URI_SCHEME:
_, _, _, resource_type = VFSURIFactory.decode_uri(uri)
+ namespace = DocumentFactory._resolve_namespace(resource_type)
+ storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)
+ return IcebergCatalogInstance.get_instance().table_exists(
+ f"{namespace}.{storage_key}"
+ )
- match resource_type:
- case VFSResourceType.RESULT:
- namespace = StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE
- case VFSResourceType.STATE:
- namespace = StorageConfig.ICEBERG_TABLE_STATE_NAMESPACE
- case _:
- raise ValueError(f"Resource type {resource_type} is not
supported")
+ raise NotImplementedError(
+ f"Unsupported URI scheme: {parsed_uri.scheme} for checking
document existence"
+ )
+ @staticmethod
+ def open_document(uri: str) -> typing.Tuple[VirtualDocument,
Optional[Schema]]:
+ parsed_uri = urlparse(uri)
+ if parsed_uri.scheme == VFSURIFactory.VFS_FILE_URI_SCHEME:
+ _, _, _, resource_type = VFSURIFactory.decode_uri(uri)
+ namespace = DocumentFactory._resolve_namespace(resource_type)
storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)
table = load_table_metadata(
diff --git a/amber/src/test/python/core/storage/test_document_factory.py
b/amber/src/test/python/core/storage/test_document_factory.py
index 859c004024..359b0c0aed 100644
--- a/amber/src/test/python/core/storage/test_document_factory.py
+++ b/amber/src/test/python/core/storage/test_document_factory.py
@@ -132,3 +132,41 @@ class TestOpenDocumentNamespaceRouting:
with pytest.raises(ValueError, match="No storage is found"):
DocumentFactory.open_document(VFS_URI)
+
+
+@patch("core.storage.document_factory.IcebergCatalogInstance")
+@patch("core.storage.document_factory.VFSURIFactory")
+class TestDocumentExists:
+ def test_returns_true_when_table_exists(self, mock_vfs, mock_icb):
+ mock_vfs.VFS_FILE_URI_SCHEME = "vfs"
+ mock_vfs.decode_uri.side_effect =
_decode_returning(VFSResourceType.RESULT)
+ catalog = MagicMock()
+ catalog.table_exists.return_value = True
+ mock_icb.get_instance.return_value = catalog
+
+ assert DocumentFactory.document_exists(VFS_URI) is True
+ identifier = catalog.table_exists.call_args.args[0]
+ assert
identifier.startswith(f"{StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE}.")
+
+ def test_returns_false_when_table_missing(self, mock_vfs, mock_icb):
+ mock_vfs.VFS_FILE_URI_SCHEME = "vfs"
+ mock_vfs.decode_uri.side_effect =
_decode_returning(VFSResourceType.RESULT)
+ catalog = MagicMock()
+ catalog.table_exists.return_value = False
+ mock_icb.get_instance.return_value = catalog
+
+ assert DocumentFactory.document_exists(VFS_URI) is False
+
+ def test_unsupported_resource_type_raises_value_error(self, mock_vfs,
_icb):
+ mock_vfs.VFS_FILE_URI_SCHEME = "vfs"
+ mock_vfs.decode_uri.side_effect = _decode_returning(
+ VFSResourceType.CONSOLE_MESSAGES
+ )
+
+ with pytest.raises(ValueError, match="not supported"):
+ DocumentFactory.document_exists(VFS_URI)
+
+
+def test_document_exists_rejects_non_vfs_scheme():
+ with pytest.raises(NotImplementedError, match="Unsupported URI scheme"):
+ DocumentFactory.document_exists("file:///tmp/x")
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala
index 00f6c70ba7..cc67ab84ce 100644
---
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala
+++
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala
@@ -27,6 +27,7 @@ import org.apache.texera.amber.core.storage.model._
import org.apache.texera.amber.core.storage.result.iceberg.IcebergDocument
import org.apache.texera.amber.core.tuple.{Schema, Tuple}
import org.apache.texera.amber.util.IcebergUtil
+import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.data.Record
import org.apache.iceberg.{Schema => IcebergSchema}
@@ -39,6 +40,16 @@ object DocumentFactory {
private def sanitizeURIPath(uri: URI): String =
uri.getPath.stripPrefix("/").replace("/", "_")
+ private def resolveNamespace(resourceType: VFSResourceType.Value): String =
+ resourceType match {
+ case RESULT => StorageConfig.icebergTableResultNamespace
+ case CONSOLE_MESSAGES =>
StorageConfig.icebergTableConsoleMessagesNamespace
+ case RUNTIME_STATISTICS =>
StorageConfig.icebergTableRuntimeStatisticsNamespace
+ case STATE => StorageConfig.icebergTableStateNamespace
+ case _ =>
+ throw new IllegalArgumentException(s"Resource type $resourceType is
not supported")
+ }
+
/**
* Open a document specified by the uri for read purposes only.
* @param fileUri the uri of the document
@@ -67,15 +78,7 @@ object DocumentFactory {
case VFS_FILE_URI_SCHEME =>
val (_, _, _, resourceType) = decodeURI(uri)
val storageKey = sanitizeURIPath(uri)
-
- val namespace = resourceType match {
- case RESULT => StorageConfig.icebergTableResultNamespace
- case CONSOLE_MESSAGES =>
StorageConfig.icebergTableConsoleMessagesNamespace
- case RUNTIME_STATISTICS =>
StorageConfig.icebergTableRuntimeStatisticsNamespace
- case STATE => StorageConfig.icebergTableStateNamespace
- case _ =>
- throw new IllegalArgumentException(s"Resource type $resourceType
is not supported")
- }
+ val namespace = resolveNamespace(resourceType)
val icebergSchema = IcebergUtil.toIcebergSchema(schema)
IcebergUtil.createTable(
@@ -103,6 +106,33 @@ object DocumentFactory {
}
}
+ /**
+ * Check whether a document exists at the given URI without opening it.
+ *
+ * Returns true iff the underlying storage already has an entry for this
+ * URI (e.g., an iceberg table at the resolved namespace + storage key).
+ *
+ * @throws UnsupportedOperationException if the URI scheme is not `vfs`.
+ * @throws IllegalArgumentException if the resolved resource type has no
+ * iceberg namespace mapping.
+ */
+ def documentExists(uri: URI): Boolean = {
+ uri.getScheme match {
+ case VFS_FILE_URI_SCHEME =>
+ val (_, _, _, resourceType) = decodeURI(uri)
+ val storageKey = sanitizeURIPath(uri)
+ val namespace = resolveNamespace(resourceType)
+ IcebergCatalogInstance
+ .getInstance()
+ .tableExists(TableIdentifier.of(namespace, storageKey))
+
+ case unsupportedScheme =>
+ throw new UnsupportedOperationException(
+ s"Unsupported URI scheme: $unsupportedScheme for checking document
existence"
+ )
+ }
+ }
+
/**
* Open a document specified by the uri.
* If the document is storing structural data, the schema will also be
returned
@@ -115,15 +145,7 @@ object DocumentFactory {
case VFS_FILE_URI_SCHEME =>
val (_, _, _, resourceType) = decodeURI(uri)
val storageKey = sanitizeURIPath(uri)
-
- val namespace = resourceType match {
- case RESULT => StorageConfig.icebergTableResultNamespace
- case CONSOLE_MESSAGES =>
StorageConfig.icebergTableConsoleMessagesNamespace
- case RUNTIME_STATISTICS =>
StorageConfig.icebergTableRuntimeStatisticsNamespace
- case STATE => StorageConfig.icebergTableStateNamespace
- case _ =>
- throw new IllegalArgumentException(s"Resource type $resourceType
is not supported")
- }
+ val namespace = resolveNamespace(resourceType)
val table = IcebergUtil
.loadTableMetadata(
diff --git
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
index b92562eeb7..6184ce8dcd 100644
---
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
+++
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
@@ -38,7 +38,7 @@ import org.apache.iceberg.data.Record
import org.apache.iceberg.{Schema => IcebergSchema}
import org.scalatest.BeforeAndAfterAll
-import java.lang.reflect.{InvocationHandler, Method, Proxy}
+import java.lang.reflect.{InvocationHandler, InvocationTargetException,
Method, Proxy}
import java.net.URI
import java.sql.Timestamp
import java.util.UUID
@@ -144,6 +144,66 @@ class IcebergDocumentSpec extends
VirtualDocumentSpec[Tuple] with BeforeAndAfter
}
}
+ it should "report documentExists=true for a URI that was created via
createDocument" in {
+ assert(DocumentFactory.documentExists(uri))
+ }
+
+ it should "report documentExists=false for a URI that was never created" in {
+ val freshBase = VFSURIFactory.createPortBaseURI(
+ WorkflowIdentity(0),
+ ExecutionIdentity(0),
+ GlobalPortIdentity(
+ PhysicalOpIdentity(
+ logicalOpId =
OperatorIdentity(s"fresh-${UUID.randomUUID().toString.replace("-", "")}"),
+ layerName = "main"
+ ),
+ PortIdentity()
+ )
+ )
+ val freshUri = VFSURIFactory.resultURI(freshBase)
+ assert(!DocumentFactory.documentExists(freshUri))
+ }
+
+ it should "throw UnsupportedOperationException for documentExists on an
unsupported scheme" in {
+ intercept[UnsupportedOperationException] {
+ DocumentFactory.documentExists(new URI("file:///tmp/anything"))
+ }
+ }
+
+ it should "resolve CONSOLE_MESSAGES URIs through documentExists" in {
+ val consoleUri = VFSURIFactory.createConsoleMessagesURI(
+ WorkflowIdentity(0),
+ ExecutionIdentity(0),
+ OperatorIdentity(s"fresh-${UUID.randomUUID().toString.replace("-", "")}")
+ )
+ assert(!DocumentFactory.documentExists(consoleUri))
+ }
+
+ it should "resolve RUNTIME_STATISTICS URIs through documentExists" in {
+ val statsUri = VFSURIFactory.createRuntimeStatisticsURI(
+ WorkflowIdentity(0),
+ ExecutionIdentity(0)
+ )
+ assert(!DocumentFactory.documentExists(statsUri))
+ }
+
+ it should "throw IllegalArgumentException for resolveNamespace on an
unmapped resource type" in {
+ // `resolveNamespace` is private and its `case _ =>` is unreachable from
any
+ // well-formed VFS URI (VFSURIFactory.decodeURI validates resource types).
+ // Exercise the defensive branch by reflecting on the method and passing
+ // null — Scala pattern matches fall through to the wildcard for null
+ // scrutinees.
+ val method = DocumentFactory.getClass.getDeclaredMethod(
+ "resolveNamespace",
+ classOf[Enumeration#Value]
+ )
+ method.setAccessible(true)
+ val wrapped = intercept[InvocationTargetException] {
+ method.invoke(DocumentFactory, null)
+ }
+ assert(wrapped.getCause.isInstanceOf[IllegalArgumentException])
+ }
+
it should "round trip materialized state documents" in {
val stateUri = VFSURIFactory.stateURI(baseURI)
DocumentFactory.createDocument(stateUri, State.schema)