aglinxinyuan commented on code in PR #5085:
URL: https://github.com/apache/texera/pull/5085#discussion_r3251754645
##########
amber/src/main/python/core/storage/document_factory.py:
##########
@@ -96,19 +98,32 @@ def create_document(uri: str, schema: Schema) ->
VirtualDocument:
)
@staticmethod
- def open_document(uri: str) -> typing.Tuple[VirtualDocument,
Optional[Schema]]:
+ def document_exists(uri: str) -> bool:
+ """
+ Check whether a document exists at the given URI without opening it.
+
+ Returns True iff the underlying storage already has an entry for this
+ URI (e.g., an iceberg table at the resolved namespace + storage key).
+ """
parsed_uri = urlparse(uri)
- if parsed_uri.scheme == "vfs":
+ if parsed_uri.scheme == VFSURIFactory.VFS_FILE_URI_SCHEME:
_, _, _, resource_type = VFSURIFactory.decode_uri(uri)
+ namespace = DocumentFactory._resolve_namespace(resource_type)
+ storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)
+ return IcebergCatalogInstance.get_instance().table_exists(
+ f"{namespace}.{storage_key}"
+ )
- match resource_type:
- case VFSResourceType.RESULT:
- namespace = StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE
- case VFSResourceType.STATE:
- namespace = StorageConfig.ICEBERG_TABLE_STATE_NAMESPACE
- case _:
- raise ValueError(f"Resource type {resource_type} is not
supported")
+ raise NotImplementedError(
+ f"Unsupported URI scheme: {parsed_uri.scheme} for checking
document existence"
+ )
+ @staticmethod
+ def open_document(uri: str) -> typing.Tuple[VirtualDocument,
Optional[Schema]]:
+ parsed_uri = urlparse(uri)
+ if parsed_uri.scheme == "vfs":
Review Comment:
Replaced the hard-coded `"vfs"` literal with
`VFSURIFactory.VFS_FILE_URI_SCHEME` in `open_document`. Fixed in `641a8df83`.
##########
common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala:
##########
@@ -103,6 +106,29 @@ object DocumentFactory {
}
}
+ /**
+ * Check whether a document exists at the given URI without opening it.
+ *
+ * Returns true iff the underlying storage already has an entry for this
+ * URI (e.g., an iceberg table at the resolved namespace + storage key).
+ */
+ def documentExists(uri: URI): Boolean = {
+ uri.getScheme match {
+ case VFS_FILE_URI_SCHEME =>
+ val (_, _, _, resourceType) = decodeURI(uri)
+ val storageKey = sanitizeURIPath(uri)
+ val namespace = resolveNamespace(resourceType)
+ IcebergCatalogInstance
+ .getInstance()
+ .tableExists(TableIdentifier.of(namespace, storageKey))
+
Review Comment:
Updated the PR description: the summary and test plan now reference
`Catalog.tableExists` (Scala) / `catalog.table_exists` (Python) as the actual
existence-check mechanism, and call out the shared `resolveNamespace` /
`_resolve_namespace` helper as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]