This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 4f56946c86 test(workflow-core): add unit test coverage for
port-identity serde helpers (#4954)
4f56946c86 is described below
commit 4f56946c861efbe30af7ac96b6076dc76f5ac856
Author: Xinyuan Lin <[email protected]>
AuthorDate: Tue May 12 22:44:21 2026 -0700
test(workflow-core): add unit test coverage for port-identity serde helpers
(#4954)
### What changes were proposed in this PR?
Adds `PortIdentitySerdeSpec` covering three Jackson / VFS-URI serde
helpers in `org.apache.texera.amber.util.serde`, and tightens the
production contract on `GlobalPortIdentitySerde` after review.
Production change (added per review):
| Surface | Change |
| --- | --- |
| `GlobalPortIdentitySerde.serializeAsString` | Rejects `portId < 0`,
`logicalOpId` containing `_`, `layerName` containing `_` with
`IllegalArgumentException`. |
| `GlobalPortIdentitySerde.deserializeFromString` | Symmetrically
rejects `portId < 0`. |
| Python `serialize_global_port_identity` /
`deserialize_global_port_identity` | Mirrored: raises `ValueError` on
the same invariants. |
| Iceberg test fixtures (`IcebergTableStatsSpec`, `IcebergDocumentSpec`,
`test_iceberg_document.py`) | Op-id `test_table_<uuid>` →
`test-table-<uuid>` to honor the new contract.
`IcebergDocumentConsoleMessagesSpec` is unchanged —
`createConsoleMessagesURI` does not go through `serializeAsString`. |
Test coverage:
| Helper | Pinned by this spec |
| --- | --- |
| `GlobalPortIdentitySerde.serializeAsString` / `deserializeFromString`
| Default + per-field round-trip; exact format pin (default +
non-default values); special-character pass-through (dashes / dots);
negative `portId` rejected (serialize + deserialize); underscored
`logicalOpId` rejected; underscored `layerName` rejected;
format-character no-underscore invariant; seven negative paths —
completely malformed, missing field, wrong field order, trailing
content, empty body, non-numeric `portId` → `NumberFormatException`,
non-boolean flag → `IllegalArgumentException` |
| `PortIdentityKeySerializer.portIdToString` | Exact `id_internal`
format. |
| `PortIdentityKeySerializer` + `PortIdentityKeyDeserializer` (Jackson
wiring) | The production `JSONUtils.objectMapper`, round-tripping
`Map[PortIdentity, String]` and an empty map. |
| `PortIdentityKeyDeserializer.deserializeKey` | Four negative paths
(non-integer id, non-boolean flag, missing-separator with non-numeric
body, missing-separator with numeric-only body); current lenient
extra-segments behavior pinned in a characterization test, with a
`pendingUntilFixed` for the stricter contract. |
### Any related issues, documentation, discussions?
Closes #4953
### How was this PR tested?
```
sbt "WorkflowCore/Test/testOnly
org.apache.texera.amber.util.serde.PortIdentitySerdeSpec"
# 24 tests, all pass (1 unchanged pendingUntilFixed for
PortIdentityKeyDeserializer)
sbt "WorkflowCore/Test/testOnly
org.apache.texera.amber.core.storage.VFSURIFactorySpec
org.apache.texera.amber.storage.result.iceberg.IcebergDocumentSpec
org.apache.texera.amber.storage.result.iceberg.IcebergTableStatsSpec
org.apache.texera.amber.storage.result.iceberg.IcebergDocumentConsoleMessagesSpec"
# 30 tests, all pass
sbt "WorkflowExecutionService/Test/testOnly
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResourceSpec"
# 2 tests, all pass
pytest core/util/virtual_identity/
# 17 tests, all pass
sbt "WorkflowCore/scalafmtCheck; WorkflowCore/Test/scalafmtCheck"
# clean
```
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Claude Opus 4.7)
---
.../src/main/python/core/util/virtual_identity.py | 17 ++
.../core/storage/iceberg/test_iceberg_document.py | 2 +-
.../test/python/core/util/test_virtual_identity.py | 23 ++
.../amber/util/serde/GlobalPortIdentitySerde.scala | 19 +-
.../result/iceberg/IcebergDocumentSpec.scala | 2 +-
.../result/iceberg/IcebergTableStatsSpec.scala | 2 +-
.../amber/util/serde/PortIdentitySerdeSpec.scala | 320 +++++++++++++++++++++
7 files changed, 379 insertions(+), 6 deletions(-)
diff --git a/amber/src/main/python/core/util/virtual_identity.py
b/amber/src/main/python/core/util/virtual_identity.py
index 93a887d7c6..49da75fcd5 100644
--- a/amber/src/main/python/core/util/virtual_identity.py
+++ b/amber/src/main/python/core/util/virtual_identity.py
@@ -42,12 +42,27 @@ def serialize_global_port_identity(obj: GlobalPortIdentity)
-> str:
Expected format:
``(logicalOpId=<logicalOpId>,layerName=<layerName>,
portId=<portId.id>,isInternal=<portId.internal>,isInput=<input>)``
+
+ Raises ValueError if `logicalOpId` or `layerName` contains an underscore
+ (VFS URI parsing relies on the absence of '_'), or if `portId` is negative.
"""
logical_op_id = obj.op_id.logical_op_id.id
layer_name = obj.op_id.layer_name
port_id = obj.port_id.id
is_internal = obj.port_id.internal
is_input_port = obj.input
+ if "_" in logical_op_id:
+ raise ValueError(
+ f"logicalOpId must not contain '_' "
+ f"(VFS URI parsing relies on this): {logical_op_id}"
+ )
+ if "_" in layer_name:
+ raise ValueError(
+ f"layerName must not contain '_' "
+ f"(VFS URI parsing relies on this): {layer_name}"
+ )
+ if port_id < 0:
+ raise ValueError(f"portId must be non-negative: {port_id}")
return (
f"(logicalOpId={logical_op_id},layerName={layer_name},portId={port_id},"
f"isInternal={str(is_internal).lower()},isInput={str(is_input_port).lower()})"
@@ -72,6 +87,8 @@ def deserialize_global_port_identity(encoded_str: str) ->
GlobalPortIdentity:
match.groups()
)
port_id = int(port_id_str)
+ if port_id < 0:
+ raise ValueError(f"portId must be non-negative: {port_id}")
is_internal = is_internal_str.lower() == "true"
is_input_port = is_input_str.lower() == "true"
op_id = PhysicalOpIdentity(
diff --git
a/amber/src/test/python/core/storage/iceberg/test_iceberg_document.py
b/amber/src/test/python/core/storage/iceberg/test_iceberg_document.py
index a218c64a2d..327b907306 100644
--- a/amber/src/test/python/core/storage/iceberg/test_iceberg_document.py
+++ b/amber/src/test/python/core/storage/iceberg/test_iceberg_document.py
@@ -86,7 +86,7 @@ class TestIcebergDocument:
ExecutionIdentity(id=0),
GlobalPortIdentity(
op_id=PhysicalOpIdentity(
-
logical_op_id=OperatorIdentity(id=f"test_table_{operator_uuid}"),
+
logical_op_id=OperatorIdentity(id=f"test-table-{operator_uuid}"),
layer_name="main",
),
port_id=PortIdentity(id=0),
diff --git a/amber/src/test/python/core/util/test_virtual_identity.py
b/amber/src/test/python/core/util/test_virtual_identity.py
index 431dd84146..c2f6f63685 100644
--- a/amber/src/test/python/core/util/test_virtual_identity.py
+++ b/amber/src/test/python/core/util/test_virtual_identity.py
@@ -105,6 +105,21 @@ class TestSerializeGlobalPortIdentity:
assert recovered.port_id.internal is True
assert recovered.input is False
+ def test_rejects_underscore_in_logical_op_id(self):
+ # VFS-compatibility contract: serialized output must be
+ # underscore-free. Fail fast at the boundary on underscored input.
+ with pytest.raises(ValueError, match="logicalOpId must not contain"):
+ serialize_global_port_identity(_gpi(op_id="__DummyOperator"))
+
+ def test_rejects_underscore_in_layer_name(self):
+ with pytest.raises(ValueError, match="layerName must not contain"):
+ serialize_global_port_identity(_gpi(layer="main_source_0_op"))
+
+ def test_rejects_negative_port_id(self):
+ # Port ids are array indices and must be non-negative.
+ with pytest.raises(ValueError, match="portId must be non-negative"):
+ serialize_global_port_identity(_gpi(port=-1))
+
class TestDeserializeGlobalPortIdentity:
def test_parses_canonical_encoded_string(self):
@@ -137,6 +152,14 @@ class TestDeserializeGlobalPortIdentity:
"(logicalOpId=op,layerName=l,portId=0,isInternal=true)"
)
+ def test_raises_value_error_on_negative_port_id(self):
+ # Symmetric with the serializer: tampered URIs with a negative
+ # portId must be rejected on the way back in.
+ with pytest.raises(ValueError, match="portId must be non-negative"):
+ deserialize_global_port_identity(
+
"(logicalOpId=op,layerName=l,portId=-1,isInternal=false,isInput=true)"
+ )
+
class TestGetFromActorIdForInputPortStorage:
def test_prefixes_materialization_reader_to_uri_plus_actor_name(self):
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/serde/GlobalPortIdentitySerde.scala
b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/serde/GlobalPortIdentitySerde.scala
index 48c087443c..c8fd8e1a36 100644
---
a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/serde/GlobalPortIdentitySerde.scala
+++
b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/serde/GlobalPortIdentitySerde.scala
@@ -23,17 +23,20 @@ import
org.apache.texera.amber.core.workflow.{GlobalPortIdentity, PortIdentity}
/**
* Serialize and deserializes a GlobalPortIdentity object to a string using a
custom, human-readable format
- * to ensure it works with both URI and file path and does not incldue
underscore "_" so that it does not
- * interfere with our own VFS URI parsing.
+ * to ensure it works with both URI and file path and does not include
underscore "_" so that it does not
+ * interfere with our own VFS URI parsing. Underscores in `logicalOpId` /
`layerName` and negative `portId`
+ * values are rejected with `IllegalArgumentException`.
*/
object GlobalPortIdentitySerde {
implicit class SerdeOps(globalPortId: GlobalPortIdentity) {
/**
* Serializes a GlobalPortIdentity object into a string using our custom,
human-readable format
- * that works with both URI and file path and does not incldue underscore
"_" so that it does not
+ * that works with both URI and file path and does not include underscore
"_" so that it does not
* interfere with our own VFS URI parsing.
*
+ * @throws java.lang.IllegalArgumentException if `logicalOpId` or
`layerName` contains an underscore,
+ * or if `portId.id` is
negative.
* @return A serialized string representation of globalPortId
*/
def serializeAsString: String = {
@@ -42,6 +45,15 @@ object GlobalPortIdentitySerde {
val portId = globalPortId.portId.id
val isInternal = globalPortId.portId.internal
val isInput = globalPortId.input
+ require(
+ !logicalOpId.contains('_'),
+ s"logicalOpId must not contain '_' (VFS URI parsing relies on this):
$logicalOpId"
+ )
+ require(
+ !layerName.contains('_'),
+ s"layerName must not contain '_' (VFS URI parsing relies on this):
$layerName"
+ )
+ require(portId >= 0, s"portId must be non-negative: $portId")
s"(logicalOpId=$logicalOpId,layerName=$layerName,portId=$portId,isInternal=$isInternal,isInput=$isInput)"
}
}
@@ -58,6 +70,7 @@ object GlobalPortIdentitySerde {
serializedGlobalPortId match {
case pattern(logicalOpId, layerName, portIdStr, isInternalStr,
isInputStr) =>
val portIdInt = portIdStr.toInt
+ require(portIdInt >= 0, s"portId must be non-negative: $portIdInt")
val isInternal = isInternalStr.toBoolean
val isInput = isInputStr.toBoolean
val physicalOpId = PhysicalOpIdentity(
diff --git
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
index 8fdf039f3e..eb259fed58 100644
---
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
+++
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
@@ -85,7 +85,7 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple]
with BeforeAndAfter
GlobalPortIdentity(
PhysicalOpIdentity(
logicalOpId =
-
OperatorIdentity(s"test_table_${UUID.randomUUID().toString.replace("-", "")}"),
+
OperatorIdentity(s"test-table-${UUID.randomUUID().toString.replace("-", "")}"),
layerName = "main"
),
PortIdentity()
diff --git
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergTableStatsSpec.scala
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergTableStatsSpec.scala
index 175ebc2c01..b7cf776eb8 100644
---
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergTableStatsSpec.scala
+++
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergTableStatsSpec.scala
@@ -56,7 +56,7 @@ class IcebergTableStatsSpec extends AnyFlatSpec with
BeforeAndAfterAll with Suit
GlobalPortIdentity(
PhysicalOpIdentity(
logicalOpId =
-
OperatorIdentity(s"test_table_${UUID.randomUUID().toString.replace("-", "")}"),
+
OperatorIdentity(s"test-table-${UUID.randomUUID().toString.replace("-", "")}"),
layerName = "main"
),
PortIdentity()
diff --git
a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/serde/PortIdentitySerdeSpec.scala
b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/serde/PortIdentitySerdeSpec.scala
new file mode 100644
index 0000000000..89a815c39e
--- /dev/null
+++
b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/serde/PortIdentitySerdeSpec.scala
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.util.serde
+
+import org.apache.texera.amber.core.virtualidentity.{OperatorIdentity,
PhysicalOpIdentity}
+import org.apache.texera.amber.core.workflow.{GlobalPortIdentity, PortIdentity}
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.apache.texera.amber.util.serde.GlobalPortIdentitySerde.SerdeOps
+import org.scalatest.flatspec.AnyFlatSpec
+
+class PortIdentitySerdeSpec extends AnyFlatSpec {
+
+ //
---------------------------------------------------------------------------
+ // GlobalPortIdentitySerde
+ //
---------------------------------------------------------------------------
+
+ private def globalPort(
+ logical: String = "op-A",
+ layer: String = "main",
+ portIdValue: Int = 0,
+ internal: Boolean = false,
+ input: Boolean = true
+ ): GlobalPortIdentity =
+ GlobalPortIdentity(
+ opId = PhysicalOpIdentity(OperatorIdentity(logical), layer),
+ portId = PortIdentity(id = portIdValue, internal = internal),
+ input = input
+ )
+
+ "GlobalPortIdentitySerde" should "round-trip a default GlobalPortIdentity
through serializeAsString → deserializeFromString" in {
+ val original = globalPort()
+ val restored =
GlobalPortIdentitySerde.deserializeFromString(original.serializeAsString)
+ assert(restored == original)
+ }
+
+ it should "preserve all five fields independently across the round-trip" in {
+ // Vary each field individually so a regression that swapped two fields
+ // (e.g., isInput / isInternal) would surface here, not as a general
+ // round-trip failure.
+ val cases = Seq(
+ globalPort(logical = "op-A"),
+ globalPort(logical = "op-Z"),
+ globalPort(layer = "main"),
+ globalPort(layer = "extra-layer"),
+ globalPort(portIdValue = 0),
+ globalPort(portIdValue = 7),
+ globalPort(internal = false),
+ globalPort(internal = true),
+ globalPort(input = true),
+ globalPort(input = false)
+ )
+ cases.foreach { p =>
+ val s = p.serializeAsString
+ val restored = GlobalPortIdentitySerde.deserializeFromString(s)
+ assert(restored == p, s"round-trip mismatch for $p (serialized: $s)")
+ }
+ }
+
+ it should "produce the documented format for default and non-default values"
in {
+ // Pin the exact format. If this changes, callers reading existing
+ // VFS URIs from disk will break — locking it down forces a deliberate
+ // migration story.
+ assert(
+ globalPort().serializeAsString ==
+
"(logicalOpId=op-A,layerName=main,portId=0,isInternal=false,isInput=true)"
+ )
+ assert(
+ globalPort(
+ logical = "op-Z",
+ layer = "extra-layer",
+ portIdValue = 7,
+ internal = true,
+ input = false
+ ).serializeAsString ==
+
"(logicalOpId=op-Z,layerName=extra-layer,portId=7,isInternal=true,isInput=false)"
+ )
+ }
+
+ it should "round-trip identifiers containing dashes and dots (regex
non-comma matcher)" in {
+ // The deserialization regex uses `[^,]+` for the field body, so any
+ // non-comma character is fair game. Cover the realistic counter-
+ // examples (dashes, dots) since logical op ids and layer names use
+ // both; if the regex were ever tightened to alphanumerics only, this
+ // would fail on purpose.
+ val p = globalPort(logical = "my.op-with-dashes.v2", layer = "main-1")
+ assert(GlobalPortIdentitySerde.deserializeFromString(p.serializeAsString)
== p)
+ }
+
+ it should "throw IllegalArgumentException when serializing a negative port
id" in {
+ // Port ids are array indices and must be non-negative; the serializer
+ // rejects negatives so corrupt data can't reach VFS URIs.
+ intercept[IllegalArgumentException] {
+ globalPort(portIdValue = -1).serializeAsString
+ }
+ }
+
+ it should "throw IllegalArgumentException when deserializing a negative port
id" in {
+ // Symmetric: a hand-crafted string with a negative portId must be
+ // rejected by the deserializer too (so tampered URIs don't slip
+ // through).
+ val malformed =
"(logicalOpId=op-A,layerName=main,portId=-1,isInternal=false,isInput=true)"
+ intercept[IllegalArgumentException] {
+ GlobalPortIdentitySerde.deserializeFromString(malformed)
+ }
+ }
+
+ it should "throw IllegalArgumentException when the input has the wrong field
order" in {
+ // The regex pins the documented field order; a swapped order should
+ // not silently parse with confused values.
+ val swapped =
"(layerName=main,logicalOpId=op-A,portId=0,isInternal=false,isInput=true)"
+ intercept[IllegalArgumentException] {
+ GlobalPortIdentitySerde.deserializeFromString(swapped)
+ }
+ }
+
+ it should "throw IllegalArgumentException when the input has trailing
content past the closing paren" in {
+ val withTrailing =
+
"(logicalOpId=op-A,layerName=main,portId=0,isInternal=false,isInput=true) extra"
+ intercept[IllegalArgumentException] {
+ GlobalPortIdentitySerde.deserializeFromString(withTrailing)
+ }
+ }
+
+ it should "throw IllegalArgumentException when a field body is empty" in {
+ // `[^,]+` requires at least one character, so an empty layerName
+ // (`,layerName=,`) must fail to match.
+ val emptyLayer =
"(logicalOpId=op-A,layerName=,portId=0,isInternal=false,isInput=true)"
+ intercept[IllegalArgumentException] {
+ GlobalPortIdentitySerde.deserializeFromString(emptyLayer)
+ }
+ }
+
+ it should "throw IllegalArgumentException on a completely malformed string"
in {
+ val ex = intercept[IllegalArgumentException] {
+ GlobalPortIdentitySerde.deserializeFromString("not even close")
+ }
+ assert(ex.getMessage.contains("not even close"))
+ }
+
+ it should "throw IllegalArgumentException when a required field is missing"
in {
+ // Drop isInput.
+ val malformed =
"(logicalOpId=op-A,layerName=main,portId=0,isInternal=false)"
+ intercept[IllegalArgumentException] {
+ GlobalPortIdentitySerde.deserializeFromString(malformed)
+ }
+ }
+
+ it should "throw NumberFormatException when portId is non-numeric" in {
+ // The regex matches (`[^,]+`) but `.toInt` fails. NumberFormatException
+ // extends IllegalArgumentException; assert the more specific type so a
+ // regression that swallowed/rewrapped it is visible.
+ val malformed =
"(logicalOpId=op-A,layerName=main,portId=NaN,isInternal=false,isInput=true)"
+ intercept[NumberFormatException] {
+ GlobalPortIdentitySerde.deserializeFromString(malformed)
+ }
+ }
+
+ it should "throw IllegalArgumentException when a boolean field is
non-boolean" in {
+ // `String.toBoolean` is strict: only \"true\" / \"false\"
(case-insensitive)
+ // pass; anything else throws IllegalArgumentException.
+ val malformed =
"(logicalOpId=op-A,layerName=main,portId=0,isInternal=maybe,isInput=true)"
+ intercept[IllegalArgumentException] {
+ GlobalPortIdentitySerde.deserializeFromString(malformed)
+ }
+ }
+
+ it should "use no underscore in its own format characters (separators /
keys)" in {
+ // Pin the format-character invariant: the wrapping `(...)`, the field
+ // separators `,`, the key=value separators, and the field NAMES
+ // themselves contain no underscore. Verify by building the format with
+ // empty-string-replacement values for every input field, so anything
+ // left in the output is purely from `serializeAsString`'s own format.
+ // (For the layerName field the empty-input variant is rejected by the
+ // deserializer regex; here we only check the SERIALIZED output, not the
+ // round-trip.)
+ val s = globalPort(logical = "x", layer = "x").serializeAsString
+ val formatChars = s.replace("x", "").replace("0", "").replace("false",
"").replace("true", "")
+ assert(!formatChars.contains("_"), s"format characters must be
underscore-free: $formatChars")
+ }
+
+ it should "throw IllegalArgumentException when logicalOpId contains an
underscore" in {
+ // Enforces the documented VFS-compatibility contract: the serialized
+ // form must be underscore-free. The serializer rejects underscored
+ // inputs at the boundary instead of silently emitting a string that
+ // would interfere with VFS URI parsing downstream.
+ intercept[IllegalArgumentException] {
+ globalPort(logical = "__DummyOperator").serializeAsString
+ }
+ }
+
+ it should "throw IllegalArgumentException when layerName contains an
underscore" in {
+ // Both fields enforce the same invariant; cover them independently so
+ // a partial fix that only validates one surfaces as a test failure.
+ intercept[IllegalArgumentException] {
+ globalPort(layer = "main_source_0_op").serializeAsString
+ }
+ }
+
+ //
---------------------------------------------------------------------------
+ // PortIdentityKeySerializer.portIdToString (companion, not the Jackson
class)
+ //
---------------------------------------------------------------------------
+
+ "PortIdentityKeySerializer.portIdToString" should "format a PortIdentity as
`id_internal`" in {
+ assert(PortIdentityKeySerializer.portIdToString(PortIdentity(0, internal =
false)) == "0_false")
+ assert(PortIdentityKeySerializer.portIdToString(PortIdentity(7, internal =
true)) == "7_true")
+ }
+
+ //
---------------------------------------------------------------------------
+ // PortIdentityKeySerializer + PortIdentityKeyDeserializer (Jackson wiring)
+ //
---------------------------------------------------------------------------
+ //
+ // These tests use the production `JSONUtils.objectMapper` directly so a
+ // regression in the singleton wiring (e.g. the module that registers the
+ // PortIdentity key (de)serializer being removed or reordered) surfaces
+ // here, not just on a freshly-constructed mapper.
+
+ "PortIdentity Jackson key (de)serialization" should "round-trip a
Map[PortIdentity, String] via JSONUtils.objectMapper" in {
+ val original = Map(
+ PortIdentity(0, internal = false) -> "a",
+ PortIdentity(1, internal = true) -> "b"
+ )
+ val json = objectMapper.writeValueAsString(original)
+ // Verify the JSON keys match the documented `id_internal` format.
+ assert(json.contains("\"0_false\""))
+ assert(json.contains("\"1_true\""))
+ val tref = objectMapper.getTypeFactory
+ .constructMapType(classOf[java.util.HashMap[_, _]],
classOf[PortIdentity], classOf[String])
+ val restored: java.util.Map[PortIdentity, String] =
objectMapper.readValue(json, tref)
+ import scala.jdk.CollectionConverters._
+ assert(restored.asScala.toMap == original)
+ }
+
+ it should "round-trip an empty Map[PortIdentity, V] without invoking the
(de)serializer" in {
+ val original = Map.empty[PortIdentity, String]
+ val json = objectMapper.writeValueAsString(original)
+ val tref = objectMapper.getTypeFactory
+ .constructMapType(classOf[java.util.HashMap[_, _]],
classOf[PortIdentity], classOf[String])
+ val restored: java.util.Map[PortIdentity, String] =
objectMapper.readValue(json, tref)
+ assert(restored.isEmpty)
+ }
+
+ "PortIdentityKeyDeserializer.deserializeKey" should "throw
NumberFormatException for a non-integer id" in {
+ val d = new PortIdentityKeyDeserializer
+ intercept[NumberFormatException] {
+ d.deserializeKey("notAnInt_false", null)
+ }
+ }
+
+ it should "throw IllegalArgumentException for a non-boolean internal flag"
in {
+ val d = new PortIdentityKeyDeserializer
+ intercept[IllegalArgumentException] {
+ d.deserializeKey("0_notABool", null)
+ }
+ }
+
+ it should "throw NumberFormatException when the underscore separator is
missing and the whole string is non-numeric" in {
+ // `key.split("_")` on a separator-less non-numeric string yields a
+ // single-element array, and `parts(0).toInt` fires first → NFE.
+ val d = new PortIdentityKeyDeserializer
+ intercept[NumberFormatException] {
+ d.deserializeKey("missingSeparator", null)
+ }
+ }
+
+ it should "throw ArrayIndexOutOfBoundsException when only the id is provided
(no `_internal` suffix)" in {
+ // Different separator-missing path: `\"5\".split(\"_\")` yields
+ // [\"5\"], parts(0).toInt = 5 succeeds, then parts(1) reads past the
+ // end. Pin this failure mode explicitly so a future safer parser
+ // breaks the spec on purpose (and the safer error type is chosen
+ // consciously).
+ val d = new PortIdentityKeyDeserializer
+ intercept[ArrayIndexOutOfBoundsException] {
+ d.deserializeKey("5", null)
+ }
+ }
+
+ it should "silently accept extra trailing underscore-separated segments
(lenient parser, current behavior)" in {
+ // Pin the current lenient behavior: `parts(0).toInt` and
+ // `parts(1).toBoolean` ignore everything past `parts(1)`, so a key
+ // like `"1_true_garbage"` deserializes to `PortIdentity(1, true)`
+ // without complaint. The strict-rejection variant lives in a
+ // pendingUntilFixed test below; characterizing today's lenient
+ // path here means a future-tightening fix would need to update
+ // both tests deliberately.
+ val d = new PortIdentityKeyDeserializer
+ val pid = d.deserializeKey("1_true_garbage", null)
+ assert(pid == PortIdentity(1, internal = true))
+ }
+
+ it should "eventually reject keys with extra trailing segments
(pendingUntilFixed)" in pendingUntilFixed {
+ // Documented contract: a `PortIdentityKeySerializer` output is exactly
+ // `id_internal` — two underscore-separated segments. Anything else is
+ // corrupt JSON and should be rejected, not silently truncated. The
+ // current implementation is lenient (see characterization test
+ // above); this pendingUntilFixed flips to passing once the parser
+ // is hardened, then `pendingUntilFixed` inverts that into a
+ // deliberate failure forcing the marker to be removed.
+ val d = new PortIdentityKeyDeserializer
+ intercept[IllegalArgumentException] {
+ d.deserializeKey("1_true_garbage", null)
+ }
+ }
+}