This is an automated email from the ASF dual-hosted git repository. aglinxinyuan pushed a commit to branch test-port-identity-serde-spec in repository https://gitbox.apache.org/repos/asf/texera.git
commit 5167c9c38fecf8f55c7075e74711d24e1c0782d7 Author: Xinyuan Lin <[email protected]> AuthorDate: Tue May 12 22:24:01 2026 -0700 feat(workflow-core): enforce non-negative portId + underscore-free op/layer in serde Address review feedback on #4954: - Yicong-Huang: "lets disallow negative port id" — `GlobalPortIdentitySerde.serializeAsString` and `.deserializeFromString` now reject `portId < 0` with `IllegalArgumentException`. Mirrored in the Python helper `serialize_global_port_identity` / `deserialize_global_port_identity` (raises ValueError). - Yicong-Huang: "can we enforce it (to fix it)?" — the documented VFS-compatibility contract ("serialized form does not include underscore '_'") was previously aspirational; inputs were interpolated verbatim. Now `serializeAsString` rejects `logicalOpId` or `layerName` containing `_` at the boundary, so a bad input fails fast instead of silently emitting a string that would interfere with VFS URI parsing downstream. Mirrored in the Python helper. Tests: - `PortIdentitySerdeSpec`: previous round-trip-negative and `pendingUntilFixed` underscore tests replaced with regular tests asserting `IllegalArgumentException` for negative portId (both serialize + deserialize) and for underscored `logicalOpId` / `layerName`. - `test_virtual_identity.py`: matching ValueError assertions for the Python serializer/deserializer. Adjacent test fixtures updated to honor the new contract: - `IcebergTableStatsSpec`, `IcebergDocumentSpec`, `test_iceberg_document.py`: op-id fixture `test_table_<uuid>` → `test-table-<uuid>` (dashes instead of underscores). `IcebergDocumentConsoleMessagesSpec` is unchanged — it uses `createConsoleMessagesURI`, which does not go through `serializeAsString`. Verification: ``` sbt "WorkflowCore/Test/testOnly org.apache.texera.amber.util.serde.PortIdentitySerdeSpec" # 24 tests, all pass (1 unchanged pendingUntilFixed for PortIdentityKeyDeserializer) sbt "WorkflowCore/Test/testOnly org.apache.texera.amber.core.storage.VFSURIFactorySpec org.apache.texera.amber.storage.result.iceberg.IcebergDocumentSpec org.apache.texera.amber.storage.result.iceberg.IcebergTableStatsSpec org.apache.texera.amber.storage.result.iceberg.IcebergDocumentConsoleMessagesSpec" # 30 tests, all pass sbt "WorkflowExecutionService/Test/testOnly org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResourceSpec" # 2 tests, all pass pytest core/util/virtual_identity/ # 17 tests, all pass sbt "WorkflowCore/scalafmtCheck; WorkflowCore/Test/scalafmtCheck" # clean ``` --- .../core/storage/iceberg/test_iceberg_document.py | 2 +- .../python/core/util/virtual_identity/__init__.py | 17 ++++++ .../util/virtual_identity/test_virtual_identity.py | 23 +++++++++ .../amber/util/serde/GlobalPortIdentitySerde.scala | 19 +++++-- .../result/iceberg/IcebergDocumentSpec.scala | 2 +- .../result/iceberg/IcebergTableStatsSpec.scala | 2 +- .../amber/util/serde/PortIdentitySerdeSpec.scala | 60 ++++++++++++---------- 7 files changed, 91 insertions(+), 34 deletions(-) diff --git a/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py b/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py index 9b374f7d5c..72b60d6cb7 100644 --- a/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py +++ b/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py @@ -81,7 +81,7 @@ class TestIcebergDocument: ExecutionIdentity(id=0), GlobalPortIdentity( op_id=PhysicalOpIdentity( - logical_op_id=OperatorIdentity(id=f"test_table_{operator_uuid}"), + logical_op_id=OperatorIdentity(id=f"test-table-{operator_uuid}"), layer_name="main", ), port_id=PortIdentity(id=0), diff --git a/amber/src/main/python/core/util/virtual_identity/__init__.py b/amber/src/main/python/core/util/virtual_identity/__init__.py index 93a887d7c6..49da75fcd5 100644 --- a/amber/src/main/python/core/util/virtual_identity/__init__.py +++ b/amber/src/main/python/core/util/virtual_identity/__init__.py @@ -42,12 +42,27 @@ def serialize_global_port_identity(obj: GlobalPortIdentity) -> str: Expected format: ``(logicalOpId=<logicalOpId>,layerName=<layerName>, portId=<portId.id>,isInternal=<portId.internal>,isInput=<input>)`` + + Raises ValueError if `logicalOpId` or `layerName` contains an underscore + (VFS URI parsing relies on the absence of '_'), or if `portId` is negative. """ logical_op_id = obj.op_id.logical_op_id.id layer_name = obj.op_id.layer_name port_id = obj.port_id.id is_internal = obj.port_id.internal is_input_port = obj.input + if "_" in logical_op_id: + raise ValueError( + f"logicalOpId must not contain '_' " + f"(VFS URI parsing relies on this): {logical_op_id}" + ) + if "_" in layer_name: + raise ValueError( + f"layerName must not contain '_' " + f"(VFS URI parsing relies on this): {layer_name}" + ) + if port_id < 0: + raise ValueError(f"portId must be non-negative: {port_id}") return ( f"(logicalOpId={logical_op_id},layerName={layer_name},portId={port_id}," f"isInternal={str(is_internal).lower()},isInput={str(is_input_port).lower()})" @@ -72,6 +87,8 @@ def deserialize_global_port_identity(encoded_str: str) -> GlobalPortIdentity: match.groups() ) port_id = int(port_id_str) + if port_id < 0: + raise ValueError(f"portId must be non-negative: {port_id}") is_internal = is_internal_str.lower() == "true" is_input_port = is_input_str.lower() == "true" op_id = PhysicalOpIdentity( diff --git a/amber/src/main/python/core/util/virtual_identity/test_virtual_identity.py b/amber/src/main/python/core/util/virtual_identity/test_virtual_identity.py index 431dd84146..c2f6f63685 100644 --- a/amber/src/main/python/core/util/virtual_identity/test_virtual_identity.py +++ b/amber/src/main/python/core/util/virtual_identity/test_virtual_identity.py @@ -105,6 +105,21 @@ class TestSerializeGlobalPortIdentity: assert recovered.port_id.internal is True assert recovered.input is False + def test_rejects_underscore_in_logical_op_id(self): + # VFS-compatibility contract: serialized output must be + # underscore-free. Fail fast at the boundary on underscored input. + with pytest.raises(ValueError, match="logicalOpId must not contain"): + serialize_global_port_identity(_gpi(op_id="__DummyOperator")) + + def test_rejects_underscore_in_layer_name(self): + with pytest.raises(ValueError, match="layerName must not contain"): + serialize_global_port_identity(_gpi(layer="main_source_0_op")) + + def test_rejects_negative_port_id(self): + # Port ids are array indices and must be non-negative. + with pytest.raises(ValueError, match="portId must be non-negative"): + serialize_global_port_identity(_gpi(port=-1)) + class TestDeserializeGlobalPortIdentity: def test_parses_canonical_encoded_string(self): @@ -137,6 +152,14 @@ class TestDeserializeGlobalPortIdentity: "(logicalOpId=op,layerName=l,portId=0,isInternal=true)" ) + def test_raises_value_error_on_negative_port_id(self): + # Symmetric with the serializer: tampered URIs with a negative + # portId must be rejected on the way back in. + with pytest.raises(ValueError, match="portId must be non-negative"): + deserialize_global_port_identity( + "(logicalOpId=op,layerName=l,portId=-1,isInternal=false,isInput=true)" + ) + class TestGetFromActorIdForInputPortStorage: def test_prefixes_materialization_reader_to_uri_plus_actor_name(self): diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/serde/GlobalPortIdentitySerde.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/serde/GlobalPortIdentitySerde.scala index 48c087443c..c8fd8e1a36 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/serde/GlobalPortIdentitySerde.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/serde/GlobalPortIdentitySerde.scala @@ -23,17 +23,20 @@ import org.apache.texera.amber.core.workflow.{GlobalPortIdentity, PortIdentity} /** * Serialize and deserializes a GlobalPortIdentity object to a string using a custom, human-readable format - * to ensure it works with both URI and file path and does not incldue underscore "_" so that it does not - * interfere with our own VFS URI parsing. + * to ensure it works with both URI and file path and does not include underscore "_" so that it does not + * interfere with our own VFS URI parsing. Underscores in `logicalOpId` / `layerName` and negative `portId` + * values are rejected with `IllegalArgumentException`. */ object GlobalPortIdentitySerde { implicit class SerdeOps(globalPortId: GlobalPortIdentity) { /** * Serializes a GlobalPortIdentity object into a string using our custom, human-readable format - * that works with both URI and file path and does not incldue underscore "_" so that it does not + * that works with both URI and file path and does not include underscore "_" so that it does not * interfere with our own VFS URI parsing. * + * @throws java.lang.IllegalArgumentException if `logicalOpId` or `layerName` contains an underscore, + * or if `portId.id` is negative. * @return A serialized string representation of globalPortId */ def serializeAsString: String = { @@ -42,6 +45,15 @@ object GlobalPortIdentitySerde { val portId = globalPortId.portId.id val isInternal = globalPortId.portId.internal val isInput = globalPortId.input + require( + !logicalOpId.contains('_'), + s"logicalOpId must not contain '_' (VFS URI parsing relies on this): $logicalOpId" + ) + require( + !layerName.contains('_'), + s"layerName must not contain '_' (VFS URI parsing relies on this): $layerName" + ) + require(portId >= 0, s"portId must be non-negative: $portId") s"(logicalOpId=$logicalOpId,layerName=$layerName,portId=$portId,isInternal=$isInternal,isInput=$isInput)" } } @@ -58,6 +70,7 @@ object GlobalPortIdentitySerde { serializedGlobalPortId match { case pattern(logicalOpId, layerName, portIdStr, isInternalStr, isInputStr) => val portIdInt = portIdStr.toInt + require(portIdInt >= 0, s"portId must be non-negative: $portIdInt") val isInternal = isInternalStr.toBoolean val isInput = isInputStr.toBoolean val physicalOpId = PhysicalOpIdentity( diff --git a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala index 8fdf039f3e..eb259fed58 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala @@ -85,7 +85,7 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] with BeforeAndAfter GlobalPortIdentity( PhysicalOpIdentity( logicalOpId = - OperatorIdentity(s"test_table_${UUID.randomUUID().toString.replace("-", "")}"), + OperatorIdentity(s"test-table-${UUID.randomUUID().toString.replace("-", "")}"), layerName = "main" ), PortIdentity() diff --git a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergTableStatsSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergTableStatsSpec.scala index 175ebc2c01..b7cf776eb8 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergTableStatsSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergTableStatsSpec.scala @@ -56,7 +56,7 @@ class IcebergTableStatsSpec extends AnyFlatSpec with BeforeAndAfterAll with Suit GlobalPortIdentity( PhysicalOpIdentity( logicalOpId = - OperatorIdentity(s"test_table_${UUID.randomUUID().toString.replace("-", "")}"), + OperatorIdentity(s"test-table-${UUID.randomUUID().toString.replace("-", "")}"), layerName = "main" ), PortIdentity() diff --git a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/serde/PortIdentitySerdeSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/serde/PortIdentitySerdeSpec.scala index 3abc796242..89a815c39e 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/serde/PortIdentitySerdeSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/serde/PortIdentitySerdeSpec.scala @@ -103,12 +103,22 @@ class PortIdentitySerdeSpec extends AnyFlatSpec { assert(GlobalPortIdentitySerde.deserializeFromString(p.serializeAsString) == p) } - it should "round-trip a negative port id" in { - // PortIdentity.id is a plain Int; negatives are technically permitted - // by the type. Pin the round-trip so a future tightening of the - // numeric regex (e.g. to `\\d+`) breaks this on purpose. - val p = globalPort(portIdValue = -1) - assert(GlobalPortIdentitySerde.deserializeFromString(p.serializeAsString) == p) + it should "throw IllegalArgumentException when serializing a negative port id" in { + // Port ids are array indices and must be non-negative; the serializer + // rejects negatives so corrupt data can't reach VFS URIs. + intercept[IllegalArgumentException] { + globalPort(portIdValue = -1).serializeAsString + } + } + + it should "throw IllegalArgumentException when deserializing a negative port id" in { + // Symmetric: a hand-crafted string with a negative portId must be + // rejected by the deserializer too (so tampered URIs don't slip + // through). + val malformed = "(logicalOpId=op-A,layerName=main,portId=-1,isInternal=false,isInput=true)" + intercept[IllegalArgumentException] { + GlobalPortIdentitySerde.deserializeFromString(malformed) + } } it should "throw IllegalArgumentException when the input has the wrong field order" in { @@ -185,28 +195,22 @@ class PortIdentitySerdeSpec extends AnyFlatSpec { assert(!formatChars.contains("_"), s"format characters must be underscore-free: $formatChars") } - it should "eventually produce an underscore-free output even for inputs that contain underscores (pendingUntilFixed)" in pendingUntilFixed { - // Documented contract on `GlobalPortIdentitySerde`: "does not include - // underscore '_' so that it does not interfere with our own VFS URI - // parsing." The implementation does NOT enforce this — inputs are - // interpolated verbatim. Both fields can carry underscores in real - // production data: - // - `logicalOpId`: e.g. `__DummyOperator` from `VirtualIdentityUtils` - // - `layerName`: e.g. `${layerName}_source_${portId}_...` constructed - // by `SpecialPhysicalOpFactory` - // Cover BOTH so a partial fix that escapes only one of them flips - // pendingUntilFixed into a deliberate failure with the second - // assertion still red. - val withUnderscoreOpId = globalPort(logical = "__DummyOperator").serializeAsString - assert( - !withUnderscoreOpId.contains("_"), - s"serialized form must be underscore-free for op id: $withUnderscoreOpId" - ) - val withUnderscoreLayer = globalPort(layer = "main_source_0_op").serializeAsString - assert( - !withUnderscoreLayer.contains("_"), - s"serialized form must be underscore-free for layer name: $withUnderscoreLayer" - ) + it should "throw IllegalArgumentException when logicalOpId contains an underscore" in { + // Enforces the documented VFS-compatibility contract: the serialized + // form must be underscore-free. The serializer rejects underscored + // inputs at the boundary instead of silently emitting a string that + // would interfere with VFS URI parsing downstream. + intercept[IllegalArgumentException] { + globalPort(logical = "__DummyOperator").serializeAsString + } + } + + it should "throw IllegalArgumentException when layerName contains an underscore" in { + // Both fields enforce the same invariant; cover them independently so + // a partial fix that only validates one surfaces as a test failure. + intercept[IllegalArgumentException] { + globalPort(layer = "main_source_0_op").serializeAsString + } } // ---------------------------------------------------------------------------
