This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 4862b924d8 feat(amber): reject self-loops and empty/null op-ids in
LogicalLink (with test coverage) (#4956)
4862b924d8 is described below
commit 4862b924d8f23eb6e4699e2f10aac4f37536bc26
Author: Xinyuan Lin <[email protected]>
AuthorDate: Tue May 12 22:29:12 2026 -0700
feat(amber): reject self-loops and empty/null op-ids in LogicalLink (with
test coverage) (#4956)
### What changes were proposed in this PR?
Originally a test-only PR pinning `LogicalLink` behavior. Per review
feedback from @Yicong-Huang, this PR now also tightens validation in
`amber/src/main/scala/org/apache/texera/workflow/LogicalLink.scala`:
| Surface | Behavior |
| --- | --- |
| Primary `LogicalLink` constructor | Rejects `null` `OperatorIdentity`,
`OperatorIdentity(null)`, `OperatorIdentity("")`, and self-loops
(`fromOpId == toOpId`). Throws `IllegalArgumentException`. |
| Secondary `@JsonCreator` `(String, …, String, …)` overload | Same
three rejections via the primary constructor's `require`. |
| Jackson deserialization with missing `fromOpId` / `toOpId` | Wraps the
`IllegalArgumentException` in `ValueInstantiationException` — no longer
silently produces `OperatorIdentity(null)`. |
### Test surface pinned in `LogicalLinkSpec`
| Surface | Pinned |
| --- | --- |
| Primary constructor | Four fields exposed as constructed. |
| Secondary `@JsonCreator` constructor | Wraps raw `String` op-ids in
`OperatorIdentity`; equal to a primary-constructor link with the same
content. |
| Case-class `equals` / `hashCode` | Structural equality across the four
fields, plus distinguishing field-by-field (`fromOpId` mismatch,
`toPortId.internal` mismatch). |
| Identifier acceptance | Dashes, dots, digits accepted without
normalization. |
| **Self-loop rejection** | `fromOpId == toOpId` is now rejected from
both construction paths (was previously characterized as "structurally
allowed"). |
| **Empty / null op-id rejection** | Empty string and null are now
rejected from both paths (primary and `@JsonCreator`). |
| **Missing-field rejection on Jackson read** | A JSON object with
missing `fromOpId` / `toOpId` now throws `ValueInstantiationException`
whose cause is `IllegalArgumentException` (was previously characterized
as "silently defaults to `OperatorIdentity(null)`"). |
| Jackson deserialization (`JSONUtils.objectMapper`) | `treeToValue`
from a JSON object with raw-string op-ids dispatches to the
`@JsonCreator` string overload and produces the expected fields. |
| `@JsonProperty` JSON field names | `fromOpId` / `toOpId` pinned by
`@JsonProperty`; `fromPortId` / `toPortId` derive from Scala parameter
names (no annotation). Both pinned so a Scala-side rename can't silently
break saved workflows. |
| `writeValueAsString` ↔ `readValue` asymmetry | Now linked to follow-up
issue #5042 (out of scope for this PR — a real fix requires either an
additional `@JsonCreator` object overload or a custom
`@JsonDeserialize`). When that fix lands, the existing characterization
test flips to assert a passing round-trip alongside the fix. |
### Any related issues, documentation, discussions?
Closes #4955.
Spawns #5042 for the harder follow-up (asymmetric serde).
### How was this PR tested?
```
sbt "WorkflowExecutionService/Test/testOnly
org.apache.texera.workflow.LogicalLinkSpec"
# → 18 tests, all pass
sbt "WorkflowExecutionService/Test/testOnly
org.apache.texera.workflow.WorkflowCompilerSpec"
# → 6 tests, all pass (regression check for callers constructing
LogicalLink directly)
sbt "WorkflowExecutionService/Test/testOnly
org.apache.texera.amber.engine.architecture.scheduling.CostBasedScheduleGeneratorSpec"
# → 7 tests, all pass (regression check; this spec constructs many
LogicalLinks)
sbt "WorkflowExecutionService/Test/scalafmtCheck"
"WorkflowExecutionService/scalafmtCheck"
# → clean
```
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Claude Opus 4.7)
---
.../org/apache/texera/workflow/LogicalLink.scala | 13 +
.../apache/texera/workflow/LogicalLinkSpec.scala | 291 +++++++++++++++++++++
2 files changed, 304 insertions(+)
diff --git a/amber/src/main/scala/org/apache/texera/workflow/LogicalLink.scala
b/amber/src/main/scala/org/apache/texera/workflow/LogicalLink.scala
index 5bbc9164b7..e6553e3cdf 100644
--- a/amber/src/main/scala/org/apache/texera/workflow/LogicalLink.scala
+++ b/amber/src/main/scala/org/apache/texera/workflow/LogicalLink.scala
@@ -29,6 +29,19 @@ case class LogicalLink(
@JsonProperty("toOpId") toOpId: OperatorIdentity,
toPortId: PortIdentity
) {
+ require(
+ fromOpId != null && fromOpId.id != null && fromOpId.id.nonEmpty,
+ "LogicalLink fromOpId must be non-null and non-empty"
+ )
+ require(
+ toOpId != null && toOpId.id != null && toOpId.id.nonEmpty,
+ "LogicalLink toOpId must be non-null and non-empty"
+ )
+ require(
+ fromOpId != toOpId,
+ s"LogicalLink self-loop not allowed: fromOpId == toOpId == ${fromOpId.id}"
+ )
+
@JsonCreator
def this(
@JsonProperty("fromOpId") fromOpId: String,
diff --git
a/amber/src/test/scala/org/apache/texera/workflow/LogicalLinkSpec.scala
b/amber/src/test/scala/org/apache/texera/workflow/LogicalLinkSpec.scala
new file mode 100644
index 0000000000..bd56aa7d5f
--- /dev/null
+++ b/amber/src/test/scala/org/apache/texera/workflow/LogicalLinkSpec.scala
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.workflow
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.exc.{MismatchedInputException,
ValueInstantiationException}
+import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
+import org.apache.texera.amber.core.workflow.PortIdentity
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.scalatest.flatspec.AnyFlatSpec
+
+class LogicalLinkSpec extends AnyFlatSpec {
+
+ //
---------------------------------------------------------------------------
+ // Primary constructor + case-class semantics
+ //
---------------------------------------------------------------------------
+
+ "LogicalLink primary constructor" should "expose the four fields it was
constructed with" in {
+ val link = LogicalLink(
+ fromOpId = OperatorIdentity("op-A"),
+ fromPortId = PortIdentity(0),
+ toOpId = OperatorIdentity("op-B"),
+ toPortId = PortIdentity(1, internal = true)
+ )
+ assert(link.fromOpId == OperatorIdentity("op-A"))
+ assert(link.fromPortId == PortIdentity(0))
+ assert(link.toOpId == OperatorIdentity("op-B"))
+ assert(link.toPortId == PortIdentity(1, internal = true))
+ }
+
+ "LogicalLink case-class equality" should "use structural equality across all
four fields" in {
+ val a =
+ LogicalLink(OperatorIdentity("x"), PortIdentity(0),
OperatorIdentity("y"), PortIdentity(1))
+ val b =
+ LogicalLink(OperatorIdentity("x"), PortIdentity(0),
OperatorIdentity("y"), PortIdentity(1))
+ assert(a == b)
+ assert(a.hashCode == b.hashCode)
+ }
+
+ it should "distinguish links that differ only in fromOpId" in {
+ val a =
+ LogicalLink(OperatorIdentity("x"), PortIdentity(0),
OperatorIdentity("y"), PortIdentity(1))
+ val b =
+ LogicalLink(OperatorIdentity("z"), PortIdentity(0),
OperatorIdentity("y"), PortIdentity(1))
+ assert(a != b)
+ }
+
+ it should "distinguish links that differ only in toPortId.internal" in {
+ val a = LogicalLink(
+ OperatorIdentity("x"),
+ PortIdentity(0),
+ OperatorIdentity("y"),
+ PortIdentity(1, internal = false)
+ )
+ val b = LogicalLink(
+ OperatorIdentity("x"),
+ PortIdentity(0),
+ OperatorIdentity("y"),
+ PortIdentity(1, internal = true)
+ )
+ assert(a != b)
+ }
+
+ it should "reject a self-loop link (fromOpId == toOpId) regardless of port"
in {
+ // The constructor rejects fromOpId == toOpId — a workflow edge whose
+ // source and sink are the same operator can never be schedulable, so
+ // we fail fast here rather than letting it travel through the planner.
+ val ex = intercept[IllegalArgumentException] {
+ LogicalLink(
+ OperatorIdentity("op-A"),
+ PortIdentity(0),
+ OperatorIdentity("op-A"),
+ PortIdentity(1)
+ )
+ }
+ assert(ex.getMessage.contains("self-loop"))
+ }
+
+ it should "reject a null fromOpId / toOpId in the primary constructor" in {
+ intercept[IllegalArgumentException] {
+ LogicalLink(null, PortIdentity(0), OperatorIdentity("op-B"),
PortIdentity(1))
+ }
+ intercept[IllegalArgumentException] {
+ LogicalLink(OperatorIdentity("op-A"), PortIdentity(0), null,
PortIdentity(1))
+ }
+ }
+
+ it should "reject an OperatorIdentity wrapping a null id in the primary
constructor" in {
+ intercept[IllegalArgumentException] {
+ LogicalLink(
+ OperatorIdentity(null),
+ PortIdentity(0),
+ OperatorIdentity("op-B"),
+ PortIdentity(1)
+ )
+ }
+ intercept[IllegalArgumentException] {
+ LogicalLink(
+ OperatorIdentity("op-A"),
+ PortIdentity(0),
+ OperatorIdentity(null),
+ PortIdentity(1)
+ )
+ }
+ }
+
+ it should "reject an OperatorIdentity wrapping an empty id in the primary
constructor" in {
+ intercept[IllegalArgumentException] {
+ LogicalLink(OperatorIdentity(""), PortIdentity(0),
OperatorIdentity("op-B"), PortIdentity(1))
+ }
+ intercept[IllegalArgumentException] {
+ LogicalLink(OperatorIdentity("op-A"), PortIdentity(0),
OperatorIdentity(""), PortIdentity(1))
+ }
+ }
+
+ //
---------------------------------------------------------------------------
+ // Secondary @JsonCreator constructor (string opId variant)
+ //
---------------------------------------------------------------------------
+
+ "LogicalLink secondary @JsonCreator constructor" should "wrap raw String op
ids in OperatorIdentity" in {
+ val link = new LogicalLink(
+ fromOpId = "op-A",
+ fromPortId = PortIdentity(0),
+ toOpId = "op-B",
+ toPortId = PortIdentity(1)
+ )
+ assert(link.fromOpId == OperatorIdentity("op-A"))
+ assert(link.toOpId == OperatorIdentity("op-B"))
+ // Equal to a link built via the primary constructor.
+ assert(
+ link == LogicalLink(
+ OperatorIdentity("op-A"),
+ PortIdentity(0),
+ OperatorIdentity("op-B"),
+ PortIdentity(1)
+ )
+ )
+ }
+
+ it should "accept identifiers containing dashes / dots / digits (no
normalization)" in {
+ val link = new LogicalLink("my.op-1", PortIdentity(0), "my.op-2",
PortIdentity(1))
+ assert(link.fromOpId == OperatorIdentity("my.op-1"))
+ assert(link.toOpId == OperatorIdentity("my.op-2"))
+ }
+
+ it should "reject the empty string as an op id via the @JsonCreator
constructor" in {
+ intercept[IllegalArgumentException] {
+ new LogicalLink("", PortIdentity(0), "op-B", PortIdentity(1))
+ }
+ intercept[IllegalArgumentException] {
+ new LogicalLink("op-A", PortIdentity(0), "", PortIdentity(1))
+ }
+ }
+
+ it should "reject a null string op id via the @JsonCreator constructor" in {
+ intercept[IllegalArgumentException] {
+ new LogicalLink(null: String, PortIdentity(0), "op-B", PortIdentity(1))
+ }
+ intercept[IllegalArgumentException] {
+ new LogicalLink("op-A", PortIdentity(0), null: String, PortIdentity(1))
+ }
+ }
+
+ it should "reject a self-loop via the @JsonCreator constructor (same string
op id)" in {
+ val ex = intercept[IllegalArgumentException] {
+ new LogicalLink("op-A", PortIdentity(0), "op-A", PortIdentity(1))
+ }
+ assert(ex.getMessage.contains("self-loop"))
+ }
+
+ //
---------------------------------------------------------------------------
+ // Jackson round-trip (production objectMapper)
+ //
---------------------------------------------------------------------------
+ //
+ // These tests use the same `JSONUtils.objectMapper` that production uses
+ // to read user-saved workflow JSON, so a regression in the Jackson
+ // wiring (annotations, default-Scala-module config) surfaces here.
+
+ "LogicalLink Jackson deserialization" should
+ "deserialize fromOpId / toOpId from raw String values via the secondary
@JsonCreator constructor" in {
+ // Build the JSON by hand to mimic a user-saved workflow file where
+ // `fromOpId` and `toOpId` are written as plain strings (the only shape
+ // production actually receives, since the frontend emits them as
+ // strings). Jackson dispatches to the @JsonCreator string-overload
+ // constructor.
+ val node = objectMapper.createObjectNode()
+ node.put("fromOpId", "op-A")
+ node.set("fromPortId", objectMapper.valueToTree[JsonNode](PortIdentity(0)))
+ node.put("toOpId", "op-B")
+ node.set("toPortId", objectMapper.valueToTree[JsonNode](PortIdentity(1)))
+ val link = objectMapper.treeToValue(node, classOf[LogicalLink])
+ assert(link.fromOpId == OperatorIdentity("op-A"))
+ assert(link.toOpId == OperatorIdentity("op-B"))
+ assert(link.fromPortId == PortIdentity(0))
+ assert(link.toPortId == PortIdentity(1))
+ }
+
+ it should "emit `fromOpId` / `toOpId` JSON keys pinned by @JsonProperty
annotations" in {
+ // Only `fromOpId` / `toOpId` carry `@JsonProperty` in `LogicalLink`;
+ // a Scala-side rename of either parameter would still keep the
+ // JSON key stable, which is the saved-workflow contract these
+ // annotations pin.
+ val link = LogicalLink(
+ OperatorIdentity("op-A"),
+ PortIdentity(0),
+ OperatorIdentity("op-B"),
+ PortIdentity(1)
+ )
+ val tree = objectMapper.valueToTree[JsonNode](link)
+ assert(tree.has("fromOpId"))
+ assert(tree.has("toOpId"))
+ }
+
+ it should "emit `fromPortId` / `toPortId` JSON keys derived from Scala
parameter names (no @JsonProperty)" in {
+ // Pin: the port-id JSON keys come from Scala parameter names since
+ // there is no `@JsonProperty` annotation on those fields. A
+ // parameter rename WOULD silently break saved-workflow compatibility
+ // for these keys — pin so a future rename without an accompanying
+ // `@JsonProperty` annotation breaks this on purpose.
+ val link = LogicalLink(
+ OperatorIdentity("op-A"),
+ PortIdentity(0),
+ OperatorIdentity("op-B"),
+ PortIdentity(1)
+ )
+ val tree = objectMapper.valueToTree[JsonNode](link)
+ assert(tree.has("fromPortId"))
+ assert(tree.has("toPortId"))
+ }
+
+ it should "NOT round-trip through writeValueAsString (the @JsonCreator
string overload is incompatible with the object-shape OperatorIdentity that
writeValueAsString emits)" in {
+ // Characterization of a real asymmetry tracked by
+ // https://github.com/apache/texera/issues/5042. Production reads
+ // user-saved workflow JSON where `fromOpId`/`toOpId` are plain
+ // strings, but `objectMapper.writeValueAsString` writes
+ // OperatorIdentity as `{"id":"op-A"}` (the case-class object form).
+ // Re-reading the emitted JSON fails because Jackson dispatches on the
+ // @JsonCreator string overload, which can't accept an object for
+ // fromOpId. When the issue is fixed (additional @JsonCreator object
+ // overload or a custom @JsonDeserialize), this test must flip to a
+ // passing round-trip assertion alongside the fix.
+ val original = LogicalLink(
+ OperatorIdentity("op-A"),
+ PortIdentity(0),
+ OperatorIdentity("op-B"),
+ PortIdentity(1)
+ )
+ val json = objectMapper.writeValueAsString(original)
+ // Parse the emitted JSON and confirm the structural shape — fromOpId
+ // is an object with an `id` field of "op-A". Avoids depending on
+ // exact key ordering or escaping.
+ val tree = objectMapper.readTree(json)
+ assert(tree.path("fromOpId").isObject, s"expected fromOpId to be an
object: $json")
+ assert(tree.path("fromOpId").path("id").asText() == "op-A")
+ // Re-reading the just-emitted JSON fails because the @JsonCreator
+ // String overload can't accept the object-shape fromOpId.
+ intercept[MismatchedInputException] {
+ objectMapper.readValue(json, classOf[LogicalLink])
+ }
+ }
+
+ it should "reject missing string op-id fields when deserializing via
Jackson" in {
+ // When `fromOpId` / `toOpId` are omitted, Jackson invokes the
+ // @JsonCreator with `null` for the missing String args. The primary
+ // constructor's `require` on non-null/non-empty ids then throws, and
+ // Jackson wraps it in `ValueInstantiationException` with the original
+ // `IllegalArgumentException` as the cause.
+ val empty = objectMapper.createObjectNode()
+ val ex = intercept[ValueInstantiationException] {
+ objectMapper.treeToValue(empty, classOf[LogicalLink])
+ }
+ assert(ex.getCause.isInstanceOf[IllegalArgumentException])
+ }
+}