Copilot commented on code in PR #5876:
URL: https://github.com/apache/texera/pull/5876#discussion_r3449345931


##########
common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonUDFOpDescV2Spec.scala:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.udf.python
+
+import org.apache.texera.amber.core.executor.OpExecWithCode
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema}
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
+import org.apache.texera.amber.operator.LogicalOp
+import org.apache.texera.amber.operator.metadata.OperatorGroupConstants
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class PythonUDFOpDescV2Spec extends AnyFlatSpec with Matchers {
+
+  private val workflowId = WorkflowIdentity(1L)
+  private val executionId = ExecutionIdentity(1L)
+
+  "PythonUDFOpDescV2.operatorInfo" should
+    "advertise the name, Python group, dynamic ports, and a default 1-in/1-out 
shape" in {
+    val info = (new PythonUDFOpDescV2).operatorInfo
+    info.userFriendlyName shouldBe "Python UDF"
+    info.operatorGroupName shouldBe OperatorGroupConstants.PYTHON_GROUP
+    info.dynamicInputPorts shouldBe true
+    info.dynamicOutputPorts shouldBe true
+    info.inputPorts should have length 1
+    info.outputPorts should have length 1
+  }
+
+  "PythonUDFOpDescV2" should "default code/workers/flags" in {
+    val d = new PythonUDFOpDescV2
+    d.code shouldBe ""
+    d.workers shouldBe 1
+    d.retainInputColumns shouldBe false
+    d.defaultEnv shouldBe true
+  }
+
+  "PythonUDFOpDescV2.getPhysicalOp" should
+    "wire OpExecWithCode(code, \"python\") and carry port identities" in {
+    val d = new PythonUDFOpDescV2
+    d.code = "yield t"
+    val physical = d.getPhysicalOp(workflowId, executionId)
+    physical.opExecInitInfo match {
+      case OpExecWithCode(code, language) =>
+        language shouldBe "python"
+        code shouldBe "yield t"
+      case other => fail(s"expected OpExecWithCode, got $other")
+    }
+    physical.inputPorts.keySet shouldBe 
d.operatorInfo.inputPorts.map(_.id).toSet
+    physical.outputPorts.keySet shouldBe 
d.operatorInfo.outputPorts.map(_.id).toSet
+  }
+
+  it should "reject a non-positive worker count" in {
+    val d = new PythonUDFOpDescV2
+    d.workers = 0
+    intercept[IllegalArgumentException] { d.getPhysicalOp(workflowId, 
executionId) }
+  }

Review Comment:
   The PR description says the UDF descriptors’ getPhysicalOp validates the 
selected Python environment (rejects blank envName when defaultEnv=false). This 
spec currently only tests the workers<=0 validation; it should also pin the 
envName validation branch in PythonUDFOpDescV2.getPhysicalOp (RuntimeException 
when envName is blank/whitespace).



##########
common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonUDFOpDescV2Spec.scala:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.udf.python
+
+import org.apache.texera.amber.core.executor.OpExecWithCode
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema}
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
+import org.apache.texera.amber.operator.LogicalOp
+import org.apache.texera.amber.operator.metadata.OperatorGroupConstants
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class PythonUDFOpDescV2Spec extends AnyFlatSpec with Matchers {
+
+  private val workflowId = WorkflowIdentity(1L)
+  private val executionId = ExecutionIdentity(1L)
+
+  "PythonUDFOpDescV2.operatorInfo" should
+    "advertise the name, Python group, dynamic ports, and a default 1-in/1-out 
shape" in {
+    val info = (new PythonUDFOpDescV2).operatorInfo
+    info.userFriendlyName shouldBe "Python UDF"
+    info.operatorGroupName shouldBe OperatorGroupConstants.PYTHON_GROUP
+    info.dynamicInputPorts shouldBe true
+    info.dynamicOutputPorts shouldBe true
+    info.inputPorts should have length 1
+    info.outputPorts should have length 1
+  }
+
+  "PythonUDFOpDescV2" should "default code/workers/flags" in {
+    val d = new PythonUDFOpDescV2
+    d.code shouldBe ""
+    d.workers shouldBe 1
+    d.retainInputColumns shouldBe false
+    d.defaultEnv shouldBe true
+  }
+
+  "PythonUDFOpDescV2.getPhysicalOp" should
+    "wire OpExecWithCode(code, \"python\") and carry port identities" in {
+    val d = new PythonUDFOpDescV2
+    d.code = "yield t"
+    val physical = d.getPhysicalOp(workflowId, executionId)
+    physical.opExecInitInfo match {
+      case OpExecWithCode(code, language) =>
+        language shouldBe "python"
+        code shouldBe "yield t"
+      case other => fail(s"expected OpExecWithCode, got $other")
+    }
+    physical.inputPorts.keySet shouldBe 
d.operatorInfo.inputPorts.map(_.id).toSet
+    physical.outputPorts.keySet shouldBe 
d.operatorInfo.outputPorts.map(_.id).toSet
+  }
+
+  it should "reject a non-positive worker count" in {
+    val d = new PythonUDFOpDescV2
+    d.workers = 0
+    intercept[IllegalArgumentException] { d.getPhysicalOp(workflowId, 
executionId) }
+  }
+
+  "PythonUDFOpDescV2 schema propagation" should
+    "emit only the output columns when input columns are not retained 
(default)" in {
+    val d = new PythonUDFOpDescV2
+    d.outputColumns = List(new Attribute("res", AttributeType.INTEGER))
+    val input = Schema().add(new Attribute("in", AttributeType.STRING))
+    val out = d.getExternalOutputSchemas(Map(d.operatorInfo.inputPorts.head.id 
-> input))
+    out shouldBe Map(
+      d.operatorInfo.outputPorts.head.id -> Schema().add(
+        new Attribute("res", AttributeType.INTEGER)
+      )
+    )
+  }
+
+  it should "retain input columns plus the output columns when 
retainInputColumns is true" in {
+    val d = new PythonUDFOpDescV2
+    d.retainInputColumns = true
+    d.outputColumns = List(new Attribute("res", AttributeType.INTEGER))
+    val input = Schema().add(new Attribute("in", AttributeType.STRING))
+    val out = d.getExternalOutputSchemas(Map(d.operatorInfo.inputPorts.head.id 
-> input))
+    out shouldBe Map(
+      d.operatorInfo.outputPorts.head.id -> Schema()
+        .add(new Attribute("in", AttributeType.STRING))
+        .add(new Attribute("res", AttributeType.INTEGER))
+    )
+  }

Review Comment:
   PythonUDFOpDescV2.getPhysicalOp explicitly throws when 
retainInputColumns=true and an outputColumns name collides with an existing 
input column. The PR description mentions this collision behavior is pinned, 
but the spec currently doesn’t cover it; add a regression test that asserts a 
RuntimeException for the duplicate-name case.



##########
common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/source/PythonUDFSourceOpDescV2Spec.scala:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.udf.python.source
+
+import org.apache.texera.amber.core.executor.OpExecWithCode
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema}
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
+import org.apache.texera.amber.operator.LogicalOp
+import org.apache.texera.amber.operator.metadata.OperatorGroupConstants
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class PythonUDFSourceOpDescV2Spec extends AnyFlatSpec with Matchers {
+
+  private val workflowId = WorkflowIdentity(1L)
+  private val executionId = ExecutionIdentity(1L)
+
+  "PythonUDFSourceOpDescV2.operatorInfo" should
+    "advertise the 1-out Python UDF source (no inputs, one output, 
reconfigurable)" in {
+    val info = (new PythonUDFSourceOpDescV2).operatorInfo
+    info.userFriendlyName shouldBe "1-out Python UDF"
+    info.operatorGroupName shouldBe OperatorGroupConstants.PYTHON_GROUP
+    info.inputPorts shouldBe empty
+    info.outputPorts should have length 1
+    info.supportReconfiguration shouldBe true
+  }
+
+  "PythonUDFSourceOpDescV2.sourceSchema" should "be empty by default and 
reflect the configured columns" in {
+    (new PythonUDFSourceOpDescV2).sourceSchema().getAttributes shouldBe empty
+    val d = new PythonUDFSourceOpDescV2
+    d.columns = List(new Attribute("a", AttributeType.STRING))
+    d.sourceSchema() shouldBe Schema().add(new Attribute("a", 
AttributeType.STRING))
+  }
+
+  "PythonUDFSourceOpDescV2.getPhysicalOp" should
+    "wire OpExecWithCode(code, \"python\") as a source op with one output 
port" in {
+    val d = new PythonUDFSourceOpDescV2
+    d.code = "yield {'a': 1}"
+    val physical = d.getPhysicalOp(workflowId, executionId)
+    physical.opExecInitInfo match {
+      case OpExecWithCode(code, language) =>
+        language shouldBe "python"
+        code shouldBe "yield {'a': 1}"
+      case other => fail(s"expected OpExecWithCode, got $other")
+    }
+    physical.outputPorts.keySet shouldBe 
d.operatorInfo.outputPorts.map(_.id).toSet
+  }
+
+  it should "reject a non-positive worker count" in {
+    val d = new PythonUDFSourceOpDescV2
+    d.workers = 0
+    intercept[IllegalArgumentException] { d.getPhysicalOp(workflowId, 
executionId) }
+  }
+
+  "PythonUDFSourceOpDescV2" should "round-trip its config fields through the 
polymorphic base" in {
+    val d = new PythonUDFSourceOpDescV2
+    d.code = "yield"
+    d.workers = 2
+    d.defaultEnv = false
+    d.envName = "venv"
+    val restored = objectMapper.readValue(objectMapper.writeValueAsString(d), 
classOf[LogicalOp])
+    restored shouldBe a[PythonUDFSourceOpDescV2]
+    val p = restored.asInstanceOf[PythonUDFSourceOpDescV2]
+    p.code shouldBe "yield"
+    p.workers shouldBe 2
+    p.defaultEnv shouldBe false
+    p.envName shouldBe "venv"
+  }

Review Comment:
   The round-trip test for PythonUDFSourceOpDescV2 doesn’t currently cover the 
`columns` field, even though it’s part of the persisted operator config and 
influences sourceSchema(). Adding `columns` to the serialization test would 
better pin the contract for this descriptor.



##########
common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonUDFOpDescV2Spec.scala:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.udf.python
+
+import org.apache.texera.amber.core.executor.OpExecWithCode
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema}
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
+import org.apache.texera.amber.operator.LogicalOp
+import org.apache.texera.amber.operator.metadata.OperatorGroupConstants
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class PythonUDFOpDescV2Spec extends AnyFlatSpec with Matchers {
+
+  private val workflowId = WorkflowIdentity(1L)
+  private val executionId = ExecutionIdentity(1L)
+
+  "PythonUDFOpDescV2.operatorInfo" should
+    "advertise the name, Python group, dynamic ports, and a default 1-in/1-out 
shape" in {
+    val info = (new PythonUDFOpDescV2).operatorInfo
+    info.userFriendlyName shouldBe "Python UDF"
+    info.operatorGroupName shouldBe OperatorGroupConstants.PYTHON_GROUP
+    info.dynamicInputPorts shouldBe true
+    info.dynamicOutputPorts shouldBe true
+    info.inputPorts should have length 1
+    info.outputPorts should have length 1
+  }
+
+  "PythonUDFOpDescV2" should "default code/workers/flags" in {
+    val d = new PythonUDFOpDescV2
+    d.code shouldBe ""
+    d.workers shouldBe 1
+    d.retainInputColumns shouldBe false
+    d.defaultEnv shouldBe true
+  }
+
+  "PythonUDFOpDescV2.getPhysicalOp" should
+    "wire OpExecWithCode(code, \"python\") and carry port identities" in {
+    val d = new PythonUDFOpDescV2
+    d.code = "yield t"
+    val physical = d.getPhysicalOp(workflowId, executionId)
+    physical.opExecInitInfo match {
+      case OpExecWithCode(code, language) =>
+        language shouldBe "python"
+        code shouldBe "yield t"
+      case other => fail(s"expected OpExecWithCode, got $other")
+    }
+    physical.inputPorts.keySet shouldBe 
d.operatorInfo.inputPorts.map(_.id).toSet
+    physical.outputPorts.keySet shouldBe 
d.operatorInfo.outputPorts.map(_.id).toSet
+  }
+
+  it should "reject a non-positive worker count" in {
+    val d = new PythonUDFOpDescV2
+    d.workers = 0
+    intercept[IllegalArgumentException] { d.getPhysicalOp(workflowId, 
executionId) }
+  }
+
+  "PythonUDFOpDescV2 schema propagation" should
+    "emit only the output columns when input columns are not retained 
(default)" in {
+    val d = new PythonUDFOpDescV2
+    d.outputColumns = List(new Attribute("res", AttributeType.INTEGER))
+    val input = Schema().add(new Attribute("in", AttributeType.STRING))
+    val out = d.getExternalOutputSchemas(Map(d.operatorInfo.inputPorts.head.id 
-> input))
+    out shouldBe Map(
+      d.operatorInfo.outputPorts.head.id -> Schema().add(
+        new Attribute("res", AttributeType.INTEGER)
+      )
+    )
+  }
+
+  it should "retain input columns plus the output columns when 
retainInputColumns is true" in {
+    val d = new PythonUDFOpDescV2
+    d.retainInputColumns = true
+    d.outputColumns = List(new Attribute("res", AttributeType.INTEGER))
+    val input = Schema().add(new Attribute("in", AttributeType.STRING))
+    val out = d.getExternalOutputSchemas(Map(d.operatorInfo.inputPorts.head.id 
-> input))
+    out shouldBe Map(
+      d.operatorInfo.outputPorts.head.id -> Schema()
+        .add(new Attribute("in", AttributeType.STRING))
+        .add(new Attribute("res", AttributeType.INTEGER))
+    )
+  }
+
+  "PythonUDFOpDescV2" should "round-trip its config fields through the 
polymorphic base" in {
+    val d = new PythonUDFOpDescV2
+    d.code = "print(1)"
+    d.workers = 3
+    d.retainInputColumns = true
+    d.defaultEnv = false
+    d.envName = "myenv"
+    val restored = objectMapper.readValue(objectMapper.writeValueAsString(d), 
classOf[LogicalOp])
+    restored shouldBe a[PythonUDFOpDescV2]
+    val p = restored.asInstanceOf[PythonUDFOpDescV2]
+    p.code shouldBe "print(1)"
+    p.workers shouldBe 3
+    p.retainInputColumns shouldBe true
+    p.defaultEnv shouldBe false
+    p.envName shouldBe "myenv"
+  }

Review Comment:
   The round-trip test for PythonUDFOpDescV2 doesn’t currently include 
outputColumns, even though it’s a persisted config field on the descriptor. 
Including it would better pin the polymorphic JSON serialization contract for 
this operator descriptor.



##########
common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonTableReducerOpDescSpec.scala:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.udf.python
+
+import org.apache.texera.amber.core.tuple.{AttributeType, Schema}
+import org.apache.texera.amber.operator.LogicalOp
+import org.apache.texera.amber.operator.metadata.OperatorGroupConstants
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class PythonTableReducerOpDescSpec extends AnyFlatSpec with Matchers {
+
+  private def unit(name: String, expr: String, t: AttributeType): 
LambdaAttributeUnit =
+    new LambdaAttributeUnit(name, expr, null, t)
+
+  "PythonTableReducerOpDesc.operatorInfo" should "advertise the name and 
Python group" in {
+    val info = (new PythonTableReducerOpDesc).operatorInfo
+    info.userFriendlyName shouldBe "Python Table Reducer"
+    info.operatorDescription shouldBe "Reduce Table to Tuple"
+    info.operatorGroupName shouldBe OperatorGroupConstants.PYTHON_GROUP
+    info.inputPorts should have length 1
+    info.outputPorts should have length 1
+  }
+
+  "PythonTableReducerOpDesc" should "default lambdaAttributeUnits to an empty 
list" in {
+    (new PythonTableReducerOpDesc).lambdaAttributeUnits shouldBe empty
+  }
+
+  "PythonTableReducerOpDesc.getOutputSchemas" should
+    "fold each lambda unit into an output column keyed by the declared output 
port" in {
+    val d = new PythonTableReducerOpDesc
+    d.lambdaAttributeUnits = List(unit("score", "1 + 1", 
AttributeType.INTEGER))
+    d.getOutputSchemas(Map.empty) shouldBe Map(
+      d.operatorInfo.outputPorts.head.id -> Schema().add("score", 
AttributeType.INTEGER)
+    )
+  }
+
+  it should "reject an empty lambda list" in {
+    intercept[IllegalArgumentException] {
+      (new PythonTableReducerOpDesc).getOutputSchemas(Map.empty)
+    }
+  }
+
+  "PythonTableReducerOpDesc.generatePythonCode" should "emit the reducer table 
operator" in {
+    val d = new PythonTableReducerOpDesc
+    d.lambdaAttributeUnits = List(unit("score", "1 + 1", 
AttributeType.INTEGER))
+    val code = d.generatePythonCode()
+    code should include("class ProcessTableOperator(UDFTableOperator)")
+    code should include("score")
+  }
+
+  "PythonTableReducerOpDesc" should "round-trip its lambda units through the 
polymorphic base" in {
+    val d = new PythonTableReducerOpDesc
+    d.lambdaAttributeUnits = List(unit("score", "1 + 1", 
AttributeType.INTEGER))
+    val restored = objectMapper.readValue(objectMapper.writeValueAsString(d), 
classOf[LogicalOp])
+    restored shouldBe a[PythonTableReducerOpDesc]
+    val r = restored.asInstanceOf[PythonTableReducerOpDesc]
+    r.lambdaAttributeUnits should have length 1
+    r.lambdaAttributeUnits.head.attributeName shouldBe "score"
+    r.lambdaAttributeUnits.head.attributeType shouldBe AttributeType.INTEGER
+  }

Review Comment:
   The PythonTableReducerOpDesc round-trip test only asserts attributeName and 
attributeType. Since LambdaAttributeUnit also contains `expression` and 
`newAttributeName` (and they are part of the serialized config), asserting 
those as well would better pin the JSON contract.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to