aglinxinyuan commented on code in PR #5825: URL: https://github.com/apache/texera/pull/5825#discussion_r3447703142
########## common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/symmetricDifference/SymmetricDifferenceOpDescSpec.scala: ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.symmetricDifference + +import org.apache.texera.amber.core.executor.OpExecWithClassName +import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema} +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.core.workflow.{HashPartition, PortIdentity} +import org.apache.texera.amber.operator.metadata.OperatorGroupConstants +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class SymmetricDifferenceOpDescSpec extends AnyFlatSpec with Matchers { + + private val workflowId = WorkflowIdentity(1L) + private val executionId = ExecutionIdentity(1L) + + private val schemaA = Schema().add(new Attribute("a", AttributeType.STRING)) + private val schemaDifferent = Schema().add(new Attribute("b", AttributeType.INTEGER)) + + "SymmetricDifferenceOpDesc.operatorInfo" should + "advertise the name, Set group, two inputs, and a single blocking output" in { + val info = (new SymmetricDifferenceOpDesc).operatorInfo + info.userFriendlyName shouldBe "SymmetricDifference" + info.operatorGroupName shouldBe OperatorGroupConstants.SET_GROUP + info.inputPorts.map(_.id) shouldBe List(PortIdentity(0), PortIdentity(1)) + info.outputPorts should have length 1 + info.outputPorts.head.blocking shouldBe true + } + + "SymmetricDifferenceOpDesc.getPhysicalOp" should + "wire SymmetricDifferenceOpExec, carry port identities, and require HashPartition on both inputs" in { + val op = new SymmetricDifferenceOpDesc + val physical = op.getPhysicalOp(workflowId, executionId) + physical.opExecInitInfo match { + case OpExecWithClassName(className, _) => + className shouldBe "org.apache.texera.amber.operator.symmetricDifference.SymmetricDifferenceOpExec" + case other => fail(s"expected OpExecWithClassName, got $other") + } + physical.inputPorts.keySet shouldBe op.operatorInfo.inputPorts.map(_.id).toSet + physical.outputPorts.keySet shouldBe op.operatorInfo.outputPorts.map(_.id).toSet + physical.partitionRequirement shouldBe List(Option(HashPartition()), Option(HashPartition())) + } + + "SymmetricDifferenceOpDesc schema propagation" should + "pass the shared input schema through to the output port" in { + val op = new SymmetricDifferenceOpDesc + val physical = op.getPhysicalOp(workflowId, executionId) + val inputs = Map(PortIdentity(0) -> schemaA, PortIdentity(1) -> schemaA) + val outputs = physical.propagateSchema.func(inputs) + outputs.keySet shouldBe op.operatorInfo.outputPorts.map(_.id).toSet + outputs.values.toSet shouldBe Set(schemaA) + } + + it should "reject inputs whose schemas differ" in { + val physical = (new SymmetricDifferenceOpDesc).getPhysicalOp(workflowId, executionId) + val mismatched = Map(PortIdentity(0) -> schemaA, PortIdentity(1) -> schemaDifferent) + intercept[IllegalArgumentException] { + physical.propagateSchema.func(mismatched) + } + } +} Review Comment: Good call — added in fe83ca7. `SymmetricDifferenceOpDescSpec` now does a serialize → deserialize-via-`classOf[LogicalOp]` round-trip, so the `SymmetricDifference` discriminator is pinned on par with the other two specs. Since the operator has no config fields, the round-trip targets the discriminator/type-resolution itself. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
