This is an automated email from the ASF dual-hosted git repository.

linxinyuan pushed a commit to branch xinyuan-source-port
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/xinyuan-source-port by this 
push:
     new 4373cbaa53 update
4373cbaa53 is described below

commit 4373cbaa53a1fe0d6455e35589e008cd0a620b55
Author: Xinyuan Lin <[email protected]>
AuthorDate: Tue Apr 7 16:06:34 2026 -0700

    update
---
 .../source/scan/csv/InputCSVScanSourceOpDesc.scala | 133 +++++++++++++++++++++
 .../source/scan/csv/InputCSVScanSourceOpExec.scala | 106 ++++++++++++++++
 .../assets/operator_images/InputCSVFileScan.png    | Bin 0 -> 22499 bytes
 3 files changed, 239 insertions(+)

diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/InputCSVScanSourceOpDesc.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/InputCSVScanSourceOpDesc.scala
new file mode 100644
index 0000000000..a34ae7ea51
--- /dev/null
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/InputCSVScanSourceOpDesc.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.source.scan.csv
+
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty, 
JsonPropertyDescription}
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize
+import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
+import org.apache.texera.amber.core.executor.OpExecWithClassName
+import org.apache.texera.amber.core.storage.FileResolver
+import org.apache.texera.amber.core.tuple.{Attribute, Schema}
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
+import org.apache.texera.amber.core.workflow.{
+  InputPort,
+  OutputPort,
+  PhysicalOp,
+  SchemaPropagationFunc
+}
+import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, 
OperatorInfo}
+import org.apache.texera.amber.operator.source.SourceOperatorDescriptor
+import org.apache.texera.amber.operator.source.scan.FileDecodingMethod
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+
+class InputCSVScanSourceOpDesc extends SourceOperatorDescriptor {
+
+  @JsonProperty(defaultValue = "UTF_8", required = true)
+  @JsonSchemaTitle("Encoding")
+  var fileEncoding: FileDecodingMethod = FileDecodingMethod.UTF_8
+
+  @JsonProperty(defaultValue = ",")
+  @JsonSchemaTitle("Delimiter")
+  @JsonPropertyDescription("delimiter to separate each line into fields")
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  var customDelimiter: Option[String] = None
+
+  @JsonProperty(defaultValue = "true")
+  @JsonSchemaTitle("Header")
+  @JsonPropertyDescription("whether the CSV file contains a header line")
+  var hasHeader: Boolean = true
+
+  @JsonProperty()
+  @JsonSchemaTitle("Limit")
+  @JsonPropertyDescription("max output count")
+  @JsonDeserialize(contentAs = classOf[Int])
+  var limit: Option[Int] = None
+
+  @JsonProperty()
+  @JsonSchemaTitle("Offset")
+  @JsonPropertyDescription("starting point of output")
+  @JsonDeserialize(contentAs = classOf[Int])
+  var offset: Option[Int] = None
+
+  @JsonProperty()
+  @JsonSchemaTitle("Columns")
+  @JsonPropertyDescription("output columns of the CSV files")
+  var columns: List[Attribute] = List.empty
+
+  override def getPhysicalOp(
+      workflowId: WorkflowIdentity,
+      executionId: ExecutionIdentity
+  ): PhysicalOp = {
+    require(
+      columns != null && columns.nonEmpty,
+      "Columns must not be empty. Use a Text Input with a literal filename or 
configure Columns manually."
+    )
+    if (customDelimiter.isEmpty || customDelimiter.get.isEmpty) {
+      customDelimiter = Option(",")
+    }
+
+    PhysicalOp
+      .sourcePhysicalOp(
+        workflowId,
+        executionId,
+        operatorIdentifier,
+        OpExecWithClassName(
+          
"org.apache.texera.amber.operator.source.scan.csv.InputCSVScanSourceOpExec",
+          objectMapper.writeValueAsString(this)
+        )
+      )
+      .withInputPorts(operatorInfo.inputPorts)
+      .withOutputPorts(operatorInfo.outputPorts)
+      .withPropagateSchema(
+        SchemaPropagationFunc(_ => Map(operatorInfo.outputPorts.head.id -> 
sourceSchema()))
+      )
+  }
+
+  override def sourceSchema(): Schema = {
+    if (columns != null && columns.nonEmpty) {
+      Schema().add(columns)
+    } else {
+      Schema()
+    }
+  }
+
+  def inferColumnsFromFileName(fileName: String): Unit = {
+    if (customDelimiter.isEmpty || customDelimiter.get.isEmpty) {
+      customDelimiter = Option(",")
+    }
+
+    val csvScanSourceOpDesc = new CSVScanSourceOpDesc()
+    csvScanSourceOpDesc.customDelimiter = customDelimiter
+    csvScanSourceOpDesc.hasHeader = hasHeader
+    csvScanSourceOpDesc.fileEncoding = fileEncoding
+    csvScanSourceOpDesc.limit = limit
+    csvScanSourceOpDesc.setResolvedFileName(FileResolver.resolve(fileName))
+    columns = csvScanSourceOpDesc.sourceSchema().getAttributes
+  }
+
+  override def operatorInfo: OperatorInfo =
+    OperatorInfo(
+      userFriendlyName = "CSV File Scan From Input",
+      operatorDescription = "Scan CSV files from file paths provided by input 
tuples",
+      operatorGroupName = OperatorGroupConstants.INPUT_GROUP,
+      inputPorts = List(InputPort(displayName = "Filename")),
+      outputPorts = List(OutputPort())
+    )
+}
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/InputCSVScanSourceOpExec.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/InputCSVScanSourceOpExec.scala
new file mode 100644
index 0000000000..fa64db3b2a
--- /dev/null
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/source/scan/csv/InputCSVScanSourceOpExec.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.source.scan.csv
+
+import com.univocity.parsers.csv.{CsvFormat, CsvParser, CsvParserSettings}
+import org.apache.texera.amber.core.storage.DocumentFactory
+import org.apache.texera.amber.core.tuple.{AttributeTypeUtils, Schema, 
TupleLike}
+import org.apache.texera.amber.operator.source.scan.{AutoClosingIterator, 
InputFileSourceOpExec}
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+
+import java.io.InputStreamReader
+import java.net.URI
+import scala.collection.immutable.ArraySeq
+
+class InputCSVScanSourceOpExec private[csv] (descString: String) extends 
InputFileSourceOpExec {
+  private val desc: InputCSVScanSourceOpDesc =
+    objectMapper.readValue(descString, classOf[InputCSVScanSourceOpDesc])
+  private val schema: Schema = desc.sourceSchema()
+
+  override def produceTuple(): Iterator[TupleLike] = {
+    resolvedInputFileNames.iterator.flatMap(produceTuplesForFile)
+  }
+
+  private def produceTuplesForFile(resolvedFileName: String): 
Iterator[TupleLike] = {
+    val inputReader = new InputStreamReader(
+      DocumentFactory.openReadonlyDocument(new 
URI(resolvedFileName)).asInputStream(),
+      desc.fileEncoding.getCharset
+    )
+
+    val csvFormat = new CsvFormat()
+    csvFormat.setDelimiter(desc.customDelimiter.get.charAt(0))
+    csvFormat.setLineSeparator("\n")
+    csvFormat.setComment(
+      '\u0000'
+    ) // disable skipping lines starting with # (default comment character)
+    val csvSetting = new CsvParserSettings()
+    csvSetting.setMaxCharsPerColumn(-1)
+    csvSetting.setFormat(csvFormat)
+    csvSetting.setHeaderExtractionEnabled(desc.hasHeader)
+
+    val parser = new CsvParser(csvSetting)
+    parser.beginParsing(inputReader)
+
+    val rowIterator = new Iterator[Array[String]] {
+      private var nextRow: Array[String] = _
+
+      override def hasNext: Boolean = {
+        if (nextRow != null) {
+          return true
+        }
+        nextRow = parser.parseNext()
+        nextRow != null
+      }
+
+      override def next(): Array[String] = {
+        val ret = nextRow
+        nextRow = null
+        ret
+      }
+    }
+
+    var tupleIterator = rowIterator
+      .drop(desc.offset.getOrElse(0))
+      .map(row => {
+        try {
+          TupleLike(
+            ArraySeq.unsafeWrapArray(
+              AttributeTypeUtils.parseFields(row.asInstanceOf[Array[Any]], 
schema)
+            ): _*
+          )
+        } catch {
+          case _: Throwable => null
+        }
+      })
+      .filter(t => t != null)
+
+    if (desc.limit.isDefined) {
+      tupleIterator = tupleIterator.take(desc.limit.get)
+    }
+
+    new AutoClosingIterator(
+      tupleIterator,
+      () => {
+        parser.stopParsing()
+        inputReader.close()
+      }
+    )
+  }
+}
diff --git a/frontend/src/assets/operator_images/InputCSVFileScan.png 
b/frontend/src/assets/operator_images/InputCSVFileScan.png
new file mode 100644
index 0000000000..c570e230ac
Binary files /dev/null and 
b/frontend/src/assets/operator_images/InputCSVFileScan.png differ

Reply via email to