This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new e27b98aa08 feat(storage): add DocumentFactory.documentExists (#5085)
e27b98aa08 is described below

commit e27b98aa0850013697403b8c66a529e5eb67ad66
Author: Xinyuan Lin <[email protected]>
AuthorDate: Fri May 15 21:35:08 2026 -0700

    feat(storage): add DocumentFactory.documentExists (#5085)
    
    ### What changes were proposed in this PR?
    
    Adds a `documentExists`-style helper to `DocumentFactory` in both the
    Scala and Python code paths, so callers can check whether an
    iceberg-backed document already exists at a `vfs://` URI without
    catching exceptions from `openDocument` / `open_document`.
    
    - Scala: new `DocumentFactory.documentExists(uri: URI): Boolean`.
    Resolves the `VFSResourceType` to its iceberg namespace, then probes the
    catalog via
    
`IcebergCatalogInstance.getInstance().tableExists(TableIdentifier.of(namespace,
    storageKey))`. Throws `UnsupportedOperationException` for non-`vfs` URI
    schemes; `IllegalArgumentException` for unsupported resource types.
    - Python: new `DocumentFactory.document_exists(uri: str) -> bool`. Same
    shape: probes via `catalog.table_exists(f"{namespace}.{storage_key}")`;
    raises `NotImplementedError` / `ValueError` symmetrically.
    - Refactor: extracted a private `resolveNamespace` (Scala) and
    `_resolve_namespace` (Python) so `createDocument`, `openDocument`, and
    the new helper share one resource-type → namespace mapping in each
    language.
    - Why `Catalog.tableExists` rather than `loadTableMetadata`:
    `loadTableMetadata` catches every exception and returns `None`, so a
    transient catalog error would have surfaced as a false-negative "doesn't
    exist" answer. `Catalog.tableExists` only returns `false` on actual
    not-found, and lets unexpected errors propagate.
    - The change in `open_document` from a hard-coded `"vfs"` literal to
    `VFSURIFactory.VFS_FILE_URI_SCHEME` aligns the three methods on the same
    scheme constant.
    
    ### Any related issues, documentation, discussions?
    
    Closes: #5089
    
    ### How was this PR tested?
    
    - `sbt "WorkflowCore/Test/compile"` — clean.
    - `sbt "WorkflowCore/testOnly *IcebergDocumentSpec"` — 14/14 pass,
    including two new cases asserting `documentExists` returns true after
    `createDocument`, false on a fresh URI, and throws
    `UnsupportedOperationException` for an unsupported scheme.
    - `sbt "WorkflowCore/testOnly *IcebergUtilSpec"` — 13/13 pass (refactor
    did not touch `IcebergUtil`).
    - `pytest amber/src/test/python/core/storage/test_document_factory.py` —
    11/11 pass, including four new cases covering `document_exists`
    returning true/false based on `catalog.table_exists`, raising
    `ValueError` on an unsupported resource type, and raising
    `NotImplementedError` on an unsupported scheme.
    - `ruff check` clean on `document_factory.py` and
    `test_document_factory.py`.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Co-authored with Claude Opus 4.7 in compliance with ASF.
    
    ---------
    
    Signed-off-by: Xinyuan Lin <[email protected]>
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
    Co-authored-by: Copilot Autofix powered by AI 
<[email protected]>
    Co-authored-by: Meng Wang <[email protected]>
---
 .../main/python/core/storage/document_factory.py   | 60 ++++++++++++++-------
 .../python/core/storage/test_document_factory.py   | 38 +++++++++++++
 .../amber/core/storage/DocumentFactory.scala       | 58 +++++++++++++-------
 .../result/iceberg/IcebergDocumentSpec.scala       | 62 +++++++++++++++++++++-
 4 files changed, 181 insertions(+), 37 deletions(-)

diff --git a/amber/src/main/python/core/storage/document_factory.py 
b/amber/src/main/python/core/storage/document_factory.py
index bd690ceb59..5078ca7dd5 100644
--- a/amber/src/main/python/core/storage/document_factory.py
+++ b/amber/src/main/python/core/storage/document_factory.py
@@ -43,6 +43,20 @@ class DocumentFactory:
 
     ICEBERG = "iceberg"
 
+    @staticmethod
+    def _resolve_namespace(resource_type: VFSResourceType) -> str:
+        # Only RESULT and STATE are mapped here: the Python runtime writes 
those
+        # tables. CONSOLE_MESSAGES and RUNTIME_STATISTICS are produced 
exclusively
+        # from the Scala side (see DocumentFactory.scala), so they are 
intentionally
+        # absent from this mapping and fall through to the unsupported branch.
+        match resource_type:
+            case VFSResourceType.RESULT:
+                return StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE
+            case VFSResourceType.STATE:
+                return StorageConfig.ICEBERG_TABLE_STATE_NAMESPACE
+            case _:
+                raise ValueError(f"Resource type {resource_type} is not 
supported")
+
     @staticmethod
     def sanitize_uri_path(uri):
         """
@@ -60,15 +74,7 @@ class DocumentFactory:
         parsed_uri = urlparse(uri)
         if parsed_uri.scheme == VFSURIFactory.VFS_FILE_URI_SCHEME:
             _, _, _, resource_type = VFSURIFactory.decode_uri(uri)
-
-            match resource_type:
-                case VFSResourceType.RESULT:
-                    namespace = StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE
-                case VFSResourceType.STATE:
-                    namespace = StorageConfig.ICEBERG_TABLE_STATE_NAMESPACE
-                case _:
-                    raise ValueError(f"Resource type {resource_type} is not 
supported")
-
+            namespace = DocumentFactory._resolve_namespace(resource_type)
             storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)
             # Convert Amber Schema to Iceberg Schema with LARGE_BINARY
             # field name encoding
@@ -96,19 +102,37 @@ class DocumentFactory:
             )
 
     @staticmethod
-    def open_document(uri: str) -> typing.Tuple[VirtualDocument, 
Optional[Schema]]:
+    def document_exists(uri: str) -> bool:
+        """
+        Check whether a document exists at the given URI without opening it.
+
+        Returns True iff the underlying storage already has an entry for this
+        URI (e.g., an iceberg table at the resolved namespace + storage key).
+
+        Raises:
+            NotImplementedError: if the URI scheme is not ``vfs``.
+            ValueError: if the resolved resource type has no iceberg namespace
+                mapping (see ``_resolve_namespace``).
+        """
         parsed_uri = urlparse(uri)
-        if parsed_uri.scheme == "vfs":
+        if parsed_uri.scheme == VFSURIFactory.VFS_FILE_URI_SCHEME:
             _, _, _, resource_type = VFSURIFactory.decode_uri(uri)
+            namespace = DocumentFactory._resolve_namespace(resource_type)
+            storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)
+            return IcebergCatalogInstance.get_instance().table_exists(
+                f"{namespace}.{storage_key}"
+            )
 
-            match resource_type:
-                case VFSResourceType.RESULT:
-                    namespace = StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE
-                case VFSResourceType.STATE:
-                    namespace = StorageConfig.ICEBERG_TABLE_STATE_NAMESPACE
-                case _:
-                    raise ValueError(f"Resource type {resource_type} is not 
supported")
+        raise NotImplementedError(
+            f"Unsupported URI scheme: {parsed_uri.scheme} for checking 
document existence"
+        )
 
+    @staticmethod
+    def open_document(uri: str) -> typing.Tuple[VirtualDocument, 
Optional[Schema]]:
+        parsed_uri = urlparse(uri)
+        if parsed_uri.scheme == VFSURIFactory.VFS_FILE_URI_SCHEME:
+            _, _, _, resource_type = VFSURIFactory.decode_uri(uri)
+            namespace = DocumentFactory._resolve_namespace(resource_type)
             storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)
 
             table = load_table_metadata(
diff --git a/amber/src/test/python/core/storage/test_document_factory.py 
b/amber/src/test/python/core/storage/test_document_factory.py
index 859c004024..359b0c0aed 100644
--- a/amber/src/test/python/core/storage/test_document_factory.py
+++ b/amber/src/test/python/core/storage/test_document_factory.py
@@ -132,3 +132,41 @@ class TestOpenDocumentNamespaceRouting:
 
         with pytest.raises(ValueError, match="No storage is found"):
             DocumentFactory.open_document(VFS_URI)
+
+
+@patch("core.storage.document_factory.IcebergCatalogInstance")
+@patch("core.storage.document_factory.VFSURIFactory")
+class TestDocumentExists:
+    def test_returns_true_when_table_exists(self, mock_vfs, mock_icb):
+        mock_vfs.VFS_FILE_URI_SCHEME = "vfs"
+        mock_vfs.decode_uri.side_effect = 
_decode_returning(VFSResourceType.RESULT)
+        catalog = MagicMock()
+        catalog.table_exists.return_value = True
+        mock_icb.get_instance.return_value = catalog
+
+        assert DocumentFactory.document_exists(VFS_URI) is True
+        identifier = catalog.table_exists.call_args.args[0]
+        assert 
identifier.startswith(f"{StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE}.")
+
+    def test_returns_false_when_table_missing(self, mock_vfs, mock_icb):
+        mock_vfs.VFS_FILE_URI_SCHEME = "vfs"
+        mock_vfs.decode_uri.side_effect = 
_decode_returning(VFSResourceType.RESULT)
+        catalog = MagicMock()
+        catalog.table_exists.return_value = False
+        mock_icb.get_instance.return_value = catalog
+
+        assert DocumentFactory.document_exists(VFS_URI) is False
+
+    def test_unsupported_resource_type_raises_value_error(self, mock_vfs, 
_icb):
+        mock_vfs.VFS_FILE_URI_SCHEME = "vfs"
+        mock_vfs.decode_uri.side_effect = _decode_returning(
+            VFSResourceType.CONSOLE_MESSAGES
+        )
+
+        with pytest.raises(ValueError, match="not supported"):
+            DocumentFactory.document_exists(VFS_URI)
+
+
+def test_document_exists_rejects_non_vfs_scheme():
+    with pytest.raises(NotImplementedError, match="Unsupported URI scheme"):
+        DocumentFactory.document_exists("file:///tmp/x")
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala
index 00f6c70ba7..cc67ab84ce 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala
@@ -27,6 +27,7 @@ import org.apache.texera.amber.core.storage.model._
 import org.apache.texera.amber.core.storage.result.iceberg.IcebergDocument
 import org.apache.texera.amber.core.tuple.{Schema, Tuple}
 import org.apache.texera.amber.util.IcebergUtil
+import org.apache.iceberg.catalog.TableIdentifier
 import org.apache.iceberg.data.Record
 import org.apache.iceberg.{Schema => IcebergSchema}
 
@@ -39,6 +40,16 @@ object DocumentFactory {
   private def sanitizeURIPath(uri: URI): String =
     uri.getPath.stripPrefix("/").replace("/", "_")
 
+  private def resolveNamespace(resourceType: VFSResourceType.Value): String =
+    resourceType match {
+      case RESULT             => StorageConfig.icebergTableResultNamespace
+      case CONSOLE_MESSAGES   => 
StorageConfig.icebergTableConsoleMessagesNamespace
+      case RUNTIME_STATISTICS => 
StorageConfig.icebergTableRuntimeStatisticsNamespace
+      case STATE              => StorageConfig.icebergTableStateNamespace
+      case _ =>
+        throw new IllegalArgumentException(s"Resource type $resourceType is 
not supported")
+    }
+
   /**
     * Open a document specified by the uri for read purposes only.
     * @param fileUri the uri of the document
@@ -67,15 +78,7 @@ object DocumentFactory {
       case VFS_FILE_URI_SCHEME =>
         val (_, _, _, resourceType) = decodeURI(uri)
         val storageKey = sanitizeURIPath(uri)
-
-        val namespace = resourceType match {
-          case RESULT             => StorageConfig.icebergTableResultNamespace
-          case CONSOLE_MESSAGES   => 
StorageConfig.icebergTableConsoleMessagesNamespace
-          case RUNTIME_STATISTICS => 
StorageConfig.icebergTableRuntimeStatisticsNamespace
-          case STATE              => StorageConfig.icebergTableStateNamespace
-          case _ =>
-            throw new IllegalArgumentException(s"Resource type $resourceType 
is not supported")
-        }
+        val namespace = resolveNamespace(resourceType)
 
         val icebergSchema = IcebergUtil.toIcebergSchema(schema)
         IcebergUtil.createTable(
@@ -103,6 +106,33 @@ object DocumentFactory {
     }
   }
 
+  /**
+    * Check whether a document exists at the given URI without opening it.
+    *
+    * Returns true iff the underlying storage already has an entry for this
+    * URI (e.g., an iceberg table at the resolved namespace + storage key).
+    *
+    * @throws UnsupportedOperationException if the URI scheme is not `vfs`.
+    * @throws IllegalArgumentException if the resolved resource type has no
+    *                                  iceberg namespace mapping.
+    */
+  def documentExists(uri: URI): Boolean = {
+    uri.getScheme match {
+      case VFS_FILE_URI_SCHEME =>
+        val (_, _, _, resourceType) = decodeURI(uri)
+        val storageKey = sanitizeURIPath(uri)
+        val namespace = resolveNamespace(resourceType)
+        IcebergCatalogInstance
+          .getInstance()
+          .tableExists(TableIdentifier.of(namespace, storageKey))
+
+      case unsupportedScheme =>
+        throw new UnsupportedOperationException(
+          s"Unsupported URI scheme: $unsupportedScheme for checking document 
existence"
+        )
+    }
+  }
+
   /**
     * Open a document specified by the uri.
     * If the document is storing structural data, the schema will also be 
returned
@@ -115,15 +145,7 @@ object DocumentFactory {
       case VFS_FILE_URI_SCHEME =>
         val (_, _, _, resourceType) = decodeURI(uri)
         val storageKey = sanitizeURIPath(uri)
-
-        val namespace = resourceType match {
-          case RESULT             => StorageConfig.icebergTableResultNamespace
-          case CONSOLE_MESSAGES   => 
StorageConfig.icebergTableConsoleMessagesNamespace
-          case RUNTIME_STATISTICS => 
StorageConfig.icebergTableRuntimeStatisticsNamespace
-          case STATE              => StorageConfig.icebergTableStateNamespace
-          case _ =>
-            throw new IllegalArgumentException(s"Resource type $resourceType 
is not supported")
-        }
+        val namespace = resolveNamespace(resourceType)
 
         val table = IcebergUtil
           .loadTableMetadata(
diff --git 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
index b92562eeb7..6184ce8dcd 100644
--- 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
+++ 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
@@ -38,7 +38,7 @@ import org.apache.iceberg.data.Record
 import org.apache.iceberg.{Schema => IcebergSchema}
 import org.scalatest.BeforeAndAfterAll
 
-import java.lang.reflect.{InvocationHandler, Method, Proxy}
+import java.lang.reflect.{InvocationHandler, InvocationTargetException, 
Method, Proxy}
 import java.net.URI
 import java.sql.Timestamp
 import java.util.UUID
@@ -144,6 +144,66 @@ class IcebergDocumentSpec extends 
VirtualDocumentSpec[Tuple] with BeforeAndAfter
     }
   }
 
+  it should "report documentExists=true for a URI that was created via 
createDocument" in {
+    assert(DocumentFactory.documentExists(uri))
+  }
+
+  it should "report documentExists=false for a URI that was never created" in {
+    val freshBase = VFSURIFactory.createPortBaseURI(
+      WorkflowIdentity(0),
+      ExecutionIdentity(0),
+      GlobalPortIdentity(
+        PhysicalOpIdentity(
+          logicalOpId = 
OperatorIdentity(s"fresh-${UUID.randomUUID().toString.replace("-", "")}"),
+          layerName = "main"
+        ),
+        PortIdentity()
+      )
+    )
+    val freshUri = VFSURIFactory.resultURI(freshBase)
+    assert(!DocumentFactory.documentExists(freshUri))
+  }
+
+  it should "throw UnsupportedOperationException for documentExists on an 
unsupported scheme" in {
+    intercept[UnsupportedOperationException] {
+      DocumentFactory.documentExists(new URI("file:///tmp/anything"))
+    }
+  }
+
+  it should "resolve CONSOLE_MESSAGES URIs through documentExists" in {
+    val consoleUri = VFSURIFactory.createConsoleMessagesURI(
+      WorkflowIdentity(0),
+      ExecutionIdentity(0),
+      OperatorIdentity(s"fresh-${UUID.randomUUID().toString.replace("-", "")}")
+    )
+    assert(!DocumentFactory.documentExists(consoleUri))
+  }
+
+  it should "resolve RUNTIME_STATISTICS URIs through documentExists" in {
+    val statsUri = VFSURIFactory.createRuntimeStatisticsURI(
+      WorkflowIdentity(0),
+      ExecutionIdentity(0)
+    )
+    assert(!DocumentFactory.documentExists(statsUri))
+  }
+
+  it should "throw IllegalArgumentException for resolveNamespace on an 
unmapped resource type" in {
+    // `resolveNamespace` is private and its `case _ =>` is unreachable from 
any
+    // well-formed VFS URI (VFSURIFactory.decodeURI validates resource types).
+    // Exercise the defensive branch by reflecting on the method and passing
+    // null — Scala pattern matches fall through to the wildcard for null
+    // scrutinees.
+    val method = DocumentFactory.getClass.getDeclaredMethod(
+      "resolveNamespace",
+      classOf[Enumeration#Value]
+    )
+    method.setAccessible(true)
+    val wrapped = intercept[InvocationTargetException] {
+      method.invoke(DocumentFactory, null)
+    }
+    assert(wrapped.getCause.isInstanceOf[IllegalArgumentException])
+  }
+
   it should "round trip materialized state documents" in {
     val stateUri = VFSURIFactory.stateURI(baseURI)
     DocumentFactory.createDocument(stateUri, State.schema)

Reply via email to