This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-5877-e7eba322ca3cd7164187eae782620b4ebcd7f7a3 in repository https://gitbox.apache.org/repos/asf/texera.git
commit e270f830be7d11a59a8357eeecec2251d642c872 Author: Xinyuan Lin <[email protected]> AuthorDate: Tue Jun 23 10:50:43 2026 -0700 test(workflow-operator): add unit test coverage for Java and R UDF operator descriptors (#5877) ### What changes were proposed in this PR? Pin behavior of three previously-untested Java/R UDF descriptors in `common/workflow-operator/udf/`. No production-code changes. | Spec | Source class | Tests | | --- | --- | --- | | `JavaUDFOpDescSpec` | `JavaUDFOpDesc` | 6 | | `RUDFOpDescSpec` | `RUDFOpDesc` | 6 | | `RUDFSourceOpDescSpec` | `RUDFSourceOpDesc` | 6 | **Behavior pinned** | Surface | Contract | | --- | --- | | `operatorInfo` | exact name + group (`Java`/`R`); `JavaUDF`/`RUDF` dynamic 1-in/1-out; `RUDFSource` zero inputs / one output | | `getPhysicalOp` | wires `OpExecWithCode` (Java tagged `"java"`); port identities carried; rejects `workers <= 0` | | Schema propagation | `JavaUDF`/`RUDF`: `retainInputColumns=false` → only output columns, `true` → input + output columns; `RUDFSource`: `sourceSchema()` from the `columns` field | | Round-trip | config fields preserved through the polymorphic base (incl. R `useTupleAPI`) | ### Any related issues, documentation, discussions? Closes #5874. ### How was this PR tested? - `sbt "WorkflowOperator/testOnly org.apache.texera.amber.operator.udf.java.JavaUDFOpDescSpec org.apache.texera.amber.operator.udf.r.RUDFOpDescSpec org.apache.texera.amber.operator.udf.r.RUDFSourceOpDescSpec"` — 18 tests, all green - `sbt "WorkflowOperator/Test/scalafmtCheck"` and `sbt "WorkflowOperator/Test/scalafix --check"` — clean - CI to confirm ### Was this PR authored or co-authored using generative AI tooling? Generated-by: Claude Code (Opus 4.8 [1M context]) --- .../operator/udf/java/JavaUDFOpDescSpec.scala | 109 ++++++++++++++++++++ .../amber/operator/udf/r/RUDFOpDescSpec.scala | 112 +++++++++++++++++++++ .../operator/udf/r/RUDFSourceOpDescSpec.scala | 87 ++++++++++++++++ 3 files changed, 308 insertions(+) diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/java/JavaUDFOpDescSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/java/JavaUDFOpDescSpec.scala new file mode 100644 index 0000000000..6ea1fe8bc4 --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/java/JavaUDFOpDescSpec.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.udf.java + +import org.apache.texera.amber.core.executor.OpExecWithCode +import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema} +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.OperatorGroupConstants +import org.apache.texera.amber.util.JSONUtils.objectMapper +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class JavaUDFOpDescSpec extends AnyFlatSpec with Matchers { + + private val workflowId = WorkflowIdentity(1L) + private val executionId = ExecutionIdentity(1L) + + "JavaUDFOpDesc.operatorInfo" should + "advertise the name, Java group, and a default 1-in/1-out shape" in { + val info = (new JavaUDFOpDesc).operatorInfo + info.userFriendlyName shouldBe "Java UDF" + info.operatorGroupName shouldBe OperatorGroupConstants.JAVA_GROUP + info.inputPorts should have length 1 + info.outputPorts should have length 1 + } + + "JavaUDFOpDesc" should "default code to empty, workers to 1, retainInputColumns to false" in { + val d = new JavaUDFOpDesc + d.code shouldBe "" + d.workers shouldBe 1 + d.retainInputColumns shouldBe false + } + + "JavaUDFOpDesc.getPhysicalOp" should + "wire OpExecWithCode(code, \"java\") and carry port identities" in { + val d = new JavaUDFOpDesc + d.code = "return t;" + val physical = d.getPhysicalOp(workflowId, executionId) + physical.opExecInitInfo match { + case OpExecWithCode(code, language) => + language shouldBe "java" + code shouldBe "return t;" + case other => fail(s"expected OpExecWithCode, got $other") + } + physical.inputPorts.keySet shouldBe d.operatorInfo.inputPorts.map(_.id).toSet + physical.outputPorts.keySet shouldBe d.operatorInfo.outputPorts.map(_.id).toSet + } + + it should "reject a non-positive worker count" in { + val d = new JavaUDFOpDesc + d.workers = 0 + intercept[IllegalArgumentException] { d.getPhysicalOp(workflowId, executionId) } + } + + "JavaUDFOpDesc schema propagation" should + "emit only the output columns when input columns are not retained (default)" in { + val d = new JavaUDFOpDesc + d.outputColumns = List(new Attribute("res", AttributeType.INTEGER)) + val input = Schema().add(new Attribute("in", AttributeType.STRING)) + d.getExternalOutputSchemas(Map(d.operatorInfo.inputPorts.head.id -> input)) shouldBe Map( + d.operatorInfo.outputPorts.head.id -> Schema().add( + new Attribute("res", AttributeType.INTEGER) + ) + ) + } + + it should "retain input columns plus the output columns when retainInputColumns is true" in { + val d = new JavaUDFOpDesc + d.retainInputColumns = true + d.outputColumns = List(new Attribute("res", AttributeType.INTEGER)) + val input = Schema().add(new Attribute("in", AttributeType.STRING)) + d.getExternalOutputSchemas(Map(d.operatorInfo.inputPorts.head.id -> input)) shouldBe Map( + d.operatorInfo.outputPorts.head.id -> Schema() + .add(new Attribute("in", AttributeType.STRING)) + .add(new Attribute("res", AttributeType.INTEGER)) + ) + } + + "JavaUDFOpDesc" should "round-trip its config fields through the polymorphic base" in { + val d = new JavaUDFOpDesc + d.code = "x" + d.workers = 4 + d.retainInputColumns = true + val restored = objectMapper.readValue(objectMapper.writeValueAsString(d), classOf[LogicalOp]) + restored shouldBe a[JavaUDFOpDesc] + val j = restored.asInstanceOf[JavaUDFOpDesc] + j.code shouldBe "x" + j.workers shouldBe 4 + j.retainInputColumns shouldBe true + } +} diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/r/RUDFOpDescSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/r/RUDFOpDescSpec.scala new file mode 100644 index 0000000000..4963af8d6d --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/r/RUDFOpDescSpec.scala @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.udf.r + +import org.apache.texera.amber.core.executor.OpExecWithCode +import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema} +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.OperatorGroupConstants +import org.apache.texera.amber.util.JSONUtils.objectMapper +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class RUDFOpDescSpec extends AnyFlatSpec with Matchers { + + private val workflowId = WorkflowIdentity(1L) + private val executionId = ExecutionIdentity(1L) + + "RUDFOpDesc.operatorInfo" should + "advertise the name, R group, and a default 1-in/1-out shape" in { + val info = (new RUDFOpDesc).operatorInfo + info.userFriendlyName shouldBe "R UDF" + info.operatorGroupName shouldBe OperatorGroupConstants.R_GROUP + info.inputPorts should have length 1 + info.outputPorts should have length 1 + } + + "RUDFOpDesc" should "default code/workers/useTupleAPI/retainInputColumns" in { + val d = new RUDFOpDesc + d.code shouldBe "" + d.workers shouldBe 1 + d.useTupleAPI shouldBe false + d.retainInputColumns shouldBe false + } + + "RUDFOpDesc.getPhysicalOp" should + "wire OpExecWithCode(code, \"r-table\") and carry port identities" in { + val d = new RUDFOpDesc + d.code = "function(t) t" + val physical = d.getPhysicalOp(workflowId, executionId) + physical.opExecInitInfo match { + case OpExecWithCode(code, language) => + language shouldBe "r-table" + code shouldBe "function(t) t" + case other => fail(s"expected OpExecWithCode, got $other") + } + physical.inputPorts.keySet shouldBe d.operatorInfo.inputPorts.map(_.id).toSet + physical.outputPorts.keySet shouldBe d.operatorInfo.outputPorts.map(_.id).toSet + } + + it should "reject a non-positive worker count" in { + val d = new RUDFOpDesc + d.workers = 0 + intercept[IllegalArgumentException] { d.getPhysicalOp(workflowId, executionId) } + } + + "RUDFOpDesc schema propagation" should + "emit only the output columns when input columns are not retained (default)" in { + val d = new RUDFOpDesc + d.outputColumns = List(new Attribute("res", AttributeType.INTEGER)) + val input = Schema().add(new Attribute("in", AttributeType.STRING)) + d.getExternalOutputSchemas(Map(d.operatorInfo.inputPorts.head.id -> input)) shouldBe Map( + d.operatorInfo.outputPorts.head.id -> Schema().add( + new Attribute("res", AttributeType.INTEGER) + ) + ) + } + + it should "retain input columns plus the output columns when retainInputColumns is true" in { + val d = new RUDFOpDesc + d.retainInputColumns = true + d.outputColumns = List(new Attribute("res", AttributeType.INTEGER)) + val input = Schema().add(new Attribute("in", AttributeType.STRING)) + d.getExternalOutputSchemas(Map(d.operatorInfo.inputPorts.head.id -> input)) shouldBe Map( + d.operatorInfo.outputPorts.head.id -> Schema() + .add(new Attribute("in", AttributeType.STRING)) + .add(new Attribute("res", AttributeType.INTEGER)) + ) + } + + "RUDFOpDesc" should "round-trip its config fields through the polymorphic base" in { + val d = new RUDFOpDesc + d.code = "f" + d.workers = 2 + d.useTupleAPI = true + d.retainInputColumns = true + val restored = objectMapper.readValue(objectMapper.writeValueAsString(d), classOf[LogicalOp]) + restored shouldBe a[RUDFOpDesc] + val r = restored.asInstanceOf[RUDFOpDesc] + r.code shouldBe "f" + r.workers shouldBe 2 + r.useTupleAPI shouldBe true + r.retainInputColumns shouldBe true + } +} diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/r/RUDFSourceOpDescSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/r/RUDFSourceOpDescSpec.scala new file mode 100644 index 0000000000..2cfca40e73 --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/r/RUDFSourceOpDescSpec.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.udf.r + +import org.apache.texera.amber.core.executor.OpExecWithCode +import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema} +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.OperatorGroupConstants +import org.apache.texera.amber.util.JSONUtils.objectMapper +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class RUDFSourceOpDescSpec extends AnyFlatSpec with Matchers { + + private val workflowId = WorkflowIdentity(1L) + private val executionId = ExecutionIdentity(1L) + + "RUDFSourceOpDesc.operatorInfo" should + "advertise the 1-out R UDF source (no inputs, one output)" in { + val info = (new RUDFSourceOpDesc).operatorInfo + info.userFriendlyName shouldBe "1-out R UDF" + info.operatorGroupName shouldBe OperatorGroupConstants.R_GROUP + info.inputPorts shouldBe empty + info.outputPorts should have length 1 + } + + "RUDFSourceOpDesc.sourceSchema" should "be empty by default and reflect the configured columns" in { + (new RUDFSourceOpDesc).sourceSchema().getAttributes shouldBe empty + val d = new RUDFSourceOpDesc + d.columns = List(new Attribute("a", AttributeType.STRING)) + d.sourceSchema() shouldBe Schema().add(new Attribute("a", AttributeType.STRING)) + } + + "RUDFSourceOpDesc.getPhysicalOp" should + "wire OpExecWithCode(code, \"r-table\") as a source op with one output port" in { + val d = new RUDFSourceOpDesc + d.code = "data.frame(a=1)" + val physical = d.getPhysicalOp(workflowId, executionId) + physical.opExecInitInfo match { + case OpExecWithCode(code, language) => + language shouldBe "r-table" + code shouldBe "data.frame(a=1)" + case other => fail(s"expected OpExecWithCode, got $other") + } + physical.outputPorts.keySet shouldBe d.operatorInfo.outputPorts.map(_.id).toSet + } + + it should "reject a non-positive worker count" in { + val d = new RUDFSourceOpDesc + d.code = "x" + d.workers = 0 + intercept[IllegalArgumentException] { d.getPhysicalOp(workflowId, executionId) } + } + + "RUDFSourceOpDesc" should "round-trip its config fields through the polymorphic base" in { + val d = new RUDFSourceOpDesc + d.code = "f" + d.workers = 2 + d.useTupleAPI = true + d.columns = List(new Attribute("a", AttributeType.STRING)) + val restored = objectMapper.readValue(objectMapper.writeValueAsString(d), classOf[LogicalOp]) + restored shouldBe a[RUDFSourceOpDesc] + val r = restored.asInstanceOf[RUDFSourceOpDesc] + r.code shouldBe "f" + r.workers shouldBe 2 + r.useTupleAPI shouldBe true + r.columns shouldBe List(new Attribute("a", AttributeType.STRING)) + } +}
