This is an automated email from the ASF dual-hosted git repository.
linxinyuan pushed a commit to branch xinyuan-input-source-operator
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/xinyuan-input-source-operator
by this push:
new 97ccda3068 fix fmt
97ccda3068 is described below
commit 97ccda306894beb626e194d1982c7211e891762b
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sun Apr 12 03:15:40 2026 -0700
fix fmt
---
.../source/scan/InputFileScanSourceOpExec.scala | 20 +++---
.../source/scan/InputFileSourceOpExec.scala | 84 ----------------------
2 files changed, 10 insertions(+), 94 deletions(-)
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpExec.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpExec.scala
index 4a1514c7f6..9e76710fde 100644
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpExec.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpExec.scala
@@ -19,14 +19,15 @@
package org.apache.texera.amber.operator.source.scan
-import org.apache.texera.amber.core.storage.DocumentFactory
+import org.apache.texera.amber.core.storage.{DocumentFactory, FileResolver}
import org.apache.texera.amber.core.tuple.AttributeTypeUtils.parseField
-import org.apache.texera.amber.core.tuple.{LargeBinary, TupleLike}
+import org.apache.texera.amber.core.tuple.{LargeBinary, Tuple, TupleLike}
import org.apache.texera.amber.util.JSONUtils.objectMapper
import org.apache.texera.service.util.LargeBinaryOutputStream
import org.apache.commons.compress.archivers.ArchiveStreamFactory
import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream
import org.apache.commons.io.IOUtils.toByteArray
+import org.apache.texera.amber.core.executor.OperatorExecutor
import java.io._
import java.net.URI
@@ -35,18 +36,14 @@ import scala.jdk.CollectionConverters.IteratorHasAsScala
class InputFileScanSourceOpExec private[scan] (
descString: String
-) extends InputFileSourceOpExec {
+) extends OperatorExecutor {
private val desc: InputFileScanSourceOpDesc =
objectMapper.readValue(descString, classOf[InputFileScanSourceOpDesc])
- @throws[IOException]
- override def produceTuple(): Iterator[TupleLike] = {
- resolvedInputFileNames.iterator.flatMap(produceTuplesForFile)
- }
-
- private def produceTuplesForFile(resolvedFileName: String):
Iterator[TupleLike] = {
+ override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = {
+ val fileName = FileResolver.resolve(tuple.getFields.collectFirst { case s:
String => s }.get).toASCIIString
val is: InputStream =
- DocumentFactory.openReadonlyDocument(new
URI(resolvedFileName)).asInputStream()
+ DocumentFactory.openReadonlyDocument(new URI(fileName)).asInputStream()
val closeables = mutable.ArrayBuffer.empty[AutoCloseable]
var zipIn: ZipArchiveInputStream = null
@@ -127,4 +124,7 @@ class InputFileScanSourceOpExec private[scan] (
new AutoClosingIterator(rawIterator, () => closeables.foreach(_.close()))
}
+
+
+
}
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileSourceOpExec.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileSourceOpExec.scala
deleted file mode 100644
index cd14cc4235..0000000000
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileSourceOpExec.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.texera.amber.operator.source.scan
-
-import org.apache.texera.amber.core.executor.SourceOperatorExecutor
-import org.apache.texera.amber.core.storage.FileResolver
-import org.apache.texera.amber.core.tuple.{Tuple, TupleLike}
-import org.apache.texera.amber.core.workflow.PortIdentity
-
-import scala.collection.mutable.ArrayBuffer
-
-trait InputFileSourceOpExec extends SourceOperatorExecutor {
- private val inputFileNames = ArrayBuffer.empty[String]
-
- override def processTupleMultiPort(
- tuple: Tuple,
- port: Int
- ): Iterator[(TupleLike, Option[PortIdentity])] = {
- processTuple(tuple, port).map(tupleLike => (tupleLike, None))
- }
-
- override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = {
- inputFileNames += extractFileName(tuple)
- Iterator.empty
- }
-
- protected def resolvedInputFileNames: Seq[String] = {
- if (inputFileNames.isEmpty) {
- throw new IllegalStateException("No input file is available for this
source operator.")
- }
-
- inputFileNames.toSeq.map(fileName =>
- if (FileResolver.isFileResolved(fileName)) {
- fileName
- } else {
- FileResolver.resolve(fileName).toASCIIString
- }
- )
- }
-
- protected def resolvedInputFileName: String = {
- val fileNames = resolvedInputFileNames
- if (fileNames.size > 1) {
- throw new IllegalStateException("This source operator accepts only one
input filename.")
- }
- fileNames.head
- }
-
- private def extractFileName(tuple: Tuple): String = {
- if (tuple.getSchema.containsAttribute("filename")) {
- return tuple.getField[String]("filename")
- }
-
- val stringFields = tuple.getFields.collect { case value: String => value }
- if (stringFields.size == 1) {
- return stringFields.head
- }
-
- tuple.getFields.headOption match {
- case Some(value: String) => value
- case _ =>
- throw new IllegalArgumentException(
- "The filename input port expects a tuple containing a filename
string."
- )
- }
- }
-}