This is an automated email from the ASF dual-hosted git repository.
linxinyuan pushed a commit to branch xinyuan-source-port
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/xinyuan-source-port by this
push:
new be49745b99 update
be49745b99 is described below
commit be49745b99ae24cbcda19fba463054035c750e25
Author: Xinyuan Lin <[email protected]>
AuthorDate: Mon Apr 6 00:11:24 2026 -0700
update
---
.../scan/InputFileScanSourceOpDescSpec.scala | 77 ----------------------
.../amber/compiler/model/LogicalPlanSpec.scala | 57 ----------------
2 files changed, 134 deletions(-)
diff --git
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpDescSpec.scala
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpDescSpec.scala
deleted file mode 100644
index e21cc58b36..0000000000
---
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpDescSpec.scala
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.texera.amber.operator.source.scan
-
-import org.apache.texera.amber.core.storage.FileResolver
-import org.apache.texera.amber.core.tuple.{AttributeType, Schema,
SchemaEnforceable, Tuple}
-import org.apache.texera.amber.operator.TestOperators
-import org.apache.texera.amber.util.JSONUtils.objectMapper
-import org.scalatest.flatspec.AnyFlatSpec
-
-class InputFileScanSourceOpDescSpec extends AnyFlatSpec {
-
- it should "require a filename input port" in {
- val desc = new InputFileScanSourceOpDesc()
- assert(desc.operatorInfo.inputPorts.length == 1)
- assert(desc.operatorInfo.inputPorts.head.displayName == "Filename")
- }
-
- it should "not expose fileName property in descriptor serialization" in {
- val desc = new InputFileScanSourceOpDesc()
- val descriptorJson =
objectMapper.valueToTree[com.fasterxml.jackson.databind.JsonNode](desc)
- assert(!descriptorJson.has("fileName"))
- }
-
- it should "scan file content using filename from input tuple" in {
- val desc = new InputFileScanSourceOpDesc()
- desc.attributeType = FileAttributeType.SINGLE_STRING
-
- val executor = new
InputFileScanSourceOpExec(objectMapper.writeValueAsString(desc))
- val inputTuple = Tuple(
- Schema().add("filename", AttributeType.STRING),
- Array(FileResolver.resolve(TestOperators.TestTextFilePath).toASCIIString)
- )
-
- executor.open()
- executor.processTupleMultiPort(inputTuple, 0)
- val tuples = executor
- .produceTuple()
- .map(tupleLike =>
tupleLike.asInstanceOf[SchemaEnforceable].enforceSchema(desc.sourceSchema()))
- .toSeq
- executor.close()
-
- assert(tuples.length == 1)
- assert(
- tuples.head
- .getField[String]("line")
-
.equals("line1\nline2\nline3\nline4\nline5\nline6\nline7\nline8\nline9\nline10")
- )
- }
-
- it should "reject execution when no filename input tuple is provided" in {
- val desc = new InputFileScanSourceOpDesc()
- desc.attributeType = FileAttributeType.SINGLE_STRING
-
- val executor = new
InputFileScanSourceOpExec(objectMapper.writeValueAsString(desc))
- executor.open()
- assertThrows[IllegalStateException](executor.produceTuple().toList)
- executor.close()
- }
-}
diff --git
a/workflow-compiling-service/src/test/scala/org/apache/texera/amber/compiler/model/LogicalPlanSpec.scala
b/workflow-compiling-service/src/test/scala/org/apache/texera/amber/compiler/model/LogicalPlanSpec.scala
deleted file mode 100644
index b0c16bb18c..0000000000
---
a/workflow-compiling-service/src/test/scala/org/apache/texera/amber/compiler/model/LogicalPlanSpec.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.texera.amber.compiler.model
-
-import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
-import org.apache.texera.amber.core.workflow.PortIdentity
-import org.apache.texera.amber.operator.source.scan.InputFileScanSourceOpDesc
-import org.apache.texera.amber.operator.source.scan.text.TextInputSourceOpDesc
-import org.scalatest.flatspec.AnyFlatSpec
-
-import scala.collection.mutable.ArrayBuffer
-
-class LogicalPlanSpec extends AnyFlatSpec {
-
- it should "skip file resolution when a scan source receives filename from
input" in {
- val scanSource = new InputFileScanSourceOpDesc()
- scanSource.setOperatorId("scan-source")
-
- val upstream = new TextInputSourceOpDesc()
- upstream.setOperatorId("filename-source")
-
- val logicalPlan = LogicalPlan(
- operators = List(upstream, scanSource),
- links = List(
- LogicalLink(
- fromOpId = OperatorIdentity("filename-source"),
- fromPortId = PortIdentity(0),
- toOpId = OperatorIdentity("scan-source"),
- toPortId = PortIdentity(0)
- )
- )
- )
-
- val errors = ArrayBuffer.empty[(OperatorIdentity, Throwable)]
- logicalPlan.resolveScanSourceOpFileName(Some(errors))
-
- assert(errors.isEmpty)
- assert(scanSource.fileName.isEmpty)
- }
-}