This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 4f56946c86 test(workflow-core): add unit test coverage for 
port-identity serde helpers (#4954)
4f56946c86 is described below

commit 4f56946c861efbe30af7ac96b6076dc76f5ac856
Author: Xinyuan Lin <[email protected]>
AuthorDate: Tue May 12 22:44:21 2026 -0700

    test(workflow-core): add unit test coverage for port-identity serde helpers 
(#4954)
    
    ### What changes were proposed in this PR?
    
    Adds `PortIdentitySerdeSpec` covering three Jackson / VFS-URI serde
    helpers in `org.apache.texera.amber.util.serde`, and tightens the
    production contract on `GlobalPortIdentitySerde` after review.
    
    Production change (added per review):
    
    | Surface | Change |
    | --- | --- |
    | `GlobalPortIdentitySerde.serializeAsString` | Rejects `portId < 0`,
    `logicalOpId` containing `_`, `layerName` containing `_` with
    `IllegalArgumentException`. |
    | `GlobalPortIdentitySerde.deserializeFromString` | Symmetrically
    rejects `portId < 0`. |
    | Python `serialize_global_port_identity` /
    `deserialize_global_port_identity` | Mirrored: raises `ValueError` on
    the same invariants. |
    | Iceberg test fixtures (`IcebergTableStatsSpec`, `IcebergDocumentSpec`,
    `test_iceberg_document.py`) | Op-id `test_table_<uuid>` →
    `test-table-<uuid>` to honor the new contract.
    `IcebergDocumentConsoleMessagesSpec` is unchanged —
    `createConsoleMessagesURI` does not go through `serializeAsString`. |
    
    Test coverage:
    
    | Helper | Pinned by this spec |
    | --- | --- |
    | `GlobalPortIdentitySerde.serializeAsString` / `deserializeFromString`
    | Default + per-field round-trip; exact format pin (default +
    non-default values); special-character pass-through (dashes / dots);
    negative `portId` rejected (serialize + deserialize); underscored
    `logicalOpId` rejected; underscored `layerName` rejected;
    format-character no-underscore invariant; seven negative paths —
    completely malformed, missing field, wrong field order, trailing
    content, empty body, non-numeric `portId` → `NumberFormatException`,
    non-boolean flag → `IllegalArgumentException` |
    | `PortIdentityKeySerializer.portIdToString` | Exact `id_internal`
    format. |
    | `PortIdentityKeySerializer` + `PortIdentityKeyDeserializer` (Jackson
    wiring) | The production `JSONUtils.objectMapper`, round-tripping
    `Map[PortIdentity, String]` and an empty map. |
    | `PortIdentityKeyDeserializer.deserializeKey` | Four negative paths
    (non-integer id, non-boolean flag, missing-separator with non-numeric
    body, missing-separator with numeric-only body); current lenient
    extra-segments behavior pinned in a characterization test, with a
    `pendingUntilFixed` for the stricter contract. |
    
    ### Any related issues, documentation, discussions?
    
    Closes #4953
    
    ### How was this PR tested?
    
    ```
    sbt "WorkflowCore/Test/testOnly 
org.apache.texera.amber.util.serde.PortIdentitySerdeSpec"
    # 24 tests, all pass (1 unchanged pendingUntilFixed for 
PortIdentityKeyDeserializer)
    
    sbt "WorkflowCore/Test/testOnly 
org.apache.texera.amber.core.storage.VFSURIFactorySpec 
org.apache.texera.amber.storage.result.iceberg.IcebergDocumentSpec 
org.apache.texera.amber.storage.result.iceberg.IcebergTableStatsSpec 
org.apache.texera.amber.storage.result.iceberg.IcebergDocumentConsoleMessagesSpec"
    # 30 tests, all pass
    
    sbt "WorkflowExecutionService/Test/testOnly 
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResourceSpec"
    # 2 tests, all pass
    
    pytest core/util/virtual_identity/
    # 17 tests, all pass
    
    sbt "WorkflowCore/scalafmtCheck; WorkflowCore/Test/scalafmtCheck"
    # clean
    ```
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (Claude Opus 4.7)
---
 .../src/main/python/core/util/virtual_identity.py  |  17 ++
 .../core/storage/iceberg/test_iceberg_document.py  |   2 +-
 .../test/python/core/util/test_virtual_identity.py |  23 ++
 .../amber/util/serde/GlobalPortIdentitySerde.scala |  19 +-
 .../result/iceberg/IcebergDocumentSpec.scala       |   2 +-
 .../result/iceberg/IcebergTableStatsSpec.scala     |   2 +-
 .../amber/util/serde/PortIdentitySerdeSpec.scala   | 320 +++++++++++++++++++++
 7 files changed, 379 insertions(+), 6 deletions(-)

diff --git a/amber/src/main/python/core/util/virtual_identity.py 
b/amber/src/main/python/core/util/virtual_identity.py
index 93a887d7c6..49da75fcd5 100644
--- a/amber/src/main/python/core/util/virtual_identity.py
+++ b/amber/src/main/python/core/util/virtual_identity.py
@@ -42,12 +42,27 @@ def serialize_global_port_identity(obj: GlobalPortIdentity) 
-> str:
     Expected format:
     ``(logicalOpId=<logicalOpId>,layerName=<layerName>,
     portId=<portId.id>,isInternal=<portId.internal>,isInput=<input>)``
+
+    Raises ValueError if `logicalOpId` or `layerName` contains an underscore
+    (VFS URI parsing relies on the absence of '_'), or if `portId` is negative.
     """
     logical_op_id = obj.op_id.logical_op_id.id
     layer_name = obj.op_id.layer_name
     port_id = obj.port_id.id
     is_internal = obj.port_id.internal
     is_input_port = obj.input
+    if "_" in logical_op_id:
+        raise ValueError(
+            f"logicalOpId must not contain '_' "
+            f"(VFS URI parsing relies on this): {logical_op_id}"
+        )
+    if "_" in layer_name:
+        raise ValueError(
+            f"layerName must not contain '_' "
+            f"(VFS URI parsing relies on this): {layer_name}"
+        )
+    if port_id < 0:
+        raise ValueError(f"portId must be non-negative: {port_id}")
     return (
         
f"(logicalOpId={logical_op_id},layerName={layer_name},portId={port_id},"
         
f"isInternal={str(is_internal).lower()},isInput={str(is_input_port).lower()})"
@@ -72,6 +87,8 @@ def deserialize_global_port_identity(encoded_str: str) -> 
GlobalPortIdentity:
         match.groups()
     )
     port_id = int(port_id_str)
+    if port_id < 0:
+        raise ValueError(f"portId must be non-negative: {port_id}")
     is_internal = is_internal_str.lower() == "true"
     is_input_port = is_input_str.lower() == "true"
     op_id = PhysicalOpIdentity(
diff --git 
a/amber/src/test/python/core/storage/iceberg/test_iceberg_document.py 
b/amber/src/test/python/core/storage/iceberg/test_iceberg_document.py
index a218c64a2d..327b907306 100644
--- a/amber/src/test/python/core/storage/iceberg/test_iceberg_document.py
+++ b/amber/src/test/python/core/storage/iceberg/test_iceberg_document.py
@@ -86,7 +86,7 @@ class TestIcebergDocument:
             ExecutionIdentity(id=0),
             GlobalPortIdentity(
                 op_id=PhysicalOpIdentity(
-                    
logical_op_id=OperatorIdentity(id=f"test_table_{operator_uuid}"),
+                    
logical_op_id=OperatorIdentity(id=f"test-table-{operator_uuid}"),
                     layer_name="main",
                 ),
                 port_id=PortIdentity(id=0),
diff --git a/amber/src/test/python/core/util/test_virtual_identity.py 
b/amber/src/test/python/core/util/test_virtual_identity.py
index 431dd84146..c2f6f63685 100644
--- a/amber/src/test/python/core/util/test_virtual_identity.py
+++ b/amber/src/test/python/core/util/test_virtual_identity.py
@@ -105,6 +105,21 @@ class TestSerializeGlobalPortIdentity:
         assert recovered.port_id.internal is True
         assert recovered.input is False
 
+    def test_rejects_underscore_in_logical_op_id(self):
+        # VFS-compatibility contract: serialized output must be
+        # underscore-free. Fail fast at the boundary on underscored input.
+        with pytest.raises(ValueError, match="logicalOpId must not contain"):
+            serialize_global_port_identity(_gpi(op_id="__DummyOperator"))
+
+    def test_rejects_underscore_in_layer_name(self):
+        with pytest.raises(ValueError, match="layerName must not contain"):
+            serialize_global_port_identity(_gpi(layer="main_source_0_op"))
+
+    def test_rejects_negative_port_id(self):
+        # Port ids are array indices and must be non-negative.
+        with pytest.raises(ValueError, match="portId must be non-negative"):
+            serialize_global_port_identity(_gpi(port=-1))
+
 
 class TestDeserializeGlobalPortIdentity:
     def test_parses_canonical_encoded_string(self):
@@ -137,6 +152,14 @@ class TestDeserializeGlobalPortIdentity:
                 "(logicalOpId=op,layerName=l,portId=0,isInternal=true)"
             )
 
+    def test_raises_value_error_on_negative_port_id(self):
+        # Symmetric with the serializer: tampered URIs with a negative
+        # portId must be rejected on the way back in.
+        with pytest.raises(ValueError, match="portId must be non-negative"):
+            deserialize_global_port_identity(
+                
"(logicalOpId=op,layerName=l,portId=-1,isInternal=false,isInput=true)"
+            )
+
 
 class TestGetFromActorIdForInputPortStorage:
     def test_prefixes_materialization_reader_to_uri_plus_actor_name(self):
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/serde/GlobalPortIdentitySerde.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/serde/GlobalPortIdentitySerde.scala
index 48c087443c..c8fd8e1a36 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/serde/GlobalPortIdentitySerde.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/serde/GlobalPortIdentitySerde.scala
@@ -23,17 +23,20 @@ import 
org.apache.texera.amber.core.workflow.{GlobalPortIdentity, PortIdentity}
 
 /**
   * Serialize and deserializes a GlobalPortIdentity object to a string using a 
custom, human-readable format
-  * to ensure it works with both URI and file path and does not incldue 
underscore "_" so that it does not
-  * interfere with our own VFS URI parsing.
+  * to ensure it works with both URI and file path and does not include 
underscore "_" so that it does not
+  * interfere with our own VFS URI parsing. Underscores in `logicalOpId` / 
`layerName` and negative `portId`
+  * values are rejected with `IllegalArgumentException`.
   */
 object GlobalPortIdentitySerde {
   implicit class SerdeOps(globalPortId: GlobalPortIdentity) {
 
     /**
       * Serializes a GlobalPortIdentity object into a string using our custom, 
human-readable format
-      * that works with both URI and file path and does not incldue underscore 
"_" so that it does not
+      * that works with both URI and file path and does not include underscore 
"_" so that it does not
       * interfere with our own VFS URI parsing.
       *
+      * @throws java.lang.IllegalArgumentException if `logicalOpId` or 
`layerName` contains an underscore,
+      *                                            or if `portId.id` is 
negative.
       * @return A serialized string representation of globalPortId
       */
     def serializeAsString: String = {
@@ -42,6 +45,15 @@ object GlobalPortIdentitySerde {
       val portId = globalPortId.portId.id
       val isInternal = globalPortId.portId.internal
       val isInput = globalPortId.input
+      require(
+        !logicalOpId.contains('_'),
+        s"logicalOpId must not contain '_' (VFS URI parsing relies on this): 
$logicalOpId"
+      )
+      require(
+        !layerName.contains('_'),
+        s"layerName must not contain '_' (VFS URI parsing relies on this): 
$layerName"
+      )
+      require(portId >= 0, s"portId must be non-negative: $portId")
       
s"(logicalOpId=$logicalOpId,layerName=$layerName,portId=$portId,isInternal=$isInternal,isInput=$isInput)"
     }
   }
@@ -58,6 +70,7 @@ object GlobalPortIdentitySerde {
     serializedGlobalPortId match {
       case pattern(logicalOpId, layerName, portIdStr, isInternalStr, 
isInputStr) =>
         val portIdInt = portIdStr.toInt
+        require(portIdInt >= 0, s"portId must be non-negative: $portIdInt")
         val isInternal = isInternalStr.toBoolean
         val isInput = isInputStr.toBoolean
         val physicalOpId = PhysicalOpIdentity(
diff --git 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
index 8fdf039f3e..eb259fed58 100644
--- 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
+++ 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
@@ -85,7 +85,7 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] 
with BeforeAndAfter
       GlobalPortIdentity(
         PhysicalOpIdentity(
           logicalOpId =
-            
OperatorIdentity(s"test_table_${UUID.randomUUID().toString.replace("-", "")}"),
+            
OperatorIdentity(s"test-table-${UUID.randomUUID().toString.replace("-", "")}"),
           layerName = "main"
         ),
         PortIdentity()
diff --git 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergTableStatsSpec.scala
 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergTableStatsSpec.scala
index 175ebc2c01..b7cf776eb8 100644
--- 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergTableStatsSpec.scala
+++ 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergTableStatsSpec.scala
@@ -56,7 +56,7 @@ class IcebergTableStatsSpec extends AnyFlatSpec with 
BeforeAndAfterAll with Suit
     GlobalPortIdentity(
       PhysicalOpIdentity(
         logicalOpId =
-          
OperatorIdentity(s"test_table_${UUID.randomUUID().toString.replace("-", "")}"),
+          
OperatorIdentity(s"test-table-${UUID.randomUUID().toString.replace("-", "")}"),
         layerName = "main"
       ),
       PortIdentity()
diff --git 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/serde/PortIdentitySerdeSpec.scala
 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/serde/PortIdentitySerdeSpec.scala
new file mode 100644
index 0000000000..89a815c39e
--- /dev/null
+++ 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/serde/PortIdentitySerdeSpec.scala
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.util.serde
+
+import org.apache.texera.amber.core.virtualidentity.{OperatorIdentity, 
PhysicalOpIdentity}
+import org.apache.texera.amber.core.workflow.{GlobalPortIdentity, PortIdentity}
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.apache.texera.amber.util.serde.GlobalPortIdentitySerde.SerdeOps
+import org.scalatest.flatspec.AnyFlatSpec
+
+class PortIdentitySerdeSpec extends AnyFlatSpec {
+
+  // 
---------------------------------------------------------------------------
+  // GlobalPortIdentitySerde
+  // 
---------------------------------------------------------------------------
+
+  private def globalPort(
+      logical: String = "op-A",
+      layer: String = "main",
+      portIdValue: Int = 0,
+      internal: Boolean = false,
+      input: Boolean = true
+  ): GlobalPortIdentity =
+    GlobalPortIdentity(
+      opId = PhysicalOpIdentity(OperatorIdentity(logical), layer),
+      portId = PortIdentity(id = portIdValue, internal = internal),
+      input = input
+    )
+
+  "GlobalPortIdentitySerde" should "round-trip a default GlobalPortIdentity 
through serializeAsString → deserializeFromString" in {
+    val original = globalPort()
+    val restored = 
GlobalPortIdentitySerde.deserializeFromString(original.serializeAsString)
+    assert(restored == original)
+  }
+
+  it should "preserve all five fields independently across the round-trip" in {
+    // Vary each field individually so a regression that swapped two fields
+    // (e.g., isInput / isInternal) would surface here, not as a general
+    // round-trip failure.
+    val cases = Seq(
+      globalPort(logical = "op-A"),
+      globalPort(logical = "op-Z"),
+      globalPort(layer = "main"),
+      globalPort(layer = "extra-layer"),
+      globalPort(portIdValue = 0),
+      globalPort(portIdValue = 7),
+      globalPort(internal = false),
+      globalPort(internal = true),
+      globalPort(input = true),
+      globalPort(input = false)
+    )
+    cases.foreach { p =>
+      val s = p.serializeAsString
+      val restored = GlobalPortIdentitySerde.deserializeFromString(s)
+      assert(restored == p, s"round-trip mismatch for $p (serialized: $s)")
+    }
+  }
+
+  it should "produce the documented format for default and non-default values" 
in {
+    // Pin the exact format. If this changes, callers reading existing
+    // VFS URIs from disk will break — locking it down forces a deliberate
+    // migration story.
+    assert(
+      globalPort().serializeAsString ==
+        
"(logicalOpId=op-A,layerName=main,portId=0,isInternal=false,isInput=true)"
+    )
+    assert(
+      globalPort(
+        logical = "op-Z",
+        layer = "extra-layer",
+        portIdValue = 7,
+        internal = true,
+        input = false
+      ).serializeAsString ==
+        
"(logicalOpId=op-Z,layerName=extra-layer,portId=7,isInternal=true,isInput=false)"
+    )
+  }
+
+  it should "round-trip identifiers containing dashes and dots (regex 
non-comma matcher)" in {
+    // The deserialization regex uses `[^,]+` for the field body, so any
+    // non-comma character is fair game. Cover the realistic counter-
+    // examples (dashes, dots) since logical op ids and layer names use
+    // both; if the regex were ever tightened to alphanumerics only, this
+    // would fail on purpose.
+    val p = globalPort(logical = "my.op-with-dashes.v2", layer = "main-1")
+    assert(GlobalPortIdentitySerde.deserializeFromString(p.serializeAsString) 
== p)
+  }
+
+  it should "throw IllegalArgumentException when serializing a negative port 
id" in {
+    // Port ids are array indices and must be non-negative; the serializer
+    // rejects negatives so corrupt data can't reach VFS URIs.
+    intercept[IllegalArgumentException] {
+      globalPort(portIdValue = -1).serializeAsString
+    }
+  }
+
+  it should "throw IllegalArgumentException when deserializing a negative port 
id" in {
+    // Symmetric: a hand-crafted string with a negative portId must be
+    // rejected by the deserializer too (so tampered URIs don't slip
+    // through).
+    val malformed = 
"(logicalOpId=op-A,layerName=main,portId=-1,isInternal=false,isInput=true)"
+    intercept[IllegalArgumentException] {
+      GlobalPortIdentitySerde.deserializeFromString(malformed)
+    }
+  }
+
+  it should "throw IllegalArgumentException when the input has the wrong field 
order" in {
+    // The regex pins the documented field order; a swapped order should
+    // not silently parse with confused values.
+    val swapped = 
"(layerName=main,logicalOpId=op-A,portId=0,isInternal=false,isInput=true)"
+    intercept[IllegalArgumentException] {
+      GlobalPortIdentitySerde.deserializeFromString(swapped)
+    }
+  }
+
+  it should "throw IllegalArgumentException when the input has trailing 
content past the closing paren" in {
+    val withTrailing =
+      
"(logicalOpId=op-A,layerName=main,portId=0,isInternal=false,isInput=true) extra"
+    intercept[IllegalArgumentException] {
+      GlobalPortIdentitySerde.deserializeFromString(withTrailing)
+    }
+  }
+
+  it should "throw IllegalArgumentException when a field body is empty" in {
+    // `[^,]+` requires at least one character, so an empty layerName
+    // (`,layerName=,`) must fail to match.
+    val emptyLayer = 
"(logicalOpId=op-A,layerName=,portId=0,isInternal=false,isInput=true)"
+    intercept[IllegalArgumentException] {
+      GlobalPortIdentitySerde.deserializeFromString(emptyLayer)
+    }
+  }
+
+  it should "throw IllegalArgumentException on a completely malformed string" 
in {
+    val ex = intercept[IllegalArgumentException] {
+      GlobalPortIdentitySerde.deserializeFromString("not even close")
+    }
+    assert(ex.getMessage.contains("not even close"))
+  }
+
+  it should "throw IllegalArgumentException when a required field is missing" 
in {
+    // Drop isInput.
+    val malformed = 
"(logicalOpId=op-A,layerName=main,portId=0,isInternal=false)"
+    intercept[IllegalArgumentException] {
+      GlobalPortIdentitySerde.deserializeFromString(malformed)
+    }
+  }
+
+  it should "throw NumberFormatException when portId is non-numeric" in {
+    // The regex matches (`[^,]+`) but `.toInt` fails. NumberFormatException
+    // extends IllegalArgumentException; assert the more specific type so a
+    // regression that swallowed/rewrapped it is visible.
+    val malformed = 
"(logicalOpId=op-A,layerName=main,portId=NaN,isInternal=false,isInput=true)"
+    intercept[NumberFormatException] {
+      GlobalPortIdentitySerde.deserializeFromString(malformed)
+    }
+  }
+
+  it should "throw IllegalArgumentException when a boolean field is 
non-boolean" in {
+    // `String.toBoolean` is strict: only \"true\" / \"false\" 
(case-insensitive)
+    // pass; anything else throws IllegalArgumentException.
+    val malformed = 
"(logicalOpId=op-A,layerName=main,portId=0,isInternal=maybe,isInput=true)"
+    intercept[IllegalArgumentException] {
+      GlobalPortIdentitySerde.deserializeFromString(malformed)
+    }
+  }
+
+  it should "use no underscore in its own format characters (separators / 
keys)" in {
+    // Pin the format-character invariant: the wrapping `(...)`, the field
+    // separators `,`, the key=value separators, and the field NAMES
+    // themselves contain no underscore. Verify by building the format with
+    // empty-string-replacement values for every input field, so anything
+    // left in the output is purely from `serializeAsString`'s own format.
+    // (For the layerName field the empty-input variant is rejected by the
+    // deserializer regex; here we only check the SERIALIZED output, not the
+    // round-trip.)
+    val s = globalPort(logical = "x", layer = "x").serializeAsString
+    val formatChars = s.replace("x", "").replace("0", "").replace("false", 
"").replace("true", "")
+    assert(!formatChars.contains("_"), s"format characters must be 
underscore-free: $formatChars")
+  }
+
+  it should "throw IllegalArgumentException when logicalOpId contains an 
underscore" in {
+    // Enforces the documented VFS-compatibility contract: the serialized
+    // form must be underscore-free. The serializer rejects underscored
+    // inputs at the boundary instead of silently emitting a string that
+    // would interfere with VFS URI parsing downstream.
+    intercept[IllegalArgumentException] {
+      globalPort(logical = "__DummyOperator").serializeAsString
+    }
+  }
+
+  it should "throw IllegalArgumentException when layerName contains an 
underscore" in {
+    // Both fields enforce the same invariant; cover them independently so
+    // a partial fix that only validates one surfaces as a test failure.
+    intercept[IllegalArgumentException] {
+      globalPort(layer = "main_source_0_op").serializeAsString
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // PortIdentityKeySerializer.portIdToString (companion, not the Jackson 
class)
+  // 
---------------------------------------------------------------------------
+
+  "PortIdentityKeySerializer.portIdToString" should "format a PortIdentity as 
`id_internal`" in {
+    assert(PortIdentityKeySerializer.portIdToString(PortIdentity(0, internal = 
false)) == "0_false")
+    assert(PortIdentityKeySerializer.portIdToString(PortIdentity(7, internal = 
true)) == "7_true")
+  }
+
+  // 
---------------------------------------------------------------------------
+  // PortIdentityKeySerializer + PortIdentityKeyDeserializer (Jackson wiring)
+  // 
---------------------------------------------------------------------------
+  //
+  // These tests use the production `JSONUtils.objectMapper` directly so a
+  // regression in the singleton wiring (e.g. the module that registers the
+  // PortIdentity key (de)serializer being removed or reordered) surfaces
+  // here, not just on a freshly-constructed mapper.
+
+  "PortIdentity Jackson key (de)serialization" should "round-trip a 
Map[PortIdentity, String] via JSONUtils.objectMapper" in {
+    val original = Map(
+      PortIdentity(0, internal = false) -> "a",
+      PortIdentity(1, internal = true) -> "b"
+    )
+    val json = objectMapper.writeValueAsString(original)
+    // Verify the JSON keys match the documented `id_internal` format.
+    assert(json.contains("\"0_false\""))
+    assert(json.contains("\"1_true\""))
+    val tref = objectMapper.getTypeFactory
+      .constructMapType(classOf[java.util.HashMap[_, _]], 
classOf[PortIdentity], classOf[String])
+    val restored: java.util.Map[PortIdentity, String] = 
objectMapper.readValue(json, tref)
+    import scala.jdk.CollectionConverters._
+    assert(restored.asScala.toMap == original)
+  }
+
+  it should "round-trip an empty Map[PortIdentity, V] without invoking the 
(de)serializer" in {
+    val original = Map.empty[PortIdentity, String]
+    val json = objectMapper.writeValueAsString(original)
+    val tref = objectMapper.getTypeFactory
+      .constructMapType(classOf[java.util.HashMap[_, _]], 
classOf[PortIdentity], classOf[String])
+    val restored: java.util.Map[PortIdentity, String] = 
objectMapper.readValue(json, tref)
+    assert(restored.isEmpty)
+  }
+
+  "PortIdentityKeyDeserializer.deserializeKey" should "throw 
NumberFormatException for a non-integer id" in {
+    val d = new PortIdentityKeyDeserializer
+    intercept[NumberFormatException] {
+      d.deserializeKey("notAnInt_false", null)
+    }
+  }
+
+  it should "throw IllegalArgumentException for a non-boolean internal flag" 
in {
+    val d = new PortIdentityKeyDeserializer
+    intercept[IllegalArgumentException] {
+      d.deserializeKey("0_notABool", null)
+    }
+  }
+
+  it should "throw NumberFormatException when the underscore separator is 
missing and the whole string is non-numeric" in {
+    // `key.split("_")` on a separator-less non-numeric string yields a
+    // single-element array, and `parts(0).toInt` fires first → NFE.
+    val d = new PortIdentityKeyDeserializer
+    intercept[NumberFormatException] {
+      d.deserializeKey("missingSeparator", null)
+    }
+  }
+
+  it should "throw ArrayIndexOutOfBoundsException when only the id is provided 
(no `_internal` suffix)" in {
+    // Different separator-missing path: `\"5\".split(\"_\")` yields
+    // [\"5\"], parts(0).toInt = 5 succeeds, then parts(1) reads past the
+    // end. Pin this failure mode explicitly so a future safer parser
+    // breaks the spec on purpose (and the safer error type is chosen
+    // consciously).
+    val d = new PortIdentityKeyDeserializer
+    intercept[ArrayIndexOutOfBoundsException] {
+      d.deserializeKey("5", null)
+    }
+  }
+
+  it should "silently accept extra trailing underscore-separated segments 
(lenient parser, current behavior)" in {
+    // Pin the current lenient behavior: `parts(0).toInt` and
+    // `parts(1).toBoolean` ignore everything past `parts(1)`, so a key
+    // like `"1_true_garbage"` deserializes to `PortIdentity(1, true)`
+    // without complaint. The strict-rejection variant lives in a
+    // pendingUntilFixed test below; characterizing today's lenient
+    // path here means a future-tightening fix would need to update
+    // both tests deliberately.
+    val d = new PortIdentityKeyDeserializer
+    val pid = d.deserializeKey("1_true_garbage", null)
+    assert(pid == PortIdentity(1, internal = true))
+  }
+
+  it should "eventually reject keys with extra trailing segments 
(pendingUntilFixed)" in pendingUntilFixed {
+    // Documented contract: a `PortIdentityKeySerializer` output is exactly
+    // `id_internal` — two underscore-separated segments. Anything else is
+    // corrupt JSON and should be rejected, not silently truncated. The
+    // current implementation is lenient (see characterization test
+    // above); this pendingUntilFixed flips to passing once the parser
+    // is hardened, then `pendingUntilFixed` inverts that into a
+    // deliberate failure forcing the marker to be removed.
+    val d = new PortIdentityKeyDeserializer
+    intercept[IllegalArgumentException] {
+      d.deserializeKey("1_true_garbage", null)
+    }
+  }
+}

Reply via email to