aglinxinyuan commented on code in PR #5738: URL: https://github.com/apache/texera/pull/5738#discussion_r3424516682
########## common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/union/UnionOpDescSpec.scala: ########## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.union + +import org.apache.texera.amber.core.executor.OpExecWithClassName +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.operator.metadata.OperatorGroupConstants +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class UnionOpDescSpec extends AnyFlatSpec with Matchers { + + private val workflowId = WorkflowIdentity(1L) + private val executionId = ExecutionIdentity(1L) + + // --------------------------------------------------------------------------- + // operatorInfo — descriptor metadata + // --------------------------------------------------------------------------- + + "UnionOpDesc.operatorInfo" should "advertise the user-friendly name and Set group" in { + val info = (new UnionOpDesc).operatorInfo + info.userFriendlyName shouldBe "Union" + info.operatorGroupName shouldBe OperatorGroupConstants.SET_GROUP + info.operatorDescription should include("Union") + } + + it should "expose exactly one input port and one (non-blocking) output port" in { + val info = (new UnionOpDesc).operatorInfo + info.inputPorts should have length 1 + info.outputPorts should have length 1 + info.outputPorts.head.blocking shouldBe false + } + + // --------------------------------------------------------------------------- + // getPhysicalOp — wiring to UnionOpExec + // --------------------------------------------------------------------------- + + "UnionOpDesc.getPhysicalOp" should + "wire the UnionOpExec class name into the OpExecInitInfo" in { + val op = new UnionOpDesc + val physical = op.getPhysicalOp(workflowId, executionId) + physical.opExecInitInfo match { + case OpExecWithClassName(className, _) => + className shouldBe "org.apache.texera.amber.operator.union.UnionOpExec" + case other => + fail(s"expected OpExecWithClassName, got $other") + } + } + + it should "expose the same input/output port shape as operatorInfo" in { + val op = new UnionOpDesc + val info = op.operatorInfo + val physical = op.getPhysicalOp(workflowId, executionId) + physical.inputPorts should have size info.inputPorts.size.toLong + physical.outputPorts should have size info.outputPorts.size.toLong + } + + it should "leave the partition requirement empty (no hash-alignment forced)" in { + // Unlike Distinct / Difference / Intersect in the same SET group, + // Union does NOT require its inputs to be hash-partitioned — the + // pass-through executor preserves whatever the upstream produced. + // + // Assert on the list itself (not just `.flatten`) so a regression + // that introduced a `None` entry (`List(None)` — same "no + // requirement" semantics but a different list shape) is caught here. + val physical = (new UnionOpDesc).getPhysicalOp(workflowId, executionId) + physical.partitionRequirement shouldBe empty + } + + // --------------------------------------------------------------------------- + // Independent instances + // --------------------------------------------------------------------------- + + "UnionOpDesc" should + "produce distinct instances (no static state shared across instantiations)" in { + val a = new UnionOpDesc + val b = new UnionOpDesc + a should not be theSameInstanceAs(b) + } Review Comment: Fixed in ff0d281603 — replaced the trivial `theSameInstanceAs` check with `a.operatorIdentifier should not equal b.operatorIdentifier`. `LogicalOp` seeds `operatorId` from `UUID.randomUUID()` in its constructor, so the two ids are distinct per allocation; a static / shared id would now fail this assertion. ########## common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/distinct/DistinctOpDescSpec.scala: ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.distinct + +import org.apache.texera.amber.core.executor.OpExecWithClassName +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.core.workflow.{HashPartition, SinglePartition, UnknownPartition} +import org.apache.texera.amber.operator.metadata.OperatorGroupConstants +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class DistinctOpDescSpec extends AnyFlatSpec with Matchers { + + private val workflowId = WorkflowIdentity(1L) + private val executionId = ExecutionIdentity(1L) + + // --------------------------------------------------------------------------- + // operatorInfo — descriptor metadata + // --------------------------------------------------------------------------- + + "DistinctOpDesc.operatorInfo" should + "advertise the user-friendly name and Cleaning group" in { + val info = (new DistinctOpDesc).operatorInfo + info.userFriendlyName shouldBe "Distinct" + info.operatorGroupName shouldBe OperatorGroupConstants.CLEANING_GROUP + info.operatorDescription.toLowerCase should include("duplicate") + } + + it should "expose one input port and one blocking output port" in { + val info = (new DistinctOpDesc).operatorInfo + info.inputPorts should have length 1 + info.outputPorts should have length 1 + info.outputPorts.head.blocking shouldBe true + } + + // --------------------------------------------------------------------------- + // getPhysicalOp — wiring to DistinctOpExec + partitioning contract + // --------------------------------------------------------------------------- + + "DistinctOpDesc.getPhysicalOp" should + "wire the DistinctOpExec class name into the OpExecInitInfo" in { + val op = new DistinctOpDesc + val physical = op.getPhysicalOp(workflowId, executionId) + physical.opExecInitInfo match { + case OpExecWithClassName(className, _) => + className shouldBe "org.apache.texera.amber.operator.distinct.DistinctOpExec" + case other => + fail(s"expected OpExecWithClassName, got $other") + } + } + + it should "require HashPartition on the single input port" in { + val physical = (new DistinctOpDesc).getPhysicalOp(workflowId, executionId) + physical.partitionRequirement shouldBe List(Option(HashPartition())) + } + + it should "always derive HashPartition for the output regardless of input partitions" in { + // Distinct's dedup semantics depend on hash-alignment, so the + // derived output partition stays hash even when upstream inputs + // report differing partition kinds. + val physical = (new DistinctOpDesc).getPhysicalOp(workflowId, executionId) + physical.derivePartition(List(SinglePartition())) shouldBe HashPartition() + physical.derivePartition(List(UnknownPartition())) shouldBe HashPartition() + physical.derivePartition(List(HashPartition(List("col-a")))) shouldBe HashPartition() + } + + it should "preserve the one input / one blocking output port shape from operatorInfo" in { + val op = new DistinctOpDesc + val physical = op.getPhysicalOp(workflowId, executionId) + physical.inputPorts should have size 1 + physical.outputPorts should have size 1 + // PhysicalOp.outputPorts is a Map[PortIdentity, (OutputPort, …, …)], + // so the blocking flag is on the first tuple element of the value. + val (outputPort, _, _) = physical.outputPorts.values.head + outputPort.blocking shouldBe true + } + + // --------------------------------------------------------------------------- + // Independent instances + // --------------------------------------------------------------------------- + + "DistinctOpDesc" should + "produce distinct instances (no static state shared across instantiations)" in { + val a = new DistinctOpDesc + val b = new DistinctOpDesc + a should not be theSameInstanceAs(b) + } Review Comment: Fixed in ff0d281603 — same fix for DistinctOpDescSpec: `a.operatorIdentifier should not equal b.operatorIdentifier`. Pins a meaningful per-instance property (the UUID-seeded id) instead of `theSameInstanceAs`. ########## common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/difference/DifferenceOpDescSpec.scala: ########## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.difference + +import org.apache.texera.amber.core.executor.OpExecWithClassName +import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema} +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.core.workflow.{ + HashPartition, + PortIdentity, + SinglePartition, + UnknownPartition +} +import org.apache.texera.amber.operator.metadata.OperatorGroupConstants +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class DifferenceOpDescSpec extends AnyFlatSpec with Matchers { + + private val workflowId = WorkflowIdentity(1L) + private val executionId = ExecutionIdentity(1L) + + private val schemaA: Schema = + Schema().add(new Attribute("col", AttributeType.STRING)) + private val schemaB: Schema = + Schema().add(new Attribute("col", AttributeType.STRING)) + private val schemaDifferent: Schema = + Schema().add(new Attribute("other", AttributeType.INTEGER)) + + // --------------------------------------------------------------------------- + // operatorInfo + // --------------------------------------------------------------------------- + + "DifferenceOpDesc.operatorInfo" should "advertise the Set group + difference description" in { + val info = (new DifferenceOpDesc).operatorInfo + info.userFriendlyName shouldBe "Difference" + info.operatorGroupName shouldBe OperatorGroupConstants.SET_GROUP + info.operatorDescription.toLowerCase should include("difference") + } + + it should + "expose two input ports (left at PortIdentity 0, right at PortIdentity 1) and one blocking output" in { + val info = (new DifferenceOpDesc).operatorInfo + info.inputPorts should have length 2 + info.inputPorts.map(_.id) shouldBe List(PortIdentity(), PortIdentity(1)) + info.inputPorts.map(_.displayName) shouldBe List("left", "right") + info.outputPorts should have length 1 + info.outputPorts.head.blocking shouldBe true + } + + // --------------------------------------------------------------------------- + // getPhysicalOp — wiring + partitioning + schema propagation + // --------------------------------------------------------------------------- + + "DifferenceOpDesc.getPhysicalOp" should + "wire the DifferenceOpExec class name into the OpExecInitInfo" in { + val physical = (new DifferenceOpDesc).getPhysicalOp(workflowId, executionId) + physical.opExecInitInfo match { + case OpExecWithClassName(className, _) => + className shouldBe "org.apache.texera.amber.operator.difference.DifferenceOpExec" + case other => + fail(s"expected OpExecWithClassName, got $other") + } + } + + it should "require HashPartition on BOTH input ports" in { + // Set-difference semantics require both inputs to be hash-aligned so + // matching keys can be compared on the same worker. + val physical = (new DifferenceOpDesc).getPhysicalOp(workflowId, executionId) + physical.partitionRequirement shouldBe List( + Option(HashPartition()), + Option(HashPartition()) + ) + } + + it should "derive HashPartition for the output regardless of input partition kinds" in { + val physical = (new DifferenceOpDesc).getPhysicalOp(workflowId, executionId) + physical.derivePartition(List(SinglePartition(), UnknownPartition())) shouldBe HashPartition() + physical.derivePartition( + List(HashPartition(List("a")), HashPartition(List("b"))) + ) shouldBe HashPartition() + } + + // --------------------------------------------------------------------------- + // Schema propagation + // --------------------------------------------------------------------------- + + "DifferenceOpDesc schema propagation" should + "produce a single output schema equal to the (shared) input schema" in { + // When both inputs report the same schema, propagation succeeds and + // every output port receives that schema. + val op = new DifferenceOpDesc + val physical = op.getPhysicalOp(workflowId, executionId) + val propagateFn = physical.propagateSchema + val inputs = Map(PortIdentity() -> schemaA, PortIdentity(1) -> schemaB) + val outputs = propagateFn.func(inputs) + outputs.keySet shouldBe op.operatorInfo.outputPorts.map(_.id).toSet + outputs.values.toSet shouldBe Set(schemaA) + } + + it should + "throw IllegalArgumentException when the two inputs do not share one schema" in { + val physical = (new DifferenceOpDesc).getPhysicalOp(workflowId, executionId) + val propagateFn = physical.propagateSchema + val mismatched = + Map(PortIdentity() -> schemaA, PortIdentity(1) -> schemaDifferent) + intercept[IllegalArgumentException] { + propagateFn.func(mismatched) + } + } + + // --------------------------------------------------------------------------- + // Independent instances + // --------------------------------------------------------------------------- + + "DifferenceOpDesc" should + "produce distinct instances (no static state shared across instantiations)" in { + val a = new DifferenceOpDesc + val b = new DifferenceOpDesc + a should not be theSameInstanceAs(b) + } Review Comment: Fixed in ff0d281603 — same fix for DifferenceOpDescSpec: `a.operatorIdentifier should not equal b.operatorIdentifier`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
