This is an automated email from the ASF dual-hosted git repository.
linxinyuan pushed a commit to branch xinyuan-input-source-operator
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/xinyuan-input-source-operator
by this push:
new b151f977c0 fix fmt
new 6aca8d48f9 Merge remote-tracking branch
'origin/xinyuan-input-source-operator' into xinyuan-input-source-operator
b151f977c0 is described below
commit b151f977c00f411658cfcafc76364650fa6a1186
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sun Apr 12 17:15:09 2026 -0700
fix fmt
---
.../source/scan/file/FileScanSourceOpDesc.scala | 2 +-
.../source/scan/file/FileScanSourceOpExec.scala | 114 +++++++++++++++++-
.../scan/file/FileScanSourceOpExecSupport.scala | 131 ---------------------
.../scan/file/InputFileScanSourceOpDesc.scala | 2 +-
.../scan/file/InputFileScanSourceOpExec.scala | 5 +-
5 files changed, 114 insertions(+), 140 deletions(-)
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpDesc.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpDesc.scala
index a695b353ad..82997632d1 100644
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpDesc.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpDesc.scala
@@ -74,7 +74,7 @@ class FileScanSourceOpDesc extends ScanSourceOpDesc with
TextSourceOpDesc {
executionId,
operatorIdentifier,
OpExecWithClassName(
- "org.apache.texera.amber.operator.source.scan.FileScanSourceOpExec",
+
"org.apache.texera.amber.operator.source.scan.file.FileScanSourceOpExec",
objectMapper.writeValueAsString(this)
)
)
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpExec.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpExec.scala
index bd04e45b08..636ae23f97 100644
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpExec.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpExec.scala
@@ -19,22 +19,128 @@
package org.apache.texera.amber.operator.source.scan.file
+import org.apache.commons.compress.archivers.ArchiveStreamFactory
+import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream
+import org.apache.commons.io.IOUtils.toByteArray
import org.apache.texera.amber.core.executor.SourceOperatorExecutor
+import org.apache.texera.amber.core.storage.DocumentFactory
+import org.apache.texera.amber.core.tuple.AttributeTypeUtils.parseField
+import org.apache.texera.amber.core.tuple.LargeBinary
import org.apache.texera.amber.core.tuple.TupleLike
+import org.apache.texera.amber.operator.source.scan.{
+ AutoClosingIterator,
+ FileAttributeType,
+ FileDecodingMethod
+}
import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.apache.texera.service.util.LargeBinaryOutputStream
+
+import java.io._
+import java.net.URI
+import scala.collection.mutable
+import scala.jdk.CollectionConverters.IteratorHasAsScala
+
+private[scan] object FileScanSourceOpExec {
+ def createTuplesFromFile(
+ fileName: String,
+ attributeType: FileAttributeType,
+ fileEncoding: FileDecodingMethod,
+ extract: Boolean,
+ outputFileName: Boolean,
+ fileScanOffset: Option[Int],
+ fileScanLimit: Option[Int]
+ ): Iterator[TupleLike] = {
+ val inputStream = DocumentFactory.openReadonlyDocument(new
URI(fileName)).asInputStream()
+
+ val closeables = mutable.ArrayBuffer.empty[AutoCloseable]
+ var zipIn: ZipArchiveInputStream = null
+ val archiveStream: InputStream =
+ if (extract) {
+ zipIn = new ArchiveStreamFactory()
+ .createArchiveInputStream(new BufferedInputStream(inputStream))
+ .asInstanceOf[ZipArchiveInputStream]
+ closeables += zipIn
+ zipIn
+ } else {
+ closeables += inputStream
+ inputStream
+ }
-import java.io.IOException
+ var filenameIt: Iterator[String] = Iterator.empty
+ val fileEntries: Iterator[InputStream] =
+ if (extract) {
+ val (it1, it2) = Iterator
+ .continually(zipIn.getNextEntry)
+ .takeWhile(_ != null)
+ .filterNot(_.getName.startsWith("__MACOSX"))
+ .duplicate
+ filenameIt = it1.map(_.getName)
+ it2.map(_ => zipIn)
+ } else {
+ Iterator(archiveStream)
+ }
+
+ val rawIterator: Iterator[TupleLike] =
+ if (attributeType.isSingle) {
+ fileEntries.zipAll(filenameIt, null, null).map {
+ case (entry, entryFileName) =>
+ val fields = mutable.ListBuffer.empty[Any]
+ if (outputFileName) {
+ fields += entryFileName
+ }
+ fields += (attributeType match {
+ case FileAttributeType.SINGLE_STRING =>
+ new String(toByteArray(entry), fileEncoding.getCharset)
+ case FileAttributeType.LARGE_BINARY =>
+ val largeBinary = new LargeBinary()
+ val out = new LargeBinaryOutputStream(largeBinary)
+ try {
+ val buffer = new Array[Byte](8192)
+ var bytesRead = entry.read(buffer)
+ while (bytesRead != -1) {
+ out.write(buffer, 0, bytesRead)
+ bytesRead = entry.read(buffer)
+ }
+ } finally {
+ out.close()
+ }
+ largeBinary
+ case _ => parseField(toByteArray(entry), attributeType.getType)
+ })
+ TupleLike(fields.toSeq: _*)
+ }
+ } else {
+ fileEntries.flatMap(entry =>
+ new BufferedReader(new InputStreamReader(entry,
fileEncoding.getCharset))
+ .lines()
+ .iterator()
+ .asScala
+ .slice(
+ fileScanOffset.getOrElse(0),
+ fileScanOffset.getOrElse(0) +
fileScanLimit.getOrElse(Int.MaxValue)
+ )
+ .map(line =>
+ TupleLike(attributeType match {
+ case FileAttributeType.SINGLE_STRING => line
+ case _ => parseField(line,
attributeType.getType)
+ })
+ )
+ )
+ }
+
+ new AutoClosingIterator(rawIterator, () => closeables.foreach(_.close()))
+ }
+}
class FileScanSourceOpExec private[scan] (
descString: String
-) extends SourceOperatorExecutor
- with FileScanSourceOpExecSupport {
+) extends SourceOperatorExecutor {
private val desc: FileScanSourceOpDesc =
objectMapper.readValue(descString, classOf[FileScanSourceOpDesc])
@throws[IOException]
override def produceTuple(): Iterator[TupleLike] = {
- createTuplesFromFile(
+ FileScanSourceOpExec.createTuplesFromFile(
fileName = desc.fileName.get,
attributeType = desc.attributeType,
fileEncoding = desc.fileEncoding,
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpExecSupport.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpExecSupport.scala
deleted file mode 100644
index e29822dfff..0000000000
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpExecSupport.scala
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.texera.amber.operator.source.scan.file
-
-import org.apache.commons.compress.archivers.ArchiveStreamFactory
-import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream
-import org.apache.commons.io.IOUtils.toByteArray
-import org.apache.texera.amber.core.storage.DocumentFactory
-import org.apache.texera.amber.core.tuple.AttributeTypeUtils.parseField
-import org.apache.texera.amber.core.tuple.{LargeBinary, TupleLike}
-import org.apache.texera.amber.operator.source.scan.{
- AutoClosingIterator,
- FileAttributeType,
- FileDecodingMethod
-}
-import org.apache.texera.service.util.LargeBinaryOutputStream
-
-import java.io._
-import java.net.URI
-import scala.collection.mutable
-import scala.jdk.CollectionConverters.IteratorHasAsScala
-
-private[scan] trait FileScanSourceOpExecSupport {
-
- protected def createTuplesFromFile(
- fileName: String,
- attributeType: FileAttributeType,
- fileEncoding: FileDecodingMethod,
- extract: Boolean,
- outputFileName: Boolean,
- fileScanOffset: Option[Int],
- fileScanLimit: Option[Int]
- ): Iterator[TupleLike] = {
- val inputStream = DocumentFactory.openReadonlyDocument(new
URI(fileName)).asInputStream()
-
- val closeables = mutable.ArrayBuffer.empty[AutoCloseable]
- var zipIn: ZipArchiveInputStream = null
- val archiveStream: InputStream =
- if (extract) {
- zipIn = new ArchiveStreamFactory()
- .createArchiveInputStream(new BufferedInputStream(inputStream))
- .asInstanceOf[ZipArchiveInputStream]
- closeables += zipIn
- zipIn
- } else {
- closeables += inputStream
- inputStream
- }
-
- var filenameIt: Iterator[String] = Iterator.empty
- val fileEntries: Iterator[InputStream] =
- if (extract) {
- val (it1, it2) = Iterator
- .continually(zipIn.getNextEntry)
- .takeWhile(_ != null)
- .filterNot(_.getName.startsWith("__MACOSX"))
- .duplicate
- filenameIt = it1.map(_.getName)
- it2.map(_ => zipIn)
- } else {
- Iterator(archiveStream)
- }
-
- val rawIterator: Iterator[TupleLike] =
- if (attributeType.isSingle) {
- fileEntries.zipAll(filenameIt, null, null).map {
- case (entry, entryFileName) =>
- val fields = mutable.ListBuffer.empty[Any]
- if (outputFileName) {
- fields += entryFileName
- }
- fields += (attributeType match {
- case FileAttributeType.SINGLE_STRING =>
- new String(toByteArray(entry), fileEncoding.getCharset)
- case FileAttributeType.LARGE_BINARY =>
- val largeBinary = new LargeBinary()
- val out = new LargeBinaryOutputStream(largeBinary)
- try {
- val buffer = new Array[Byte](8192)
- var bytesRead = entry.read(buffer)
- while (bytesRead != -1) {
- out.write(buffer, 0, bytesRead)
- bytesRead = entry.read(buffer)
- }
- } finally {
- out.close()
- }
- largeBinary
- case _ => parseField(toByteArray(entry), attributeType.getType)
- })
- TupleLike(fields.toSeq: _*)
- }
- } else {
- fileEntries.flatMap(entry =>
- new BufferedReader(new InputStreamReader(entry,
fileEncoding.getCharset))
- .lines()
- .iterator()
- .asScala
- .slice(
- fileScanOffset.getOrElse(0),
- fileScanOffset.getOrElse(0) +
fileScanLimit.getOrElse(Int.MaxValue)
- )
- .map(line =>
- TupleLike(attributeType match {
- case FileAttributeType.SINGLE_STRING => line
- case _ => parseField(line,
attributeType.getType)
- })
- )
- )
- }
-
- new AutoClosingIterator(rawIterator, () => closeables.foreach(_.close()))
- }
-}
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/InputFileScanSourceOpDesc.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/InputFileScanSourceOpDesc.scala
index 72299fc0d2..d52406f134 100644
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/InputFileScanSourceOpDesc.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/InputFileScanSourceOpDesc.scala
@@ -71,7 +71,7 @@ class InputFileScanSourceOpDesc extends
SourceOperatorDescriptor with TextSource
executionId,
operatorIdentifier,
OpExecWithClassName(
-
"org.apache.texera.amber.operator.source.scan.InputFileScanSourceOpExec",
+
"org.apache.texera.amber.operator.source.scan.file.InputFileScanSourceOpExec",
objectMapper.writeValueAsString(this)
)
)
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/InputFileScanSourceOpExec.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/InputFileScanSourceOpExec.scala
index cedb868d8e..b8c72e0d0f 100644
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/InputFileScanSourceOpExec.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/InputFileScanSourceOpExec.scala
@@ -26,15 +26,14 @@ import org.apache.texera.amber.util.JSONUtils.objectMapper
class InputFileScanSourceOpExec private[scan] (
descString: String
-) extends OperatorExecutor
- with FileScanSourceOpExecSupport {
+) extends OperatorExecutor {
private val desc: InputFileScanSourceOpDesc =
objectMapper.readValue(descString, classOf[InputFileScanSourceOpDesc])
override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = {
val fileName =
FileResolver.resolve(tuple.getFields.collectFirst { case s: String => s
}.get).toASCIIString
- createTuplesFromFile(
+ FileScanSourceOpExec.createTuplesFromFile(
fileName = fileName,
attributeType = desc.attributeType,
fileEncoding = desc.fileEncoding,