This is an automated email from the ASF dual-hosted git repository. linxinyuan pushed a commit to branch xinyuan-input-source-operator in repository https://gitbox.apache.org/repos/asf/texera.git
commit 22ae4de60b6004afcf4a251e27033ae8d25a7568 Author: Xinyuan Lin <[email protected]> AuthorDate: Sun Apr 12 01:35:58 2026 -0700 feat(input-source): add input file source operator --- .../apache/texera/amber/operator/LogicalOp.scala | 2 + .../source/scan/InputFileScanSourceOpDesc.scala | 100 ++++++++++++++++ .../source/scan/InputFileScanSourceOpExec.scala | 130 +++++++++++++++++++++ .../source/scan/InputFileSourceOpExec.scala | 84 +++++++++++++ .../src/assets/operator_images/InputFileScan.png | Bin 0 -> 22499 bytes 5 files changed, 316 insertions(+) diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala index 7ccbb073d6..ea40c4b815 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala @@ -78,6 +78,7 @@ import org.apache.texera.amber.operator.source.apis.twitter.v2.{ import org.apache.texera.amber.operator.source.dataset.DatasetSelectorSourceOpDesc import org.apache.texera.amber.operator.source.fetcher.URLFetcherOpDesc import org.apache.texera.amber.operator.source.scan.FileScanSourceOpDesc +import org.apache.texera.amber.operator.source.scan.InputFileScanSourceOpDesc import org.apache.texera.amber.operator.source.scan.arrow.ArrowSourceOpDesc import org.apache.texera.amber.operator.source.scan.csv.CSVScanSourceOpDesc import org.apache.texera.amber.operator.source.scan.csvOld.CSVOldScanSourceOpDesc @@ -165,6 +166,7 @@ trait StateTransferFunc // new Type(value = classOf[ParallelCSVScanSourceOpDesc], name = "ParallelCSVFileScan"), new Type(value = classOf[JSONLScanSourceOpDesc], name = "JSONLFileScan"), new Type(value = classOf[FileScanSourceOpDesc], name = "FileScan"), + new Type(value = classOf[InputFileScanSourceOpDesc], name = "InputFileScan"), new Type(value = classOf[TextInputSourceOpDesc], name = "TextInput"), new Type( value = classOf[TwitterFullArchiveSearchSourceOpDesc], diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpDesc.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpDesc.scala new file mode 100644 index 0000000000..4e7c88b000 --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpDesc.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.source.scan + +import com.fasterxml.jackson.annotation.JsonProperty +import com.kjetland.jackson.jsonSchema.annotations.{ + JsonSchemaInject, + JsonSchemaString, + JsonSchemaTitle +} +import org.apache.texera.amber.core.executor.OpExecWithClassName +import org.apache.texera.amber.core.tuple.{AttributeType, Schema} +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.core.workflow.{ + InputPort, + OutputPort, + PhysicalOp, + SchemaPropagationFunc +} +import org.apache.texera.amber.operator.metadata.annotations.HideAnnotation +import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} +import org.apache.texera.amber.operator.source.SourceOperatorDescriptor +import org.apache.texera.amber.operator.source.scan.text.TextSourceOpDesc +import org.apache.texera.amber.util.JSONUtils.objectMapper + +class InputFileScanSourceOpDesc extends SourceOperatorDescriptor with TextSourceOpDesc { + @JsonProperty(defaultValue = "UTF_8", required = true) + @JsonSchemaTitle("Encoding") + var fileEncoding: FileDecodingMethod = FileDecodingMethod.UTF_8 + + @JsonProperty(defaultValue = "false") + @JsonSchemaTitle("Extract") + val extract: Boolean = false + + @JsonProperty(defaultValue = "false") + @JsonSchemaTitle("Include Filename") + @JsonSchemaInject( + strings = Array( + new JsonSchemaString(path = HideAnnotation.hideTarget, value = "extract"), + new JsonSchemaString(path = HideAnnotation.hideType, value = HideAnnotation.Type.equals), + new JsonSchemaString(path = HideAnnotation.hideExpectedValue, value = "false") + ) + ) + val outputFileName: Boolean = false + + override def getPhysicalOp( + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity + ): PhysicalOp = { + PhysicalOp + .sourcePhysicalOp( + workflowId, + executionId, + operatorIdentifier, + OpExecWithClassName( + "org.apache.texera.amber.operator.source.scan.InputFileScanSourceOpExec", + objectMapper.writeValueAsString(this) + ) + ) + .withInputPorts(operatorInfo.inputPorts) + .withOutputPorts(operatorInfo.outputPorts) + .withPropagateSchema( + SchemaPropagationFunc(_ => Map(operatorInfo.outputPorts.head.id -> sourceSchema())) + ) + } + + override def sourceSchema(): Schema = { + var schema = Schema() + if (outputFileName) { + schema = schema.add("filename", AttributeType.STRING) + } + schema.add(attributeName, attributeType.getType) + } + + override def operatorInfo: OperatorInfo = + OperatorInfo( + userFriendlyName = "File Scan From Input", + operatorDescription = "Scan data from file paths provided by input tuples", + operatorGroupName = OperatorGroupConstants.INPUT_GROUP, + inputPorts = List(InputPort(displayName = "Filename")), + outputPorts = List(OutputPort()) + ) +} diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpExec.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpExec.scala new file mode 100644 index 0000000000..4a1514c7f6 --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileScanSourceOpExec.scala @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.source.scan + +import org.apache.texera.amber.core.storage.DocumentFactory +import org.apache.texera.amber.core.tuple.AttributeTypeUtils.parseField +import org.apache.texera.amber.core.tuple.{LargeBinary, TupleLike} +import org.apache.texera.amber.util.JSONUtils.objectMapper +import org.apache.texera.service.util.LargeBinaryOutputStream +import org.apache.commons.compress.archivers.ArchiveStreamFactory +import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream +import org.apache.commons.io.IOUtils.toByteArray + +import java.io._ +import java.net.URI +import scala.collection.mutable +import scala.jdk.CollectionConverters.IteratorHasAsScala + +class InputFileScanSourceOpExec private[scan] ( + descString: String +) extends InputFileSourceOpExec { + private val desc: InputFileScanSourceOpDesc = + objectMapper.readValue(descString, classOf[InputFileScanSourceOpDesc]) + + @throws[IOException] + override def produceTuple(): Iterator[TupleLike] = { + resolvedInputFileNames.iterator.flatMap(produceTuplesForFile) + } + + private def produceTuplesForFile(resolvedFileName: String): Iterator[TupleLike] = { + val is: InputStream = + DocumentFactory.openReadonlyDocument(new URI(resolvedFileName)).asInputStream() + + val closeables = mutable.ArrayBuffer.empty[AutoCloseable] + var zipIn: ZipArchiveInputStream = null + var archiveStream: InputStream = null + if (desc.extract) { + zipIn = new ArchiveStreamFactory() + .createArchiveInputStream(new BufferedInputStream(is)) + .asInstanceOf[ZipArchiveInputStream] + archiveStream = zipIn + closeables += zipIn + } else { + archiveStream = is + closeables += is + } + + var filenameIt: Iterator[String] = Iterator.empty + val fileEntries: Iterator[InputStream] = { + if (desc.extract) { + val (it1, it2) = Iterator + .continually(zipIn.getNextEntry) + .takeWhile(_ != null) + .filterNot(_.getName.startsWith("__MACOSX")) + .duplicate + filenameIt = it1.map(_.getName) + it2.map(_ => zipIn) + } else { + Iterator(archiveStream) + } + } + + val rawIterator: Iterator[TupleLike] = + if (desc.attributeType.isSingle) { + fileEntries.zipAll(filenameIt, null, null).map { + case (entry, fileName) => + val fields: mutable.ListBuffer[Any] = mutable.ListBuffer() + if (desc.outputFileName) { + fields.addOne(fileName) + } + fields.addOne(desc.attributeType match { + case FileAttributeType.SINGLE_STRING => + new String(toByteArray(entry), desc.fileEncoding.getCharset) + case FileAttributeType.LARGE_BINARY => + val largeBinary = new LargeBinary() + val out = new LargeBinaryOutputStream(largeBinary) + try { + val buffer = new Array[Byte](8192) + var bytesRead = entry.read(buffer) + while (bytesRead != -1) { + out.write(buffer, 0, bytesRead) + bytesRead = entry.read(buffer) + } + } finally { + out.close() + } + largeBinary + case _ => parseField(toByteArray(entry), desc.attributeType.getType) + }) + TupleLike(fields.toSeq: _*) + } + } else { + fileEntries.flatMap(entry => + new BufferedReader(new InputStreamReader(entry, desc.fileEncoding.getCharset)) + .lines() + .iterator() + .asScala + .slice( + desc.fileScanOffset.getOrElse(0), + desc.fileScanOffset.getOrElse(0) + desc.fileScanLimit.getOrElse(Int.MaxValue) + ) + .map(line => + TupleLike(desc.attributeType match { + case FileAttributeType.SINGLE_STRING => line + case _ => parseField(line, desc.attributeType.getType) + }) + ) + ) + } + + new AutoClosingIterator(rawIterator, () => closeables.foreach(_.close())) + } +} diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileSourceOpExec.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileSourceOpExec.scala new file mode 100644 index 0000000000..cd14cc4235 --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/InputFileSourceOpExec.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.source.scan + +import org.apache.texera.amber.core.executor.SourceOperatorExecutor +import org.apache.texera.amber.core.storage.FileResolver +import org.apache.texera.amber.core.tuple.{Tuple, TupleLike} +import org.apache.texera.amber.core.workflow.PortIdentity + +import scala.collection.mutable.ArrayBuffer + +trait InputFileSourceOpExec extends SourceOperatorExecutor { + private val inputFileNames = ArrayBuffer.empty[String] + + override def processTupleMultiPort( + tuple: Tuple, + port: Int + ): Iterator[(TupleLike, Option[PortIdentity])] = { + processTuple(tuple, port).map(tupleLike => (tupleLike, None)) + } + + override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = { + inputFileNames += extractFileName(tuple) + Iterator.empty + } + + protected def resolvedInputFileNames: Seq[String] = { + if (inputFileNames.isEmpty) { + throw new IllegalStateException("No input file is available for this source operator.") + } + + inputFileNames.toSeq.map(fileName => + if (FileResolver.isFileResolved(fileName)) { + fileName + } else { + FileResolver.resolve(fileName).toASCIIString + } + ) + } + + protected def resolvedInputFileName: String = { + val fileNames = resolvedInputFileNames + if (fileNames.size > 1) { + throw new IllegalStateException("This source operator accepts only one input filename.") + } + fileNames.head + } + + private def extractFileName(tuple: Tuple): String = { + if (tuple.getSchema.containsAttribute("filename")) { + return tuple.getField[String]("filename") + } + + val stringFields = tuple.getFields.collect { case value: String => value } + if (stringFields.size == 1) { + return stringFields.head + } + + tuple.getFields.headOption match { + case Some(value: String) => value + case _ => + throw new IllegalArgumentException( + "The filename input port expects a tuple containing a filename string." + ) + } + } +} diff --git a/frontend/src/assets/operator_images/InputFileScan.png b/frontend/src/assets/operator_images/InputFileScan.png new file mode 100644 index 0000000000..c570e230ac Binary files /dev/null and b/frontend/src/assets/operator_images/InputFileScan.png differ
