This is an automated email from the ASF dual-hosted git repository.

linxinyuan pushed a commit to branch xinyuan-input-source-operator
in repository https://gitbox.apache.org/repos/asf/texera.git

commit 22ae4de60b6004afcf4a251e27033ae8d25a7568
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sun Apr 12 01:35:58 2026 -0700

    feat(input-source): add input file source operator
---
 .../apache/texera/amber/operator/LogicalOp.scala   |   2 +
 .../source/scan/InputFileScanSourceOpDesc.scala    | 100 ++++++++++++++++
 .../source/scan/InputFileScanSourceOpExec.scala    | 130 +++++++++++++++++++++
 .../source/scan/InputFileSourceOpExec.scala        |  84 +++++++++++++
 .../src/assets/operator_images/InputFileScan.png   | Bin 0 -> 22499 bytes
 5 files changed, 316 insertions(+)

diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
index 7ccbb073d6..ea40c4b815 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
@@ -78,6 +78,7 @@ import 
org.apache.texera.amber.operator.source.apis.twitter.v2.{
 import 
org.apache.texera.amber.operator.source.dataset.DatasetSelectorSourceOpDesc
 import org.apache.texera.amber.operator.source.fetcher.URLFetcherOpDesc
 import org.apache.texera.amber.operator.source.scan.FileScanSourceOpDesc
+import org.apache.texera.amber.operator.source.scan.InputFileScanSourceOpDesc
 import org.apache.texera.amber.operator.source.scan.arrow.ArrowSourceOpDesc
 import org.apache.texera.amber.operator.source.scan.csv.CSVScanSourceOpDesc
 import 
org.apache.texera.amber.operator.source.scan.csvOld.CSVOldScanSourceOpDesc
@@ -165,6 +166,7 @@ trait StateTransferFunc
     // new Type(value = classOf[ParallelCSVScanSourceOpDesc], name = 
"ParallelCSVFileScan"),
     new Type(value = classOf[JSONLScanSourceOpDesc], name = "JSONLFileScan"),
     new Type(value = classOf[FileScanSourceOpDesc], name = "FileScan"),
+    new Type(value = classOf[InputFileScanSourceOpDesc], name = 
"InputFileScan"),
     new Type(value = classOf[TextInputSourceOpDesc], name = "TextInput"),
     new Type(
       value = classOf[TwitterFullArchiveSearchSourceOpDesc],
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpDesc.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpDesc.scala
new file mode 100644
index 0000000000..4e7c88b000
--- /dev/null
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpDesc.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.source.scan
+
+import com.fasterxml.jackson.annotation.JsonProperty
+import com.kjetland.jackson.jsonSchema.annotations.{
+  JsonSchemaInject,
+  JsonSchemaString,
+  JsonSchemaTitle
+}
+import org.apache.texera.amber.core.executor.OpExecWithClassName
+import org.apache.texera.amber.core.tuple.{AttributeType, Schema}
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
+import org.apache.texera.amber.core.workflow.{
+  InputPort,
+  OutputPort,
+  PhysicalOp,
+  SchemaPropagationFunc
+}
+import org.apache.texera.amber.operator.metadata.annotations.HideAnnotation
+import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, 
OperatorInfo}
+import org.apache.texera.amber.operator.source.SourceOperatorDescriptor
+import org.apache.texera.amber.operator.source.scan.text.TextSourceOpDesc
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+
+class InputFileScanSourceOpDesc extends SourceOperatorDescriptor with 
TextSourceOpDesc {
+  @JsonProperty(defaultValue = "UTF_8", required = true)
+  @JsonSchemaTitle("Encoding")
+  var fileEncoding: FileDecodingMethod = FileDecodingMethod.UTF_8
+
+  @JsonProperty(defaultValue = "false")
+  @JsonSchemaTitle("Extract")
+  val extract: Boolean = false
+
+  @JsonProperty(defaultValue = "false")
+  @JsonSchemaTitle("Include Filename")
+  @JsonSchemaInject(
+    strings = Array(
+      new JsonSchemaString(path = HideAnnotation.hideTarget, value = 
"extract"),
+      new JsonSchemaString(path = HideAnnotation.hideType, value = 
HideAnnotation.Type.equals),
+      new JsonSchemaString(path = HideAnnotation.hideExpectedValue, value = 
"false")
+    )
+  )
+  val outputFileName: Boolean = false
+
+  override def getPhysicalOp(
+      workflowId: WorkflowIdentity,
+      executionId: ExecutionIdentity
+  ): PhysicalOp = {
+    PhysicalOp
+      .sourcePhysicalOp(
+        workflowId,
+        executionId,
+        operatorIdentifier,
+        OpExecWithClassName(
+          
"org.apache.texera.amber.operator.source.scan.InputFileScanSourceOpExec",
+          objectMapper.writeValueAsString(this)
+        )
+      )
+      .withInputPorts(operatorInfo.inputPorts)
+      .withOutputPorts(operatorInfo.outputPorts)
+      .withPropagateSchema(
+        SchemaPropagationFunc(_ => Map(operatorInfo.outputPorts.head.id -> 
sourceSchema()))
+      )
+  }
+
+  override def sourceSchema(): Schema = {
+    var schema = Schema()
+    if (outputFileName) {
+      schema = schema.add("filename", AttributeType.STRING)
+    }
+    schema.add(attributeName, attributeType.getType)
+  }
+
+  override def operatorInfo: OperatorInfo =
+    OperatorInfo(
+      userFriendlyName = "File Scan From Input",
+      operatorDescription = "Scan data from file paths provided by input 
tuples",
+      operatorGroupName = OperatorGroupConstants.INPUT_GROUP,
+      inputPorts = List(InputPort(displayName = "Filename")),
+      outputPorts = List(OutputPort())
+    )
+}
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpExec.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpExec.scala
new file mode 100644
index 0000000000..4a1514c7f6
--- /dev/null
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpExec.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.source.scan
+
+import org.apache.texera.amber.core.storage.DocumentFactory
+import org.apache.texera.amber.core.tuple.AttributeTypeUtils.parseField
+import org.apache.texera.amber.core.tuple.{LargeBinary, TupleLike}
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.apache.texera.service.util.LargeBinaryOutputStream
+import org.apache.commons.compress.archivers.ArchiveStreamFactory
+import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream
+import org.apache.commons.io.IOUtils.toByteArray
+
+import java.io._
+import java.net.URI
+import scala.collection.mutable
+import scala.jdk.CollectionConverters.IteratorHasAsScala
+
+class InputFileScanSourceOpExec private[scan] (
+    descString: String
+) extends InputFileSourceOpExec {
+  private val desc: InputFileScanSourceOpDesc =
+    objectMapper.readValue(descString, classOf[InputFileScanSourceOpDesc])
+
+  @throws[IOException]
+  override def produceTuple(): Iterator[TupleLike] = {
+    resolvedInputFileNames.iterator.flatMap(produceTuplesForFile)
+  }
+
+  private def produceTuplesForFile(resolvedFileName: String): 
Iterator[TupleLike] = {
+    val is: InputStream =
+      DocumentFactory.openReadonlyDocument(new 
URI(resolvedFileName)).asInputStream()
+
+    val closeables = mutable.ArrayBuffer.empty[AutoCloseable]
+    var zipIn: ZipArchiveInputStream = null
+    var archiveStream: InputStream = null
+    if (desc.extract) {
+      zipIn = new ArchiveStreamFactory()
+        .createArchiveInputStream(new BufferedInputStream(is))
+        .asInstanceOf[ZipArchiveInputStream]
+      archiveStream = zipIn
+      closeables += zipIn
+    } else {
+      archiveStream = is
+      closeables += is
+    }
+
+    var filenameIt: Iterator[String] = Iterator.empty
+    val fileEntries: Iterator[InputStream] = {
+      if (desc.extract) {
+        val (it1, it2) = Iterator
+          .continually(zipIn.getNextEntry)
+          .takeWhile(_ != null)
+          .filterNot(_.getName.startsWith("__MACOSX"))
+          .duplicate
+        filenameIt = it1.map(_.getName)
+        it2.map(_ => zipIn)
+      } else {
+        Iterator(archiveStream)
+      }
+    }
+
+    val rawIterator: Iterator[TupleLike] =
+      if (desc.attributeType.isSingle) {
+        fileEntries.zipAll(filenameIt, null, null).map {
+          case (entry, fileName) =>
+            val fields: mutable.ListBuffer[Any] = mutable.ListBuffer()
+            if (desc.outputFileName) {
+              fields.addOne(fileName)
+            }
+            fields.addOne(desc.attributeType match {
+              case FileAttributeType.SINGLE_STRING =>
+                new String(toByteArray(entry), desc.fileEncoding.getCharset)
+              case FileAttributeType.LARGE_BINARY =>
+                val largeBinary = new LargeBinary()
+                val out = new LargeBinaryOutputStream(largeBinary)
+                try {
+                  val buffer = new Array[Byte](8192)
+                  var bytesRead = entry.read(buffer)
+                  while (bytesRead != -1) {
+                    out.write(buffer, 0, bytesRead)
+                    bytesRead = entry.read(buffer)
+                  }
+                } finally {
+                  out.close()
+                }
+                largeBinary
+              case _ => parseField(toByteArray(entry), 
desc.attributeType.getType)
+            })
+            TupleLike(fields.toSeq: _*)
+        }
+      } else {
+        fileEntries.flatMap(entry =>
+          new BufferedReader(new InputStreamReader(entry, 
desc.fileEncoding.getCharset))
+            .lines()
+            .iterator()
+            .asScala
+            .slice(
+              desc.fileScanOffset.getOrElse(0),
+              desc.fileScanOffset.getOrElse(0) + 
desc.fileScanLimit.getOrElse(Int.MaxValue)
+            )
+            .map(line =>
+              TupleLike(desc.attributeType match {
+                case FileAttributeType.SINGLE_STRING => line
+                case _                               => parseField(line, 
desc.attributeType.getType)
+              })
+            )
+        )
+      }
+
+    new AutoClosingIterator(rawIterator, () => closeables.foreach(_.close()))
+  }
+}
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileSourceOpExec.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileSourceOpExec.scala
new file mode 100644
index 0000000000..cd14cc4235
--- /dev/null
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileSourceOpExec.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.source.scan
+
+import org.apache.texera.amber.core.executor.SourceOperatorExecutor
+import org.apache.texera.amber.core.storage.FileResolver
+import org.apache.texera.amber.core.tuple.{Tuple, TupleLike}
+import org.apache.texera.amber.core.workflow.PortIdentity
+
+import scala.collection.mutable.ArrayBuffer
+
+trait InputFileSourceOpExec extends SourceOperatorExecutor {
+  private val inputFileNames = ArrayBuffer.empty[String]
+
+  override def processTupleMultiPort(
+      tuple: Tuple,
+      port: Int
+  ): Iterator[(TupleLike, Option[PortIdentity])] = {
+    processTuple(tuple, port).map(tupleLike => (tupleLike, None))
+  }
+
+  override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = {
+    inputFileNames += extractFileName(tuple)
+    Iterator.empty
+  }
+
+  protected def resolvedInputFileNames: Seq[String] = {
+    if (inputFileNames.isEmpty) {
+      throw new IllegalStateException("No input file is available for this 
source operator.")
+    }
+
+    inputFileNames.toSeq.map(fileName =>
+      if (FileResolver.isFileResolved(fileName)) {
+        fileName
+      } else {
+        FileResolver.resolve(fileName).toASCIIString
+      }
+    )
+  }
+
+  protected def resolvedInputFileName: String = {
+    val fileNames = resolvedInputFileNames
+    if (fileNames.size > 1) {
+      throw new IllegalStateException("This source operator accepts only one 
input filename.")
+    }
+    fileNames.head
+  }
+
+  private def extractFileName(tuple: Tuple): String = {
+    if (tuple.getSchema.containsAttribute("filename")) {
+      return tuple.getField[String]("filename")
+    }
+
+    val stringFields = tuple.getFields.collect { case value: String => value }
+    if (stringFields.size == 1) {
+      return stringFields.head
+    }
+
+    tuple.getFields.headOption match {
+      case Some(value: String) => value
+      case _ =>
+        throw new IllegalArgumentException(
+          "The filename input port expects a tuple containing a filename 
string."
+        )
+    }
+  }
+}
diff --git a/frontend/src/assets/operator_images/InputFileScan.png 
b/frontend/src/assets/operator_images/InputFileScan.png
new file mode 100644
index 0000000000..c570e230ac
Binary files /dev/null and 
b/frontend/src/assets/operator_images/InputFileScan.png differ

Reply via email to