This is an automated email from the ASF dual-hosted git repository.

kunwp1 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new d547c76e07 feat: add File Scan From Input operator (#4369)
d547c76e07 is described below

commit d547c76e07360088a71016710b15638a397a9d67
Author: Xinyuan Lin <[email protected]>
AuthorDate: Thu Apr 16 20:29:40 2026 -0700

    feat: add File Scan From Input operator (#4369)
    
    ### What changes were proposed in this PR?
    This PR adds a new File Scan From Input operator that reads file path
    strings from upstream tuples and uses them as source input at execution
    time.
    
    Both the File Scan From Input operator and the File Scan operator were
    moved to the "file" subfolder.
    
    <img width="1093" height="799" alt="image"
    
src="https://github.com/user-attachments/assets/ffb9cc99-a268-4dce-8bec-27a30d81991d";
    />
    
    ### Any related issues, documentation, discussions?
    Closes #4368
    
    
    ### How was this PR tested?
    Tested manually, and a new test case was added.
    
    
    ### Was this PR authored or co-authored using generative AI tooling?
    No.
    
    ---------
    
    Signed-off-by: Xinyuan Lin <[email protected]>
    Co-authored-by: Kunwoo (Chris) <[email protected]>
---
 .../apache/texera/amber/operator/LogicalOp.scala   |   3 +-
 .../FileScanOpDesc.scala}                          |  48 ++++-----
 .../operator/source/scan/file/FileScanOpExec.scala |  48 +++++++++
 .../scan/{ => file}/FileScanSourceOpDesc.scala     |   5 +-
 .../source/scan/file/FileScanSourceOpExec.scala    |  46 ++++++++
 .../FileScanUtils.scala}                           | 120 +++++++++++++--------
 .../source/scan/file/FileScanOpDescSpec.scala      |  99 +++++++++++++++++
 .../{text => file}/FileScanSourceOpDescSpec.scala  |   9 +-
 .../scan/{ => file}/FileScanSourceOpExecSpec.scala |   3 +-
 .../operator-property-edit-frame.component.ts      |  16 +++
 frontend/src/assets/operator_images/FileScanOp.png | Bin 0 -> 22499 bytes
 11 files changed, 315 insertions(+), 82 deletions(-)

diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
index 3a0f0a5c4b..1f6e444c46 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
@@ -77,7 +77,6 @@ import 
org.apache.texera.amber.operator.source.apis.twitter.v2.{
 }
 import org.apache.texera.amber.operator.source.dataset.FileListerSourceOpDesc
 import org.apache.texera.amber.operator.source.fetcher.URLFetcherOpDesc
-import org.apache.texera.amber.operator.source.scan.FileScanSourceOpDesc
 import org.apache.texera.amber.operator.source.scan.arrow.ArrowSourceOpDesc
 import org.apache.texera.amber.operator.source.scan.csv.CSVScanSourceOpDesc
 import 
org.apache.texera.amber.operator.source.scan.csvOld.CSVOldScanSourceOpDesc
@@ -140,6 +139,7 @@ import 
org.apache.texera.amber.operator.visualization.waterfallChart.WaterfallCh
 import org.apache.texera.amber.operator.visualization.wordCloud.WordCloudOpDesc
 import org.apache.commons.lang3.builder.{EqualsBuilder, HashCodeBuilder, 
ToStringBuilder}
 import org.apache.texera.amber.operator.sklearn.testing.SklearnTestingOpDesc
+import org.apache.texera.amber.operator.source.scan.file.{FileScanOpDesc, 
FileScanSourceOpDesc}
 import 
org.apache.texera.amber.operator.visualization.stripChart.StripChartOpDesc
 
 import java.util.UUID
@@ -165,6 +165,7 @@ trait StateTransferFunc
     // new Type(value = classOf[ParallelCSVScanSourceOpDesc], name = 
"ParallelCSVFileScan"),
     new Type(value = classOf[JSONLScanSourceOpDesc], name = "JSONLFileScan"),
     new Type(value = classOf[FileScanSourceOpDesc], name = "FileScan"),
+    new Type(value = classOf[FileScanOpDesc], name = "FileScanOp"),
     new Type(value = classOf[TextInputSourceOpDesc], name = "TextInput"),
     new Type(
       value = classOf[TwitterFullArchiveSearchSourceOpDesc],
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpDesc.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanOpDesc.scala
similarity index 66%
copy from 
common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpDesc.scala
copy to 
common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanOpDesc.scala
index a76a855cad..8fbac6ec43 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpDesc.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanOpDesc.scala
@@ -17,9 +17,9 @@
  * under the License.
  */
 
-package org.apache.texera.amber.operator.source.scan
+package org.apache.texera.amber.operator.source.scan.file
 
-import com.fasterxml.jackson.annotation.{JsonIgnoreProperties, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonProperty
 import com.kjetland.jackson.jsonSchema.annotations.{
   JsonSchemaInject,
   JsonSchemaString,
@@ -28,23 +28,23 @@ import com.kjetland.jackson.jsonSchema.annotations.{
 import org.apache.texera.amber.core.executor.OpExecWithClassName
 import org.apache.texera.amber.core.tuple.{AttributeType, Schema}
 import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
-import org.apache.texera.amber.core.workflow.{PhysicalOp, 
SchemaPropagationFunc}
+import org.apache.texera.amber.core.workflow.{
+  InputPort,
+  OutputPort,
+  PhysicalOp,
+  SchemaPropagationFunc
+}
 import org.apache.texera.amber.operator.metadata.annotations.HideAnnotation
+import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, 
OperatorInfo}
+import org.apache.texera.amber.operator.source.SourceOperatorDescriptor
+import org.apache.texera.amber.operator.source.scan.FileDecodingMethod
 import org.apache.texera.amber.operator.source.scan.text.TextSourceOpDesc
 import org.apache.texera.amber.util.JSONUtils.objectMapper
 
-@JsonIgnoreProperties(value = Array("limit", "offset", "fileEncoding"))
-class FileScanSourceOpDesc extends ScanSourceOpDesc with TextSourceOpDesc {
+class FileScanOpDesc extends SourceOperatorDescriptor with TextSourceOpDesc {
   @JsonProperty(defaultValue = "UTF_8", required = true)
   @JsonSchemaTitle("Encoding")
-  @JsonSchemaInject(
-    strings = Array(
-      new JsonSchemaString(path = HideAnnotation.hideTarget, value = 
"attributeType"),
-      new JsonSchemaString(path = HideAnnotation.hideType, value = 
HideAnnotation.Type.equals),
-      new JsonSchemaString(path = HideAnnotation.hideExpectedValue, value = 
"binary")
-    )
-  )
-  private val encoding: FileDecodingMethod = FileDecodingMethod.UTF_8
+  var fileEncoding: FileDecodingMethod = FileDecodingMethod.UTF_8
 
   @JsonProperty(defaultValue = "false")
   @JsonSchemaTitle("Extract")
@@ -52,16 +52,7 @@ class FileScanSourceOpDesc extends ScanSourceOpDesc with 
TextSourceOpDesc {
 
   @JsonProperty(defaultValue = "false")
   @JsonSchemaTitle("Include Filename")
-  @JsonSchemaInject(
-    strings = Array(
-      new JsonSchemaString(path = HideAnnotation.hideTarget, value = 
"extract"),
-      new JsonSchemaString(path = HideAnnotation.hideType, value = 
HideAnnotation.Type.equals),
-      new JsonSchemaString(path = HideAnnotation.hideExpectedValue, value = 
"false")
-    )
-  )
-  val outputFileName: Boolean = false
-
-  fileTypeName = Option("")
+  var outputFileName: Boolean = false
 
   override def getPhysicalOp(
       workflowId: WorkflowIdentity,
@@ -73,7 +64,7 @@ class FileScanSourceOpDesc extends ScanSourceOpDesc with 
TextSourceOpDesc {
         executionId,
         operatorIdentifier,
         OpExecWithClassName(
-          "org.apache.texera.amber.operator.source.scan.FileScanSourceOpExec",
+          "org.apache.texera.amber.operator.source.scan.file.FileScanOpExec",
           objectMapper.writeValueAsString(this)
         )
       )
@@ -91,4 +82,13 @@ class FileScanSourceOpDesc extends ScanSourceOpDesc with 
TextSourceOpDesc {
     }
     schema.add(attributeName, attributeType.getType)
   }
+
+  override def operatorInfo: OperatorInfo =
+    OperatorInfo(
+      userFriendlyName = "File Scan From Input",
+      operatorDescription = "Scan data from file paths provided by input 
tuples",
+      operatorGroupName = OperatorGroupConstants.INPUT_GROUP,
+      inputPorts = List(InputPort(displayName = "Filename")),
+      outputPorts = List(OutputPort())
+    )
 }
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanOpExec.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanOpExec.scala
new file mode 100644
index 0000000000..4cc60bf2bd
--- /dev/null
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanOpExec.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.source.scan.file
+
+import org.apache.texera.amber.core.executor.OperatorExecutor
+import org.apache.texera.amber.core.storage.FileResolver
+import org.apache.texera.amber.core.tuple.{Tuple, TupleLike}
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+
+class FileScanOpExec private[scan] (
+    descString: String
+) extends OperatorExecutor {
+  private val desc: FileScanOpDesc =
+    objectMapper.readValue(descString, classOf[FileScanOpDesc])
+
+  override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = {
+    val originalFileName = tuple.getFields.collectFirst { case s: String => s 
}.get
+    val fileName = FileResolver.resolve(originalFileName).toASCIIString
+    FileScanUtils.createTuplesFromFile(
+      fileName = fileName,
+      displayFileName = originalFileName,
+      attributeType = desc.attributeType,
+      fileEncoding = desc.fileEncoding,
+      extract = desc.extract,
+      outputFileName = desc.outputFileName,
+      fileScanOffset = desc.fileScanOffset,
+      fileScanLimit = desc.fileScanLimit
+    )
+  }
+
+}
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpDesc.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpDesc.scala
similarity index 93%
rename from 
common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpDesc.scala
rename to 
common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpDesc.scala
index a76a855cad..82997632d1 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpDesc.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpDesc.scala
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.texera.amber.operator.source.scan
+package org.apache.texera.amber.operator.source.scan.file
 
 import com.fasterxml.jackson.annotation.{JsonIgnoreProperties, JsonProperty}
 import com.kjetland.jackson.jsonSchema.annotations.{
@@ -31,6 +31,7 @@ import 
org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, Workflow
 import org.apache.texera.amber.core.workflow.{PhysicalOp, 
SchemaPropagationFunc}
 import org.apache.texera.amber.operator.metadata.annotations.HideAnnotation
 import org.apache.texera.amber.operator.source.scan.text.TextSourceOpDesc
+import org.apache.texera.amber.operator.source.scan.{FileDecodingMethod, 
ScanSourceOpDesc}
 import org.apache.texera.amber.util.JSONUtils.objectMapper
 
 @JsonIgnoreProperties(value = Array("limit", "offset", "fileEncoding"))
@@ -73,7 +74,7 @@ class FileScanSourceOpDesc extends ScanSourceOpDesc with 
TextSourceOpDesc {
         executionId,
         operatorIdentifier,
         OpExecWithClassName(
-          "org.apache.texera.amber.operator.source.scan.FileScanSourceOpExec",
+          
"org.apache.texera.amber.operator.source.scan.file.FileScanSourceOpExec",
           objectMapper.writeValueAsString(this)
         )
       )
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpExec.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpExec.scala
new file mode 100644
index 0000000000..d47cf3681c
--- /dev/null
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpExec.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.source.scan.file
+
+import org.apache.texera.amber.core.executor.SourceOperatorExecutor
+import org.apache.texera.amber.core.tuple.TupleLike
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+
+import java.io.IOException
+
+class FileScanSourceOpExec private[scan] (
+    descString: String
+) extends SourceOperatorExecutor {
+  private val desc: FileScanSourceOpDesc =
+    objectMapper.readValue(descString, classOf[FileScanSourceOpDesc])
+
+  @throws[IOException]
+  override def produceTuple(): Iterator[TupleLike] = {
+    FileScanUtils.createTuplesFromFile(
+      fileName = desc.fileName.get,
+      attributeType = desc.attributeType,
+      fileEncoding = desc.fileEncoding,
+      extract = desc.extract,
+      outputFileName = desc.outputFileName,
+      fileScanOffset = desc.fileScanOffset,
+      fileScanLimit = desc.fileScanLimit
+    )
+  }
+}
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExec.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanUtils.scala
similarity index 54%
rename from 
common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExec.scala
rename to 
common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanUtils.scala
index 91c817240c..a7f81b4869 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExec.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanUtils.scala
@@ -17,51 +17,57 @@
  * under the License.
  */
 
-package org.apache.texera.amber.operator.source.scan
+package org.apache.texera.amber.operator.source.scan.file
 
-import org.apache.texera.amber.core.executor.SourceOperatorExecutor
-import org.apache.texera.amber.core.storage.DocumentFactory
-import org.apache.texera.amber.core.tuple.AttributeTypeUtils.parseField
-import org.apache.texera.amber.core.tuple.{LargeBinary, TupleLike}
-import org.apache.texera.amber.util.JSONUtils.objectMapper
-import org.apache.texera.service.util.LargeBinaryOutputStream
 import org.apache.commons.compress.archivers.ArchiveStreamFactory
 import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream
 import org.apache.commons.io.IOUtils.toByteArray
+import org.apache.texera.amber.core.storage.DocumentFactory
+import org.apache.texera.amber.core.tuple.AttributeTypeUtils.parseField
+import org.apache.texera.amber.core.tuple.LargeBinary
+import org.apache.texera.amber.core.tuple.TupleLike
+import org.apache.texera.amber.operator.source.scan.{
+  AutoClosingIterator,
+  FileAttributeType,
+  FileDecodingMethod
+}
+import org.apache.texera.service.util.LargeBinaryOutputStream
 
 import java.io._
 import java.net.URI
 import scala.collection.mutable
 import scala.jdk.CollectionConverters.IteratorHasAsScala
 
-class FileScanSourceOpExec private[scan] (
-    descString: String
-) extends SourceOperatorExecutor {
-  private val desc: FileScanSourceOpDesc =
-    objectMapper.readValue(descString, classOf[FileScanSourceOpDesc])
-
-  @throws[IOException]
-  override def produceTuple(): Iterator[TupleLike] = {
-    val is: InputStream =
-      DocumentFactory.openReadonlyDocument(new 
URI(desc.fileName.get)).asInputStream()
+private[file] object FileScanUtils {
+  def createTuplesFromFile(
+      fileName: String,
+      displayFileName: String,
+      attributeType: FileAttributeType,
+      fileEncoding: FileDecodingMethod,
+      extract: Boolean,
+      outputFileName: Boolean,
+      fileScanOffset: Option[Int],
+      fileScanLimit: Option[Int]
+  ): Iterator[TupleLike] = {
+    val inputStream = DocumentFactory.openReadonlyDocument(new 
URI(fileName)).asInputStream()
 
     val closeables = mutable.ArrayBuffer.empty[AutoCloseable]
     var zipIn: ZipArchiveInputStream = null
-    var archiveStream: InputStream = null
-    if (desc.extract) {
-      zipIn = new ArchiveStreamFactory()
-        .createArchiveInputStream(new BufferedInputStream(is))
-        .asInstanceOf[ZipArchiveInputStream]
-      archiveStream = zipIn
-      closeables += zipIn
-    } else {
-      archiveStream = is
-      closeables += is
-    }
+    val archiveStream: InputStream =
+      if (extract) {
+        zipIn = new ArchiveStreamFactory()
+          .createArchiveInputStream(new BufferedInputStream(inputStream))
+          .asInstanceOf[ZipArchiveInputStream]
+        closeables += zipIn
+        zipIn
+      } else {
+        closeables += inputStream
+        inputStream
+      }
 
     var filenameIt: Iterator[String] = Iterator.empty
-    val fileEntries: Iterator[InputStream] = {
-      if (desc.extract) {
+    val fileEntries: Iterator[InputStream] =
+      if (extract) {
         val (it1, it2) = Iterator
           .continually(zipIn.getNextEntry)
           .takeWhile(_ != null)
@@ -70,23 +76,22 @@ class FileScanSourceOpExec private[scan] (
         filenameIt = it1.map(_.getName)
         it2.map(_ => zipIn)
       } else {
+        filenameIt = Iterator.single(displayFileName)
         Iterator(archiveStream)
       }
-    }
 
     val rawIterator: Iterator[TupleLike] =
-      if (desc.attributeType.isSingle) {
+      if (attributeType.isSingle) {
         fileEntries.zipAll(filenameIt, null, null).map {
-          case (entry, fileName) =>
-            val fields: mutable.ListBuffer[Any] = mutable.ListBuffer()
-            if (desc.outputFileName) {
-              fields.addOne(fileName)
+          case (entry, entryFileName) =>
+            val fields = mutable.ListBuffer.empty[Any]
+            if (outputFileName) {
+              fields += entryFileName
             }
-            fields.addOne(desc.attributeType match {
+            fields += (attributeType match {
               case FileAttributeType.SINGLE_STRING =>
-                new String(toByteArray(entry), desc.fileEncoding.getCharset)
+                new String(toByteArray(entry), fileEncoding.getCharset)
               case FileAttributeType.LARGE_BINARY =>
-                // For large binaries, create reference and upload via 
streaming
                 val largeBinary = new LargeBinary()
                 val out = new LargeBinaryOutputStream(largeBinary)
                 try {
@@ -100,29 +105,50 @@ class FileScanSourceOpExec private[scan] (
                   out.close()
                 }
                 largeBinary
-              case _ => parseField(toByteArray(entry), 
desc.attributeType.getType)
+              case _ => parseField(toByteArray(entry), attributeType.getType)
             })
             TupleLike(fields.toSeq: _*)
         }
       } else {
         fileEntries.flatMap(entry =>
-          new BufferedReader(new InputStreamReader(entry, 
desc.fileEncoding.getCharset))
+          new BufferedReader(new InputStreamReader(entry, 
fileEncoding.getCharset))
             .lines()
             .iterator()
             .asScala
             .slice(
-              desc.fileScanOffset.getOrElse(0),
-              desc.fileScanOffset.getOrElse(0) + 
desc.fileScanLimit.getOrElse(Int.MaxValue)
+              fileScanOffset.getOrElse(0),
+              fileScanOffset.getOrElse(0) + 
fileScanLimit.getOrElse(Int.MaxValue)
             )
-            .map(line => {
-              TupleLike(desc.attributeType match {
+            .map(line =>
+              TupleLike(attributeType match {
                 case FileAttributeType.SINGLE_STRING => line
-                case _                               => parseField(line, 
desc.attributeType.getType)
+                case _                               => parseField(line, 
attributeType.getType)
               })
-            })
+            )
         )
       }
 
     new AutoClosingIterator(rawIterator, () => closeables.foreach(_.close()))
   }
+
+  def createTuplesFromFile(
+      fileName: String,
+      attributeType: FileAttributeType,
+      fileEncoding: FileDecodingMethod,
+      extract: Boolean,
+      outputFileName: Boolean,
+      fileScanOffset: Option[Int],
+      fileScanLimit: Option[Int]
+  ): Iterator[TupleLike] = {
+    createTuplesFromFile(
+      fileName = fileName,
+      displayFileName = fileName,
+      attributeType = attributeType,
+      fileEncoding = fileEncoding,
+      extract = extract,
+      outputFileName = outputFileName,
+      fileScanOffset = fileScanOffset,
+      fileScanLimit = fileScanLimit
+    )
+  }
 }
diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/file/FileScanOpDescSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/file/FileScanOpDescSpec.scala
new file mode 100644
index 0000000000..e1749b98d3
--- /dev/null
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/file/FileScanOpDescSpec.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.source.scan.file
+
+import org.apache.texera.amber.core.tuple.{
+  Attribute,
+  AttributeType,
+  Schema,
+  SchemaEnforceable,
+  Tuple
+}
+import org.apache.texera.amber.operator.TestOperators
+import org.apache.texera.amber.operator.source.scan.{FileAttributeType, 
FileDecodingMethod}
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.scalatest.BeforeAndAfter
+import org.scalatest.flatspec.AnyFlatSpec
+
+class FileScanOpDescSpec extends AnyFlatSpec with BeforeAndAfter {
+
+  private val inputSchema = new Schema(new Attribute("filename", 
AttributeType.STRING))
+
+  var fileScanOpDesc: FileScanOpDesc = _
+
+  before {
+    fileScanOpDesc = new FileScanOpDesc()
+    fileScanOpDesc.fileEncoding = FileDecodingMethod.UTF_8
+  }
+
+  it should "infer schema with single column representing each line of text" 
in {
+    val inferredSchema: Schema = fileScanOpDesc.sourceSchema()
+
+    assert(inferredSchema.getAttributes.length == 1)
+    assert(inferredSchema.getAttribute("line").getType == AttributeType.STRING)
+  }
+
+  it should "read first 5 lines from the input file path tuple into output 
tuples" in {
+    fileScanOpDesc.attributeType = FileAttributeType.STRING
+    fileScanOpDesc.fileScanLimit = Option(5)
+
+    val inputTuple = Tuple(inputSchema, 
Array[Any](TestOperators.TestTextFilePath))
+    val fileScanOpExec =
+      new FileScanOpExec(objectMapper.writeValueAsString(fileScanOpDesc))
+
+    fileScanOpExec.open()
+    val processedTuple: Iterator[Tuple] = fileScanOpExec
+      .processTuple(inputTuple, 0)
+      .map(tupleLike =>
+        tupleLike
+          .asInstanceOf[SchemaEnforceable]
+          .enforceSchema(fileScanOpDesc.sourceSchema())
+      )
+
+    assert(processedTuple.next().getField("line").equals("line1"))
+    assert(processedTuple.next().getField("line").equals("line2"))
+    assert(processedTuple.next().getField("line").equals("line3"))
+    assert(processedTuple.next().getField("line").equals("line4"))
+    assert(processedTuple.next().getField("line").equals("line5"))
+    
assertThrows[java.util.NoSuchElementException](processedTuple.next().getField("line"))
+    fileScanOpExec.close()
+  }
+
+  it should "preserve the original input filename when include filename is 
enabled" in {
+    fileScanOpDesc.attributeType = FileAttributeType.SINGLE_STRING
+    fileScanOpDesc.outputFileName = true
+
+    val inputFilePath = TestOperators.TestTextFilePath
+    val inputTuple = Tuple(inputSchema, Array[Any](inputFilePath))
+    val fileScanOpExec =
+      new FileScanOpExec(objectMapper.writeValueAsString(fileScanOpDesc))
+
+    fileScanOpExec.open()
+    val outputSchema = fileScanOpDesc.sourceSchema()
+    val processedTuple = fileScanOpExec
+      .processTuple(inputTuple, 0)
+      .next()
+      .asInstanceOf[SchemaEnforceable]
+      .enforceSchema(outputSchema)
+
+    assert(processedTuple.getField[String]("filename") == inputFilePath)
+    fileScanOpExec.close()
+  }
+}
diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/text/FileScanSourceOpDescSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpDescSpec.scala
similarity index 97%
rename from 
common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/text/FileScanSourceOpDescSpec.scala
rename to 
common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpDescSpec.scala
index 8c08686705..4437c018bd 100644
--- 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/text/FileScanSourceOpDescSpec.scala
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpDescSpec.scala
@@ -17,17 +17,12 @@
  * under the License.
  */
 
-package org.apache.texera.amber.operator.source.scan.text
+package org.apache.texera.amber.operator.source.scan.file
 
 import org.apache.texera.amber.core.storage.FileResolver
 import org.apache.texera.amber.core.tuple.{AttributeType, Schema, 
SchemaEnforceable, Tuple}
 import org.apache.texera.amber.operator.TestOperators
-import org.apache.texera.amber.operator.source.scan.{
-  FileAttributeType,
-  FileDecodingMethod,
-  FileScanSourceOpDesc,
-  FileScanSourceOpExec
-}
+import org.apache.texera.amber.operator.source.scan.{FileAttributeType, 
FileDecodingMethod}
 import org.apache.texera.amber.util.JSONUtils.objectMapper
 import org.scalatest.BeforeAndAfter
 import org.scalatest.flatspec.AnyFlatSpec
diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExecSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpExecSpec.scala
similarity index 97%
rename from 
common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExecSpec.scala
rename to 
common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpExecSpec.scala
index ab3383ca0a..206cc33f32 100644
--- 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/FileScanSourceOpExecSpec.scala
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpExecSpec.scala
@@ -17,9 +17,10 @@
  * under the License.
  */
 
-package org.apache.texera.amber.operator.source.scan
+package org.apache.texera.amber.operator.source.scan.file
 
 import org.apache.texera.amber.core.tuple.{AttributeType, LargeBinary, Schema, 
SchemaEnforceable}
+import org.apache.texera.amber.operator.source.scan.{FileAttributeType, 
FileDecodingMethod}
 import org.apache.texera.amber.util.JSONUtils.objectMapper
 import org.scalatest.BeforeAndAfterAll
 import org.scalatest.flatspec.AnyFlatSpec
diff --git 
a/frontend/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.ts
 
b/frontend/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.ts
index 0e1484f59c..7b753fb083 100644
--- 
a/frontend/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.ts
+++ 
b/frontend/src/app/workspace/component/property-editor/operator-property-edit-frame/operator-property-edit-frame.component.ts
@@ -457,6 +457,22 @@ export class OperatorPropertyEditFrameComponent implements 
OnInit, OnChanges, On
         mappedField.type = "datasetversionselector";
       }
 
+      if (this.currentOperatorSchema?.operatorType === "FileScanOp" && 
mappedField.key === "outputFileName") {
+        mappedField.expressions = {
+          ...mappedField.expressions,
+          hide: (field: FormlyFieldConfig) => {
+            const model = field.model as { extract?: boolean; attributeType?: 
string } | undefined;
+            const attributeType = model?.attributeType;
+            return !(
+              model?.extract === true ||
+              attributeType === "single string" ||
+              attributeType === "binary" ||
+              attributeType === "large binary"
+            );
+          },
+        };
+      }
+
       // if the title is python script (for Python UDF), then make this field 
a custom template 'codearea'
       if (mapSource?.description?.toLowerCase() === "input your code here") {
         if (mappedField.type) {
diff --git a/frontend/src/assets/operator_images/FileScanOp.png 
b/frontend/src/assets/operator_images/FileScanOp.png
new file mode 100644
index 0000000000..c570e230ac
Binary files /dev/null and b/frontend/src/assets/operator_images/FileScanOp.png 
differ

Reply via email to