This is an automated email from the ASF dual-hosted git repository.

linxinyuan pushed a commit to branch xinyuan-source-port
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/xinyuan-source-port by this 
push:
     new 2a3a8f1bd6 update
2a3a8f1bd6 is described below

commit 2a3a8f1bd6f8aa25528f46f2af199ac1f596af68
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sun Apr 5 23:33:56 2026 -0700

    update
---
 .../apache/texera/amber/operator/LogicalOp.scala   |  2 +
 .../source/scan/InputFileScanSourceOpDesc.scala    | 37 ++++++++++++
 .../operator/source/scan/ScanSourceOpDesc.scala    |  6 +-
 .../scan/InputFileScanSourceOpDescSpec.scala       | 67 ++++++++++++++++++++++
 .../scan/text/FileScanSourceOpDescSpec.scala       |  5 +-
 .../amber/compiler/model/LogicalPlanSpec.scala     |  4 +-
 6 files changed, 113 insertions(+), 8 deletions(-)

diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
index 931596b1bf..0cd7c0c143 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
@@ -77,6 +77,7 @@ import 
org.apache.texera.amber.operator.source.apis.twitter.v2.{
 }
 import org.apache.texera.amber.operator.source.fetcher.URLFetcherOpDesc
 import org.apache.texera.amber.operator.source.scan.FileScanSourceOpDesc
+import org.apache.texera.amber.operator.source.scan.InputFileScanSourceOpDesc
 import org.apache.texera.amber.operator.source.scan.arrow.ArrowSourceOpDesc
 import org.apache.texera.amber.operator.source.scan.csv.CSVScanSourceOpDesc
 import 
org.apache.texera.amber.operator.source.scan.csvOld.CSVOldScanSourceOpDesc
@@ -163,6 +164,7 @@ trait StateTransferFunc
     // new Type(value = classOf[ParallelCSVScanSourceOpDesc], name = 
"ParallelCSVFileScan"),
     new Type(value = classOf[JSONLScanSourceOpDesc], name = "JSONLFileScan"),
     new Type(value = classOf[FileScanSourceOpDesc], name = "FileScan"),
+    new Type(value = classOf[InputFileScanSourceOpDesc], name = 
"InputFileScan"),
     new Type(value = classOf[TextInputSourceOpDesc], name = "TextInput"),
     new Type(
       value = classOf[TwitterFullArchiveSearchSourceOpDesc],
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpDesc.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpDesc.scala
new file mode 100644
index 0000000000..f94253d0b2
--- /dev/null
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpDesc.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.source.scan
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties
+import org.apache.texera.amber.core.workflow.{InputPort, OutputPort}
+import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, 
OperatorInfo}
+
+@JsonIgnoreProperties(value = Array("fileName"))
+class InputFileScanSourceOpDesc extends FileScanSourceOpDesc {
+
+  override def operatorInfo: OperatorInfo =
+    OperatorInfo(
+      userFriendlyName = "File Scan From Input",
+      operatorDescription = "Scan data from file paths provided by input 
tuples",
+      operatorGroupName = OperatorGroupConstants.INPUT_GROUP,
+      inputPorts = List(InputPort(displayName = "Filename")),
+      outputPorts = List(OutputPort())
+    )
+}
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/ScanSourceOpDesc.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/ScanSourceOpDesc.scala
index 11deb60386..6c1c2c5fbd 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/ScanSourceOpDesc.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/ScanSourceOpDesc.scala
@@ -24,7 +24,7 @@ import 
com.fasterxml.jackson.databind.annotation.JsonDeserialize
 import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
 import org.apache.texera.amber.core.storage.FileResolver
 import org.apache.texera.amber.core.tuple.Schema
-import org.apache.texera.amber.core.workflow.{InputPort, OutputPort}
+import org.apache.texera.amber.core.workflow.OutputPort
 import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, 
OperatorInfo}
 import org.apache.texera.amber.operator.source.SourceOperatorDescriptor
 import org.apache.commons.lang3.builder.EqualsBuilder
@@ -39,7 +39,7 @@ abstract class ScanSourceOpDesc extends 
SourceOperatorDescriptor {
   @JsonIgnore
   var INFER_READ_LIMIT: Int = 100
 
-  @JsonProperty()
+  @JsonProperty(required = true)
   @JsonSchemaTitle("File")
   @JsonDeserialize(contentAs = classOf[java.lang.String])
   var fileName: Option[String] = None
@@ -71,7 +71,7 @@ abstract class ScanSourceOpDesc extends 
SourceOperatorDescriptor {
       userFriendlyName = s"${fileTypeName.getOrElse("Unknown")} File Scan",
       operatorDescription = s"Scan data from a 
${fileTypeName.getOrElse("Unknown")} file",
       OperatorGroupConstants.INPUT_GROUP,
-      inputPorts = List(InputPort(displayName = "Filename")),
+      inputPorts = List.empty,
       outputPorts = List(OutputPort())
     )
   }
diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpDescSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpDescSpec.scala
new file mode 100644
index 0000000000..a3b59723cb
--- /dev/null
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpDescSpec.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.source.scan
+
+import org.apache.texera.amber.core.storage.FileResolver
+import org.apache.texera.amber.core.tuple.{AttributeType, Schema, 
SchemaEnforceable, Tuple}
+import org.apache.texera.amber.operator.TestOperators
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.scalatest.flatspec.AnyFlatSpec
+
+class InputFileScanSourceOpDescSpec extends AnyFlatSpec {
+
+  it should "require a filename input port" in {
+    val desc = new InputFileScanSourceOpDesc()
+    assert(desc.operatorInfo.inputPorts.length == 1)
+    assert(desc.operatorInfo.inputPorts.head.displayName == "Filename")
+  }
+
+  it should "not expose fileName property in descriptor serialization" in {
+    val desc = new InputFileScanSourceOpDesc()
+    val descriptorJson = 
objectMapper.valueToTree[com.fasterxml.jackson.databind.JsonNode](desc)
+    assert(!descriptorJson.has("fileName"))
+  }
+
+  it should "scan file content using filename from input tuple" in {
+    val desc = new InputFileScanSourceOpDesc()
+    desc.attributeType = FileAttributeType.SINGLE_STRING
+
+    val executor = new 
FileScanSourceOpExec(objectMapper.writeValueAsString(desc))
+    val inputTuple = Tuple(
+      Schema().add("filename", AttributeType.STRING),
+      Array(FileResolver.resolve(TestOperators.TestTextFilePath).toASCIIString)
+    )
+
+    executor.open()
+    executor.processTupleMultiPort(inputTuple, 0)
+    val tuples = executor
+      .produceTuple()
+      .map(tupleLike => 
tupleLike.asInstanceOf[SchemaEnforceable].enforceSchema(desc.sourceSchema()))
+      .toSeq
+    executor.close()
+
+    assert(tuples.length == 1)
+    assert(
+      tuples.head
+        .getField[String]("line")
+        
.equals("line1\nline2\nline3\nline4\nline5\nline6\nline7\nline8\nline9\nline10")
+    )
+  }
+}
diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/text/FileScanSourceOpDescSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/text/FileScanSourceOpDescSpec.scala
index af85ed459b..4a270331e4 100644
--- 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/text/FileScanSourceOpDescSpec.scala
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/text/FileScanSourceOpDescSpec.scala
@@ -49,9 +49,8 @@ class FileScanSourceOpDescSpec extends AnyFlatSpec with 
BeforeAndAfter {
     assert(inferredSchema.getAttribute("line").getType == AttributeType.STRING)
   }
 
-  it should "expose one filename input port" in {
-    assert(fileScanSourceOpDesc.operatorInfo.inputPorts.length == 1)
-    assert(fileScanSourceOpDesc.operatorInfo.inputPorts.head.displayName == 
"Filename")
+  it should "not expose filename input port on file scan source" in {
+    assert(fileScanSourceOpDesc.operatorInfo.inputPorts.isEmpty)
   }
 
   it should "infer schema with single column representing entire file in 
outputAsSingleTuple mode" in {
diff --git 
a/workflow-compiling-service/src/test/scala/org/apache/texera/amber/compiler/model/LogicalPlanSpec.scala
 
b/workflow-compiling-service/src/test/scala/org/apache/texera/amber/compiler/model/LogicalPlanSpec.scala
index b13e9e2da1..b0c16bb18c 100644
--- 
a/workflow-compiling-service/src/test/scala/org/apache/texera/amber/compiler/model/LogicalPlanSpec.scala
+++ 
b/workflow-compiling-service/src/test/scala/org/apache/texera/amber/compiler/model/LogicalPlanSpec.scala
@@ -21,7 +21,7 @@ package org.apache.texera.amber.compiler.model
 
 import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
 import org.apache.texera.amber.core.workflow.PortIdentity
-import org.apache.texera.amber.operator.source.scan.FileScanSourceOpDesc
+import org.apache.texera.amber.operator.source.scan.InputFileScanSourceOpDesc
 import org.apache.texera.amber.operator.source.scan.text.TextInputSourceOpDesc
 import org.scalatest.flatspec.AnyFlatSpec
 
@@ -30,7 +30,7 @@ import scala.collection.mutable.ArrayBuffer
 class LogicalPlanSpec extends AnyFlatSpec {
 
   it should "skip file resolution when a scan source receives filename from 
input" in {
-    val scanSource = new FileScanSourceOpDesc()
+    val scanSource = new InputFileScanSourceOpDesc()
     scanSource.setOperatorId("scan-source")
 
     val upstream = new TextInputSourceOpDesc()

Reply via email to