This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch 
gh-readonly-queue/main/pr-5876-3ced705c721574a06ce7031019fdbbb5ce22106e
in repository https://gitbox.apache.org/repos/asf/texera.git

commit 0f98a140714f87059458e06a76234cf4a5ec98b0
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sun Jun 21 23:46:32 2026 -0700

    test(workflow-operator): add unit test coverage for Python UDF operator 
descriptors (#5876)
    
    ### What changes were proposed in this PR?
    
    Pin behavior of three previously-untested Python-UDF descriptors in
    `common/workflow-operator/udf/python/`. No production-code changes.
    
    | Spec | Source class | Tests |
    | --- | --- | --- |
    | `PythonUDFOpDescV2Spec` | `PythonUDFOpDescV2` | 7 |
    | `PythonUDFSourceOpDescV2Spec` | `PythonUDFSourceOpDescV2` | 5 |
    | `PythonTableReducerOpDescSpec` | `PythonTableReducerOpDesc` | 6 |
    
    **Behavior pinned**
    
    | Surface | Contract |
    | --- | --- |
    | `operatorInfo` | exact name + `PYTHON_GROUP`; `PythonUDFOpDescV2`
    dynamic 1-in/1-out; `PythonUDFSourceOpDescV2` zero inputs / one output +
    `supportReconfiguration` |
    | `getPhysicalOp` | wires `OpExecWithCode(code, "python")`; port
    identities carried; rejects `workers <= 0` (`IllegalArgumentException`)
    |
    | Schema propagation | `PythonUDFOpDescV2`: `retainInputColumns=false` →
    only the output columns, `true` → input + output columns (full map keyed
    by the declared output port); `PythonUDFSourceOpDescV2`:
    `sourceSchema()` from the `columns` field; `PythonTableReducerOpDesc`:
    `getOutputSchemas` folds the lambda units and rejects the empty default
    |
    | Round-trip | config fields preserved through the polymorphic base |
    
    ### Any related issues, documentation, discussions?
    
    Closes #5873.
    
    ### How was this PR tested?
    
    - `sbt "WorkflowOperator/testOnly
    org.apache.texera.amber.operator.udf.python.PythonUDFOpDescV2Spec
    
org.apache.texera.amber.operator.udf.python.source.PythonUDFSourceOpDescV2Spec
    org.apache.texera.amber.operator.udf.python.PythonTableReducerOpDescSpec"`
    — 18 tests, all green
    - `sbt "WorkflowOperator/Test/scalafmtCheck"` and `sbt
    "WorkflowOperator/Test/scalafix --check"` — clean
    - CI to confirm
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (Opus 4.8 [1M context])
---
 .../udf/python/PythonTableReducerOpDescSpec.scala  |  83 +++++++++++++
 .../udf/python/PythonUDFOpDescV2Spec.scala         | 137 +++++++++++++++++++++
 .../source/PythonUDFSourceOpDescV2Spec.scala       |  89 +++++++++++++
 3 files changed, 309 insertions(+)

diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonTableReducerOpDescSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonTableReducerOpDescSpec.scala
new file mode 100644
index 0000000000..ba1a7a20ff
--- /dev/null
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonTableReducerOpDescSpec.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.udf.python
+
+import org.apache.texera.amber.core.tuple.{AttributeType, Schema}
+import org.apache.texera.amber.operator.LogicalOp
+import org.apache.texera.amber.operator.metadata.OperatorGroupConstants
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class PythonTableReducerOpDescSpec extends AnyFlatSpec with Matchers {
+
+  private def unit(name: String, expr: String, t: AttributeType): 
LambdaAttributeUnit =
+    new LambdaAttributeUnit(name, expr, null, t)
+
+  "PythonTableReducerOpDesc.operatorInfo" should "advertise the name and 
Python group" in {
+    val info = (new PythonTableReducerOpDesc).operatorInfo
+    info.userFriendlyName shouldBe "Python Table Reducer"
+    info.operatorDescription shouldBe "Reduce Table to Tuple"
+    info.operatorGroupName shouldBe OperatorGroupConstants.PYTHON_GROUP
+    info.inputPorts should have length 1
+    info.outputPorts should have length 1
+  }
+
+  "PythonTableReducerOpDesc" should "default lambdaAttributeUnits to an empty 
list" in {
+    (new PythonTableReducerOpDesc).lambdaAttributeUnits shouldBe empty
+  }
+
+  "PythonTableReducerOpDesc.getOutputSchemas" should
+    "fold each lambda unit into an output column keyed by the declared output 
port" in {
+    val d = new PythonTableReducerOpDesc
+    d.lambdaAttributeUnits = List(unit("score", "1 + 1", 
AttributeType.INTEGER))
+    d.getOutputSchemas(Map.empty) shouldBe Map(
+      d.operatorInfo.outputPorts.head.id -> Schema().add("score", 
AttributeType.INTEGER)
+    )
+  }
+
+  it should "reject an empty lambda list" in {
+    intercept[IllegalArgumentException] {
+      (new PythonTableReducerOpDesc).getOutputSchemas(Map.empty)
+    }
+  }
+
+  "PythonTableReducerOpDesc.generatePythonCode" should "emit the reducer table 
operator" in {
+    val d = new PythonTableReducerOpDesc
+    d.lambdaAttributeUnits = List(unit("score", "1 + 1", 
AttributeType.INTEGER))
+    val code = d.generatePythonCode()
+    code should include("class ProcessTableOperator(UDFTableOperator)")
+    code should include("score")
+  }
+
+  "PythonTableReducerOpDesc" should "round-trip its lambda units through the 
polymorphic base" in {
+    val d = new PythonTableReducerOpDesc
+    d.lambdaAttributeUnits =
+      List(new LambdaAttributeUnit("score", "1 + 1", "scoreOut", 
AttributeType.INTEGER))
+    val restored = objectMapper.readValue(objectMapper.writeValueAsString(d), 
classOf[LogicalOp])
+    restored shouldBe a[PythonTableReducerOpDesc]
+    val r = restored.asInstanceOf[PythonTableReducerOpDesc]
+    r.lambdaAttributeUnits should have length 1
+    r.lambdaAttributeUnits.head.attributeName shouldBe "score"
+    r.lambdaAttributeUnits.head.expression shouldBe "1 + 1"
+    r.lambdaAttributeUnits.head.newAttributeName shouldBe "scoreOut"
+    r.lambdaAttributeUnits.head.attributeType shouldBe AttributeType.INTEGER
+  }
+}
diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonUDFOpDescV2Spec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonUDFOpDescV2Spec.scala
new file mode 100644
index 0000000000..be319f4e41
--- /dev/null
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonUDFOpDescV2Spec.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.udf.python
+
+import org.apache.texera.amber.core.executor.OpExecWithCode
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema}
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
+import org.apache.texera.amber.operator.LogicalOp
+import org.apache.texera.amber.operator.metadata.OperatorGroupConstants
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class PythonUDFOpDescV2Spec extends AnyFlatSpec with Matchers {
+
+  private val workflowId = WorkflowIdentity(1L)
+  private val executionId = ExecutionIdentity(1L)
+
+  "PythonUDFOpDescV2.operatorInfo" should
+    "advertise the name, Python group, dynamic ports, and a default 1-in/1-out 
shape" in {
+    val info = (new PythonUDFOpDescV2).operatorInfo
+    info.userFriendlyName shouldBe "Python UDF"
+    info.operatorGroupName shouldBe OperatorGroupConstants.PYTHON_GROUP
+    info.dynamicInputPorts shouldBe true
+    info.dynamicOutputPorts shouldBe true
+    info.inputPorts should have length 1
+    info.outputPorts should have length 1
+  }
+
+  "PythonUDFOpDescV2" should "default code/workers/flags" in {
+    val d = new PythonUDFOpDescV2
+    d.code shouldBe ""
+    d.workers shouldBe 1
+    d.retainInputColumns shouldBe false
+    d.defaultEnv shouldBe true
+  }
+
+  "PythonUDFOpDescV2.getPhysicalOp" should
+    "wire OpExecWithCode(code, \"python\") and carry port identities" in {
+    val d = new PythonUDFOpDescV2
+    d.code = "yield t"
+    val physical = d.getPhysicalOp(workflowId, executionId)
+    physical.opExecInitInfo match {
+      case OpExecWithCode(code, language) =>
+        language shouldBe "python"
+        code shouldBe "yield t"
+      case other => fail(s"expected OpExecWithCode, got $other")
+    }
+    physical.inputPorts.keySet shouldBe 
d.operatorInfo.inputPorts.map(_.id).toSet
+    physical.outputPorts.keySet shouldBe 
d.operatorInfo.outputPorts.map(_.id).toSet
+  }
+
+  it should "reject a non-positive worker count" in {
+    val d = new PythonUDFOpDescV2
+    d.workers = 0
+    intercept[IllegalArgumentException] { d.getPhysicalOp(workflowId, 
executionId) }
+  }
+
+  it should "reject a blank virtual-environment name when the default env is 
disabled" in {
+    val d = new PythonUDFOpDescV2
+    d.defaultEnv = false
+    d.envName = "   "
+    intercept[RuntimeException] { d.getPhysicalOp(workflowId, executionId) }
+  }
+
+  "PythonUDFOpDescV2 schema propagation" should
+    "emit only the output columns when input columns are not retained 
(default)" in {
+    val d = new PythonUDFOpDescV2
+    d.outputColumns = List(new Attribute("res", AttributeType.INTEGER))
+    val input = Schema().add(new Attribute("in", AttributeType.STRING))
+    val out = d.getExternalOutputSchemas(Map(d.operatorInfo.inputPorts.head.id 
-> input))
+    out shouldBe Map(
+      d.operatorInfo.outputPorts.head.id -> Schema().add(
+        new Attribute("res", AttributeType.INTEGER)
+      )
+    )
+  }
+
+  it should "retain input columns plus the output columns when 
retainInputColumns is true" in {
+    val d = new PythonUDFOpDescV2
+    d.retainInputColumns = true
+    d.outputColumns = List(new Attribute("res", AttributeType.INTEGER))
+    val input = Schema().add(new Attribute("in", AttributeType.STRING))
+    val out = d.getExternalOutputSchemas(Map(d.operatorInfo.inputPorts.head.id 
-> input))
+    out shouldBe Map(
+      d.operatorInfo.outputPorts.head.id -> Schema()
+        .add(new Attribute("in", AttributeType.STRING))
+        .add(new Attribute("res", AttributeType.INTEGER))
+    )
+  }
+
+  it should "reject an output column that collides with a retained input 
column" in {
+    val d = new PythonUDFOpDescV2
+    d.retainInputColumns = true
+    d.outputColumns = List(new Attribute("dup", AttributeType.INTEGER))
+    val input = Schema().add(new Attribute("dup", AttributeType.STRING))
+    intercept[RuntimeException] {
+      d.getExternalOutputSchemas(Map(d.operatorInfo.inputPorts.head.id -> 
input))
+    }
+  }
+
+  "PythonUDFOpDescV2" should "round-trip its config fields through the 
polymorphic base" in {
+    val d = new PythonUDFOpDescV2
+    d.code = "print(1)"
+    d.workers = 3
+    d.retainInputColumns = true
+    d.defaultEnv = false
+    d.envName = "myenv"
+    d.outputColumns = List(new Attribute("res", AttributeType.INTEGER))
+    val restored = objectMapper.readValue(objectMapper.writeValueAsString(d), 
classOf[LogicalOp])
+    restored shouldBe a[PythonUDFOpDescV2]
+    val p = restored.asInstanceOf[PythonUDFOpDescV2]
+    p.code shouldBe "print(1)"
+    p.workers shouldBe 3
+    p.retainInputColumns shouldBe true
+    p.defaultEnv shouldBe false
+    p.envName shouldBe "myenv"
+    p.outputColumns shouldBe List(new Attribute("res", AttributeType.INTEGER))
+  }
+}
diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/source/PythonUDFSourceOpDescV2Spec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/source/PythonUDFSourceOpDescV2Spec.scala
new file mode 100644
index 0000000000..f0c3dfa41f
--- /dev/null
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/source/PythonUDFSourceOpDescV2Spec.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.udf.python.source
+
+import org.apache.texera.amber.core.executor.OpExecWithCode
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema}
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
+import org.apache.texera.amber.operator.LogicalOp
+import org.apache.texera.amber.operator.metadata.OperatorGroupConstants
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class PythonUDFSourceOpDescV2Spec extends AnyFlatSpec with Matchers {
+
+  private val workflowId = WorkflowIdentity(1L)
+  private val executionId = ExecutionIdentity(1L)
+
+  "PythonUDFSourceOpDescV2.operatorInfo" should
+    "advertise the 1-out Python UDF source (no inputs, one output, 
reconfigurable)" in {
+    val info = (new PythonUDFSourceOpDescV2).operatorInfo
+    info.userFriendlyName shouldBe "1-out Python UDF"
+    info.operatorGroupName shouldBe OperatorGroupConstants.PYTHON_GROUP
+    info.inputPorts shouldBe empty
+    info.outputPorts should have length 1
+    info.supportReconfiguration shouldBe true
+  }
+
+  "PythonUDFSourceOpDescV2.sourceSchema" should "be empty by default and 
reflect the configured columns" in {
+    (new PythonUDFSourceOpDescV2).sourceSchema().getAttributes shouldBe empty
+    val d = new PythonUDFSourceOpDescV2
+    d.columns = List(new Attribute("a", AttributeType.STRING))
+    d.sourceSchema() shouldBe Schema().add(new Attribute("a", 
AttributeType.STRING))
+  }
+
+  "PythonUDFSourceOpDescV2.getPhysicalOp" should
+    "wire OpExecWithCode(code, \"python\") as a source op with one output 
port" in {
+    val d = new PythonUDFSourceOpDescV2
+    d.code = "yield {'a': 1}"
+    val physical = d.getPhysicalOp(workflowId, executionId)
+    physical.opExecInitInfo match {
+      case OpExecWithCode(code, language) =>
+        language shouldBe "python"
+        code shouldBe "yield {'a': 1}"
+      case other => fail(s"expected OpExecWithCode, got $other")
+    }
+    physical.outputPorts.keySet shouldBe 
d.operatorInfo.outputPorts.map(_.id).toSet
+  }
+
+  it should "reject a non-positive worker count" in {
+    val d = new PythonUDFSourceOpDescV2
+    d.workers = 0
+    intercept[IllegalArgumentException] { d.getPhysicalOp(workflowId, 
executionId) }
+  }
+
+  "PythonUDFSourceOpDescV2" should "round-trip its config fields through the 
polymorphic base" in {
+    val d = new PythonUDFSourceOpDescV2
+    d.code = "yield"
+    d.workers = 2
+    d.defaultEnv = false
+    d.envName = "venv"
+    d.columns = List(new Attribute("a", AttributeType.STRING))
+    val restored = objectMapper.readValue(objectMapper.writeValueAsString(d), 
classOf[LogicalOp])
+    restored shouldBe a[PythonUDFSourceOpDescV2]
+    val p = restored.asInstanceOf[PythonUDFSourceOpDescV2]
+    p.code shouldBe "yield"
+    p.workers shouldBe 2
+    p.defaultEnv shouldBe false
+    p.envName shouldBe "venv"
+    p.columns shouldBe List(new Attribute("a", AttributeType.STRING))
+  }
+}

Reply via email to