This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch test-port-identity-serde-spec
in repository https://gitbox.apache.org/repos/asf/texera.git

commit 5167c9c38fecf8f55c7075e74711d24e1c0782d7
Author: Xinyuan Lin <[email protected]>
AuthorDate: Tue May 12 22:24:01 2026 -0700

    feat(workflow-core): enforce non-negative portId + underscore-free op/layer 
in serde
    
    Address review feedback on #4954:
    
    - Yicong-Huang: "lets disallow negative port id" — 
`GlobalPortIdentitySerde.serializeAsString`
      and `.deserializeFromString` now reject `portId < 0` with 
`IllegalArgumentException`.
      Mirrored in the Python helper `serialize_global_port_identity` /
      `deserialize_global_port_identity` (raises ValueError).
    
    - Yicong-Huang: "can we enforce it (to fix it)?" — the documented 
VFS-compatibility
      contract ("serialized form does not include underscore '_'") was 
previously
      aspirational; inputs were interpolated verbatim. Now `serializeAsString`
      rejects `logicalOpId` or `layerName` containing `_` at the boundary, so a
      bad input fails fast instead of silently emitting a string that would
      interfere with VFS URI parsing downstream. Mirrored in the Python helper.
    
    Tests:
    - `PortIdentitySerdeSpec`: previous round-trip-negative and 
`pendingUntilFixed`
      underscore tests replaced with regular tests asserting 
`IllegalArgumentException`
      for negative portId (both serialize + deserialize) and for underscored
      `logicalOpId` / `layerName`.
    - `test_virtual_identity.py`: matching ValueError assertions for the Python
      serializer/deserializer.
    
    Adjacent test fixtures updated to honor the new contract:
    - `IcebergTableStatsSpec`, `IcebergDocumentSpec`, 
`test_iceberg_document.py`:
      op-id fixture `test_table_<uuid>` → `test-table-<uuid>` (dashes instead of
      underscores). `IcebergDocumentConsoleMessagesSpec` is unchanged — it uses
      `createConsoleMessagesURI`, which does not go through `serializeAsString`.
    
    Verification:
    ```
    sbt "WorkflowCore/Test/testOnly 
org.apache.texera.amber.util.serde.PortIdentitySerdeSpec"
    # 24 tests, all pass (1 unchanged pendingUntilFixed for 
PortIdentityKeyDeserializer)
    sbt "WorkflowCore/Test/testOnly 
org.apache.texera.amber.core.storage.VFSURIFactorySpec 
org.apache.texera.amber.storage.result.iceberg.IcebergDocumentSpec 
org.apache.texera.amber.storage.result.iceberg.IcebergTableStatsSpec 
org.apache.texera.amber.storage.result.iceberg.IcebergDocumentConsoleMessagesSpec"
    # 30 tests, all pass
    sbt "WorkflowExecutionService/Test/testOnly 
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResourceSpec"
    # 2 tests, all pass
    pytest core/util/virtual_identity/
    # 17 tests, all pass
    sbt "WorkflowCore/scalafmtCheck; WorkflowCore/Test/scalafmtCheck"
    # clean
    ```
---
 .../core/storage/iceberg/test_iceberg_document.py  |  2 +-
 .../python/core/util/virtual_identity/__init__.py  | 17 ++++++
 .../util/virtual_identity/test_virtual_identity.py | 23 +++++++++
 .../amber/util/serde/GlobalPortIdentitySerde.scala | 19 +++++--
 .../result/iceberg/IcebergDocumentSpec.scala       |  2 +-
 .../result/iceberg/IcebergTableStatsSpec.scala     |  2 +-
 .../amber/util/serde/PortIdentitySerdeSpec.scala   | 60 ++++++++++++----------
 7 files changed, 91 insertions(+), 34 deletions(-)

diff --git 
a/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py 
b/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py
index 9b374f7d5c..72b60d6cb7 100644
--- a/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py
+++ b/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py
@@ -81,7 +81,7 @@ class TestIcebergDocument:
             ExecutionIdentity(id=0),
             GlobalPortIdentity(
                 op_id=PhysicalOpIdentity(
-                    
logical_op_id=OperatorIdentity(id=f"test_table_{operator_uuid}"),
+                    
logical_op_id=OperatorIdentity(id=f"test-table-{operator_uuid}"),
                     layer_name="main",
                 ),
                 port_id=PortIdentity(id=0),
diff --git a/amber/src/main/python/core/util/virtual_identity/__init__.py 
b/amber/src/main/python/core/util/virtual_identity/__init__.py
index 93a887d7c6..49da75fcd5 100644
--- a/amber/src/main/python/core/util/virtual_identity/__init__.py
+++ b/amber/src/main/python/core/util/virtual_identity/__init__.py
@@ -42,12 +42,27 @@ def serialize_global_port_identity(obj: GlobalPortIdentity) 
-> str:
     Expected format:
     ``(logicalOpId=<logicalOpId>,layerName=<layerName>,
     portId=<portId.id>,isInternal=<portId.internal>,isInput=<input>)``
+
+    Raises ValueError if `logicalOpId` or `layerName` contains an underscore
+    (VFS URI parsing relies on the absence of '_'), or if `portId` is negative.
     """
     logical_op_id = obj.op_id.logical_op_id.id
     layer_name = obj.op_id.layer_name
     port_id = obj.port_id.id
     is_internal = obj.port_id.internal
     is_input_port = obj.input
+    if "_" in logical_op_id:
+        raise ValueError(
+            f"logicalOpId must not contain '_' "
+            f"(VFS URI parsing relies on this): {logical_op_id}"
+        )
+    if "_" in layer_name:
+        raise ValueError(
+            f"layerName must not contain '_' "
+            f"(VFS URI parsing relies on this): {layer_name}"
+        )
+    if port_id < 0:
+        raise ValueError(f"portId must be non-negative: {port_id}")
     return (
         
f"(logicalOpId={logical_op_id},layerName={layer_name},portId={port_id},"
         
f"isInternal={str(is_internal).lower()},isInput={str(is_input_port).lower()})"
@@ -72,6 +87,8 @@ def deserialize_global_port_identity(encoded_str: str) -> 
GlobalPortIdentity:
         match.groups()
     )
     port_id = int(port_id_str)
+    if port_id < 0:
+        raise ValueError(f"portId must be non-negative: {port_id}")
     is_internal = is_internal_str.lower() == "true"
     is_input_port = is_input_str.lower() == "true"
     op_id = PhysicalOpIdentity(
diff --git 
a/amber/src/main/python/core/util/virtual_identity/test_virtual_identity.py 
b/amber/src/main/python/core/util/virtual_identity/test_virtual_identity.py
index 431dd84146..c2f6f63685 100644
--- a/amber/src/main/python/core/util/virtual_identity/test_virtual_identity.py
+++ b/amber/src/main/python/core/util/virtual_identity/test_virtual_identity.py
@@ -105,6 +105,21 @@ class TestSerializeGlobalPortIdentity:
         assert recovered.port_id.internal is True
         assert recovered.input is False
 
+    def test_rejects_underscore_in_logical_op_id(self):
+        # VFS-compatibility contract: serialized output must be
+        # underscore-free. Fail fast at the boundary on underscored input.
+        with pytest.raises(ValueError, match="logicalOpId must not contain"):
+            serialize_global_port_identity(_gpi(op_id="__DummyOperator"))
+
+    def test_rejects_underscore_in_layer_name(self):
+        with pytest.raises(ValueError, match="layerName must not contain"):
+            serialize_global_port_identity(_gpi(layer="main_source_0_op"))
+
+    def test_rejects_negative_port_id(self):
+        # Port ids are array indices and must be non-negative.
+        with pytest.raises(ValueError, match="portId must be non-negative"):
+            serialize_global_port_identity(_gpi(port=-1))
+
 
 class TestDeserializeGlobalPortIdentity:
     def test_parses_canonical_encoded_string(self):
@@ -137,6 +152,14 @@ class TestDeserializeGlobalPortIdentity:
                 "(logicalOpId=op,layerName=l,portId=0,isInternal=true)"
             )
 
+    def test_raises_value_error_on_negative_port_id(self):
+        # Symmetric with the serializer: tampered URIs with a negative
+        # portId must be rejected on the way back in.
+        with pytest.raises(ValueError, match="portId must be non-negative"):
+            deserialize_global_port_identity(
+                
"(logicalOpId=op,layerName=l,portId=-1,isInternal=false,isInput=true)"
+            )
+
 
 class TestGetFromActorIdForInputPortStorage:
     def test_prefixes_materialization_reader_to_uri_plus_actor_name(self):
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/serde/GlobalPortIdentitySerde.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/serde/GlobalPortIdentitySerde.scala
index 48c087443c..c8fd8e1a36 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/serde/GlobalPortIdentitySerde.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/serde/GlobalPortIdentitySerde.scala
@@ -23,17 +23,20 @@ import 
org.apache.texera.amber.core.workflow.{GlobalPortIdentity, PortIdentity}
 
 /**
   * Serialize and deserializes a GlobalPortIdentity object to a string using a 
custom, human-readable format
-  * to ensure it works with both URI and file path and does not incldue 
underscore "_" so that it does not
-  * interfere with our own VFS URI parsing.
+  * to ensure it works with both URI and file path and does not include 
underscore "_" so that it does not
+  * interfere with our own VFS URI parsing. Underscores in `logicalOpId` / 
`layerName` and negative `portId`
+  * values are rejected with `IllegalArgumentException`.
   */
 object GlobalPortIdentitySerde {
   implicit class SerdeOps(globalPortId: GlobalPortIdentity) {
 
     /**
       * Serializes a GlobalPortIdentity object into a string using our custom, 
human-readable format
-      * that works with both URI and file path and does not incldue underscore 
"_" so that it does not
+      * that works with both URI and file path and does not include underscore 
"_" so that it does not
       * interfere with our own VFS URI parsing.
       *
+      * @throws java.lang.IllegalArgumentException if `logicalOpId` or 
`layerName` contains an underscore,
+      *                                            or if `portId.id` is 
negative.
       * @return A serialized string representation of globalPortId
       */
     def serializeAsString: String = {
@@ -42,6 +45,15 @@ object GlobalPortIdentitySerde {
       val portId = globalPortId.portId.id
       val isInternal = globalPortId.portId.internal
       val isInput = globalPortId.input
+      require(
+        !logicalOpId.contains('_'),
+        s"logicalOpId must not contain '_' (VFS URI parsing relies on this): 
$logicalOpId"
+      )
+      require(
+        !layerName.contains('_'),
+        s"layerName must not contain '_' (VFS URI parsing relies on this): 
$layerName"
+      )
+      require(portId >= 0, s"portId must be non-negative: $portId")
       
s"(logicalOpId=$logicalOpId,layerName=$layerName,portId=$portId,isInternal=$isInternal,isInput=$isInput)"
     }
   }
@@ -58,6 +70,7 @@ object GlobalPortIdentitySerde {
     serializedGlobalPortId match {
       case pattern(logicalOpId, layerName, portIdStr, isInternalStr, 
isInputStr) =>
         val portIdInt = portIdStr.toInt
+        require(portIdInt >= 0, s"portId must be non-negative: $portIdInt")
         val isInternal = isInternalStr.toBoolean
         val isInput = isInputStr.toBoolean
         val physicalOpId = PhysicalOpIdentity(
diff --git 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
index 8fdf039f3e..eb259fed58 100644
--- 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
+++ 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
@@ -85,7 +85,7 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] 
with BeforeAndAfter
       GlobalPortIdentity(
         PhysicalOpIdentity(
           logicalOpId =
-            
OperatorIdentity(s"test_table_${UUID.randomUUID().toString.replace("-", "")}"),
+            
OperatorIdentity(s"test-table-${UUID.randomUUID().toString.replace("-", "")}"),
           layerName = "main"
         ),
         PortIdentity()
diff --git 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergTableStatsSpec.scala
 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergTableStatsSpec.scala
index 175ebc2c01..b7cf776eb8 100644
--- 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergTableStatsSpec.scala
+++ 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergTableStatsSpec.scala
@@ -56,7 +56,7 @@ class IcebergTableStatsSpec extends AnyFlatSpec with 
BeforeAndAfterAll with Suit
     GlobalPortIdentity(
       PhysicalOpIdentity(
         logicalOpId =
-          
OperatorIdentity(s"test_table_${UUID.randomUUID().toString.replace("-", "")}"),
+          
OperatorIdentity(s"test-table-${UUID.randomUUID().toString.replace("-", "")}"),
         layerName = "main"
       ),
       PortIdentity()
diff --git 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/serde/PortIdentitySerdeSpec.scala
 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/serde/PortIdentitySerdeSpec.scala
index 3abc796242..89a815c39e 100644
--- 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/serde/PortIdentitySerdeSpec.scala
+++ 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/serde/PortIdentitySerdeSpec.scala
@@ -103,12 +103,22 @@ class PortIdentitySerdeSpec extends AnyFlatSpec {
     assert(GlobalPortIdentitySerde.deserializeFromString(p.serializeAsString) 
== p)
   }
 
-  it should "round-trip a negative port id" in {
-    // PortIdentity.id is a plain Int; negatives are technically permitted
-    // by the type. Pin the round-trip so a future tightening of the
-    // numeric regex (e.g. to `\\d+`) breaks this on purpose.
-    val p = globalPort(portIdValue = -1)
-    assert(GlobalPortIdentitySerde.deserializeFromString(p.serializeAsString) 
== p)
+  it should "throw IllegalArgumentException when serializing a negative port 
id" in {
+    // Port ids are array indices and must be non-negative; the serializer
+    // rejects negatives so corrupt data can't reach VFS URIs.
+    intercept[IllegalArgumentException] {
+      globalPort(portIdValue = -1).serializeAsString
+    }
+  }
+
+  it should "throw IllegalArgumentException when deserializing a negative port 
id" in {
+    // Symmetric: a hand-crafted string with a negative portId must be
+    // rejected by the deserializer too (so tampered URIs don't slip
+    // through).
+    val malformed = 
"(logicalOpId=op-A,layerName=main,portId=-1,isInternal=false,isInput=true)"
+    intercept[IllegalArgumentException] {
+      GlobalPortIdentitySerde.deserializeFromString(malformed)
+    }
   }
 
   it should "throw IllegalArgumentException when the input has the wrong field 
order" in {
@@ -185,28 +195,22 @@ class PortIdentitySerdeSpec extends AnyFlatSpec {
     assert(!formatChars.contains("_"), s"format characters must be 
underscore-free: $formatChars")
   }
 
-  it should "eventually produce an underscore-free output even for inputs that 
contain underscores (pendingUntilFixed)" in pendingUntilFixed {
-    // Documented contract on `GlobalPortIdentitySerde`: "does not include
-    // underscore '_' so that it does not interfere with our own VFS URI
-    // parsing." The implementation does NOT enforce this — inputs are
-    // interpolated verbatim. Both fields can carry underscores in real
-    // production data:
-    // - `logicalOpId`: e.g. `__DummyOperator` from `VirtualIdentityUtils`
-    // - `layerName`: e.g. `${layerName}_source_${portId}_...` constructed
-    //   by `SpecialPhysicalOpFactory`
-    // Cover BOTH so a partial fix that escapes only one of them flips
-    // pendingUntilFixed into a deliberate failure with the second
-    // assertion still red.
-    val withUnderscoreOpId = globalPort(logical = 
"__DummyOperator").serializeAsString
-    assert(
-      !withUnderscoreOpId.contains("_"),
-      s"serialized form must be underscore-free for op id: $withUnderscoreOpId"
-    )
-    val withUnderscoreLayer = globalPort(layer = 
"main_source_0_op").serializeAsString
-    assert(
-      !withUnderscoreLayer.contains("_"),
-      s"serialized form must be underscore-free for layer name: 
$withUnderscoreLayer"
-    )
+  it should "throw IllegalArgumentException when logicalOpId contains an 
underscore" in {
+    // Enforces the documented VFS-compatibility contract: the serialized
+    // form must be underscore-free. The serializer rejects underscored
+    // inputs at the boundary instead of silently emitting a string that
+    // would interfere with VFS URI parsing downstream.
+    intercept[IllegalArgumentException] {
+      globalPort(logical = "__DummyOperator").serializeAsString
+    }
+  }
+
+  it should "throw IllegalArgumentException when layerName contains an 
underscore" in {
+    // Both fields enforce the same invariant; cover them independently so
+    // a partial fix that only validates one surfaces as a test failure.
+    intercept[IllegalArgumentException] {
+      globalPort(layer = "main_source_0_op").serializeAsString
+    }
   }
 
   // 
---------------------------------------------------------------------------

Reply via email to