This is an automated email from the ASF dual-hosted git repository.
github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new e270f830be test(workflow-operator): add unit test coverage for Java
and R UDF operator descriptors (#5877)
e270f830be is described below
commit e270f830be7d11a59a8357eeecec2251d642c872
Author: Xinyuan Lin <[email protected]>
AuthorDate: Tue Jun 23 10:50:43 2026 -0700
test(workflow-operator): add unit test coverage for Java and R UDF operator
descriptors (#5877)
### What changes were proposed in this PR?
Pin behavior of three previously-untested Java/R UDF descriptors in
`common/workflow-operator/udf/`. No production-code changes.
| Spec | Source class | Tests |
| --- | --- | --- |
| `JavaUDFOpDescSpec` | `JavaUDFOpDesc` | 6 |
| `RUDFOpDescSpec` | `RUDFOpDesc` | 6 |
| `RUDFSourceOpDescSpec` | `RUDFSourceOpDesc` | 6 |
**Behavior pinned**
| Surface | Contract |
| --- | --- |
| `operatorInfo` | exact name + group (`Java`/`R`); `JavaUDF`/`RUDF`
dynamic 1-in/1-out; `RUDFSource` zero inputs / one output |
| `getPhysicalOp` | wires `OpExecWithCode` (Java tagged `"java"`); port
identities carried; rejects `workers <= 0` |
| Schema propagation | `JavaUDF`/`RUDF`: `retainInputColumns=false` →
only output columns, `true` → input + output columns; `RUDFSource`:
`sourceSchema()` from the `columns` field |
| Round-trip | config fields preserved through the polymorphic base
(incl. R `useTupleAPI`) |
### Any related issues, documentation, discussions?
Closes #5874.
### How was this PR tested?
- `sbt "WorkflowOperator/testOnly
org.apache.texera.amber.operator.udf.java.JavaUDFOpDescSpec
org.apache.texera.amber.operator.udf.r.RUDFOpDescSpec
org.apache.texera.amber.operator.udf.r.RUDFSourceOpDescSpec"` — 18
tests, all green
- `sbt "WorkflowOperator/Test/scalafmtCheck"` and `sbt
"WorkflowOperator/Test/scalafix --check"` — clean
- CI to confirm
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Opus 4.8 [1M context])
---
.../operator/udf/java/JavaUDFOpDescSpec.scala | 109 ++++++++++++++++++++
.../amber/operator/udf/r/RUDFOpDescSpec.scala | 112 +++++++++++++++++++++
.../operator/udf/r/RUDFSourceOpDescSpec.scala | 87 ++++++++++++++++
3 files changed, 308 insertions(+)
diff --git
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/java/JavaUDFOpDescSpec.scala
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/java/JavaUDFOpDescSpec.scala
new file mode 100644
index 0000000000..6ea1fe8bc4
--- /dev/null
+++
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/java/JavaUDFOpDescSpec.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.udf.java
+
+import org.apache.texera.amber.core.executor.OpExecWithCode
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema}
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity,
WorkflowIdentity}
+import org.apache.texera.amber.operator.LogicalOp
+import org.apache.texera.amber.operator.metadata.OperatorGroupConstants
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class JavaUDFOpDescSpec extends AnyFlatSpec with Matchers {
+
+ private val workflowId = WorkflowIdentity(1L)
+ private val executionId = ExecutionIdentity(1L)
+
+ "JavaUDFOpDesc.operatorInfo" should
+ "advertise the name, Java group, and a default 1-in/1-out shape" in {
+ val info = (new JavaUDFOpDesc).operatorInfo
+ info.userFriendlyName shouldBe "Java UDF"
+ info.operatorGroupName shouldBe OperatorGroupConstants.JAVA_GROUP
+ info.inputPorts should have length 1
+ info.outputPorts should have length 1
+ }
+
+ "JavaUDFOpDesc" should "default code to empty, workers to 1,
retainInputColumns to false" in {
+ val d = new JavaUDFOpDesc
+ d.code shouldBe ""
+ d.workers shouldBe 1
+ d.retainInputColumns shouldBe false
+ }
+
+ "JavaUDFOpDesc.getPhysicalOp" should
+ "wire OpExecWithCode(code, \"java\") and carry port identities" in {
+ val d = new JavaUDFOpDesc
+ d.code = "return t;"
+ val physical = d.getPhysicalOp(workflowId, executionId)
+ physical.opExecInitInfo match {
+ case OpExecWithCode(code, language) =>
+ language shouldBe "java"
+ code shouldBe "return t;"
+ case other => fail(s"expected OpExecWithCode, got $other")
+ }
+ physical.inputPorts.keySet shouldBe
d.operatorInfo.inputPorts.map(_.id).toSet
+ physical.outputPorts.keySet shouldBe
d.operatorInfo.outputPorts.map(_.id).toSet
+ }
+
+ it should "reject a non-positive worker count" in {
+ val d = new JavaUDFOpDesc
+ d.workers = 0
+ intercept[IllegalArgumentException] { d.getPhysicalOp(workflowId,
executionId) }
+ }
+
+ "JavaUDFOpDesc schema propagation" should
+ "emit only the output columns when input columns are not retained
(default)" in {
+ val d = new JavaUDFOpDesc
+ d.outputColumns = List(new Attribute("res", AttributeType.INTEGER))
+ val input = Schema().add(new Attribute("in", AttributeType.STRING))
+ d.getExternalOutputSchemas(Map(d.operatorInfo.inputPorts.head.id ->
input)) shouldBe Map(
+ d.operatorInfo.outputPorts.head.id -> Schema().add(
+ new Attribute("res", AttributeType.INTEGER)
+ )
+ )
+ }
+
+ it should "retain input columns plus the output columns when
retainInputColumns is true" in {
+ val d = new JavaUDFOpDesc
+ d.retainInputColumns = true
+ d.outputColumns = List(new Attribute("res", AttributeType.INTEGER))
+ val input = Schema().add(new Attribute("in", AttributeType.STRING))
+ d.getExternalOutputSchemas(Map(d.operatorInfo.inputPorts.head.id ->
input)) shouldBe Map(
+ d.operatorInfo.outputPorts.head.id -> Schema()
+ .add(new Attribute("in", AttributeType.STRING))
+ .add(new Attribute("res", AttributeType.INTEGER))
+ )
+ }
+
+ "JavaUDFOpDesc" should "round-trip its config fields through the polymorphic
base" in {
+ val d = new JavaUDFOpDesc
+ d.code = "x"
+ d.workers = 4
+ d.retainInputColumns = true
+ val restored = objectMapper.readValue(objectMapper.writeValueAsString(d),
classOf[LogicalOp])
+ restored shouldBe a[JavaUDFOpDesc]
+ val j = restored.asInstanceOf[JavaUDFOpDesc]
+ j.code shouldBe "x"
+ j.workers shouldBe 4
+ j.retainInputColumns shouldBe true
+ }
+}
diff --git
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/r/RUDFOpDescSpec.scala
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/r/RUDFOpDescSpec.scala
new file mode 100644
index 0000000000..4963af8d6d
--- /dev/null
+++
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/r/RUDFOpDescSpec.scala
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.udf.r
+
+import org.apache.texera.amber.core.executor.OpExecWithCode
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema}
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity,
WorkflowIdentity}
+import org.apache.texera.amber.operator.LogicalOp
+import org.apache.texera.amber.operator.metadata.OperatorGroupConstants
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class RUDFOpDescSpec extends AnyFlatSpec with Matchers {
+
+ private val workflowId = WorkflowIdentity(1L)
+ private val executionId = ExecutionIdentity(1L)
+
+ "RUDFOpDesc.operatorInfo" should
+ "advertise the name, R group, and a default 1-in/1-out shape" in {
+ val info = (new RUDFOpDesc).operatorInfo
+ info.userFriendlyName shouldBe "R UDF"
+ info.operatorGroupName shouldBe OperatorGroupConstants.R_GROUP
+ info.inputPorts should have length 1
+ info.outputPorts should have length 1
+ }
+
+ "RUDFOpDesc" should "default code/workers/useTupleAPI/retainInputColumns" in
{
+ val d = new RUDFOpDesc
+ d.code shouldBe ""
+ d.workers shouldBe 1
+ d.useTupleAPI shouldBe false
+ d.retainInputColumns shouldBe false
+ }
+
+ "RUDFOpDesc.getPhysicalOp" should
+ "wire OpExecWithCode(code, \"r-table\") and carry port identities" in {
+ val d = new RUDFOpDesc
+ d.code = "function(t) t"
+ val physical = d.getPhysicalOp(workflowId, executionId)
+ physical.opExecInitInfo match {
+ case OpExecWithCode(code, language) =>
+ language shouldBe "r-table"
+ code shouldBe "function(t) t"
+ case other => fail(s"expected OpExecWithCode, got $other")
+ }
+ physical.inputPorts.keySet shouldBe
d.operatorInfo.inputPorts.map(_.id).toSet
+ physical.outputPorts.keySet shouldBe
d.operatorInfo.outputPorts.map(_.id).toSet
+ }
+
+ it should "reject a non-positive worker count" in {
+ val d = new RUDFOpDesc
+ d.workers = 0
+ intercept[IllegalArgumentException] { d.getPhysicalOp(workflowId,
executionId) }
+ }
+
+ "RUDFOpDesc schema propagation" should
+ "emit only the output columns when input columns are not retained
(default)" in {
+ val d = new RUDFOpDesc
+ d.outputColumns = List(new Attribute("res", AttributeType.INTEGER))
+ val input = Schema().add(new Attribute("in", AttributeType.STRING))
+ d.getExternalOutputSchemas(Map(d.operatorInfo.inputPorts.head.id ->
input)) shouldBe Map(
+ d.operatorInfo.outputPorts.head.id -> Schema().add(
+ new Attribute("res", AttributeType.INTEGER)
+ )
+ )
+ }
+
+ it should "retain input columns plus the output columns when
retainInputColumns is true" in {
+ val d = new RUDFOpDesc
+ d.retainInputColumns = true
+ d.outputColumns = List(new Attribute("res", AttributeType.INTEGER))
+ val input = Schema().add(new Attribute("in", AttributeType.STRING))
+ d.getExternalOutputSchemas(Map(d.operatorInfo.inputPorts.head.id ->
input)) shouldBe Map(
+ d.operatorInfo.outputPorts.head.id -> Schema()
+ .add(new Attribute("in", AttributeType.STRING))
+ .add(new Attribute("res", AttributeType.INTEGER))
+ )
+ }
+
+ "RUDFOpDesc" should "round-trip its config fields through the polymorphic
base" in {
+ val d = new RUDFOpDesc
+ d.code = "f"
+ d.workers = 2
+ d.useTupleAPI = true
+ d.retainInputColumns = true
+ val restored = objectMapper.readValue(objectMapper.writeValueAsString(d),
classOf[LogicalOp])
+ restored shouldBe a[RUDFOpDesc]
+ val r = restored.asInstanceOf[RUDFOpDesc]
+ r.code shouldBe "f"
+ r.workers shouldBe 2
+ r.useTupleAPI shouldBe true
+ r.retainInputColumns shouldBe true
+ }
+}
diff --git
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/r/RUDFSourceOpDescSpec.scala
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/r/RUDFSourceOpDescSpec.scala
new file mode 100644
index 0000000000..2cfca40e73
--- /dev/null
+++
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/r/RUDFSourceOpDescSpec.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.udf.r
+
+import org.apache.texera.amber.core.executor.OpExecWithCode
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema}
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity,
WorkflowIdentity}
+import org.apache.texera.amber.operator.LogicalOp
+import org.apache.texera.amber.operator.metadata.OperatorGroupConstants
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class RUDFSourceOpDescSpec extends AnyFlatSpec with Matchers {
+
+ private val workflowId = WorkflowIdentity(1L)
+ private val executionId = ExecutionIdentity(1L)
+
+ "RUDFSourceOpDesc.operatorInfo" should
+ "advertise the 1-out R UDF source (no inputs, one output)" in {
+ val info = (new RUDFSourceOpDesc).operatorInfo
+ info.userFriendlyName shouldBe "1-out R UDF"
+ info.operatorGroupName shouldBe OperatorGroupConstants.R_GROUP
+ info.inputPorts shouldBe empty
+ info.outputPorts should have length 1
+ }
+
+ "RUDFSourceOpDesc.sourceSchema" should "be empty by default and reflect the
configured columns" in {
+ (new RUDFSourceOpDesc).sourceSchema().getAttributes shouldBe empty
+ val d = new RUDFSourceOpDesc
+ d.columns = List(new Attribute("a", AttributeType.STRING))
+ d.sourceSchema() shouldBe Schema().add(new Attribute("a",
AttributeType.STRING))
+ }
+
+ "RUDFSourceOpDesc.getPhysicalOp" should
+ "wire OpExecWithCode(code, \"r-table\") as a source op with one output
port" in {
+ val d = new RUDFSourceOpDesc
+ d.code = "data.frame(a=1)"
+ val physical = d.getPhysicalOp(workflowId, executionId)
+ physical.opExecInitInfo match {
+ case OpExecWithCode(code, language) =>
+ language shouldBe "r-table"
+ code shouldBe "data.frame(a=1)"
+ case other => fail(s"expected OpExecWithCode, got $other")
+ }
+ physical.outputPorts.keySet shouldBe
d.operatorInfo.outputPorts.map(_.id).toSet
+ }
+
+ it should "reject a non-positive worker count" in {
+ val d = new RUDFSourceOpDesc
+ d.code = "x"
+ d.workers = 0
+ intercept[IllegalArgumentException] { d.getPhysicalOp(workflowId,
executionId) }
+ }
+
+ "RUDFSourceOpDesc" should "round-trip its config fields through the
polymorphic base" in {
+ val d = new RUDFSourceOpDesc
+ d.code = "f"
+ d.workers = 2
+ d.useTupleAPI = true
+ d.columns = List(new Attribute("a", AttributeType.STRING))
+ val restored = objectMapper.readValue(objectMapper.writeValueAsString(d),
classOf[LogicalOp])
+ restored shouldBe a[RUDFSourceOpDesc]
+ val r = restored.asInstanceOf[RUDFSourceOpDesc]
+ r.code shouldBe "f"
+ r.workers shouldBe 2
+ r.useTupleAPI shouldBe true
+ r.columns shouldBe List(new Attribute("a", AttributeType.STRING))
+ }
+}