This is an automated email from the ASF dual-hosted git repository.
linxinyuan pushed a commit to branch xinyuan-source-port
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/xinyuan-source-port by this
push:
new 2a3a8f1bd6 update
2a3a8f1bd6 is described below
commit 2a3a8f1bd6f8aa25528f46f2af199ac1f596af68
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sun Apr 5 23:33:56 2026 -0700
update
---
.../apache/texera/amber/operator/LogicalOp.scala | 2 +
.../source/scan/InputFileScanSourceOpDesc.scala | 37 ++++++++++++
.../operator/source/scan/ScanSourceOpDesc.scala | 6 +-
.../scan/InputFileScanSourceOpDescSpec.scala | 67 ++++++++++++++++++++++
.../scan/text/FileScanSourceOpDescSpec.scala | 5 +-
.../amber/compiler/model/LogicalPlanSpec.scala | 4 +-
6 files changed, 113 insertions(+), 8 deletions(-)
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
index 931596b1bf..0cd7c0c143 100644
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
@@ -77,6 +77,7 @@ import
org.apache.texera.amber.operator.source.apis.twitter.v2.{
}
import org.apache.texera.amber.operator.source.fetcher.URLFetcherOpDesc
import org.apache.texera.amber.operator.source.scan.FileScanSourceOpDesc
+import org.apache.texera.amber.operator.source.scan.InputFileScanSourceOpDesc
import org.apache.texera.amber.operator.source.scan.arrow.ArrowSourceOpDesc
import org.apache.texera.amber.operator.source.scan.csv.CSVScanSourceOpDesc
import
org.apache.texera.amber.operator.source.scan.csvOld.CSVOldScanSourceOpDesc
@@ -163,6 +164,7 @@ trait StateTransferFunc
// new Type(value = classOf[ParallelCSVScanSourceOpDesc], name =
"ParallelCSVFileScan"),
new Type(value = classOf[JSONLScanSourceOpDesc], name = "JSONLFileScan"),
new Type(value = classOf[FileScanSourceOpDesc], name = "FileScan"),
+ new Type(value = classOf[InputFileScanSourceOpDesc], name =
"InputFileScan"),
new Type(value = classOf[TextInputSourceOpDesc], name = "TextInput"),
new Type(
value = classOf[TwitterFullArchiveSearchSourceOpDesc],
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpDesc.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpDesc.scala
new file mode 100644
index 0000000000..f94253d0b2
--- /dev/null
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpDesc.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.source.scan
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties
+import org.apache.texera.amber.core.workflow.{InputPort, OutputPort}
+import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants,
OperatorInfo}
+
+@JsonIgnoreProperties(value = Array("fileName"))
+class InputFileScanSourceOpDesc extends FileScanSourceOpDesc {
+
+ override def operatorInfo: OperatorInfo =
+ OperatorInfo(
+ userFriendlyName = "File Scan From Input",
+ operatorDescription = "Scan data from file paths provided by input
tuples",
+ operatorGroupName = OperatorGroupConstants.INPUT_GROUP,
+ inputPorts = List(InputPort(displayName = "Filename")),
+ outputPorts = List(OutputPort())
+ )
+}
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/ScanSourceOpDesc.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/ScanSourceOpDesc.scala
index 11deb60386..6c1c2c5fbd 100644
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/ScanSourceOpDesc.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/ScanSourceOpDesc.scala
@@ -24,7 +24,7 @@ import
com.fasterxml.jackson.databind.annotation.JsonDeserialize
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
import org.apache.texera.amber.core.storage.FileResolver
import org.apache.texera.amber.core.tuple.Schema
-import org.apache.texera.amber.core.workflow.{InputPort, OutputPort}
+import org.apache.texera.amber.core.workflow.OutputPort
import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants,
OperatorInfo}
import org.apache.texera.amber.operator.source.SourceOperatorDescriptor
import org.apache.commons.lang3.builder.EqualsBuilder
@@ -39,7 +39,7 @@ abstract class ScanSourceOpDesc extends
SourceOperatorDescriptor {
@JsonIgnore
var INFER_READ_LIMIT: Int = 100
- @JsonProperty()
+ @JsonProperty(required = true)
@JsonSchemaTitle("File")
@JsonDeserialize(contentAs = classOf[java.lang.String])
var fileName: Option[String] = None
@@ -71,7 +71,7 @@ abstract class ScanSourceOpDesc extends
SourceOperatorDescriptor {
userFriendlyName = s"${fileTypeName.getOrElse("Unknown")} File Scan",
operatorDescription = s"Scan data from a
${fileTypeName.getOrElse("Unknown")} file",
OperatorGroupConstants.INPUT_GROUP,
- inputPorts = List(InputPort(displayName = "Filename")),
+ inputPorts = List.empty,
outputPorts = List(OutputPort())
)
}
diff --git
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpDescSpec.scala
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpDescSpec.scala
new file mode 100644
index 0000000000..a3b59723cb
--- /dev/null
+++
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpDescSpec.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.source.scan
+
+import org.apache.texera.amber.core.storage.FileResolver
+import org.apache.texera.amber.core.tuple.{AttributeType, Schema,
SchemaEnforceable, Tuple}
+import org.apache.texera.amber.operator.TestOperators
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.scalatest.flatspec.AnyFlatSpec
+
+class InputFileScanSourceOpDescSpec extends AnyFlatSpec {
+
+ it should "require a filename input port" in {
+ val desc = new InputFileScanSourceOpDesc()
+ assert(desc.operatorInfo.inputPorts.length == 1)
+ assert(desc.operatorInfo.inputPorts.head.displayName == "Filename")
+ }
+
+ it should "not expose fileName property in descriptor serialization" in {
+ val desc = new InputFileScanSourceOpDesc()
+ val descriptorJson =
objectMapper.valueToTree[com.fasterxml.jackson.databind.JsonNode](desc)
+ assert(!descriptorJson.has("fileName"))
+ }
+
+ it should "scan file content using filename from input tuple" in {
+ val desc = new InputFileScanSourceOpDesc()
+ desc.attributeType = FileAttributeType.SINGLE_STRING
+
+ val executor = new
FileScanSourceOpExec(objectMapper.writeValueAsString(desc))
+ val inputTuple = Tuple(
+ Schema().add("filename", AttributeType.STRING),
+ Array(FileResolver.resolve(TestOperators.TestTextFilePath).toASCIIString)
+ )
+
+ executor.open()
+ executor.processTupleMultiPort(inputTuple, 0)
+ val tuples = executor
+ .produceTuple()
+ .map(tupleLike =>
tupleLike.asInstanceOf[SchemaEnforceable].enforceSchema(desc.sourceSchema()))
+ .toSeq
+ executor.close()
+
+ assert(tuples.length == 1)
+ assert(
+ tuples.head
+ .getField[String]("line")
+
.equals("line1\nline2\nline3\nline4\nline5\nline6\nline7\nline8\nline9\nline10")
+ )
+ }
+}
diff --git
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/text/FileScanSourceOpDescSpec.scala
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/text/FileScanSourceOpDescSpec.scala
index af85ed459b..4a270331e4 100644
---
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/text/FileScanSourceOpDescSpec.scala
+++
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/text/FileScanSourceOpDescSpec.scala
@@ -49,9 +49,8 @@ class FileScanSourceOpDescSpec extends AnyFlatSpec with
BeforeAndAfter {
assert(inferredSchema.getAttribute("line").getType == AttributeType.STRING)
}
- it should "expose one filename input port" in {
- assert(fileScanSourceOpDesc.operatorInfo.inputPorts.length == 1)
- assert(fileScanSourceOpDesc.operatorInfo.inputPorts.head.displayName ==
"Filename")
+ it should "not expose filename input port on file scan source" in {
+ assert(fileScanSourceOpDesc.operatorInfo.inputPorts.isEmpty)
}
it should "infer schema with single column representing entire file in
outputAsSingleTuple mode" in {
diff --git
a/workflow-compiling-service/src/test/scala/org/apache/texera/amber/compiler/model/LogicalPlanSpec.scala
b/workflow-compiling-service/src/test/scala/org/apache/texera/amber/compiler/model/LogicalPlanSpec.scala
index b13e9e2da1..b0c16bb18c 100644
---
a/workflow-compiling-service/src/test/scala/org/apache/texera/amber/compiler/model/LogicalPlanSpec.scala
+++
b/workflow-compiling-service/src/test/scala/org/apache/texera/amber/compiler/model/LogicalPlanSpec.scala
@@ -21,7 +21,7 @@ package org.apache.texera.amber.compiler.model
import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
import org.apache.texera.amber.core.workflow.PortIdentity
-import org.apache.texera.amber.operator.source.scan.FileScanSourceOpDesc
+import org.apache.texera.amber.operator.source.scan.InputFileScanSourceOpDesc
import org.apache.texera.amber.operator.source.scan.text.TextInputSourceOpDesc
import org.scalatest.flatspec.AnyFlatSpec
@@ -30,7 +30,7 @@ import scala.collection.mutable.ArrayBuffer
class LogicalPlanSpec extends AnyFlatSpec {
it should "skip file resolution when a scan source receives filename from
input" in {
- val scanSource = new FileScanSourceOpDesc()
+ val scanSource = new InputFileScanSourceOpDesc()
scanSource.setOperatorId("scan-source")
val upstream = new TextInputSourceOpDesc()