aglinxinyuan commented on code in PR #4954: URL: https://github.com/apache/texera/pull/4954#discussion_r3231695763
########## common/workflow-core/src/test/scala/org/apache/texera/amber/util/serde/PortIdentitySerdeSpec.scala: ########## @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.util.serde + +import org.apache.texera.amber.core.virtualidentity.{OperatorIdentity, PhysicalOpIdentity} +import org.apache.texera.amber.core.workflow.{GlobalPortIdentity, PortIdentity} +import org.apache.texera.amber.util.JSONUtils.objectMapper +import org.apache.texera.amber.util.serde.GlobalPortIdentitySerde.SerdeOps +import org.scalatest.flatspec.AnyFlatSpec + +class PortIdentitySerdeSpec extends AnyFlatSpec { + + // --------------------------------------------------------------------------- + // GlobalPortIdentitySerde + // --------------------------------------------------------------------------- + + private def globalPort( + logical: String = "op-A", + layer: String = "main", + portIdValue: Int = 0, + internal: Boolean = false, + input: Boolean = true + ): GlobalPortIdentity = + GlobalPortIdentity( + opId = PhysicalOpIdentity(OperatorIdentity(logical), layer), + portId = PortIdentity(id = portIdValue, internal = internal), + input = input + ) + + "GlobalPortIdentitySerde" should "round-trip a default GlobalPortIdentity through serializeAsString → deserializeFromString" in { + val original = globalPort() + val restored = GlobalPortIdentitySerde.deserializeFromString(original.serializeAsString) + assert(restored == original) + } + + it should "preserve all five fields independently across the round-trip" in { + // Vary each field individually so a regression that swapped two fields + // (e.g., isInput / isInternal) would surface here, not as a general + // round-trip failure. + val cases = Seq( + globalPort(logical = "op-A"), + globalPort(logical = "op-Z"), + globalPort(layer = "main"), + globalPort(layer = "extra-layer"), + globalPort(portIdValue = 0), + globalPort(portIdValue = 7), + globalPort(internal = false), + globalPort(internal = true), + globalPort(input = true), + globalPort(input = false) + ) + cases.foreach { p => + val s = p.serializeAsString + val restored = GlobalPortIdentitySerde.deserializeFromString(s) + assert(restored == p, s"round-trip mismatch for $p (serialized: $s)") + } + } + + it should "produce the documented format for default and non-default values" in { + // Pin the exact format. If this changes, callers reading existing + // VFS URIs from disk will break — locking it down forces a deliberate + // migration story. + assert( + globalPort().serializeAsString == + "(logicalOpId=op-A,layerName=main,portId=0,isInternal=false,isInput=true)" + ) + assert( + globalPort( + logical = "op-Z", + layer = "extra-layer", + portIdValue = 7, + internal = true, + input = false + ).serializeAsString == + "(logicalOpId=op-Z,layerName=extra-layer,portId=7,isInternal=true,isInput=false)" + ) + } + + it should "round-trip identifiers containing dashes and dots (regex non-comma matcher)" in { + // The deserialization regex uses `[^,]+` for the field body, so any + // non-comma character is fair game. Cover the realistic counter- + // examples (dashes, dots) since logical op ids and layer names use + // both; if the regex were ever tightened to alphanumerics only, this + // would fail on purpose. + val p = globalPort(logical = "my.op-with-dashes.v2", layer = "main-1") + assert(GlobalPortIdentitySerde.deserializeFromString(p.serializeAsString) == p) + } + + it should "round-trip a negative port id" in { Review Comment: Done in f5bd56db87. Tightened both directions: `GlobalPortIdentitySerde.serializeAsString` now `require(portId >= 0)` and `deserializeFromString` re-asserts the same after `.toInt`, so a hand-crafted URI with a negative portId can't slip through either. Mirrored in the Python helper (raises ValueError). Tests in `PortIdentitySerdeSpec` now assert IAE on both paths; the previous round-trip-negative test is replaced. ########## common/workflow-core/src/test/scala/org/apache/texera/amber/util/serde/PortIdentitySerdeSpec.scala: ########## @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.util.serde + +import org.apache.texera.amber.core.virtualidentity.{OperatorIdentity, PhysicalOpIdentity} +import org.apache.texera.amber.core.workflow.{GlobalPortIdentity, PortIdentity} +import org.apache.texera.amber.util.JSONUtils.objectMapper +import org.apache.texera.amber.util.serde.GlobalPortIdentitySerde.SerdeOps +import org.scalatest.flatspec.AnyFlatSpec + +class PortIdentitySerdeSpec extends AnyFlatSpec { + + // --------------------------------------------------------------------------- + // GlobalPortIdentitySerde + // --------------------------------------------------------------------------- + + private def globalPort( + logical: String = "op-A", + layer: String = "main", + portIdValue: Int = 0, + internal: Boolean = false, + input: Boolean = true + ): GlobalPortIdentity = + GlobalPortIdentity( + opId = PhysicalOpIdentity(OperatorIdentity(logical), layer), + portId = PortIdentity(id = portIdValue, internal = internal), + input = input + ) + + "GlobalPortIdentitySerde" should "round-trip a default GlobalPortIdentity through serializeAsString → deserializeFromString" in { + val original = globalPort() + val restored = GlobalPortIdentitySerde.deserializeFromString(original.serializeAsString) + assert(restored == original) + } + + it should "preserve all five fields independently across the round-trip" in { + // Vary each field individually so a regression that swapped two fields + // (e.g., isInput / isInternal) would surface here, not as a general + // round-trip failure. + val cases = Seq( + globalPort(logical = "op-A"), + globalPort(logical = "op-Z"), + globalPort(layer = "main"), + globalPort(layer = "extra-layer"), + globalPort(portIdValue = 0), + globalPort(portIdValue = 7), + globalPort(internal = false), + globalPort(internal = true), + globalPort(input = true), + globalPort(input = false) + ) + cases.foreach { p => + val s = p.serializeAsString + val restored = GlobalPortIdentitySerde.deserializeFromString(s) + assert(restored == p, s"round-trip mismatch for $p (serialized: $s)") + } + } + + it should "produce the documented format for default and non-default values" in { + // Pin the exact format. If this changes, callers reading existing + // VFS URIs from disk will break — locking it down forces a deliberate + // migration story. + assert( + globalPort().serializeAsString == + "(logicalOpId=op-A,layerName=main,portId=0,isInternal=false,isInput=true)" + ) + assert( + globalPort( + logical = "op-Z", + layer = "extra-layer", + portIdValue = 7, + internal = true, + input = false + ).serializeAsString == + "(logicalOpId=op-Z,layerName=extra-layer,portId=7,isInternal=true,isInput=false)" + ) + } + + it should "round-trip identifiers containing dashes and dots (regex non-comma matcher)" in { + // The deserialization regex uses `[^,]+` for the field body, so any + // non-comma character is fair game. Cover the realistic counter- + // examples (dashes, dots) since logical op ids and layer names use + // both; if the regex were ever tightened to alphanumerics only, this + // would fail on purpose. + val p = globalPort(logical = "my.op-with-dashes.v2", layer = "main-1") + assert(GlobalPortIdentitySerde.deserializeFromString(p.serializeAsString) == p) + } + + it should "round-trip a negative port id" in { + // PortIdentity.id is a plain Int; negatives are technically permitted + // by the type. Pin the round-trip so a future tightening of the + // numeric regex (e.g. to `\\d+`) breaks this on purpose. + val p = globalPort(portIdValue = -1) + assert(GlobalPortIdentitySerde.deserializeFromString(p.serializeAsString) == p) + } + + it should "throw IllegalArgumentException when the input has the wrong field order" in { + // The regex pins the documented field order; a swapped order should + // not silently parse with confused values. + val swapped = "(layerName=main,logicalOpId=op-A,portId=0,isInternal=false,isInput=true)" + intercept[IllegalArgumentException] { + GlobalPortIdentitySerde.deserializeFromString(swapped) + } + } + + it should "throw IllegalArgumentException when the input has trailing content past the closing paren" in { + val withTrailing = + "(logicalOpId=op-A,layerName=main,portId=0,isInternal=false,isInput=true) extra" + intercept[IllegalArgumentException] { + GlobalPortIdentitySerde.deserializeFromString(withTrailing) + } + } + + it should "throw IllegalArgumentException when a field body is empty" in { + // `[^,]+` requires at least one character, so an empty layerName + // (`,layerName=,`) must fail to match. + val emptyLayer = "(logicalOpId=op-A,layerName=,portId=0,isInternal=false,isInput=true)" + intercept[IllegalArgumentException] { + GlobalPortIdentitySerde.deserializeFromString(emptyLayer) + } + } + + it should "throw IllegalArgumentException on a completely malformed string" in { + val ex = intercept[IllegalArgumentException] { + GlobalPortIdentitySerde.deserializeFromString("not even close") + } + assert(ex.getMessage.contains("not even close")) + } + + it should "throw IllegalArgumentException when a required field is missing" in { + // Drop isInput. + val malformed = "(logicalOpId=op-A,layerName=main,portId=0,isInternal=false)" + intercept[IllegalArgumentException] { + GlobalPortIdentitySerde.deserializeFromString(malformed) + } + } + + it should "throw NumberFormatException when portId is non-numeric" in { + // The regex matches (`[^,]+`) but `.toInt` fails. NumberFormatException + // extends IllegalArgumentException; assert the more specific type so a + // regression that swallowed/rewrapped it is visible. + val malformed = "(logicalOpId=op-A,layerName=main,portId=NaN,isInternal=false,isInput=true)" + intercept[NumberFormatException] { + GlobalPortIdentitySerde.deserializeFromString(malformed) + } + } + + it should "throw IllegalArgumentException when a boolean field is non-boolean" in { + // `String.toBoolean` is strict: only \"true\" / \"false\" (case-insensitive) + // pass; anything else throws IllegalArgumentException. + val malformed = "(logicalOpId=op-A,layerName=main,portId=0,isInternal=maybe,isInput=true)" + intercept[IllegalArgumentException] { + GlobalPortIdentitySerde.deserializeFromString(malformed) + } + } + + it should "use no underscore in its own format characters (separators / keys)" in { + // Pin the format-character invariant: the wrapping `(...)`, the field + // separators `,`, the key=value separators, and the field NAMES + // themselves contain no underscore. Verify by building the format with + // empty-string-replacement values for every input field, so anything + // left in the output is purely from `serializeAsString`'s own format. + // (For the layerName field the empty-input variant is rejected by the + // deserializer regex; here we only check the SERIALIZED output, not the + // round-trip.) + val s = globalPort(logical = "x", layer = "x").serializeAsString + val formatChars = s.replace("x", "").replace("0", "").replace("false", "").replace("true", "") + assert(!formatChars.contains("_"), s"format characters must be underscore-free: $formatChars") + } + + it should "eventually produce an underscore-free output even for inputs that contain underscores (pendingUntilFixed)" in pendingUntilFixed { + // Documented contract on `GlobalPortIdentitySerde`: "does not include + // underscore '_' so that it does not interfere with our own VFS URI + // parsing." The implementation does NOT enforce this — inputs are + // interpolated verbatim. Both fields can carry underscores in real + // production data: Review Comment: Done in f5bd56db87. `GlobalPortIdentitySerde.serializeAsString` now `require`s that `logicalOpId` and `layerName` contain no `_`, so the documented VFS-compatibility invariant is enforced at the serializer boundary. Mirrored in the Python helper. Test changes: the previous `pendingUntilFixed` test is replaced with two regular tests asserting IAE for underscored `logicalOpId` and `layerName` independently. Adjacent test fixtures that used `test_table_<uuid>` op-ids (`IcebergTableStatsSpec`, `IcebergDocumentSpec`, `test_iceberg_document.py`) are updated to dashes. `IcebergDocumentConsoleMessagesSpec` is left alone — `createConsoleMessagesURI` does not go through `serializeAsString`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
