This is an automated email from the ASF dual-hosted git repository.
linxinyuan pushed a commit to branch xinyuan-input-source-operator
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/xinyuan-input-source-operator
by this push:
new 86f4172d15 update
86f4172d15 is described below
commit 86f4172d1573606474ed0bda4c0acf714c77fac4
Author: Xinyuan Lin <[email protected]>
AuthorDate: Thu Apr 16 15:27:30 2026 -0700
update
---
.../apache/texera/amber/operator/LogicalOp.scala | 4 +-
...ScanSourceOpDesc.scala => FileScanOpDesc.scala} | 4 +-
...ScanSourceOpExec.scala => FileScanOpExec.scala} | 8 +-
.../source/scan/file/FileScanSourceOpExec.scala | 111 +--------------------
...eScanSourceOpExec.scala => FileScanUtils.scala} | 24 +----
...ceOpDescSpec.scala => FileScanOpDescSpec.scala} | 26 ++---
.../{InputFileScan.png => FileScanOp.png} | Bin
7 files changed, 24 insertions(+), 153 deletions(-)
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
index 378078f6ba..17f3c9f19d 100644
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
@@ -140,8 +140,8 @@ import
org.apache.texera.amber.operator.visualization.wordCloud.WordCloudOpDesc
import org.apache.commons.lang3.builder.{EqualsBuilder, HashCodeBuilder,
ToStringBuilder}
import org.apache.texera.amber.operator.sklearn.testing.SklearnTestingOpDesc
import org.apache.texera.amber.operator.source.scan.file.{
+ FileScanOpDesc,
FileScanSourceOpDesc,
- InputFileScanSourceOpDesc
}
import
org.apache.texera.amber.operator.visualization.stripChart.StripChartOpDesc
@@ -168,7 +168,7 @@ trait StateTransferFunc
// new Type(value = classOf[ParallelCSVScanSourceOpDesc], name =
"ParallelCSVFileScan"),
new Type(value = classOf[JSONLScanSourceOpDesc], name = "JSONLFileScan"),
new Type(value = classOf[FileScanSourceOpDesc], name = "FileScan"),
- new Type(value = classOf[InputFileScanSourceOpDesc], name =
"InputFileScan"),
+ new Type(value = classOf[FileScanOpDesc], name = "FileScanOp"),
new Type(value = classOf[TextInputSourceOpDesc], name = "TextInput"),
new Type(
value = classOf[TwitterFullArchiveSearchSourceOpDesc],
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/InputFileScanSourceOpDesc.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanOpDesc.scala
similarity index 95%
rename from
common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/InputFileScanSourceOpDesc.scala
rename to
common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanOpDesc.scala
index d52406f134..9635f7fa58 100644
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/InputFileScanSourceOpDesc.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanOpDesc.scala
@@ -41,7 +41,7 @@ import
org.apache.texera.amber.operator.source.scan.FileDecodingMethod
import org.apache.texera.amber.operator.source.scan.text.TextSourceOpDesc
import org.apache.texera.amber.util.JSONUtils.objectMapper
-class InputFileScanSourceOpDesc extends SourceOperatorDescriptor with
TextSourceOpDesc {
+class FileScanOpDesc extends SourceOperatorDescriptor with TextSourceOpDesc {
@JsonProperty(defaultValue = "UTF_8", required = true)
@JsonSchemaTitle("Encoding")
var fileEncoding: FileDecodingMethod = FileDecodingMethod.UTF_8
@@ -71,7 +71,7 @@ class InputFileScanSourceOpDesc extends
SourceOperatorDescriptor with TextSource
executionId,
operatorIdentifier,
OpExecWithClassName(
-
"org.apache.texera.amber.operator.source.scan.file.InputFileScanSourceOpExec",
+ "org.apache.texera.amber.operator.source.scan.file.FileScanOpExec",
objectMapper.writeValueAsString(this)
)
)
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/InputFileScanSourceOpExec.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanOpExec.scala
similarity index 88%
rename from
common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/InputFileScanSourceOpExec.scala
rename to
common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanOpExec.scala
index b8c72e0d0f..9a84b40a3c 100644
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/InputFileScanSourceOpExec.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanOpExec.scala
@@ -24,16 +24,16 @@ import org.apache.texera.amber.core.storage.FileResolver
import org.apache.texera.amber.core.tuple.{Tuple, TupleLike}
import org.apache.texera.amber.util.JSONUtils.objectMapper
-class InputFileScanSourceOpExec private[scan] (
+class FileScanOpExec private[scan] (
descString: String
) extends OperatorExecutor {
- private val desc: InputFileScanSourceOpDesc =
- objectMapper.readValue(descString, classOf[InputFileScanSourceOpDesc])
+ private val desc: FileScanOpDesc =
+ objectMapper.readValue(descString, classOf[FileScanOpDesc])
override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = {
val fileName =
FileResolver.resolve(tuple.getFields.collectFirst { case s: String => s
}.get).toASCIIString
- FileScanSourceOpExec.createTuplesFromFile(
+ FileScanUtils.createTuplesFromFile(
fileName = fileName,
attributeType = desc.attributeType,
fileEncoding = desc.fileEncoding,
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpExec.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpExec.scala
index 636ae23f97..d47cf3681c 100644
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpExec.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpExec.scala
@@ -19,118 +19,11 @@
package org.apache.texera.amber.operator.source.scan.file
-import org.apache.commons.compress.archivers.ArchiveStreamFactory
-import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream
-import org.apache.commons.io.IOUtils.toByteArray
import org.apache.texera.amber.core.executor.SourceOperatorExecutor
-import org.apache.texera.amber.core.storage.DocumentFactory
-import org.apache.texera.amber.core.tuple.AttributeTypeUtils.parseField
-import org.apache.texera.amber.core.tuple.LargeBinary
import org.apache.texera.amber.core.tuple.TupleLike
-import org.apache.texera.amber.operator.source.scan.{
- AutoClosingIterator,
- FileAttributeType,
- FileDecodingMethod
-}
import org.apache.texera.amber.util.JSONUtils.objectMapper
-import org.apache.texera.service.util.LargeBinaryOutputStream
-
-import java.io._
-import java.net.URI
-import scala.collection.mutable
-import scala.jdk.CollectionConverters.IteratorHasAsScala
-
-private[scan] object FileScanSourceOpExec {
- def createTuplesFromFile(
- fileName: String,
- attributeType: FileAttributeType,
- fileEncoding: FileDecodingMethod,
- extract: Boolean,
- outputFileName: Boolean,
- fileScanOffset: Option[Int],
- fileScanLimit: Option[Int]
- ): Iterator[TupleLike] = {
- val inputStream = DocumentFactory.openReadonlyDocument(new
URI(fileName)).asInputStream()
-
- val closeables = mutable.ArrayBuffer.empty[AutoCloseable]
- var zipIn: ZipArchiveInputStream = null
- val archiveStream: InputStream =
- if (extract) {
- zipIn = new ArchiveStreamFactory()
- .createArchiveInputStream(new BufferedInputStream(inputStream))
- .asInstanceOf[ZipArchiveInputStream]
- closeables += zipIn
- zipIn
- } else {
- closeables += inputStream
- inputStream
- }
- var filenameIt: Iterator[String] = Iterator.empty
- val fileEntries: Iterator[InputStream] =
- if (extract) {
- val (it1, it2) = Iterator
- .continually(zipIn.getNextEntry)
- .takeWhile(_ != null)
- .filterNot(_.getName.startsWith("__MACOSX"))
- .duplicate
- filenameIt = it1.map(_.getName)
- it2.map(_ => zipIn)
- } else {
- Iterator(archiveStream)
- }
-
- val rawIterator: Iterator[TupleLike] =
- if (attributeType.isSingle) {
- fileEntries.zipAll(filenameIt, null, null).map {
- case (entry, entryFileName) =>
- val fields = mutable.ListBuffer.empty[Any]
- if (outputFileName) {
- fields += entryFileName
- }
- fields += (attributeType match {
- case FileAttributeType.SINGLE_STRING =>
- new String(toByteArray(entry), fileEncoding.getCharset)
- case FileAttributeType.LARGE_BINARY =>
- val largeBinary = new LargeBinary()
- val out = new LargeBinaryOutputStream(largeBinary)
- try {
- val buffer = new Array[Byte](8192)
- var bytesRead = entry.read(buffer)
- while (bytesRead != -1) {
- out.write(buffer, 0, bytesRead)
- bytesRead = entry.read(buffer)
- }
- } finally {
- out.close()
- }
- largeBinary
- case _ => parseField(toByteArray(entry), attributeType.getType)
- })
- TupleLike(fields.toSeq: _*)
- }
- } else {
- fileEntries.flatMap(entry =>
- new BufferedReader(new InputStreamReader(entry,
fileEncoding.getCharset))
- .lines()
- .iterator()
- .asScala
- .slice(
- fileScanOffset.getOrElse(0),
- fileScanOffset.getOrElse(0) +
fileScanLimit.getOrElse(Int.MaxValue)
- )
- .map(line =>
- TupleLike(attributeType match {
- case FileAttributeType.SINGLE_STRING => line
- case _ => parseField(line,
attributeType.getType)
- })
- )
- )
- }
-
- new AutoClosingIterator(rawIterator, () => closeables.foreach(_.close()))
- }
-}
+import java.io.IOException
class FileScanSourceOpExec private[scan] (
descString: String
@@ -140,7 +33,7 @@ class FileScanSourceOpExec private[scan] (
@throws[IOException]
override def produceTuple(): Iterator[TupleLike] = {
- FileScanSourceOpExec.createTuplesFromFile(
+ FileScanUtils.createTuplesFromFile(
fileName = desc.fileName.get,
attributeType = desc.attributeType,
fileEncoding = desc.fileEncoding,
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpExec.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanUtils.scala
similarity index 85%
copy from
common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpExec.scala
copy to
common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanUtils.scala
index 636ae23f97..3e00f72fec 100644
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanSourceOpExec.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/file/FileScanUtils.scala
@@ -22,7 +22,6 @@ package org.apache.texera.amber.operator.source.scan.file
import org.apache.commons.compress.archivers.ArchiveStreamFactory
import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream
import org.apache.commons.io.IOUtils.toByteArray
-import org.apache.texera.amber.core.executor.SourceOperatorExecutor
import org.apache.texera.amber.core.storage.DocumentFactory
import org.apache.texera.amber.core.tuple.AttributeTypeUtils.parseField
import org.apache.texera.amber.core.tuple.LargeBinary
@@ -32,7 +31,6 @@ import org.apache.texera.amber.operator.source.scan.{
FileAttributeType,
FileDecodingMethod
}
-import org.apache.texera.amber.util.JSONUtils.objectMapper
import org.apache.texera.service.util.LargeBinaryOutputStream
import java.io._
@@ -40,7 +38,7 @@ import java.net.URI
import scala.collection.mutable
import scala.jdk.CollectionConverters.IteratorHasAsScala
-private[scan] object FileScanSourceOpExec {
+private[file] object FileScanUtils {
def createTuplesFromFile(
fileName: String,
attributeType: FileAttributeType,
@@ -131,23 +129,3 @@ private[scan] object FileScanSourceOpExec {
new AutoClosingIterator(rawIterator, () => closeables.foreach(_.close()))
}
}
-
-class FileScanSourceOpExec private[scan] (
- descString: String
-) extends SourceOperatorExecutor {
- private val desc: FileScanSourceOpDesc =
- objectMapper.readValue(descString, classOf[FileScanSourceOpDesc])
-
- @throws[IOException]
- override def produceTuple(): Iterator[TupleLike] = {
- FileScanSourceOpExec.createTuplesFromFile(
- fileName = desc.fileName.get,
- attributeType = desc.attributeType,
- fileEncoding = desc.fileEncoding,
- extract = desc.extract,
- outputFileName = desc.outputFileName,
- fileScanOffset = desc.fileScanOffset,
- fileScanLimit = desc.fileScanLimit
- )
- }
-}
diff --git
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/file/InputFileScanSourceOpDescSpec.scala
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/file/FileScanOpDescSpec.scala
similarity index 74%
rename from
common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/file/InputFileScanSourceOpDescSpec.scala
rename to
common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/file/FileScanOpDescSpec.scala
index 901fca0251..9eec614f50 100644
---
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/file/InputFileScanSourceOpDescSpec.scala
+++
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/scan/file/FileScanOpDescSpec.scala
@@ -32,39 +32,39 @@ import org.apache.texera.amber.util.JSONUtils.objectMapper
import org.scalatest.BeforeAndAfter
import org.scalatest.flatspec.AnyFlatSpec
-class InputFileScanSourceOpDescSpec extends AnyFlatSpec with BeforeAndAfter {
+class FileScanOpDescSpec extends AnyFlatSpec with BeforeAndAfter {
private val inputSchema = new Schema(new Attribute("filename",
AttributeType.STRING))
- var inputFileScanSourceOpDesc: InputFileScanSourceOpDesc = _
+ var fileScanOpDesc: FileScanOpDesc = _
before {
- inputFileScanSourceOpDesc = new InputFileScanSourceOpDesc()
- inputFileScanSourceOpDesc.fileEncoding = FileDecodingMethod.UTF_8
+ fileScanOpDesc = new FileScanOpDesc()
+ fileScanOpDesc.fileEncoding = FileDecodingMethod.UTF_8
}
it should "infer schema with single column representing each line of text"
in {
- val inferredSchema: Schema = inputFileScanSourceOpDesc.sourceSchema()
+ val inferredSchema: Schema = fileScanOpDesc.sourceSchema()
assert(inferredSchema.getAttributes.length == 1)
assert(inferredSchema.getAttribute("line").getType == AttributeType.STRING)
}
it should "read first 5 lines from the input file path tuple into output
tuples" in {
- inputFileScanSourceOpDesc.attributeType = FileAttributeType.STRING
- inputFileScanSourceOpDesc.fileScanLimit = Option(5)
+ fileScanOpDesc.attributeType = FileAttributeType.STRING
+ fileScanOpDesc.fileScanLimit = Option(5)
val inputTuple = Tuple(inputSchema,
Array[Any](TestOperators.TestTextFilePath))
- val inputFileScanSourceOpExec =
- new
InputFileScanSourceOpExec(objectMapper.writeValueAsString(inputFileScanSourceOpDesc))
+ val fileScanOpExec =
+ new FileScanOpExec(objectMapper.writeValueAsString(fileScanOpDesc))
- inputFileScanSourceOpExec.open()
- val processedTuple: Iterator[Tuple] = inputFileScanSourceOpExec
+ fileScanOpExec.open()
+ val processedTuple: Iterator[Tuple] = fileScanOpExec
.processTuple(inputTuple, 0)
.map(tupleLike =>
tupleLike
.asInstanceOf[SchemaEnforceable]
- .enforceSchema(inputFileScanSourceOpDesc.sourceSchema())
+ .enforceSchema(fileScanOpDesc.sourceSchema())
)
assert(processedTuple.next().getField("line").equals("line1"))
@@ -73,6 +73,6 @@ class InputFileScanSourceOpDescSpec extends AnyFlatSpec with
BeforeAndAfter {
assert(processedTuple.next().getField("line").equals("line4"))
assert(processedTuple.next().getField("line").equals("line5"))
assertThrows[java.util.NoSuchElementException](processedTuple.next().getField("line"))
- inputFileScanSourceOpExec.close()
+ fileScanOpExec.close()
}
}
diff --git a/frontend/src/assets/operator_images/InputFileScan.png
b/frontend/src/assets/operator_images/FileScanOp.png
similarity index 100%
rename from frontend/src/assets/operator_images/InputFileScan.png
rename to frontend/src/assets/operator_images/FileScanOp.png