This is an automated email from the ASF dual-hosted git repository. aglinxinyuan pushed a commit to branch xinyuan-document-factory-exists in repository https://gitbox.apache.org/repos/asf/texera.git
commit 7aeb5ee7fac2cfd85f62ca3802d4a5ea25ec96ce Author: Xinyuan Lin <[email protected]> AuthorDate: Fri May 15 17:43:41 2026 -0700 feat(storage): add DocumentFactory.documentExists for absence checks Enables "create only if absent" flows to test existence without catching exceptions from openDocument. Splits off from #4206. Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]> --- .../amber/core/storage/DocumentFactory.scala | 34 ++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala index 00f6c70ba7..8356493ed2 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala @@ -103,6 +103,40 @@ object DocumentFactory { } } + /** + * Check whether a document exists at the given URI without opening it. + * + * Returns true iff the underlying storage already has an entry for this + * URI (e.g., an iceberg table at the resolved namespace + storage key). + * Useful for "create only if absent" flows that would otherwise have to + * call `openDocument` inside a try/catch to test existence. + */ + def documentExists(uri: URI): Boolean = { + uri.getScheme match { + case VFS_FILE_URI_SCHEME => + val (_, _, _, resourceType) = decodeURI(uri) + val storageKey = sanitizeURIPath(uri) + + val namespace = resourceType match { + case RESULT => StorageConfig.icebergTableResultNamespace + case CONSOLE_MESSAGES => StorageConfig.icebergTableConsoleMessagesNamespace + case RUNTIME_STATISTICS => StorageConfig.icebergTableRuntimeStatisticsNamespace + case STATE => StorageConfig.icebergTableStateNamespace + case _ => + throw new IllegalArgumentException(s"Resource type $resourceType is not supported") + } + + IcebergUtil + .loadTableMetadata(IcebergCatalogInstance.getInstance(), namespace, storageKey) + .isDefined + + case unsupportedScheme => + throw new UnsupportedOperationException( + s"Unsupported URI scheme: $unsupportedScheme for checking the document" + ) + } + } + /** * Open a document specified by the uri. * If the document is storing structural data, the schema will also be returned
