This is an automated email from the ASF dual-hosted git repository.

linxinyuan pushed a commit to branch xinyuan-input-source-operator
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/xinyuan-input-source-operator 
by this push:
     new 86f4172d15 update
86f4172d15 is described below

commit 86f4172d1573606474ed0bda4c0acf714c77fac4
Author: Xinyuan Lin <[email protected]>
AuthorDate: Thu Apr 16 15:27:30 2026 -0700

    update
---
 .../apache/texera/amber/operator/LogicalOp.scala   |   4 +-
 ...ScanSourceOpDesc.scala => FileScanOpDesc.scala} |   4 +-
 ...ScanSourceOpExec.scala => FileScanOpExec.scala} |   8 +-
 .../source/scan/file/FileScanSourceOpExec.scala    | 111 +--------------------
 ...eScanSourceOpExec.scala => FileScanUtils.scala} |  24 +----
 ...ceOpDescSpec.scala => FileScanOpDescSpec.scala} |  26 ++---
 .../{InputFileScan.png => FileScanOp.png}          | Bin
 7 files changed, 24 insertions(+), 153 deletions(-)

diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
index 378078f6ba..17f3c9f19d 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
@@ -140,8 +140,8 @@ import 
org.apache.texera.amber.operator.visualization.wordCloud.WordCloudOpDesc
 import org.apache.commons.lang3.builder.{EqualsBuilder, HashCodeBuilder, 
ToStringBuilder}
 import org.apache.texera.amber.operator.sklearn.testing.SklearnTestingOpDesc
 import org.apache.texera.amber.operator.source.scan.file.{
+  FileScanOpDesc,
   FileScanSourceOpDesc,
-  InputFileScanSourceOpDesc
 }
 import 
org.apache.texera.amber.operator.visualization.stripChart.StripChartOpDesc
 
@@ -168,7 +168,7 @@ trait StateTransferFunc
     // new Type(value = classOf[ParallelCSVScanSourceOpDesc], name = 
"ParallelCSVFileScan"),
     new Type(value = classOf[JSONLScanSourceOpDesc], name = "JSONLFileScan"),
     new Type(value = classOf[FileScanSourceOpDesc], name = "FileScan"),
-    new Type(value = classOf[InputFileScanSourceOpDesc], name = 
"InputFileScan"),
+    new Type(value = classOf[FileScanOpDesc], name = "FileScanOp"),
     new Type(value = classOf[TextInputSourceOpDesc], name = "TextInput"),
     new Type(
       value = classOf[TwitterFullArchiveSearchSourceOpDesc],
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/InputFileScanSourceOpDesc.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanOpDesc.scala
similarity index 95%
rename from 
common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/InputFileScanSourceOpDesc.scala
rename to 
common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanOpDesc.scala
index d52406f134..9635f7fa58 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/InputFileScanSourceOpDesc.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanOpDesc.scala
@@ -41,7 +41,7 @@ import 
org.apache.texera.amber.operator.source.scan.FileDecodingMethod
 import org.apache.texera.amber.operator.source.scan.text.TextSourceOpDesc
 import org.apache.texera.amber.util.JSONUtils.objectMapper
 
-class InputFileScanSourceOpDesc extends SourceOperatorDescriptor with 
TextSourceOpDesc {
+class FileScanOpDesc extends SourceOperatorDescriptor with TextSourceOpDesc {
   @JsonProperty(defaultValue = "UTF_8", required = true)
   @JsonSchemaTitle("Encoding")
   var fileEncoding: FileDecodingMethod = FileDecodingMethod.UTF_8
@@ -71,7 +71,7 @@ class InputFileScanSourceOpDesc extends 
SourceOperatorDescriptor with TextSource
         executionId,
         operatorIdentifier,
         OpExecWithClassName(
-          
"org.apache.texera.amber.operator.source.scan.file.InputFileScanSourceOpExec",
+          "org.apache.texera.amber.operator.source.scan.file.FileScanOpExec",
           objectMapper.writeValueAsString(this)
         )
       )
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/InputFileScanSourceOpExec.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanOpExec.scala
similarity index 88%
rename from 
common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/InputFileScanSourceOpExec.scala
rename to 
common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanOpExec.scala
index b8c72e0d0f..9a84b40a3c 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/InputFileScanSourceOpExec.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanOpExec.scala
@@ -24,16 +24,16 @@ import org.apache.texera.amber.core.storage.FileResolver
 import org.apache.texera.amber.core.tuple.{Tuple, TupleLike}
 import org.apache.texera.amber.util.JSONUtils.objectMapper
 
-class InputFileScanSourceOpExec private[scan] (
+class FileScanOpExec private[scan] (
     descString: String
 ) extends OperatorExecutor {
-  private val desc: InputFileScanSourceOpDesc =
-    objectMapper.readValue(descString, classOf[InputFileScanSourceOpDesc])
+  private val desc: FileScanOpDesc =
+    objectMapper.readValue(descString, classOf[FileScanOpDesc])
 
   override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = {
     val fileName =
       FileResolver.resolve(tuple.getFields.collectFirst { case s: String => s 
}.get).toASCIIString
-    FileScanSourceOpExec.createTuplesFromFile(
+    FileScanUtils.createTuplesFromFile(
       fileName = fileName,
       attributeType = desc.attributeType,
       fileEncoding = desc.fileEncoding,
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpExec.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpExec.scala
index 636ae23f97..d47cf3681c 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpExec.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpExec.scala
@@ -19,118 +19,11 @@
 
 package org.apache.texera.amber.operator.source.scan.file
 
-import org.apache.commons.compress.archivers.ArchiveStreamFactory
-import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream
-import org.apache.commons.io.IOUtils.toByteArray
 import org.apache.texera.amber.core.executor.SourceOperatorExecutor
-import org.apache.texera.amber.core.storage.DocumentFactory
-import org.apache.texera.amber.core.tuple.AttributeTypeUtils.parseField
-import org.apache.texera.amber.core.tuple.LargeBinary
 import org.apache.texera.amber.core.tuple.TupleLike
-import org.apache.texera.amber.operator.source.scan.{
-  AutoClosingIterator,
-  FileAttributeType,
-  FileDecodingMethod
-}
 import org.apache.texera.amber.util.JSONUtils.objectMapper
-import org.apache.texera.service.util.LargeBinaryOutputStream
-
-import java.io._
-import java.net.URI
-import scala.collection.mutable
-import scala.jdk.CollectionConverters.IteratorHasAsScala
-
-private[scan] object FileScanSourceOpExec {
-  def createTuplesFromFile(
-      fileName: String,
-      attributeType: FileAttributeType,
-      fileEncoding: FileDecodingMethod,
-      extract: Boolean,
-      outputFileName: Boolean,
-      fileScanOffset: Option[Int],
-      fileScanLimit: Option[Int]
-  ): Iterator[TupleLike] = {
-    val inputStream = DocumentFactory.openReadonlyDocument(new 
URI(fileName)).asInputStream()
-
-    val closeables = mutable.ArrayBuffer.empty[AutoCloseable]
-    var zipIn: ZipArchiveInputStream = null
-    val archiveStream: InputStream =
-      if (extract) {
-        zipIn = new ArchiveStreamFactory()
-          .createArchiveInputStream(new BufferedInputStream(inputStream))
-          .asInstanceOf[ZipArchiveInputStream]
-        closeables += zipIn
-        zipIn
-      } else {
-        closeables += inputStream
-        inputStream
-      }
 
-    var filenameIt: Iterator[String] = Iterator.empty
-    val fileEntries: Iterator[InputStream] =
-      if (extract) {
-        val (it1, it2) = Iterator
-          .continually(zipIn.getNextEntry)
-          .takeWhile(_ != null)
-          .filterNot(_.getName.startsWith("__MACOSX"))
-          .duplicate
-        filenameIt = it1.map(_.getName)
-        it2.map(_ => zipIn)
-      } else {
-        Iterator(archiveStream)
-      }
-
-    val rawIterator: Iterator[TupleLike] =
-      if (attributeType.isSingle) {
-        fileEntries.zipAll(filenameIt, null, null).map {
-          case (entry, entryFileName) =>
-            val fields = mutable.ListBuffer.empty[Any]
-            if (outputFileName) {
-              fields += entryFileName
-            }
-            fields += (attributeType match {
-              case FileAttributeType.SINGLE_STRING =>
-                new String(toByteArray(entry), fileEncoding.getCharset)
-              case FileAttributeType.LARGE_BINARY =>
-                val largeBinary = new LargeBinary()
-                val out = new LargeBinaryOutputStream(largeBinary)
-                try {
-                  val buffer = new Array[Byte](8192)
-                  var bytesRead = entry.read(buffer)
-                  while (bytesRead != -1) {
-                    out.write(buffer, 0, bytesRead)
-                    bytesRead = entry.read(buffer)
-                  }
-                } finally {
-                  out.close()
-                }
-                largeBinary
-              case _ => parseField(toByteArray(entry), attributeType.getType)
-            })
-            TupleLike(fields.toSeq: _*)
-        }
-      } else {
-        fileEntries.flatMap(entry =>
-          new BufferedReader(new InputStreamReader(entry, 
fileEncoding.getCharset))
-            .lines()
-            .iterator()
-            .asScala
-            .slice(
-              fileScanOffset.getOrElse(0),
-              fileScanOffset.getOrElse(0) + 
fileScanLimit.getOrElse(Int.MaxValue)
-            )
-            .map(line =>
-              TupleLike(attributeType match {
-                case FileAttributeType.SINGLE_STRING => line
-                case _                               => parseField(line, 
attributeType.getType)
-              })
-            )
-        )
-      }
-
-    new AutoClosingIterator(rawIterator, () => closeables.foreach(_.close()))
-  }
-}
+import java.io.IOException
 
 class FileScanSourceOpExec private[scan] (
     descString: String
@@ -140,7 +33,7 @@ class FileScanSourceOpExec private[scan] (
 
   @throws[IOException]
   override def produceTuple(): Iterator[TupleLike] = {
-    FileScanSourceOpExec.createTuplesFromFile(
+    FileScanUtils.createTuplesFromFile(
       fileName = desc.fileName.get,
       attributeType = desc.attributeType,
       fileEncoding = desc.fileEncoding,
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpExec.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanUtils.scala
similarity index 85%
copy from 
common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpExec.scala
copy to 
common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanUtils.scala
index 636ae23f97..3e00f72fec 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpExec.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanUtils.scala
@@ -22,7 +22,6 @@ package org.apache.texera.amber.operator.source.scan.file
 import org.apache.commons.compress.archivers.ArchiveStreamFactory
 import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream
 import org.apache.commons.io.IOUtils.toByteArray
-import org.apache.texera.amber.core.executor.SourceOperatorExecutor
 import org.apache.texera.amber.core.storage.DocumentFactory
 import org.apache.texera.amber.core.tuple.AttributeTypeUtils.parseField
 import org.apache.texera.amber.core.tuple.LargeBinary
@@ -32,7 +31,6 @@ import org.apache.texera.amber.operator.source.scan.{
   FileAttributeType,
   FileDecodingMethod
 }
-import org.apache.texera.amber.util.JSONUtils.objectMapper
 import org.apache.texera.service.util.LargeBinaryOutputStream
 
 import java.io._
@@ -40,7 +38,7 @@ import java.net.URI
 import scala.collection.mutable
 import scala.jdk.CollectionConverters.IteratorHasAsScala
 
-private[scan] object FileScanSourceOpExec {
+private[file] object FileScanUtils {
   def createTuplesFromFile(
       fileName: String,
       attributeType: FileAttributeType,
@@ -131,23 +129,3 @@ private[scan] object FileScanSourceOpExec {
     new AutoClosingIterator(rawIterator, () => closeables.foreach(_.close()))
   }
 }
-
-class FileScanSourceOpExec private[scan] (
-    descString: String
-) extends SourceOperatorExecutor {
-  private val desc: FileScanSourceOpDesc =
-    objectMapper.readValue(descString, classOf[FileScanSourceOpDesc])
-
-  @throws[IOException]
-  override def produceTuple(): Iterator[TupleLike] = {
-    FileScanSourceOpExec.createTuplesFromFile(
-      fileName = desc.fileName.get,
-      attributeType = desc.attributeType,
-      fileEncoding = desc.fileEncoding,
-      extract = desc.extract,
-      outputFileName = desc.outputFileName,
-      fileScanOffset = desc.fileScanOffset,
-      fileScanLimit = desc.fileScanLimit
-    )
-  }
-}
diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/file/InputFileScanSourceOpDescSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/file/FileScanOpDescSpec.scala
similarity index 74%
rename from 
common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/file/InputFileScanSourceOpDescSpec.scala
rename to 
common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/file/FileScanOpDescSpec.scala
index 901fca0251..9eec614f50 100644
--- 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/file/InputFileScanSourceOpDescSpec.scala
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/file/FileScanOpDescSpec.scala
@@ -32,39 +32,39 @@ import org.apache.texera.amber.util.JSONUtils.objectMapper
 import org.scalatest.BeforeAndAfter
 import org.scalatest.flatspec.AnyFlatSpec
 
-class InputFileScanSourceOpDescSpec extends AnyFlatSpec with BeforeAndAfter {
+class FileScanOpDescSpec extends AnyFlatSpec with BeforeAndAfter {
 
   private val inputSchema = new Schema(new Attribute("filename", 
AttributeType.STRING))
 
-  var inputFileScanSourceOpDesc: InputFileScanSourceOpDesc = _
+  var fileScanOpDesc: FileScanOpDesc = _
 
   before {
-    inputFileScanSourceOpDesc = new InputFileScanSourceOpDesc()
-    inputFileScanSourceOpDesc.fileEncoding = FileDecodingMethod.UTF_8
+    fileScanOpDesc = new FileScanOpDesc()
+    fileScanOpDesc.fileEncoding = FileDecodingMethod.UTF_8
   }
 
   it should "infer schema with single column representing each line of text" 
in {
-    val inferredSchema: Schema = inputFileScanSourceOpDesc.sourceSchema()
+    val inferredSchema: Schema = fileScanOpDesc.sourceSchema()
 
     assert(inferredSchema.getAttributes.length == 1)
     assert(inferredSchema.getAttribute("line").getType == AttributeType.STRING)
   }
 
   it should "read first 5 lines from the input file path tuple into output 
tuples" in {
-    inputFileScanSourceOpDesc.attributeType = FileAttributeType.STRING
-    inputFileScanSourceOpDesc.fileScanLimit = Option(5)
+    fileScanOpDesc.attributeType = FileAttributeType.STRING
+    fileScanOpDesc.fileScanLimit = Option(5)
 
     val inputTuple = Tuple(inputSchema, 
Array[Any](TestOperators.TestTextFilePath))
-    val inputFileScanSourceOpExec =
-      new 
InputFileScanSourceOpExec(objectMapper.writeValueAsString(inputFileScanSourceOpDesc))
+    val fileScanOpExec =
+      new FileScanOpExec(objectMapper.writeValueAsString(fileScanOpDesc))
 
-    inputFileScanSourceOpExec.open()
-    val processedTuple: Iterator[Tuple] = inputFileScanSourceOpExec
+    fileScanOpExec.open()
+    val processedTuple: Iterator[Tuple] = fileScanOpExec
       .processTuple(inputTuple, 0)
       .map(tupleLike =>
         tupleLike
           .asInstanceOf[SchemaEnforceable]
-          .enforceSchema(inputFileScanSourceOpDesc.sourceSchema())
+          .enforceSchema(fileScanOpDesc.sourceSchema())
       )
 
     assert(processedTuple.next().getField("line").equals("line1"))
@@ -73,6 +73,6 @@ class InputFileScanSourceOpDescSpec extends AnyFlatSpec with 
BeforeAndAfter {
     assert(processedTuple.next().getField("line").equals("line4"))
     assert(processedTuple.next().getField("line").equals("line5"))
     
assertThrows[java.util.NoSuchElementException](processedTuple.next().getField("line"))
-    inputFileScanSourceOpExec.close()
+    fileScanOpExec.close()
   }
 }
diff --git a/frontend/src/assets/operator_images/InputFileScan.png 
b/frontend/src/assets/operator_images/FileScanOp.png
similarity index 100%
rename from frontend/src/assets/operator_images/InputFileScan.png
rename to frontend/src/assets/operator_images/FileScanOp.png

Reply via email to