This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-5738-8a90f1f667c44bc26c0faf9eee619392e3f57ddf in repository https://gitbox.apache.org/repos/asf/texera.git
commit 0efbc0f59cad0a660912fab63de04a4860d8b42c Author: Xinyuan Lin <[email protected]> AuthorDate: Wed Jun 17 15:07:43 2026 -0700 test(workflow-operator): add unit test coverage for SET-family LogicalOp descriptors (#5738) ### What changes were proposed in this PR? Pin behavior of three previously-uncovered `LogicalOp` descriptors in the SET / cleaning operator family. Each descriptor wires a physical-op class name + port shape + (where applicable) partitioning + schema-propagation contract through `getPhysicalOp`. No production-code changes. | Spec | Source class | Tests | | --- | --- | --- | | `UnionOpDescSpec` | `UnionOpDesc` | 5 | | `DistinctOpDescSpec` | `DistinctOpDesc` | 7 | | `DifferenceOpDescSpec` | `DifferenceOpDesc` | 9 | All three spec files follow the `<srcClassName>Spec.scala` one-to-one convention. `IntersectOpDescSpec` already exists and gave us the spec-shape template. **Behavior pinned — `UnionOpDesc`** | Surface | Contract | | --- | --- | | `operatorInfo` | name `"Union"`, group `SET_GROUP`, description mentions "Union" | | Ports | one input, one non-blocking output | | `getPhysicalOp` | wires `OpExecWithClassName("…operator.union.UnionOpExec")` | | Partition requirement | empty (no hash-alignment forced; unlike Distinct / Difference / Intersect, Union preserves whatever the upstream produced) | | Independent instances | no static state shared across `new UnionOpDesc` | **Behavior pinned — `DistinctOpDesc`** | Surface | Contract | | --- | --- | | `operatorInfo` | name `"Distinct"`, group `CLEANING_GROUP`, description mentions "duplicate" | | Ports | one input, one **blocking** output | | `getPhysicalOp` | wires `OpExecWithClassName("…operator.distinct.DistinctOpExec")`; `partitionRequirement` is `List(Option(HashPartition()))`; `derivePartition` always returns `HashPartition` regardless of input partition kind | **Behavior pinned — `DifferenceOpDesc`** | Surface | Contract | | --- | --- | | `operatorInfo` | name `"Difference"`, group `SET_GROUP`, description mentions "difference"; two input ports with `displayName` `"left"` (PortIdentity 0) and `"right"` (PortIdentity 1); one **blocking** output | | `getPhysicalOp` | wires `OpExecWithClassName("…operator.difference.DifferenceOpExec")`; `partitionRequirement` is `List(Option(HashPartition()), Option(HashPartition()))` (both inputs); `derivePartition` always returns `HashPartition` | | Schema propagation | accepts a single shared input schema and produces that schema on every output port; throws `IllegalArgumentException` when the two inputs do not share one schema | ### Any related issues, documentation, discussions? Closes #5734. ### How was this PR tested? Pure unit-test additions; verified locally with: - `sbt "WorkflowOperator/testOnly org.apache.texera.amber.operator.union.UnionOpDescSpec org.apache.texera.amber.operator.distinct.DistinctOpDescSpec org.apache.texera.amber.operator.difference.DifferenceOpDescSpec"` — 21 tests, all green - `sbt scalafmtCheckAll` — clean - CI to confirm ### Was this PR authored or co-authored using generative AI tooling? Generated-by: Claude Code (Opus 4.7 [1M context]) --- .../operator/difference/DifferenceOpDescSpec.scala | 143 +++++++++++++++++++++ .../operator/distinct/DistinctOpDescSpec.scala | 109 ++++++++++++++++ .../amber/operator/union/UnionOpDescSpec.scala | 104 +++++++++++++++ 3 files changed, 356 insertions(+) diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/difference/DifferenceOpDescSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/difference/DifferenceOpDescSpec.scala new file mode 100644 index 0000000000..f6d17e94eb --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/difference/DifferenceOpDescSpec.scala @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.difference + +import org.apache.texera.amber.core.executor.OpExecWithClassName +import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema} +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.core.workflow.{ + HashPartition, + PortIdentity, + SinglePartition, + UnknownPartition +} +import org.apache.texera.amber.operator.metadata.OperatorGroupConstants +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class DifferenceOpDescSpec extends AnyFlatSpec with Matchers { + + private val workflowId = WorkflowIdentity(1L) + private val executionId = ExecutionIdentity(1L) + + private val schemaA: Schema = + Schema().add(new Attribute("col", AttributeType.STRING)) + private val schemaB: Schema = + Schema().add(new Attribute("col", AttributeType.STRING)) + private val schemaDifferent: Schema = + Schema().add(new Attribute("other", AttributeType.INTEGER)) + + // --------------------------------------------------------------------------- + // operatorInfo + // --------------------------------------------------------------------------- + + "DifferenceOpDesc.operatorInfo" should "advertise the Set group + difference description" in { + val info = (new DifferenceOpDesc).operatorInfo + info.userFriendlyName shouldBe "Difference" + info.operatorGroupName shouldBe OperatorGroupConstants.SET_GROUP + info.operatorDescription.toLowerCase should include("difference") + } + + it should + "expose two input ports (left at PortIdentity 0, right at PortIdentity 1) and one blocking output" in { + val info = (new DifferenceOpDesc).operatorInfo + info.inputPorts should have length 2 + info.inputPorts.map(_.id) shouldBe List(PortIdentity(), PortIdentity(1)) + info.inputPorts.map(_.displayName) shouldBe List("left", "right") + info.outputPorts should have length 1 + info.outputPorts.head.blocking shouldBe true + } + + // --------------------------------------------------------------------------- + // getPhysicalOp — wiring + partitioning + schema propagation + // --------------------------------------------------------------------------- + + "DifferenceOpDesc.getPhysicalOp" should + "wire the DifferenceOpExec class name into the OpExecInitInfo" in { + val physical = (new DifferenceOpDesc).getPhysicalOp(workflowId, executionId) + physical.opExecInitInfo match { + case OpExecWithClassName(className, _) => + className shouldBe "org.apache.texera.amber.operator.difference.DifferenceOpExec" + case other => + fail(s"expected OpExecWithClassName, got $other") + } + } + + it should "require HashPartition on BOTH input ports" in { + // Set-difference semantics require both inputs to be hash-aligned so + // matching keys can be compared on the same worker. + val physical = (new DifferenceOpDesc).getPhysicalOp(workflowId, executionId) + physical.partitionRequirement shouldBe List( + Option(HashPartition()), + Option(HashPartition()) + ) + } + + it should "derive HashPartition for the output regardless of input partition kinds" in { + val physical = (new DifferenceOpDesc).getPhysicalOp(workflowId, executionId) + physical.derivePartition(List(SinglePartition(), UnknownPartition())) shouldBe HashPartition() + physical.derivePartition( + List(HashPartition(List("a")), HashPartition(List("b"))) + ) shouldBe HashPartition() + } + + // --------------------------------------------------------------------------- + // Schema propagation + // --------------------------------------------------------------------------- + + "DifferenceOpDesc schema propagation" should + "produce a single output schema equal to the (shared) input schema" in { + // When both inputs report the same schema, propagation succeeds and + // every output port receives that schema. + val op = new DifferenceOpDesc + val physical = op.getPhysicalOp(workflowId, executionId) + val propagateFn = physical.propagateSchema + val inputs = Map(PortIdentity() -> schemaA, PortIdentity(1) -> schemaB) + val outputs = propagateFn.func(inputs) + outputs.keySet shouldBe op.operatorInfo.outputPorts.map(_.id).toSet + outputs.values.toSet shouldBe Set(schemaA) + } + + it should + "throw IllegalArgumentException when the two inputs do not share one schema" in { + val physical = (new DifferenceOpDesc).getPhysicalOp(workflowId, executionId) + val propagateFn = physical.propagateSchema + val mismatched = + Map(PortIdentity() -> schemaA, PortIdentity(1) -> schemaDifferent) + intercept[IllegalArgumentException] { + propagateFn.func(mismatched) + } + } + + // --------------------------------------------------------------------------- + // Independent instances + // --------------------------------------------------------------------------- + + "DifferenceOpDesc" should + "assign a fresh operatorIdentifier per instance (UUID-based id is not shared)" in { + // `LogicalOp` initializes `operatorId` from `UUID.randomUUID()` in + // its constructor body, so two `new DifferenceOpDesc` allocations + // must hold different identifiers. A regression to a static / + // shared id would surface here as the two ids being equal. + val a = new DifferenceOpDesc + val b = new DifferenceOpDesc + a.operatorIdentifier should not equal b.operatorIdentifier + } +} diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/distinct/DistinctOpDescSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/distinct/DistinctOpDescSpec.scala new file mode 100644 index 0000000000..2aba788acf --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/distinct/DistinctOpDescSpec.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.distinct + +import org.apache.texera.amber.core.executor.OpExecWithClassName +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.core.workflow.{HashPartition, SinglePartition, UnknownPartition} +import org.apache.texera.amber.operator.metadata.OperatorGroupConstants +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class DistinctOpDescSpec extends AnyFlatSpec with Matchers { + + private val workflowId = WorkflowIdentity(1L) + private val executionId = ExecutionIdentity(1L) + + // --------------------------------------------------------------------------- + // operatorInfo — descriptor metadata + // --------------------------------------------------------------------------- + + "DistinctOpDesc.operatorInfo" should + "advertise the user-friendly name and Cleaning group" in { + val info = (new DistinctOpDesc).operatorInfo + info.userFriendlyName shouldBe "Distinct" + info.operatorGroupName shouldBe OperatorGroupConstants.CLEANING_GROUP + info.operatorDescription.toLowerCase should include("duplicate") + } + + it should "expose one input port and one blocking output port" in { + val info = (new DistinctOpDesc).operatorInfo + info.inputPorts should have length 1 + info.outputPorts should have length 1 + info.outputPorts.head.blocking shouldBe true + } + + // --------------------------------------------------------------------------- + // getPhysicalOp — wiring to DistinctOpExec + partitioning contract + // --------------------------------------------------------------------------- + + "DistinctOpDesc.getPhysicalOp" should + "wire the DistinctOpExec class name into the OpExecInitInfo" in { + val op = new DistinctOpDesc + val physical = op.getPhysicalOp(workflowId, executionId) + physical.opExecInitInfo match { + case OpExecWithClassName(className, _) => + className shouldBe "org.apache.texera.amber.operator.distinct.DistinctOpExec" + case other => + fail(s"expected OpExecWithClassName, got $other") + } + } + + it should "require HashPartition on the single input port" in { + val physical = (new DistinctOpDesc).getPhysicalOp(workflowId, executionId) + physical.partitionRequirement shouldBe List(Option(HashPartition())) + } + + it should "always derive HashPartition for the output regardless of input partitions" in { + // Distinct's dedup semantics depend on hash-alignment, so the + // derived output partition stays hash even when upstream inputs + // report differing partition kinds. + val physical = (new DistinctOpDesc).getPhysicalOp(workflowId, executionId) + physical.derivePartition(List(SinglePartition())) shouldBe HashPartition() + physical.derivePartition(List(UnknownPartition())) shouldBe HashPartition() + physical.derivePartition(List(HashPartition(List("col-a")))) shouldBe HashPartition() + } + + it should "preserve the one input / one blocking output port shape from operatorInfo" in { + val op = new DistinctOpDesc + val physical = op.getPhysicalOp(workflowId, executionId) + physical.inputPorts should have size 1 + physical.outputPorts should have size 1 + // PhysicalOp.outputPorts is a Map[PortIdentity, (OutputPort, …, …)], + // so the blocking flag is on the first tuple element of the value. + val (outputPort, _, _) = physical.outputPorts.values.head + outputPort.blocking shouldBe true + } + + // --------------------------------------------------------------------------- + // Independent instances + // --------------------------------------------------------------------------- + + "DistinctOpDesc" should + "assign a fresh operatorIdentifier per instance (UUID-based id is not shared)" in { + // `LogicalOp` initializes `operatorId` from `UUID.randomUUID()` in + // its constructor body, so two `new DistinctOpDesc` allocations + // must hold different identifiers. A regression to a static / + // shared id would surface here as the two ids being equal. + val a = new DistinctOpDesc + val b = new DistinctOpDesc + a.operatorIdentifier should not equal b.operatorIdentifier + } +} diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/union/UnionOpDescSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/union/UnionOpDescSpec.scala new file mode 100644 index 0000000000..a9c58bbcda --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/union/UnionOpDescSpec.scala @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.union + +import org.apache.texera.amber.core.executor.OpExecWithClassName +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.operator.metadata.OperatorGroupConstants +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class UnionOpDescSpec extends AnyFlatSpec with Matchers { + + private val workflowId = WorkflowIdentity(1L) + private val executionId = ExecutionIdentity(1L) + + // --------------------------------------------------------------------------- + // operatorInfo — descriptor metadata + // --------------------------------------------------------------------------- + + "UnionOpDesc.operatorInfo" should "advertise the user-friendly name and Set group" in { + val info = (new UnionOpDesc).operatorInfo + info.userFriendlyName shouldBe "Union" + info.operatorGroupName shouldBe OperatorGroupConstants.SET_GROUP + info.operatorDescription should include("Union") + } + + it should "expose exactly one input port and one (non-blocking) output port" in { + val info = (new UnionOpDesc).operatorInfo + info.inputPorts should have length 1 + info.outputPorts should have length 1 + info.outputPorts.head.blocking shouldBe false + } + + // --------------------------------------------------------------------------- + // getPhysicalOp — wiring to UnionOpExec + // --------------------------------------------------------------------------- + + "UnionOpDesc.getPhysicalOp" should + "wire the UnionOpExec class name into the OpExecInitInfo" in { + val op = new UnionOpDesc + val physical = op.getPhysicalOp(workflowId, executionId) + physical.opExecInitInfo match { + case OpExecWithClassName(className, _) => + className shouldBe "org.apache.texera.amber.operator.union.UnionOpExec" + case other => + fail(s"expected OpExecWithClassName, got $other") + } + } + + it should "expose the same input/output port shape as operatorInfo" in { + val op = new UnionOpDesc + val info = op.operatorInfo + val physical = op.getPhysicalOp(workflowId, executionId) + // `physical.inputPorts` / `outputPorts` are `Map`s — compare `size` + // (Int) directly; the descriptor's `operatorInfo.*.size` is also an + // Int, so no Long coercion is needed. + assert(physical.inputPorts.size == info.inputPorts.size) + assert(physical.outputPorts.size == info.outputPorts.size) + } + + it should "leave the partition requirement empty (no hash-alignment forced)" in { + // Unlike Distinct / Difference / Intersect in the same SET group, + // Union does NOT require its inputs to be hash-partitioned — the + // pass-through executor preserves whatever the upstream produced. + // + // Assert on the list itself (not just `.flatten`) so a regression + // that introduced a `None` entry (`List(None)` — same "no + // requirement" semantics but a different list shape) is caught here. + val physical = (new UnionOpDesc).getPhysicalOp(workflowId, executionId) + physical.partitionRequirement shouldBe empty + } + + // --------------------------------------------------------------------------- + // Independent instances + // --------------------------------------------------------------------------- + + "UnionOpDesc" should + "assign a fresh operatorIdentifier per instance (UUID-based id is not shared)" in { + // `LogicalOp` initializes `operatorId` from `UUID.randomUUID()` in + // its constructor body, so two `new UnionOpDesc` allocations must + // hold different identifiers. A regression to a static / shared id + // would surface here as the two ids being equal. + val a = new UnionOpDesc + val b = new UnionOpDesc + a.operatorIdentifier should not equal b.operatorIdentifier + } +}
