Copilot commented on code in PR #5877: URL: https://github.com/apache/texera/pull/5877#discussion_r3449345284
########## common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/r/RUDFOpDescSpec.scala: ########## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.udf.r + +import org.apache.texera.amber.core.executor.OpExecWithCode +import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema} +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.OperatorGroupConstants +import org.apache.texera.amber.util.JSONUtils.objectMapper +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class RUDFOpDescSpec extends AnyFlatSpec with Matchers { + + private val workflowId = WorkflowIdentity(1L) + private val executionId = ExecutionIdentity(1L) + + "RUDFOpDesc.operatorInfo" should + "advertise the name, R group, and a default 1-in/1-out shape" in { + val info = (new RUDFOpDesc).operatorInfo + info.userFriendlyName shouldBe "R UDF" + info.operatorGroupName shouldBe OperatorGroupConstants.R_GROUP + info.inputPorts should have length 1 + info.outputPorts should have length 1 + } + + "RUDFOpDesc" should "default code/workers/useTupleAPI/retainInputColumns" in { + val d = new RUDFOpDesc + d.code shouldBe "" + d.workers shouldBe 1 + d.useTupleAPI shouldBe false + d.retainInputColumns shouldBe false + } + + "RUDFOpDesc.getPhysicalOp" should "wire OpExecWithCode and carry port identities" in { + val d = new RUDFOpDesc + d.code = "function(t) t" + val physical = d.getPhysicalOp(workflowId, executionId) + physical.opExecInitInfo match { + case OpExecWithCode(code, _) => code shouldBe "function(t) t" + case other => fail(s"expected OpExecWithCode, got $other") + } + physical.inputPorts.keySet shouldBe d.operatorInfo.inputPorts.map(_.id).toSet + physical.outputPorts.keySet shouldBe d.operatorInfo.outputPorts.map(_.id).toSet + } + + it should "reject a non-positive worker count" in { + val d = new RUDFOpDesc + d.workers = 0 + intercept[IllegalArgumentException] { d.getPhysicalOp(workflowId, executionId) } + } + + "RUDFOpDesc schema propagation" should + "emit only the output columns when input columns are not retained (default)" in { + val d = new RUDFOpDesc + d.outputColumns = List(new Attribute("res", AttributeType.INTEGER)) + val input = Schema().add(new Attribute("in", AttributeType.STRING)) + d.getExternalOutputSchemas(Map(d.operatorInfo.inputPorts.head.id -> input)) shouldBe Map( + d.operatorInfo.outputPorts.head.id -> Schema().add( + new Attribute("res", AttributeType.INTEGER) + ) + ) + } Review Comment: `RUDFOpDesc` schema propagation is only asserted for the default `retainInputColumns=false`. The operator’s implementation also supports `retainInputColumns=true` (input schema + outputColumns), and the PR description claims this behavior is pinned for RUDF as well, but there’s no test covering it. ########## common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/r/RUDFSourceOpDescSpec.scala: ########## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.udf.r + +import org.apache.texera.amber.core.executor.OpExecWithCode +import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema} +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.OperatorGroupConstants +import org.apache.texera.amber.util.JSONUtils.objectMapper +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class RUDFSourceOpDescSpec extends AnyFlatSpec with Matchers { + + private val workflowId = WorkflowIdentity(1L) + private val executionId = ExecutionIdentity(1L) + + "RUDFSourceOpDesc.operatorInfo" should + "advertise the 1-out R UDF source (no inputs, one output)" in { + val info = (new RUDFSourceOpDesc).operatorInfo + info.userFriendlyName shouldBe "1-out R UDF" + info.operatorGroupName shouldBe OperatorGroupConstants.R_GROUP + info.inputPorts shouldBe empty + info.outputPorts should have length 1 + } + + "RUDFSourceOpDesc.sourceSchema" should "be empty by default and reflect the configured columns" in { + (new RUDFSourceOpDesc).sourceSchema().getAttributes shouldBe empty + val d = new RUDFSourceOpDesc + d.columns = List(new Attribute("a", AttributeType.STRING)) + d.sourceSchema() shouldBe Schema().add(new Attribute("a", AttributeType.STRING)) + } + + "RUDFSourceOpDesc.getPhysicalOp" should + "wire OpExecWithCode as a source op with one output port" in { + val d = new RUDFSourceOpDesc + d.code = "data.frame(a=1)" + val physical = d.getPhysicalOp(workflowId, executionId) + physical.opExecInitInfo match { + case OpExecWithCode(code, _) => code shouldBe "data.frame(a=1)" + case other => fail(s"expected OpExecWithCode, got $other") + } + physical.outputPorts.keySet shouldBe d.operatorInfo.outputPorts.map(_.id).toSet + } + + it should "reject a non-positive worker count" in { + val d = new RUDFSourceOpDesc + d.code = "x" + d.workers = 0 + intercept[IllegalArgumentException] { d.getPhysicalOp(workflowId, executionId) } + } + + "RUDFSourceOpDesc" should "round-trip its config fields through the polymorphic base" in { + val d = new RUDFSourceOpDesc + d.code = "f" + d.workers = 2 + d.useTupleAPI = true + val restored = objectMapper.readValue(objectMapper.writeValueAsString(d), classOf[LogicalOp]) + restored shouldBe a[RUDFSourceOpDesc] + val r = restored.asInstanceOf[RUDFSourceOpDesc] + r.code shouldBe "f" + r.workers shouldBe 2 + r.useTupleAPI shouldBe true Review Comment: The round-trip test doesn’t include the `columns` field, even though `RUDFSourceOpDesc.sourceSchema()` is derived from it. Adding `columns` to the round-trip assertion better pins the config contract for this source descriptor. ########## common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/r/RUDFOpDescSpec.scala: ########## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.udf.r + +import org.apache.texera.amber.core.executor.OpExecWithCode +import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema} +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.OperatorGroupConstants +import org.apache.texera.amber.util.JSONUtils.objectMapper +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class RUDFOpDescSpec extends AnyFlatSpec with Matchers { + + private val workflowId = WorkflowIdentity(1L) + private val executionId = ExecutionIdentity(1L) + + "RUDFOpDesc.operatorInfo" should + "advertise the name, R group, and a default 1-in/1-out shape" in { + val info = (new RUDFOpDesc).operatorInfo + info.userFriendlyName shouldBe "R UDF" + info.operatorGroupName shouldBe OperatorGroupConstants.R_GROUP + info.inputPorts should have length 1 + info.outputPorts should have length 1 + } + + "RUDFOpDesc" should "default code/workers/useTupleAPI/retainInputColumns" in { + val d = new RUDFOpDesc + d.code shouldBe "" + d.workers shouldBe 1 + d.useTupleAPI shouldBe false + d.retainInputColumns shouldBe false + } + + "RUDFOpDesc.getPhysicalOp" should "wire OpExecWithCode and carry port identities" in { + val d = new RUDFOpDesc + d.code = "function(t) t" + val physical = d.getPhysicalOp(workflowId, executionId) + physical.opExecInitInfo match { + case OpExecWithCode(code, _) => code shouldBe "function(t) t" + case other => fail(s"expected OpExecWithCode, got $other") + } Review Comment: This test matches `OpExecWithCode(code, _)`, so it won’t fail if the operator type accidentally changes (e.g., from the default `"r-table"`). Since `RUDFOpDesc` selects the OpExec type based on `useTupleAPI`, it’s worth pinning the default to avoid regressions. ########## common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/r/RUDFSourceOpDescSpec.scala: ########## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.udf.r + +import org.apache.texera.amber.core.executor.OpExecWithCode +import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema} +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.OperatorGroupConstants +import org.apache.texera.amber.util.JSONUtils.objectMapper +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class RUDFSourceOpDescSpec extends AnyFlatSpec with Matchers { + + private val workflowId = WorkflowIdentity(1L) + private val executionId = ExecutionIdentity(1L) + + "RUDFSourceOpDesc.operatorInfo" should + "advertise the 1-out R UDF source (no inputs, one output)" in { + val info = (new RUDFSourceOpDesc).operatorInfo + info.userFriendlyName shouldBe "1-out R UDF" + info.operatorGroupName shouldBe OperatorGroupConstants.R_GROUP + info.inputPorts shouldBe empty + info.outputPorts should have length 1 + } + + "RUDFSourceOpDesc.sourceSchema" should "be empty by default and reflect the configured columns" in { + (new RUDFSourceOpDesc).sourceSchema().getAttributes shouldBe empty + val d = new RUDFSourceOpDesc + d.columns = List(new Attribute("a", AttributeType.STRING)) + d.sourceSchema() shouldBe Schema().add(new Attribute("a", AttributeType.STRING)) + } + + "RUDFSourceOpDesc.getPhysicalOp" should + "wire OpExecWithCode as a source op with one output port" in { + val d = new RUDFSourceOpDesc + d.code = "data.frame(a=1)" + val physical = d.getPhysicalOp(workflowId, executionId) + physical.opExecInitInfo match { + case OpExecWithCode(code, _) => code shouldBe "data.frame(a=1)" + case other => fail(s"expected OpExecWithCode, got $other") + } Review Comment: This test matches `OpExecWithCode(code, _)`, so it won’t catch regressions where the R operator type changes. Since `RUDFSourceOpDesc` chooses `"r-table"` vs `"r-tuple"` based on `useTupleAPI`, consider asserting the default operator type here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
