aglinxinyuan commented on code in PR #5085:
URL: https://github.com/apache/texera/pull/5085#discussion_r3251965214


##########
amber/src/main/python/core/storage/document_factory.py:
##########
@@ -43,6 +43,16 @@ class DocumentFactory:
 
     ICEBERG = "iceberg"
 
+    @staticmethod
+    def _resolve_namespace(resource_type: VFSResourceType) -> str:
+        match resource_type:
+            case VFSResourceType.RESULT:
+                return StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE
+            case VFSResourceType.STATE:
+                return StorageConfig.ICEBERG_TABLE_STATE_NAMESPACE
+            case _:
+                raise ValueError(f"Resource type {resource_type} is not 
supported")

Review Comment:
   Added the comment in `_resolve_namespace` calling out the intentional 
RESULT/STATE-only mapping (CONSOLE_MESSAGES and RUNTIME_STATISTICS are produced 
exclusively from Scala). Fixed in `1b9b7c1eb4`.



##########
amber/src/main/python/core/storage/document_factory.py:
##########
@@ -96,19 +98,32 @@ def create_document(uri: str, schema: Schema) -> 
VirtualDocument:
             )
 
     @staticmethod
-    def open_document(uri: str) -> typing.Tuple[VirtualDocument, 
Optional[Schema]]:
+    def document_exists(uri: str) -> bool:
+        """
+        Check whether a document exists at the given URI without opening it.
+
+        Returns True iff the underlying storage already has an entry for this
+        URI (e.g., an iceberg table at the resolved namespace + storage key).
+        """

Review Comment:
   Documented failure modes on both sides: Scala `documentExists` now has 
`@throws UnsupportedOperationException` and `@throws IllegalArgumentException`, 
and Python `document_exists` has a `Raises:` block listing 
`NotImplementedError` and `ValueError`. Fixed in `1b9b7c1eb4`.



##########
common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala:
##########
@@ -144,6 +144,30 @@ class IcebergDocumentSpec extends 
VirtualDocumentSpec[Tuple] with BeforeAndAfter
     }
   }
 
+  it should "report documentExists=true for a created URI and false for a 
fresh one" in {
+    assert(DocumentFactory.documentExists(uri))
+
+    val freshBase = VFSURIFactory.createPortBaseURI(
+      WorkflowIdentity(0),
+      ExecutionIdentity(0),
+      GlobalPortIdentity(
+        PhysicalOpIdentity(
+          logicalOpId = 
OperatorIdentity(s"fresh-${UUID.randomUUID().toString.replace("-", "")}"),
+          layerName = "main"
+        ),
+        PortIdentity()
+      )
+    )
+    val freshUri = VFSURIFactory.resultURI(freshBase)
+    assert(!DocumentFactory.documentExists(freshUri))
+  }

Review Comment:
   Split the combined assert into two `it should` blocks ("true for a URI that 
was created via createDocument" and "false for a URI that was never created"), 
so a regression names the failing scenario. Fixed in `1b9b7c1eb4`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to