This is an automated email from the ASF dual-hosted git repository.

linxinyuan pushed a commit to branch xinyuan-input-source-operator
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/xinyuan-input-source-operator 
by this push:
     new b151f977c0 fix fmt
     new 6aca8d48f9 Merge remote-tracking branch 
'origin/xinyuan-input-source-operator' into xinyuan-input-source-operator
b151f977c0 is described below

commit b151f977c00f411658cfcafc76364650fa6a1186
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sun Apr 12 17:15:09 2026 -0700

    fix fmt
---
 .../source/scan/file/FileScanSourceOpDesc.scala    |   2 +-
 .../source/scan/file/FileScanSourceOpExec.scala    | 114 +++++++++++++++++-
 .../scan/file/FileScanSourceOpExecSupport.scala    | 131 ---------------------
 .../scan/file/InputFileScanSourceOpDesc.scala      |   2 +-
 .../scan/file/InputFileScanSourceOpExec.scala      |   5 +-
 5 files changed, 114 insertions(+), 140 deletions(-)

diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpDesc.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpDesc.scala
index a695b353ad..82997632d1 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpDesc.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpDesc.scala
@@ -74,7 +74,7 @@ class FileScanSourceOpDesc extends ScanSourceOpDesc with 
TextSourceOpDesc {
         executionId,
         operatorIdentifier,
         OpExecWithClassName(
-          "org.apache.texera.amber.operator.source.scan.FileScanSourceOpExec",
+          
"org.apache.texera.amber.operator.source.scan.file.FileScanSourceOpExec",
           objectMapper.writeValueAsString(this)
         )
       )
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpExec.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpExec.scala
index bd04e45b08..636ae23f97 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpExec.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpExec.scala
@@ -19,22 +19,128 @@
 
 package org.apache.texera.amber.operator.source.scan.file
 
+import org.apache.commons.compress.archivers.ArchiveStreamFactory
+import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream
+import org.apache.commons.io.IOUtils.toByteArray
 import org.apache.texera.amber.core.executor.SourceOperatorExecutor
+import org.apache.texera.amber.core.storage.DocumentFactory
+import org.apache.texera.amber.core.tuple.AttributeTypeUtils.parseField
+import org.apache.texera.amber.core.tuple.LargeBinary
 import org.apache.texera.amber.core.tuple.TupleLike
+import org.apache.texera.amber.operator.source.scan.{
+  AutoClosingIterator,
+  FileAttributeType,
+  FileDecodingMethod
+}
 import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.apache.texera.service.util.LargeBinaryOutputStream
+
+import java.io._
+import java.net.URI
+import scala.collection.mutable
+import scala.jdk.CollectionConverters.IteratorHasAsScala
+
+private[scan] object FileScanSourceOpExec {
+  def createTuplesFromFile(
+      fileName: String,
+      attributeType: FileAttributeType,
+      fileEncoding: FileDecodingMethod,
+      extract: Boolean,
+      outputFileName: Boolean,
+      fileScanOffset: Option[Int],
+      fileScanLimit: Option[Int]
+  ): Iterator[TupleLike] = {
+    val inputStream = DocumentFactory.openReadonlyDocument(new 
URI(fileName)).asInputStream()
+
+    val closeables = mutable.ArrayBuffer.empty[AutoCloseable]
+    var zipIn: ZipArchiveInputStream = null
+    val archiveStream: InputStream =
+      if (extract) {
+        zipIn = new ArchiveStreamFactory()
+          .createArchiveInputStream(new BufferedInputStream(inputStream))
+          .asInstanceOf[ZipArchiveInputStream]
+        closeables += zipIn
+        zipIn
+      } else {
+        closeables += inputStream
+        inputStream
+      }
 
-import java.io.IOException
+    var filenameIt: Iterator[String] = Iterator.empty
+    val fileEntries: Iterator[InputStream] =
+      if (extract) {
+        val (it1, it2) = Iterator
+          .continually(zipIn.getNextEntry)
+          .takeWhile(_ != null)
+          .filterNot(_.getName.startsWith("__MACOSX"))
+          .duplicate
+        filenameIt = it1.map(_.getName)
+        it2.map(_ => zipIn)
+      } else {
+        Iterator(archiveStream)
+      }
+
+    val rawIterator: Iterator[TupleLike] =
+      if (attributeType.isSingle) {
+        fileEntries.zipAll(filenameIt, null, null).map {
+          case (entry, entryFileName) =>
+            val fields = mutable.ListBuffer.empty[Any]
+            if (outputFileName) {
+              fields += entryFileName
+            }
+            fields += (attributeType match {
+              case FileAttributeType.SINGLE_STRING =>
+                new String(toByteArray(entry), fileEncoding.getCharset)
+              case FileAttributeType.LARGE_BINARY =>
+                val largeBinary = new LargeBinary()
+                val out = new LargeBinaryOutputStream(largeBinary)
+                try {
+                  val buffer = new Array[Byte](8192)
+                  var bytesRead = entry.read(buffer)
+                  while (bytesRead != -1) {
+                    out.write(buffer, 0, bytesRead)
+                    bytesRead = entry.read(buffer)
+                  }
+                } finally {
+                  out.close()
+                }
+                largeBinary
+              case _ => parseField(toByteArray(entry), attributeType.getType)
+            })
+            TupleLike(fields.toSeq: _*)
+        }
+      } else {
+        fileEntries.flatMap(entry =>
+          new BufferedReader(new InputStreamReader(entry, 
fileEncoding.getCharset))
+            .lines()
+            .iterator()
+            .asScala
+            .slice(
+              fileScanOffset.getOrElse(0),
+              fileScanOffset.getOrElse(0) + 
fileScanLimit.getOrElse(Int.MaxValue)
+            )
+            .map(line =>
+              TupleLike(attributeType match {
+                case FileAttributeType.SINGLE_STRING => line
+                case _                               => parseField(line, 
attributeType.getType)
+              })
+            )
+        )
+      }
+
+    new AutoClosingIterator(rawIterator, () => closeables.foreach(_.close()))
+  }
+}
 
 class FileScanSourceOpExec private[scan] (
     descString: String
-) extends SourceOperatorExecutor
-    with FileScanSourceOpExecSupport {
+) extends SourceOperatorExecutor {
   private val desc: FileScanSourceOpDesc =
     objectMapper.readValue(descString, classOf[FileScanSourceOpDesc])
 
   @throws[IOException]
   override def produceTuple(): Iterator[TupleLike] = {
-    createTuplesFromFile(
+    FileScanSourceOpExec.createTuplesFromFile(
       fileName = desc.fileName.get,
       attributeType = desc.attributeType,
       fileEncoding = desc.fileEncoding,
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpExecSupport.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpExecSupport.scala
deleted file mode 100644
index e29822dfff..0000000000
--- 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpExecSupport.scala
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.texera.amber.operator.source.scan.file
-
-import org.apache.commons.compress.archivers.ArchiveStreamFactory
-import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream
-import org.apache.commons.io.IOUtils.toByteArray
-import org.apache.texera.amber.core.storage.DocumentFactory
-import org.apache.texera.amber.core.tuple.AttributeTypeUtils.parseField
-import org.apache.texera.amber.core.tuple.{LargeBinary, TupleLike}
-import org.apache.texera.amber.operator.source.scan.{
-  AutoClosingIterator,
-  FileAttributeType,
-  FileDecodingMethod
-}
-import org.apache.texera.service.util.LargeBinaryOutputStream
-
-import java.io._
-import java.net.URI
-import scala.collection.mutable
-import scala.jdk.CollectionConverters.IteratorHasAsScala
-
-private[scan] trait FileScanSourceOpExecSupport {
-
-  protected def createTuplesFromFile(
-      fileName: String,
-      attributeType: FileAttributeType,
-      fileEncoding: FileDecodingMethod,
-      extract: Boolean,
-      outputFileName: Boolean,
-      fileScanOffset: Option[Int],
-      fileScanLimit: Option[Int]
-  ): Iterator[TupleLike] = {
-    val inputStream = DocumentFactory.openReadonlyDocument(new 
URI(fileName)).asInputStream()
-
-    val closeables = mutable.ArrayBuffer.empty[AutoCloseable]
-    var zipIn: ZipArchiveInputStream = null
-    val archiveStream: InputStream =
-      if (extract) {
-        zipIn = new ArchiveStreamFactory()
-          .createArchiveInputStream(new BufferedInputStream(inputStream))
-          .asInstanceOf[ZipArchiveInputStream]
-        closeables += zipIn
-        zipIn
-      } else {
-        closeables += inputStream
-        inputStream
-      }
-
-    var filenameIt: Iterator[String] = Iterator.empty
-    val fileEntries: Iterator[InputStream] =
-      if (extract) {
-        val (it1, it2) = Iterator
-          .continually(zipIn.getNextEntry)
-          .takeWhile(_ != null)
-          .filterNot(_.getName.startsWith("__MACOSX"))
-          .duplicate
-        filenameIt = it1.map(_.getName)
-        it2.map(_ => zipIn)
-      } else {
-        Iterator(archiveStream)
-      }
-
-    val rawIterator: Iterator[TupleLike] =
-      if (attributeType.isSingle) {
-        fileEntries.zipAll(filenameIt, null, null).map {
-          case (entry, entryFileName) =>
-            val fields = mutable.ListBuffer.empty[Any]
-            if (outputFileName) {
-              fields += entryFileName
-            }
-            fields += (attributeType match {
-              case FileAttributeType.SINGLE_STRING =>
-                new String(toByteArray(entry), fileEncoding.getCharset)
-              case FileAttributeType.LARGE_BINARY =>
-                val largeBinary = new LargeBinary()
-                val out = new LargeBinaryOutputStream(largeBinary)
-                try {
-                  val buffer = new Array[Byte](8192)
-                  var bytesRead = entry.read(buffer)
-                  while (bytesRead != -1) {
-                    out.write(buffer, 0, bytesRead)
-                    bytesRead = entry.read(buffer)
-                  }
-                } finally {
-                  out.close()
-                }
-                largeBinary
-              case _ => parseField(toByteArray(entry), attributeType.getType)
-            })
-            TupleLike(fields.toSeq: _*)
-        }
-      } else {
-        fileEntries.flatMap(entry =>
-          new BufferedReader(new InputStreamReader(entry, 
fileEncoding.getCharset))
-            .lines()
-            .iterator()
-            .asScala
-            .slice(
-              fileScanOffset.getOrElse(0),
-              fileScanOffset.getOrElse(0) + 
fileScanLimit.getOrElse(Int.MaxValue)
-            )
-            .map(line =>
-              TupleLike(attributeType match {
-                case FileAttributeType.SINGLE_STRING => line
-                case _                               => parseField(line, 
attributeType.getType)
-              })
-            )
-        )
-      }
-
-    new AutoClosingIterator(rawIterator, () => closeables.foreach(_.close()))
-  }
-}
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/InputFileScanSourceOpDesc.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/InputFileScanSourceOpDesc.scala
index 72299fc0d2..d52406f134 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/InputFileScanSourceOpDesc.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/InputFileScanSourceOpDesc.scala
@@ -71,7 +71,7 @@ class InputFileScanSourceOpDesc extends 
SourceOperatorDescriptor with TextSource
         executionId,
         operatorIdentifier,
         OpExecWithClassName(
-          
"org.apache.texera.amber.operator.source.scan.InputFileScanSourceOpExec",
+          
"org.apache.texera.amber.operator.source.scan.file.InputFileScanSourceOpExec",
           objectMapper.writeValueAsString(this)
         )
       )
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/InputFileScanSourceOpExec.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/InputFileScanSourceOpExec.scala
index cedb868d8e..b8c72e0d0f 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/InputFileScanSourceOpExec.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/InputFileScanSourceOpExec.scala
@@ -26,15 +26,14 @@ import org.apache.texera.amber.util.JSONUtils.objectMapper
 
 class InputFileScanSourceOpExec private[scan] (
     descString: String
-) extends OperatorExecutor
-    with FileScanSourceOpExecSupport {
+) extends OperatorExecutor {
   private val desc: InputFileScanSourceOpDesc =
     objectMapper.readValue(descString, classOf[InputFileScanSourceOpDesc])
 
   override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = {
     val fileName =
       FileResolver.resolve(tuple.getFields.collectFirst { case s: String => s 
}.get).toASCIIString
-    createTuplesFromFile(
+    FileScanSourceOpExec.createTuplesFromFile(
       fileName = fileName,
       attributeType = desc.attributeType,
       fileEncoding = desc.fileEncoding,

Reply via email to