This is an automated email from the ASF dual-hosted git repository.

linxinyuan pushed a commit to branch xinyuan-input-source-operator
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/xinyuan-input-source-operator 
by this push:
     new 97ccda3068 fix fmt
97ccda3068 is described below

commit 97ccda306894beb626e194d1982c7211e891762b
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sun Apr 12 03:15:40 2026 -0700

    fix fmt
---
 .../source/scan/InputFileScanSourceOpExec.scala    | 20 +++---
 .../source/scan/InputFileSourceOpExec.scala        | 84 ----------------------
 2 files changed, 10 insertions(+), 94 deletions(-)

diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpExec.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpExec.scala
index 4a1514c7f6..9e76710fde 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpExec.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpExec.scala
@@ -19,14 +19,15 @@
 
 package org.apache.texera.amber.operator.source.scan
 
-import org.apache.texera.amber.core.storage.DocumentFactory
+import org.apache.texera.amber.core.storage.{DocumentFactory, FileResolver}
 import org.apache.texera.amber.core.tuple.AttributeTypeUtils.parseField
-import org.apache.texera.amber.core.tuple.{LargeBinary, TupleLike}
+import org.apache.texera.amber.core.tuple.{LargeBinary, Tuple, TupleLike}
 import org.apache.texera.amber.util.JSONUtils.objectMapper
 import org.apache.texera.service.util.LargeBinaryOutputStream
 import org.apache.commons.compress.archivers.ArchiveStreamFactory
 import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream
 import org.apache.commons.io.IOUtils.toByteArray
+import org.apache.texera.amber.core.executor.OperatorExecutor
 
 import java.io._
 import java.net.URI
@@ -35,18 +36,14 @@ import scala.jdk.CollectionConverters.IteratorHasAsScala
 
 class InputFileScanSourceOpExec private[scan] (
     descString: String
-) extends InputFileSourceOpExec {
+) extends OperatorExecutor {
   private val desc: InputFileScanSourceOpDesc =
     objectMapper.readValue(descString, classOf[InputFileScanSourceOpDesc])
 
-  @throws[IOException]
-  override def produceTuple(): Iterator[TupleLike] = {
-    resolvedInputFileNames.iterator.flatMap(produceTuplesForFile)
-  }
-
-  private def produceTuplesForFile(resolvedFileName: String): 
Iterator[TupleLike] = {
+  override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = {
+    val fileName = FileResolver.resolve(tuple.getFields.collectFirst { case s: 
String => s }.get).toASCIIString
     val is: InputStream =
-      DocumentFactory.openReadonlyDocument(new 
URI(resolvedFileName)).asInputStream()
+      DocumentFactory.openReadonlyDocument(new URI(fileName)).asInputStream()
 
     val closeables = mutable.ArrayBuffer.empty[AutoCloseable]
     var zipIn: ZipArchiveInputStream = null
@@ -127,4 +124,7 @@ class InputFileScanSourceOpExec private[scan] (
 
     new AutoClosingIterator(rawIterator, () => closeables.foreach(_.close()))
   }
+
+
+
 }
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileSourceOpExec.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileSourceOpExec.scala
deleted file mode 100644
index cd14cc4235..0000000000
--- 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileSourceOpExec.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.texera.amber.operator.source.scan
-
-import org.apache.texera.amber.core.executor.SourceOperatorExecutor
-import org.apache.texera.amber.core.storage.FileResolver
-import org.apache.texera.amber.core.tuple.{Tuple, TupleLike}
-import org.apache.texera.amber.core.workflow.PortIdentity
-
-import scala.collection.mutable.ArrayBuffer
-
-trait InputFileSourceOpExec extends SourceOperatorExecutor {
-  private val inputFileNames = ArrayBuffer.empty[String]
-
-  override def processTupleMultiPort(
-      tuple: Tuple,
-      port: Int
-  ): Iterator[(TupleLike, Option[PortIdentity])] = {
-    processTuple(tuple, port).map(tupleLike => (tupleLike, None))
-  }
-
-  override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = {
-    inputFileNames += extractFileName(tuple)
-    Iterator.empty
-  }
-
-  protected def resolvedInputFileNames: Seq[String] = {
-    if (inputFileNames.isEmpty) {
-      throw new IllegalStateException("No input file is available for this 
source operator.")
-    }
-
-    inputFileNames.toSeq.map(fileName =>
-      if (FileResolver.isFileResolved(fileName)) {
-        fileName
-      } else {
-        FileResolver.resolve(fileName).toASCIIString
-      }
-    )
-  }
-
-  protected def resolvedInputFileName: String = {
-    val fileNames = resolvedInputFileNames
-    if (fileNames.size > 1) {
-      throw new IllegalStateException("This source operator accepts only one 
input filename.")
-    }
-    fileNames.head
-  }
-
-  private def extractFileName(tuple: Tuple): String = {
-    if (tuple.getSchema.containsAttribute("filename")) {
-      return tuple.getField[String]("filename")
-    }
-
-    val stringFields = tuple.getFields.collect { case value: String => value }
-    if (stringFields.size == 1) {
-      return stringFields.head
-    }
-
-    tuple.getFields.headOption match {
-      case Some(value: String) => value
-      case _ =>
-        throw new IllegalArgumentException(
-          "The filename input port expects a tuple containing a filename 
string."
-        )
-    }
-  }
-}

Reply via email to