This is an automated email from the ASF dual-hosted git repository.
linxinyuan pushed a commit to branch xinyuan-source-port
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/xinyuan-source-port by this
push:
new 4373cbaa53 update
4373cbaa53 is described below
commit 4373cbaa53a1fe0d6455e35589e008cd0a620b55
Author: Xinyuan Lin <[email protected]>
AuthorDate: Tue Apr 7 16:06:34 2026 -0700
update
---
.../source/scan/csv/InputCSVScanSourceOpDesc.scala | 133 +++++++++++++++++++++
.../source/scan/csv/InputCSVScanSourceOpExec.scala | 106 ++++++++++++++++
.../assets/operator_images/InputCSVFileScan.png | Bin 0 -> 22499 bytes
3 files changed, 239 insertions(+)
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/InputCSVScanSourceOpDesc.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/InputCSVScanSourceOpDesc.scala
new file mode 100644
index 0000000000..a34ae7ea51
--- /dev/null
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/InputCSVScanSourceOpDesc.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.source.scan.csv
+
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty,
JsonPropertyDescription}
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize
+import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
+import org.apache.texera.amber.core.executor.OpExecWithClassName
+import org.apache.texera.amber.core.storage.FileResolver
+import org.apache.texera.amber.core.tuple.{Attribute, Schema}
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity,
WorkflowIdentity}
+import org.apache.texera.amber.core.workflow.{
+ InputPort,
+ OutputPort,
+ PhysicalOp,
+ SchemaPropagationFunc
+}
+import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants,
OperatorInfo}
+import org.apache.texera.amber.operator.source.SourceOperatorDescriptor
+import org.apache.texera.amber.operator.source.scan.FileDecodingMethod
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+
+class InputCSVScanSourceOpDesc extends SourceOperatorDescriptor {
+
+ @JsonProperty(defaultValue = "UTF_8", required = true)
+ @JsonSchemaTitle("Encoding")
+ var fileEncoding: FileDecodingMethod = FileDecodingMethod.UTF_8
+
+ @JsonProperty(defaultValue = ",")
+ @JsonSchemaTitle("Delimiter")
+ @JsonPropertyDescription("delimiter to separate each line into fields")
+ @JsonInclude(JsonInclude.Include.NON_ABSENT)
+ var customDelimiter: Option[String] = None
+
+ @JsonProperty(defaultValue = "true")
+ @JsonSchemaTitle("Header")
+ @JsonPropertyDescription("whether the CSV file contains a header line")
+ var hasHeader: Boolean = true
+
+ @JsonProperty()
+ @JsonSchemaTitle("Limit")
+ @JsonPropertyDescription("max output count")
+ @JsonDeserialize(contentAs = classOf[Int])
+ var limit: Option[Int] = None
+
+ @JsonProperty()
+ @JsonSchemaTitle("Offset")
+ @JsonPropertyDescription("starting point of output")
+ @JsonDeserialize(contentAs = classOf[Int])
+ var offset: Option[Int] = None
+
+ @JsonProperty()
+ @JsonSchemaTitle("Columns")
+ @JsonPropertyDescription("output columns of the CSV files")
+ var columns: List[Attribute] = List.empty
+
+ override def getPhysicalOp(
+ workflowId: WorkflowIdentity,
+ executionId: ExecutionIdentity
+ ): PhysicalOp = {
+ require(
+ columns != null && columns.nonEmpty,
+ "Columns must not be empty. Use a Text Input with a literal filename or
configure Columns manually."
+ )
+ if (customDelimiter.isEmpty || customDelimiter.get.isEmpty) {
+ customDelimiter = Option(",")
+ }
+
+ PhysicalOp
+ .sourcePhysicalOp(
+ workflowId,
+ executionId,
+ operatorIdentifier,
+ OpExecWithClassName(
+
"org.apache.texera.amber.operator.source.scan.csv.InputCSVScanSourceOpExec",
+ objectMapper.writeValueAsString(this)
+ )
+ )
+ .withInputPorts(operatorInfo.inputPorts)
+ .withOutputPorts(operatorInfo.outputPorts)
+ .withPropagateSchema(
+ SchemaPropagationFunc(_ => Map(operatorInfo.outputPorts.head.id ->
sourceSchema()))
+ )
+ }
+
+ override def sourceSchema(): Schema = {
+ if (columns != null && columns.nonEmpty) {
+ Schema().add(columns)
+ } else {
+ Schema()
+ }
+ }
+
+ def inferColumnsFromFileName(fileName: String): Unit = {
+ if (customDelimiter.isEmpty || customDelimiter.get.isEmpty) {
+ customDelimiter = Option(",")
+ }
+
+ val csvScanSourceOpDesc = new CSVScanSourceOpDesc()
+ csvScanSourceOpDesc.customDelimiter = customDelimiter
+ csvScanSourceOpDesc.hasHeader = hasHeader
+ csvScanSourceOpDesc.fileEncoding = fileEncoding
+ csvScanSourceOpDesc.limit = limit
+ csvScanSourceOpDesc.setResolvedFileName(FileResolver.resolve(fileName))
+ columns = csvScanSourceOpDesc.sourceSchema().getAttributes
+ }
+
+ override def operatorInfo: OperatorInfo =
+ OperatorInfo(
+ userFriendlyName = "CSV File Scan From Input",
+ operatorDescription = "Scan CSV files from file paths provided by input
tuples",
+ operatorGroupName = OperatorGroupConstants.INPUT_GROUP,
+ inputPorts = List(InputPort(displayName = "Filename")),
+ outputPorts = List(OutputPort())
+ )
+}
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/InputCSVScanSourceOpExec.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/InputCSVScanSourceOpExec.scala
new file mode 100644
index 0000000000..fa64db3b2a
--- /dev/null
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/InputCSVScanSourceOpExec.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.source.scan.csv
+
+import com.univocity.parsers.csv.{CsvFormat, CsvParser, CsvParserSettings}
+import org.apache.texera.amber.core.storage.DocumentFactory
+import org.apache.texera.amber.core.tuple.{AttributeTypeUtils, Schema,
TupleLike}
+import org.apache.texera.amber.operator.source.scan.{AutoClosingIterator,
InputFileSourceOpExec}
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+
+import java.io.InputStreamReader
+import java.net.URI
+import scala.collection.immutable.ArraySeq
+
+class InputCSVScanSourceOpExec private[csv] (descString: String) extends
InputFileSourceOpExec {
+ private val desc: InputCSVScanSourceOpDesc =
+ objectMapper.readValue(descString, classOf[InputCSVScanSourceOpDesc])
+ private val schema: Schema = desc.sourceSchema()
+
+ override def produceTuple(): Iterator[TupleLike] = {
+ resolvedInputFileNames.iterator.flatMap(produceTuplesForFile)
+ }
+
+ private def produceTuplesForFile(resolvedFileName: String):
Iterator[TupleLike] = {
+ val inputReader = new InputStreamReader(
+ DocumentFactory.openReadonlyDocument(new
URI(resolvedFileName)).asInputStream(),
+ desc.fileEncoding.getCharset
+ )
+
+ val csvFormat = new CsvFormat()
+ csvFormat.setDelimiter(desc.customDelimiter.get.charAt(0))
+ csvFormat.setLineSeparator("\n")
+ csvFormat.setComment(
+ '\u0000'
+ ) // disable skipping lines starting with # (default comment character)
+ val csvSetting = new CsvParserSettings()
+ csvSetting.setMaxCharsPerColumn(-1)
+ csvSetting.setFormat(csvFormat)
+ csvSetting.setHeaderExtractionEnabled(desc.hasHeader)
+
+ val parser = new CsvParser(csvSetting)
+ parser.beginParsing(inputReader)
+
+ val rowIterator = new Iterator[Array[String]] {
+ private var nextRow: Array[String] = _
+
+ override def hasNext: Boolean = {
+ if (nextRow != null) {
+ return true
+ }
+ nextRow = parser.parseNext()
+ nextRow != null
+ }
+
+ override def next(): Array[String] = {
+ val ret = nextRow
+ nextRow = null
+ ret
+ }
+ }
+
+ var tupleIterator = rowIterator
+ .drop(desc.offset.getOrElse(0))
+ .map(row => {
+ try {
+ TupleLike(
+ ArraySeq.unsafeWrapArray(
+ AttributeTypeUtils.parseFields(row.asInstanceOf[Array[Any]],
schema)
+ ): _*
+ )
+ } catch {
+ case _: Throwable => null
+ }
+ })
+ .filter(t => t != null)
+
+ if (desc.limit.isDefined) {
+ tupleIterator = tupleIterator.take(desc.limit.get)
+ }
+
+ new AutoClosingIterator(
+ tupleIterator,
+ () => {
+ parser.stopParsing()
+ inputReader.close()
+ }
+ )
+ }
+}
diff --git a/frontend/src/assets/operator_images/InputCSVFileScan.png
b/frontend/src/assets/operator_images/InputCSVFileScan.png
new file mode 100644
index 0000000000..c570e230ac
Binary files /dev/null and
b/frontend/src/assets/operator_images/InputCSVFileScan.png differ