This is an automated email from the ASF dual-hosted git repository.
yaooqinn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git
The following commit(s) were added to refs/heads/main by this push:
new d206c5e20e [MINOR][VL] Remove dead Arrow-CSV / Arrow-Dataset JVM code
paths (#12130)
d206c5e20e is described below
commit d206c5e20e2709ede7162fc5b8d4602600a92f50
Author: Kent Yao <[email protected]>
AuthorDate: Mon May 25 22:08:11 2026 +0800
[MINOR][VL] Remove dead Arrow-CSV / Arrow-Dataset JVM code paths (#12130)
* [MINOR][VL] Remove dead Arrow-CSV / Arrow-Dataset JVM code path
The ArrowCSV file format and ArrowBatchScanExec chain are unreachable:
no injection in VeloxRuleApi, no META-INF/services entry, and all
ArrowCsvScanSuite cases are @Ignore'd. They were introduced as a
squash-merge byproduct in #11776 and never wired up.
Verified by compiling:
* spark-3.5 + scala-2.12 + arrow 15.0.0-gluten (install)
* spark-4.0 + scala-2.13 + arrow 18.1.0 (compile)
Generated-by: claude-opus-4.7
* [MINOR][VL] Remove dead Arrow dataset reader paths from ArrowUtil
makeArrowDiscovery / readArrowSchema / readArrowFileColumnNames /
readSchema(FragmentScanOptions) overloads / loadMissingColumns /
loadPartitionColumns / loadBatch in ArrowUtil have zero callers across
the repo after the previous removal of the ArrowCSV chain. Drop them
together with the now-unused imports (arrow.dataset.*, FileStatus,
URI/URLDecoder, ArrowRecordBatch, Optional, Logging, etc.).
Verified by compiling:
* spark-3.5 + scala-2.12 (test-compile, patched arrow 15.0.0-gluten)
* spark-4.0 + scala-2.13 (compile, pure arrow 18.1.0)
Generated-by: claude-opus-4.7
* [MINOR][VL] Remove dead spark.gluten.sql.native.arrow.reader.enabled
config
Following the removal of ArrowConvertorRule/ArrowScanReplaceRule (already
unwired from VeloxRuleApi by PR #11190 "[GLUTEN-11088][VL] Fall back CSV
reader" merged 2026-01-19), the spark.gluten.sql.native.arrow.reader.enabled
config and its plumbing have no consumers:
* GlutenConfig.enableNativeArrowReader / NATIVE_ARROW_READER_ENABLED
* BackendSettingsApi.enableNativeArrowReadFiles (default)
* VeloxBackend.enableNativeArrowReadFiles (override)
Test suites still set this flag (MiscOperatorSuite, GlutenCSVSuite,
GlutenReadSchemaSuite across spark35/40/41) but it has been a no-op since
PR #11190; CSV continues to be covered by these suites via the Spark
native CSV path. The corresponding entry in docs/Configuration.md is
also removed.
Generated-by: Claude claude-opus-4.7
* [MINOR][UT] Fix spotless violation in GlutenReadSchemaSuite
Generated-by: claude-opus-4.7
* [MINOR][UT] Fix spotless violation in spark40/spark41
GlutenReadSchemaSuite
Generated-by: claude-opus-4.7
* [MINOR][UT] Remove unused GlutenConfig import from GlutenCSVSuite
(spark35/40/41)
Generated-by: claude-opus-4.7
---
.../gluten/backendsapi/velox/VeloxBackend.scala | 4 -
.../gluten/datasource/ArrowCSVFileFormat.scala | 379 ---------------------
.../datasource/ArrowCSVOptionConverter.scala | 62 ----
.../v2/ArrowCSVPartitionReaderFactory.scala | 176 ----------
.../apache/gluten/datasource/v2/ArrowCSVScan.scala | 76 -----
.../gluten/datasource/v2/ArrowCSVScanBuilder.scala | 44 ---
.../gluten/datasource/v2/ArrowCSVTable.scala | 80 -----
.../datasource/v2/ArrowBatchScanExec.scala | 46 ---
.../gluten/extension/ArrowConvertorRule.scala | 120 -------
.../gluten/extension/ArrowScanReplaceRule.scala | 39 ---
.../sql/execution/ArrowFileSourceScanExec.scala | 61 ----
.../spark/sql/execution/BaseArrowScanExec.scala | 29 --
.../gluten/execution/ArrowCsvScanSuite.scala | 238 -------------
.../gluten/execution/MiscOperatorSuite.scala | 1 -
docs/Configuration.md | 1 -
.../scala/org/apache/gluten/utils/ArrowUtil.scala | 168 +--------
.../gluten/backendsapi/BackendSettingsApi.scala | 2 -
.../org/apache/gluten/config/GlutenConfig.scala | 8 -
.../datasources/GlutenReadSchemaSuite.scala | 19 +-
.../execution/datasources/csv/GlutenCSVSuite.scala | 8 +-
.../datasources/GlutenReadSchemaSuite.scala | 19 +-
.../execution/datasources/csv/GlutenCSVSuite.scala | 8 +-
.../datasources/GlutenReadSchemaSuite.scala | 19 +-
.../execution/datasources/csv/GlutenCSVSuite.scala | 8 +-
.../datasources/v2/BatchScanExecShim.scala | 9 -
.../datasources/v2/BatchScanExecShim.scala | 21 --
.../datasources/v2/BatchScanExecShim.scala | 19 --
.../datasources/v2/BatchScanExecShim.scala | 21 --
.../datasources/v2/BatchScanExecShim.scala | 21 --
29 files changed, 16 insertions(+), 1690 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 9d2d92a2fc..599b91851e 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -550,10 +550,6 @@ object VeloxBackendSettings extends BackendSettingsApi {
)
}
- override def enableNativeArrowReadFiles(): Boolean = {
- GlutenConfig.get.enableNativeArrowReader
- }
-
override def shouldRewriteCount(): Boolean = {
// Velox backend does not support count if it has more that one child,
// so we should rewrite it.
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala
b/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala
deleted file mode 100644
index 10b7e7801f..0000000000
---
a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala
+++ /dev/null
@@ -1,379 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gluten.datasource
-
-import org.apache.gluten.columnarbatch.ColumnarBatches
-import org.apache.gluten.config.VeloxConfig
-import org.apache.gluten.exception.SchemaMismatchException
-import org.apache.gluten.execution.RowToVeloxColumnarExec
-import org.apache.gluten.iterator.Iterators
-import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
-import org.apache.gluten.memory.arrow.pool.ArrowNativeMemoryPool
-import org.apache.gluten.utils.ArrowUtil
-import org.apache.gluten.vectorized.ArrowWritableColumnVector
-
-import org.apache.spark.TaskContext
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.csv.{CSVHeaderChecker,
CSVHeaderCheckerHelper, CSVOptions, UnivocityParser}
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
JoinedRow}
-import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
-import org.apache.spark.sql.execution.datasources.{FileFormat,
HadoopFileLinesReader, OutputWriterFactory, PartitionedFile}
-import org.apache.spark.sql.execution.datasources.csv.CSVDataSource
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
-import org.apache.spark.sql.types.{StructField, StructType}
-import org.apache.spark.sql.vectorized.ColumnarBatch
-import org.apache.spark.util.SerializableConfiguration
-
-import org.apache.arrow.c.ArrowSchema
-import org.apache.arrow.dataset.file.FileSystemDatasetFactory
-import org.apache.arrow.dataset.scanner.ScanOptions
-import org.apache.arrow.dataset.scanner.csv.CsvFragmentScanOptions
-import org.apache.arrow.memory.BufferAllocator
-import org.apache.arrow.vector.VectorUnloader
-import org.apache.arrow.vector.types.pojo.Schema
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, Path}
-
-import java.net.URLDecoder
-import java.util.Optional
-
-import scala.collection.JavaConverters.{asJavaIterableConverter,
asScalaBufferConverter}
-
-class ArrowCSVFileFormat(parsedOptions: CSVOptions)
- extends FileFormat
- with DataSourceRegister
- with Logging
- with Serializable {
-
- private val fileFormat = org.apache.arrow.dataset.file.FileFormat.CSV
- private lazy val pool = ArrowNativeMemoryPool.arrowPool("FileSystem Read")
- var fallback = false
-
- override def isSplitable(
- sparkSession: SparkSession,
- options: Map[String, String],
- path: Path): Boolean = {
- false
- }
-
- override def inferSchema(
- sparkSession: SparkSession,
- options: Map[String, String],
- files: Seq[FileStatus]): Option[StructType] = {
- val arrowConfig = ArrowCSVOptionConverter.convert(parsedOptions)
- ArrowUtil.readSchema(
- files,
- fileFormat,
- arrowConfig,
- ArrowBufferAllocators.contextInstance(),
- ArrowNativeMemoryPool.arrowPool("infer schema"))
- }
-
- override def supportBatch(sparkSession: SparkSession, dataSchema:
StructType): Boolean = true
-
- override def buildReaderWithPartitionValues(
- sparkSession: SparkSession,
- dataSchema: StructType,
- partitionSchema: StructType,
- requiredSchema: StructType,
- filters: Seq[Filter],
- options: Map[String, String],
- hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
- val sqlConf = sparkSession.sessionState.conf
- val broadcastedHadoopConf =
- sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf))
- val batchSize = sqlConf.columnBatchSize
- val columnPruning = sqlConf.csvColumnPruning &&
- !requiredSchema.exists(_.name ==
sparkSession.sessionState.conf.columnNameOfCorruptRecord)
- val actualFilters =
-
filters.filterNot(_.references.contains(parsedOptions.columnNameOfCorruptRecord))
- (file: PartitionedFile) => {
- val actualDataSchema = StructType(
- dataSchema.filterNot(_.name ==
parsedOptions.columnNameOfCorruptRecord))
- val actualRequiredSchema = StructType(
- requiredSchema.filterNot(_.name ==
parsedOptions.columnNameOfCorruptRecord))
- ArrowCSVFileFormat.checkHeader(
- file,
- actualDataSchema,
- actualRequiredSchema,
- parsedOptions,
- actualFilters,
- broadcastedHadoopConf.value.value)
-
- val arrowConfig = ArrowCSVOptionConverter.convert(parsedOptions)
- val allocator = ArrowBufferAllocators.contextInstance()
- // todo predicate validation / pushdown
- val fileNames = ArrowUtil
- .readArrowFileColumnNames(
- URLDecoder.decode(file.filePath.toString, "UTF-8"),
- fileFormat,
- arrowConfig,
- ArrowBufferAllocators.contextInstance(),
- pool)
- val tokenIndexArr =
- actualRequiredSchema
- .map(f => java.lang.Integer.valueOf(actualDataSchema.indexOf(f)))
- .toArray
- val fileIndex = tokenIndexArr.filter(_ < fileNames.length)
- val requestSchema = new StructType(
- fileIndex
- .map(index => StructField(fileNames(index),
actualDataSchema(index).dataType)))
- val missingIndex = tokenIndexArr.filter(_ >= fileNames.length)
- val missingSchema = new StructType(missingIndex.map(actualDataSchema(_)))
- // TODO: support array/map/struct types in out-of-order schema reading.
- val cSchema: ArrowSchema = ArrowSchema.allocateNew(allocator)
- val cSchema2: ArrowSchema = ArrowSchema.allocateNew(allocator)
- try {
- ArrowCSVOptionConverter.schema(requestSchema, cSchema, allocator,
arrowConfig)
- val factory =
- ArrowUtil.makeArrowDiscovery(
- URLDecoder.decode(file.filePath.toString, "UTF-8"),
- fileFormat,
- Optional.of(arrowConfig),
- ArrowBufferAllocators.contextInstance(),
- pool)
- val fields = factory.inspect().getFields
- val actualReadFields = new Schema(
- fileIndex.map(index => fields.get(index)).toIterable.asJava)
- ArrowCSVOptionConverter.schema(requestSchema, cSchema2, allocator,
arrowConfig)
- ArrowCSVFileFormat
- .readArrow(
- ArrowBufferAllocators.contextInstance(),
- file,
- actualReadFields,
- missingSchema,
- partitionSchema,
- factory,
- batchSize,
- arrowConfig)
- .asInstanceOf[Iterator[InternalRow]]
- } catch {
- case e: SchemaMismatchException =>
- logWarning(e.getMessage)
- fallback = true
- val iter = ArrowCSVFileFormat.fallbackReadVanilla(
- dataSchema,
- requiredSchema,
- broadcastedHadoopConf.value.value,
- parsedOptions,
- file,
- actualFilters,
- columnPruning)
- val (schema, rows) =
- ArrowCSVFileFormat.withPartitionValue(requiredSchema,
partitionSchema, iter, file)
- ArrowCSVFileFormat
- .rowToColumn(schema, batchSize, rows)
- .asInstanceOf[Iterator[InternalRow]]
- case d: Exception => throw d
- } finally {
- cSchema.close()
- cSchema2.close()
- }
- }
- }
-
- override def vectorTypes(
- requiredSchema: StructType,
- partitionSchema: StructType,
- sqlConf: SQLConf): Option[Seq[String]] = {
- Option(
- Seq.fill(requiredSchema.fields.length + partitionSchema.fields.length)(
- classOf[ArrowWritableColumnVector].getName
- ))
- }
-
- override def shortName(): String = "arrowcsv"
-
- override def hashCode(): Int = getClass.hashCode()
-
- override def equals(other: Any): Boolean =
other.isInstanceOf[ArrowCSVFileFormat]
-
- override def prepareWrite(
- sparkSession: SparkSession,
- job: _root_.org.apache.hadoop.mapreduce.Job,
- options: Map[String, String],
- dataSchema: StructType): OutputWriterFactory = {
- throw new UnsupportedOperationException()
- }
-}
-
-object ArrowCSVFileFormat {
-
- def readArrow(
- allocator: BufferAllocator,
- file: PartitionedFile,
- actualReadFields: Schema,
- missingSchema: StructType,
- partitionSchema: StructType,
- factory: FileSystemDatasetFactory,
- batchSize: Int,
- arrowConfig: CsvFragmentScanOptions): Iterator[ColumnarBatch] = {
- val actualReadFieldNames =
actualReadFields.getFields.asScala.map(_.getName).toArray
- val dataset = factory.finish(actualReadFields)
- val scanOptions = new ScanOptions.Builder(batchSize)
- .columns(Optional.of(actualReadFieldNames))
- .fragmentScanOptions(arrowConfig)
- .build()
- val scanner = dataset.newScan(scanOptions)
-
- val partitionVectors =
- ArrowUtil.loadPartitionColumns(batchSize, partitionSchema,
file.partitionValues)
-
- val nullVectors = if (missingSchema.nonEmpty) {
- ArrowUtil.loadMissingColumns(batchSize, missingSchema)
- } else {
- Array.empty[ArrowWritableColumnVector]
- }
- val reader = scanner.scanBatches()
- Iterators
- .wrap(new Iterator[ColumnarBatch] {
-
- override def hasNext: Boolean = {
- reader.loadNextBatch()
- }
-
- override def next: ColumnarBatch = {
- val root = reader.getVectorSchemaRoot
- val unloader = new VectorUnloader(root)
-
- val batch = ArrowUtil.loadBatch(
- allocator,
- unloader.getRecordBatch,
- actualReadFields,
- partitionVectors,
- nullVectors)
- batch
- }
- })
- .recycleIterator {
- scanner.close()
- dataset.close()
- factory.close()
- reader.close()
- partitionVectors.foreach(_.close())
- nullVectors.foreach(_.close())
- }
- .recyclePayload(_.close())
- .create()
- }
-
- def checkHeader(
- file: PartitionedFile,
- actualDataSchema: StructType,
- actualRequiredSchema: StructType,
- parsedOptions: CSVOptions,
- actualFilters: Seq[Filter],
- conf: Configuration): Unit = {
- val isStartOfFile = file.start == 0
- if (!isStartOfFile) {
- return
- }
- val parser =
- new UnivocityParser(actualDataSchema, actualRequiredSchema,
parsedOptions, actualFilters)
- val schema = if (parsedOptions.columnPruning) actualRequiredSchema else
actualDataSchema
- val headerChecker = new CSVHeaderChecker(
- schema,
- parsedOptions,
- source = s"CSV file: ${file.filePath}",
- isStartOfFile)
-
- val lines = {
- val linesReader =
- new HadoopFileLinesReader(file, parser.options.lineSeparatorInRead,
conf)
- Option(TaskContext.get())
- .foreach(_.addTaskCompletionListener[Unit](_ => linesReader.close()))
- linesReader.map {
- line => new String(line.getBytes, 0, line.getLength,
parser.options.charset)
- }
- }
- CSVHeaderCheckerHelper.checkHeaderColumnNames(headerChecker, lines,
parser.tokenizer)
- }
-
- def rowToColumn(
- schema: StructType,
- batchSize: Int,
- it: Iterator[InternalRow]): Iterator[ColumnarBatch] = {
- val veloxBatch = RowToVeloxColumnarExec.toColumnarBatchIterator(
- it,
- schema,
- batchSize,
- VeloxConfig.get.veloxPreferredBatchBytes
- )
- veloxBatch
- .map(v => ColumnarBatches.load(ArrowBufferAllocators.contextInstance(),
v))
- }
-
- private def toAttribute(field: StructField): AttributeReference =
- AttributeReference(field.name, field.dataType, field.nullable,
field.metadata)()
-
- private def toAttributes(schema: StructType): Seq[AttributeReference] = {
- schema.map(toAttribute)
- }
-
- def withPartitionValue(
- requiredSchema: StructType,
- partitionSchema: StructType,
- iter: Iterator[InternalRow],
- file: PartitionedFile): (StructType, Iterator[InternalRow]) = {
- val fullSchema = toAttributes(requiredSchema) ++
toAttributes(partitionSchema)
-
- // Using lazy val to avoid serialization
- lazy val appendPartitionColumns =
- GenerateUnsafeProjection.generate(fullSchema, fullSchema)
- // Using local val to avoid per-row lazy val check (pre-mature
optimization?...)
- val converter = appendPartitionColumns
-
- // Note that we have to apply the converter even though
`file.partitionValues` is empty.
- // This is because the converter is also responsible for converting safe
`InternalRow`s into
- // `UnsafeRow`s.
- if (partitionSchema.isEmpty) {
- val rows = iter.map(dataRow => converter(dataRow))
- (StructType(requiredSchema ++ partitionSchema), rows)
- } else {
- val joinedRow = new JoinedRow()
- val rows = iter.map(dataRow => converter(joinedRow(dataRow,
file.partitionValues)))
- (StructType(requiredSchema ++ partitionSchema), rows)
- }
- }
-
- def fallbackReadVanilla(
- dataSchema: StructType,
- requiredSchema: StructType,
- conf: Configuration,
- parsedOptions: CSVOptions,
- file: PartitionedFile,
- actualFilters: Seq[Filter],
- columnPruning: Boolean): Iterator[InternalRow] = {
- val actualDataSchema = StructType(
- dataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))
- val actualRequiredSchema = StructType(
- requiredSchema.filterNot(_.name ==
parsedOptions.columnNameOfCorruptRecord))
- val parser =
- new UnivocityParser(actualDataSchema, actualRequiredSchema,
parsedOptions, actualFilters)
- val schema = if (columnPruning) actualRequiredSchema else actualDataSchema
- val isStartOfFile = file.start == 0
- val headerChecker = new CSVHeaderChecker(
- schema,
- parsedOptions,
- source = s"CSV file: ${file.filePath}",
- isStartOfFile)
- CSVDataSource(parsedOptions).readFile(conf, file, parser, headerChecker,
requiredSchema)
- }
-}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVOptionConverter.scala
b/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVOptionConverter.scala
deleted file mode 100644
index 7d6a54c2ac..0000000000
---
a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVOptionConverter.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gluten.datasource
-
-import org.apache.gluten.utils.ArrowAbiUtil
-
-import org.apache.spark.sql.catalyst.csv.CSVOptions
-import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.utils.SparkSchemaUtil
-
-import com.google.common.collect.ImmutableMap
-import org.apache.arrow.c.ArrowSchema
-import org.apache.arrow.dataset.scanner.csv.{CsvConvertOptions,
CsvFragmentScanOptions}
-import org.apache.arrow.memory.BufferAllocator
-
-import java.util
-
-object ArrowCSVOptionConverter {
- def convert(option: CSVOptions): CsvFragmentScanOptions = {
- val parseMap = new util.HashMap[String, String]()
- val default = new CSVOptions(
- CaseInsensitiveMap(Map()),
- option.columnPruning,
- SparkSchemaUtil.getLocalTimezoneID)
- parseMap.put("strings_can_be_null", "true")
- if (option.delimiter != default.delimiter) {
- parseMap.put("delimiter", option.delimiter)
- }
- if (option.escapeQuotes != default.escapeQuotes) {
- parseMap.put("quoting", (!option.escapeQuotes).toString)
- }
-
- val convertOptions = new CsvConvertOptions(ImmutableMap.of())
- new CsvFragmentScanOptions(convertOptions, ImmutableMap.of(), parseMap)
- }
-
- def schema(
- requiredSchema: StructType,
- cSchema: ArrowSchema,
- allocator: BufferAllocator,
- option: CsvFragmentScanOptions): Unit = {
- val schema = SparkSchemaUtil.toArrowSchema(requiredSchema)
- ArrowAbiUtil.exportSchema(allocator, schema, cSchema)
- option.getConvertOptions.setArrowSchema(cSchema)
- }
-
-}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVPartitionReaderFactory.scala
b/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVPartitionReaderFactory.scala
deleted file mode 100644
index c930cebebe..0000000000
---
a/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVPartitionReaderFactory.scala
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gluten.datasource.v2
-
-import org.apache.gluten.datasource.{ArrowCSVFileFormat,
ArrowCSVOptionConverter}
-import org.apache.gluten.exception.SchemaMismatchException
-import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
-import org.apache.gluten.memory.arrow.pool.ArrowNativeMemoryPool
-import org.apache.gluten.utils.ArrowUtil
-
-import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.csv.CSVOptions
-import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader}
-import org.apache.spark.sql.execution.datasources.PartitionedFile
-import org.apache.spark.sql.execution.datasources.v2.FilePartitionReaderFactory
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.Filter
-import org.apache.spark.sql.types.{StructField, StructType}
-import org.apache.spark.sql.vectorized.ColumnarBatch
-import org.apache.spark.task.TaskResources
-import org.apache.spark.util.SerializableConfiguration
-
-import org.apache.arrow.c.ArrowSchema
-import org.apache.arrow.vector.types.pojo.Schema
-
-import java.net.URLDecoder
-import java.util.Optional
-
-import scala.collection.JavaConverters.asJavaIterableConverter
-
-case class ArrowCSVPartitionReaderFactory(
- sqlConf: SQLConf,
- broadcastedConf: Broadcast[SerializableConfiguration],
- dataSchema: StructType,
- readDataSchema: StructType,
- readPartitionSchema: StructType,
- options: CSVOptions,
- filters: Seq[Filter])
- extends FilePartitionReaderFactory
- with Logging {
-
- private val batchSize = sqlConf.parquetVectorizedReaderBatchSize
- private val csvColumnPruning: Boolean = sqlConf.csvColumnPruning
- private val fileFormat = org.apache.arrow.dataset.file.FileFormat.CSV
- var fallback = false
-
- override def supportColumnarReads(partition: InputPartition): Boolean = true
-
- override def buildReader(partitionedFile: PartitionedFile):
PartitionReader[InternalRow] = {
- // disable row based read
- throw new UnsupportedOperationException
- }
-
- override def buildColumnarReader(
- partitionedFile: PartitionedFile): PartitionReader[ColumnarBatch] = {
- val actualDataSchema = StructType(
- dataSchema.filterNot(_.name == options.columnNameOfCorruptRecord))
- val actualRequiredSchema = StructType(
- readDataSchema.filterNot(_.name == options.columnNameOfCorruptRecord))
- ArrowCSVFileFormat.checkHeader(
- partitionedFile,
- actualDataSchema,
- actualRequiredSchema,
- options,
- filters,
- broadcastedConf.value.value)
- val (allocator, pool) = if (!TaskResources.inSparkTask()) {
- TaskResources.runUnsafe(
- (
- ArrowBufferAllocators.contextInstance(),
- ArrowNativeMemoryPool.arrowPool("FileSystemFactory"))
- )
- } else {
- (
- ArrowBufferAllocators.contextInstance(),
- ArrowNativeMemoryPool.arrowPool("FileSystemFactory"))
- }
- val arrowConfig = ArrowCSVOptionConverter.convert(options)
- val fileNames = ArrowUtil
- .readArrowFileColumnNames(
- URLDecoder.decode(partitionedFile.filePath.toString, "UTF-8"),
- fileFormat,
- arrowConfig,
- ArrowBufferAllocators.contextInstance(),
- pool)
- val tokenIndexArr =
- actualRequiredSchema.map(f =>
java.lang.Integer.valueOf(actualDataSchema.indexOf(f))).toArray
- val fileIndex = tokenIndexArr.filter(_ < fileNames.length)
- val requestSchema = new StructType(
- fileIndex
- .map(index => StructField(fileNames(index),
actualDataSchema(index).dataType)))
- val missingIndex = tokenIndexArr.filter(_ >= fileNames.length)
- val missingSchema = new StructType(missingIndex.map(actualDataSchema(_)))
- // TODO: support array/map/struct types in out-of-order schema reading.
- val cSchema: ArrowSchema = ArrowSchema.allocateNew(allocator)
- val cSchema2: ArrowSchema = ArrowSchema.allocateNew(allocator)
- // TODO: support array/map/struct types in out-of-order schema reading.
- val iter =
- try {
- ArrowCSVOptionConverter.schema(requestSchema, cSchema, allocator,
arrowConfig)
- val factory =
- ArrowUtil.makeArrowDiscovery(
- URLDecoder.decode(partitionedFile.filePath.toString, "UTF-8"),
- fileFormat,
- Optional.of(arrowConfig),
- ArrowBufferAllocators.contextInstance(),
- pool)
- val fields = factory.inspect().getFields
- val actualReadFields = new Schema(
- fileIndex.map(index => fields.get(index)).toIterable.asJava)
- ArrowCSVOptionConverter.schema(requestSchema, cSchema2, allocator,
arrowConfig)
- ArrowCSVFileFormat
- .readArrow(
- ArrowBufferAllocators.contextInstance(),
- partitionedFile,
- actualReadFields,
- missingSchema,
- readPartitionSchema,
- factory,
- batchSize,
- arrowConfig)
- } catch {
- case e: SchemaMismatchException =>
- logWarning(e.getMessage)
- fallback = true
- val iter = ArrowCSVFileFormat.fallbackReadVanilla(
- dataSchema,
- readDataSchema,
- broadcastedConf.value.value,
- options,
- partitionedFile,
- filters,
- csvColumnPruning)
- val (schema, rows) = ArrowCSVFileFormat.withPartitionValue(
- readDataSchema,
- readPartitionSchema,
- iter,
- partitionedFile)
- ArrowCSVFileFormat.rowToColumn(schema, batchSize, rows)
- case d: Exception => throw d
- } finally {
- cSchema.close()
- cSchema2.close()
- }
-
- new PartitionReader[ColumnarBatch] {
-
- override def next(): Boolean = {
- iter.hasNext
- }
-
- override def get(): ColumnarBatch = {
- iter.next()
- }
-
- override def close(): Unit = {}
- }
- }
-
-}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVScan.scala
b/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVScan.scala
deleted file mode 100644
index ce3f847704..0000000000
---
a/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVScan.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gluten.datasource.v2
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.csv.CSVOptions
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.connector.read.PartitionReaderFactory
-import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
-import org.apache.spark.sql.execution.datasources.v2.FileScan
-import org.apache.spark.sql.sources.Filter
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.util.CaseInsensitiveStringMap
-import org.apache.spark.util.SerializableConfiguration
-
-import org.apache.hadoop.fs.Path
-
-import scala.collection.JavaConverters.mapAsScalaMapConverter
-
-case class ArrowCSVScan(
- sparkSession: SparkSession,
- fileIndex: PartitioningAwareFileIndex,
- dataSchema: StructType,
- readDataSchema: StructType,
- readPartitionSchema: StructType,
- pushedFilters: Array[Filter],
- options: CaseInsensitiveStringMap,
- partitionFilters: Seq[Expression] = Seq.empty,
- dataFilters: Seq[Expression] = Seq.empty)
- extends FileScan {
-
- private lazy val parsedOptions: CSVOptions = new CSVOptions(
- options.asScala.toMap,
- columnPruning = sparkSession.sessionState.conf.csvColumnPruning,
- sparkSession.sessionState.conf.sessionLocalTimeZone,
- sparkSession.sessionState.conf.columnNameOfCorruptRecord
- )
-
- override def isSplitable(path: Path): Boolean = {
- false
- }
-
- override def createReaderFactory(): PartitionReaderFactory = {
- val caseSensitiveMap = options.asCaseSensitiveMap().asScala.toMap
- val hconf =
sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
- val broadcastedConf =
- sparkSession.sparkContext.broadcast(new SerializableConfiguration(hconf))
- val actualFilters =
-
pushedFilters.filterNot(_.references.contains(parsedOptions.columnNameOfCorruptRecord))
- ArrowCSVPartitionReaderFactory(
- sparkSession.sessionState.conf,
- broadcastedConf,
- dataSchema,
- readDataSchema,
- readPartitionSchema,
- parsedOptions,
- actualFilters)
- }
-
- def withFilters(partitionFilters: Seq[Expression], dataFilters:
Seq[Expression]): FileScan =
- this.copy(partitionFilters = partitionFilters, dataFilters = dataFilters)
-}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVScanBuilder.scala
b/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVScanBuilder.scala
deleted file mode 100644
index 2b3991fe29..0000000000
---
a/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVScanBuilder.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gluten.datasource.v2
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.connector.read.Scan
-import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
-import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.util.CaseInsensitiveStringMap
-
-case class ArrowCSVScanBuilder(
- sparkSession: SparkSession,
- fileIndex: PartitioningAwareFileIndex,
- schema: StructType,
- dataSchema: StructType,
- options: CaseInsensitiveStringMap)
- extends FileScanBuilder(sparkSession, fileIndex, dataSchema) {
-
- override def build(): Scan = {
- ArrowCSVScan(
- sparkSession,
- fileIndex,
- dataSchema,
- readDataSchema(),
- readPartitionSchema(),
- Array.empty,
- options)
- }
-}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVTable.scala
b/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVTable.scala
deleted file mode 100644
index 3eaf4e35fd..0000000000
---
a/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVTable.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gluten.datasource.v2
-
-import org.apache.gluten.datasource.ArrowCSVOptionConverter
-import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
-import org.apache.gluten.memory.arrow.pool.ArrowNativeMemoryPool
-import org.apache.gluten.utils.ArrowUtil
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.csv.CSVOptions
-import org.apache.spark.sql.connector.read.ScanBuilder
-import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-import org.apache.spark.sql.execution.datasources.FileFormat
-import org.apache.spark.sql.execution.datasources.v2.FileTable
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.util.CaseInsensitiveStringMap
-import org.apache.spark.task.TaskResources
-
-import org.apache.hadoop.fs.FileStatus
-
-import scala.collection.JavaConverters.mapAsScalaMapConverter
-
-case class ArrowCSVTable(
- name: String,
- sparkSession: SparkSession,
- options: CaseInsensitiveStringMap,
- paths: Seq[String],
- userSpecifiedSchema: Option[StructType],
- fallbackFileFormat: Class[_ <: FileFormat])
- extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
-
- override def inferSchema(files: Seq[FileStatus]): Option[StructType] = {
- val (allocator, pool) = if (!TaskResources.inSparkTask()) {
- TaskResources.runUnsafe(
- (ArrowBufferAllocators.contextInstance(),
ArrowNativeMemoryPool.arrowPool("inferSchema"))
- )
- } else {
- (ArrowBufferAllocators.contextInstance(),
ArrowNativeMemoryPool.arrowPool("inferSchema"))
- }
- val parsedOptions: CSVOptions = new CSVOptions(
- options.asScala.toMap,
- columnPruning = sparkSession.sessionState.conf.csvColumnPruning,
- sparkSession.sessionState.conf.sessionLocalTimeZone,
- sparkSession.sessionState.conf.columnNameOfCorruptRecord
- )
- val arrowConfig = ArrowCSVOptionConverter.convert(parsedOptions)
- ArrowUtil.readSchema(
- files.head,
- org.apache.arrow.dataset.file.FileFormat.CSV,
- arrowConfig,
- allocator,
- pool
- )
- }
-
- override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder
= {
- ArrowCSVScanBuilder(sparkSession, fileIndex, schema, dataSchema, options)
- }
-
- override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
- throw new UnsupportedOperationException
- }
-
- override def formatName: String = "arrowcsv"
-}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/datasource/v2/ArrowBatchScanExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/datasource/v2/ArrowBatchScanExec.scala
deleted file mode 100644
index 4591192b2b..0000000000
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/datasource/v2/ArrowBatchScanExec.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gluten.execution.datasource.v2
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.execution.BaseArrowScanExec
-import org.apache.spark.sql.execution.datasources.v2.{ArrowBatchScanExecShim,
BatchScanExec}
-import org.apache.spark.sql.execution.metric.SQLMetrics
-import org.apache.spark.sql.vectorized.ColumnarBatch
-
-case class ArrowBatchScanExec(original: BatchScanExec)
- extends ArrowBatchScanExecShim(original)
- with BaseArrowScanExec {
- override def doCanonicalize(): ArrowBatchScanExec =
- this.copy(original = original.doCanonicalize())
-
- override def nodeName: String = "Arrow" + original.nodeName
-
- override lazy val metrics = {
- Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of
output rows")) ++
- customMetrics
- }
-
- override def doExecuteColumnar(): RDD[ColumnarBatch] = {
- val numOutputRows = longMetric("numOutputRows")
- inputRDD.asInstanceOf[RDD[ColumnarBatch]].map {
- b =>
- numOutputRows += b.numRows()
- b
- }
- }
-}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowConvertorRule.scala
b/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowConvertorRule.scala
deleted file mode 100644
index 009674e810..0000000000
---
a/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowConvertorRule.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gluten.extension
-
-import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.datasource.ArrowCSVFileFormat
-import org.apache.gluten.datasource.v2.ArrowCSVTable
-
-import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.csv.CSVOptions
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, PermissiveMode}
-import org.apache.spark.sql.execution.datasources.{HadoopFsRelation,
LogicalRelation}
-import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-import org.apache.spark.sql.execution.datasources.v2.csv.CSVTable
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.utils.SparkArrowUtil
-
-import java.nio.charset.StandardCharsets
-
-import scala.collection.convert.ImplicitConversions.`map AsScala`
-
-/**
- * Extracts a CSVTable from a DataSourceV2Relation.
- *
- * Only the table variable of DataSourceV2Relation is accessed to improve
compatibility across
- * different Spark versions.
- * @since Spark
- * 4.1
- */
-private object CSVTableExtractor {
- def unapply(relation: DataSourceV2Relation): Option[(DataSourceV2Relation,
CSVTable)] = {
- relation.table match {
- case t: CSVTable =>
- Some((relation, t))
- case _ => None
- }
- }
-}
-
-@Experimental
-case class ArrowConvertorRule(session: SparkSession) extends Rule[LogicalPlan]
{
- override def apply(plan: LogicalPlan): LogicalPlan = {
- if (!BackendsApiManager.getSettings.enableNativeArrowReadFiles()) {
- return plan
- }
- plan.resolveOperators {
- case l: LogicalRelation =>
- l.relation match {
- case r @ HadoopFsRelation(_, _, dataSchema, _, _: CSVFileFormat,
options)
- if validate(session, dataSchema, options) =>
- val csvOptions = new CSVOptions(
- options,
- columnPruning = session.sessionState.conf.csvColumnPruning,
- session.sessionState.conf.sessionLocalTimeZone)
- l.copy(relation = r.copy(fileFormat = new
ArrowCSVFileFormat(csvOptions))(session))
- case _ => l
- }
- case CSVTableExtractor(d, t)
- if validate(session, t.dataSchema,
t.options.asCaseSensitiveMap().toMap) =>
- d.copy(table = ArrowCSVTable(
- "arrow" + t.name,
- t.sparkSession,
- t.options,
- t.paths,
- t.userSpecifiedSchema,
- t.fallbackFileFormat))
- case r =>
- r
- }
- }
-
- private def validate(
- session: SparkSession,
- dataSchema: StructType,
- options: Map[String, String]): Boolean = {
- val csvOptions = new CSVOptions(
- options,
- columnPruning = session.sessionState.conf.csvColumnPruning,
- session.sessionState.conf.sessionLocalTimeZone)
- SparkArrowUtil.checkSchema(dataSchema) &&
- checkCsvOptions(csvOptions,
session.sessionState.conf.sessionLocalTimeZone) &&
- dataSchema.nonEmpty
- }
-
- private def checkCsvOptions(csvOptions: CSVOptions, timeZone: String):
Boolean = {
- val default = new CSVOptions(CaseInsensitiveMap(Map()),
csvOptions.columnPruning, timeZone)
- csvOptions.headerFlag && !csvOptions.multiLine &&
- csvOptions.delimiter.length == 1 &&
- csvOptions.quote == '\"' &&
- csvOptions.escape == '\\' &&
- csvOptions.lineSeparator.isEmpty &&
- csvOptions.charset == StandardCharsets.UTF_8.name() &&
- csvOptions.parseMode == PermissiveMode && !csvOptions.inferSchemaFlag &&
- csvOptions.nullValue == "" &&
- csvOptions.emptyValueInRead == "" && csvOptions.comment == '\u0000' &&
- csvOptions.columnPruning &&
- csvOptions.dateFormatInRead == default.dateFormatInRead &&
- csvOptions.timestampFormatInRead == default.timestampFormatInRead &&
- csvOptions.timestampNTZFormatInRead == default.timestampNTZFormatInRead
- }
-
-}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowScanReplaceRule.scala
b/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowScanReplaceRule.scala
deleted file mode 100644
index adfc6ca742..0000000000
---
a/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowScanReplaceRule.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gluten.extension
-
-import org.apache.gluten.datasource.ArrowCSVFileFormat
-import org.apache.gluten.datasource.v2.ArrowCSVScan
-import org.apache.gluten.execution.datasource.v2.ArrowBatchScanExec
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.{ArrowFileSourceScanExec,
FileSourceScanExec, SparkPlan}
-import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
-
-case class ArrowScanReplaceRule(spark: SparkSession) extends Rule[SparkPlan] {
- override def apply(plan: SparkPlan): SparkPlan = {
- plan.transformUp {
- case plan: FileSourceScanExec if
plan.relation.fileFormat.isInstanceOf[ArrowCSVFileFormat] =>
- ArrowFileSourceScanExec(plan)
- case plan: BatchScanExec if plan.scan.isInstanceOf[ArrowCSVScan] =>
- ArrowBatchScanExec(plan)
- case plan: BatchScanExec => plan
- case p => p
- }
- }
-}
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala
deleted file mode 100644
index 16b8fb0e9f..0000000000
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.vectorized.ColumnarBatch
-
-import scala.concurrent.duration.NANOSECONDS
-
-case class ArrowFileSourceScanExec(original: FileSourceScanExec)
- extends ArrowFileSourceScanLikeShim(original)
- with BaseArrowScanExec {
-
- lazy val inputRDD: RDD[InternalRow] = original.inputRDD
-
- override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
-
- override def output: Seq[Attribute] = original.output
-
- override def doCanonicalize(): FileSourceScanExec = original.doCanonicalize()
-
- override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
- val numOutputRows = longMetric("numOutputRows")
- val scanTime = longMetric("scanTime")
- inputRDD.asInstanceOf[RDD[ColumnarBatch]].mapPartitionsInternal {
- batches =>
- new Iterator[ColumnarBatch] {
-
- override def hasNext: Boolean = {
- // The `FileScanRDD` returns an iterator which scans the file
during the `hasNext` call.
- val startNs = System.nanoTime()
- val res = batches.hasNext
- scanTime += NANOSECONDS.toMillis(System.nanoTime() - startNs)
- res
- }
-
- override def next(): ColumnarBatch = {
- val batch = batches.next()
- numOutputRows += batch.numRows()
- batch
- }
- }
- }
- }
-}
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
deleted file mode 100644
index 13d0f5699b..0000000000
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution
-
-import org.apache.gluten.backendsapi.arrow.ArrowBatchTypes
-import org.apache.gluten.execution.GlutenPlan
-import org.apache.gluten.extension.columnar.transition.Convention
-
-trait BaseArrowScanExec extends GlutenPlan {
- final override def batchType(): Convention.BatchType = {
- ArrowBatchTypes.ArrowJavaBatchType
- }
-
- final override def rowType(): Convention.RowType = Convention.RowType.None
-}
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/ArrowCsvScanSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/ArrowCsvScanSuite.scala
deleted file mode 100644
index 8e66c3a58f..0000000000
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/ArrowCsvScanSuite.scala
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gluten.execution
-
-import org.apache.gluten.config.GlutenConfig
-import org.apache.gluten.datasource.ArrowCSVFileFormat
-
-import org.apache.spark.SparkConf
-import org.apache.spark.sql.execution.{ArrowFileSourceScanExec,
BaseArrowScanExec, ColumnarToRowExec}
-import org.apache.spark.sql.execution.columnar.SparkCacheUtil
-import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
-
-import org.scalatest.Ignore
-
-@Ignore
-class ArrowCsvScanSuiteV1 extends ArrowCsvScanSuiteBase {
- override protected def sparkConf: SparkConf = {
- super.sparkConf
- .set("spark.sql.sources.useV1SourceList", "csv")
- }
-
- test("csv scan v1") {
- val df = runAndCompare("select * from student")
- val plan = df.queryExecution.executedPlan
- assert(plan.find(s => s.isInstanceOf[ColumnarToRowExec]).isDefined)
- assert(plan.find(_.isInstanceOf[BaseArrowScanExec]).isDefined)
- val scan = plan.find(_.isInstanceOf[BaseArrowScanExec]).toList.head
- assert(
- scan
- .asInstanceOf[ArrowFileSourceScanExec]
- .relation
- .fileFormat
- .isInstanceOf[ArrowCSVFileFormat])
- }
-
- test("csv scan with schema v1") {
- val df = runAndCompare("select * from student_option_schema")
- val plan = df.queryExecution.executedPlan
- assert(plan.find(s => s.isInstanceOf[ColumnarToRowExec]).isDefined)
- val scan = plan.find(_.isInstanceOf[BaseArrowScanExec])
- assert(scan.isDefined)
- assert(
- !scan.get
- .asInstanceOf[ArrowFileSourceScanExec]
- .original
- .relation
- .fileFormat
- .asInstanceOf[ArrowCSVFileFormat]
- .fallback)
- }
-}
-
-@Ignore
-class ArrowCsvScanSuiteV2 extends ArrowCsvScanSuite {
- override protected def sparkConf: SparkConf = {
- super.sparkConf
- .set("spark.sql.sources.useV1SourceList", "")
- }
-
- test("csv scan") {
- runAndCompare("select * from student")
- }
-}
-
-@Ignore
-class ArrowCsvScanWithTableCacheSuite extends ArrowCsvScanSuiteBase {
- override protected def sparkConf: SparkConf = {
- super.sparkConf
- .set("spark.sql.sources.useV1SourceList", "csv")
- .set(GlutenConfig.COLUMNAR_TABLE_CACHE_ENABLED.key, "true")
- }
-
- override def beforeAll(): Unit = {
- super.beforeAll()
- // A common practice as well as in Spark tests, to clear the cache
serializer
- // in case it was already set as the default row-based serializer.
- SparkCacheUtil.clearCacheSerializer()
- }
-
- override def afterAll(): Unit = {
- SparkCacheUtil.clearCacheSerializer()
- super.afterAll()
- }
-
- /**
- * Test for GLUTEN-8453: https://github.com/apache/gluten/issues/8453. To
make sure no error is
- * thrown when caching an Arrow Java query plan.
- */
- test("csv scan v1 with table cache") {
- val df = spark.sql("select * from student")
- df.cache()
- assert(df.collect().length == 3)
- }
-}
-
-/** Since https://github.com/apache/gluten/pull/5850. */
-@Ignore
-abstract class ArrowCsvScanSuite extends ArrowCsvScanSuiteBase {
-
- test("csv scan with option string as null") {
- val df = runAndCompare("select * from student_option_str")
- val plan = df.queryExecution.executedPlan
- assert(plan.find(_.isInstanceOf[ColumnarToRowExec]).isDefined)
- assert(plan.find(_.isInstanceOf[BaseArrowScanExec]).isDefined)
- }
-
- test("csv scan with option delimiter") {
- val df = runAndCompare("select * from student_option")
- val plan = df.queryExecution.executedPlan
- assert(plan.find(s => s.isInstanceOf[ColumnarToRowExec]).isDefined)
- assert(plan.find(_.isInstanceOf[BaseArrowScanExec]).isDefined)
- }
-
- test("csv scan with missing columns") {
- val df =
- runAndCompare("select languagemissing, language, id_new_col from
student_option_schema_lm")
- val plan = df.queryExecution.executedPlan
- assert(plan.find(s => s.isInstanceOf[VeloxColumnarToRowExec]).isDefined)
- assert(plan.find(_.isInstanceOf[BaseArrowScanExec]).isDefined)
- }
-
- test("csv scan with different name") {
- val df = runAndCompare("select * from student_option_schema")
- val plan = df.queryExecution.executedPlan
- assert(plan.find(s => s.isInstanceOf[ColumnarToRowExec]).isDefined)
- assert(plan.find(_.isInstanceOf[BaseArrowScanExec]).isDefined)
-
- val df2 = runAndCompare("select * from student_option_schema")
- val plan2 = df2.queryExecution.executedPlan
- assert(plan2.find(s => s.isInstanceOf[ColumnarToRowExec]).isDefined)
- assert(plan2.find(_.isInstanceOf[BaseArrowScanExec]).isDefined)
- }
-
- test("csv scan with filter") {
- val df = runAndCompare("select * from student where Name = 'Peter'")
- assert(df.queryExecution.executedPlan.find(s =>
s.isInstanceOf[ColumnarToRowExec]).isEmpty)
- assert(
- df.queryExecution.executedPlan
- .find(s => s.isInstanceOf[BaseArrowScanExec])
- .isDefined)
- }
-
- test("insert into select from csv") {
- withTable("insert_csv_t") {
- spark.sql("create table insert_csv_t(Name string, Language string) using
parquet;")
- runQueryAndCompare("""
- |insert into insert_csv_t select * from student;
- |""".stripMargin) {
- checkGlutenPlan[BaseArrowScanExec]
- }
- }
- }
-}
-
-abstract class ArrowCsvScanSuiteBase extends VeloxWholeStageTransformerSuite {
- override protected val resourcePath: String = "N/A"
- override protected val fileFormat: String = "N/A"
-
- protected val rootPath: String = getClass.getResource("/").getPath
-
- override def beforeAll(): Unit = {
- super.beforeAll()
- createCsvTables()
- }
-
- override def afterAll(): Unit = {
- super.afterAll()
- }
-
- override protected def sparkConf: SparkConf = {
- super.sparkConf
- .set("spark.shuffle.manager",
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
- .set("spark.sql.files.maxPartitionBytes", "1g")
- .set("spark.sql.shuffle.partitions", "1")
- .set("spark.memory.offHeap.size", "2g")
- .set("spark.unsafe.exceptionOnMemoryLeak", "true")
- .set("spark.sql.autoBroadcastJoinThreshold", "-1")
- .set(GlutenConfig.NATIVE_ARROW_READER_ENABLED.key, "true")
- }
-
- private def createCsvTables(): Unit = {
- spark.read
- .format("csv")
- .option("header", "true")
- .load(rootPath + "/datasource/csv/student.csv")
- .createOrReplaceTempView("student")
-
- spark.read
- .format("csv")
- .option("header", "true")
- .load(rootPath + "/datasource/csv/student_option_str.csv")
- .createOrReplaceTempView("student_option_str")
-
- spark.read
- .format("csv")
- .option("header", "true")
- .option("delimiter", ";")
- .load(rootPath + "/datasource/csv/student_option.csv")
- .createOrReplaceTempView("student_option")
-
- spark.read
- .schema(
- new StructType()
- .add("id", StringType)
- .add("name", StringType)
- .add("language", StringType))
- .format("csv")
- .option("header", "true")
- .load(rootPath + "/datasource/csv/student_option_schema.csv")
- .createOrReplaceTempView("student_option_schema")
-
- spark.read
- .schema(
- new StructType()
- .add("id_new_col", IntegerType)
- .add("name", StringType)
- .add("language", StringType)
- .add("languagemissing", StringType))
- .format("csv")
- .option("header", "true")
- .load(rootPath + "/datasource/csv/student_option_schema.csv")
- .createOrReplaceTempView("student_option_schema_lm")
- }
-}
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
index fec55eff09..581d62aa8c 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
@@ -62,7 +62,6 @@ class MiscOperatorSuite extends
VeloxWholeStageTransformerSuite with AdaptiveSpa
.set("spark.unsafe.exceptionOnMemoryLeak", "true")
.set("spark.sql.autoBroadcastJoinThreshold", "-1")
.set("spark.sql.sources.useV1SourceList", "avro,parquet,csv")
- .set(GlutenConfig.NATIVE_ARROW_READER_ENABLED.key, "true")
}
test("select_part_column") {
diff --git a/docs/Configuration.md b/docs/Configuration.md
index 4ed5cd4b27..6aaf25578b 100644
--- a/docs/Configuration.md
+++ b/docs/Configuration.md
@@ -125,7 +125,6 @@ nav_order: 15
| spark.gluten.sql.fallbackUnexpectedMetadataParquet.samplePercentage | 🔄
Dynamic | 0.1 | The percentage of root paths to sample for
metadata validation when the number of root paths is large. Value range is (0,
1.0]. 1.0 means check all paths (no sampling). A smaller value reduces
validation cost for tables with many partitions.
[...]
| spark.gluten.sql.injectNativePlanStringToExplain | 🔄
Dynamic | false | When true, Gluten will inject native plan tree
to Spark's explain output.
[...]
| spark.gluten.sql.mergeTwoPhasesAggregate.enabled | 🔄
Dynamic | true | Whether to merge two phases aggregate if there
are no other operators between them.
[...]
-| spark.gluten.sql.native.arrow.reader.enabled | 🔄
Dynamic | false | This is config to specify whether to enable
the native columnar csv reader
[...]
| spark.gluten.sql.native.bloomFilter | 🔄
Dynamic | true |
| spark.gluten.sql.native.hive.writer.enabled | 🔄
Dynamic | true | This is config to specify whether to enable
the native columnar writer for HiveFileFormat. Currently only supports
HiveFileFormat with Parquet as the output file type.
[...]
| spark.gluten.sql.native.hyperLogLog.Aggregate | 🔄
Dynamic | true |
diff --git
a/gluten-arrow/src/main/scala/org/apache/gluten/utils/ArrowUtil.scala
b/gluten-arrow/src/main/scala/org/apache/gluten/utils/ArrowUtil.scala
index a94f8f2e3d..1354024879 100644
--- a/gluten-arrow/src/main/scala/org/apache/gluten/utils/ArrowUtil.scala
+++ b/gluten-arrow/src/main/scala/org/apache/gluten/utils/ArrowUtil.scala
@@ -18,31 +18,20 @@ package org.apache.gluten.utils
import org.apache.gluten.vectorized.ArrowWritableColumnVector
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.execution.vectorized.ArrowColumnVectorUtils
import org.apache.spark.sql.types._
import org.apache.spark.sql.utils.{SparkArrowUtil, SparkSchemaUtil}
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
import org.apache.arrow.c.{ArrowSchema, CDataDictionaryProvider, Data}
-import org.apache.arrow.dataset.file.{FileFormat, FileSystemDatasetFactory}
-import org.apache.arrow.dataset.jni.NativeMemoryPool
-import org.apache.arrow.dataset.scanner.FragmentScanOptions
import org.apache.arrow.memory.BufferAllocator
-import org.apache.arrow.vector.ipc.message.ArrowRecordBatch
import org.apache.arrow.vector.types.pojo.{ArrowType, Field, Schema}
-import org.apache.hadoop.fs.FileStatus
-import java.net.{URI, URLDecoder}
import java.util
-import java.util.Optional
import scala.collection.JavaConverters._
-import scala.collection.mutable
-object ArrowUtil extends Logging {
+object ArrowUtil {
private val defaultTimeZoneId = SparkSchemaUtil.getLocalTimezoneID
@@ -97,159 +86,4 @@ object ArrowUtil extends Logging {
new Schema(fields)
}
- private def rewriteUri(encodeUri: String): String = {
- val decodedUri = encodeUri
- val uri = URI.create(decodedUri)
- if (uri.getScheme == "s3" || uri.getScheme == "s3a") {
- val s3Rewritten =
- new URI("s3", uri.getAuthority, uri.getPath, uri.getQuery,
uri.getFragment).toString
- return s3Rewritten
- }
- val sch = uri.getScheme match {
- case "hdfs" => "hdfs"
- case "file" => "file"
- }
- val ssp = uri.getScheme match {
- case "hdfs" => uri.getSchemeSpecificPart
- case "file" => "//" + uri.getSchemeSpecificPart
- }
- val rewritten = new URI(sch, ssp, uri.getFragment)
- rewritten.toString
- }
-
- def makeArrowDiscovery(
- encodedUri: String,
- format: FileFormat,
- option: Optional[FragmentScanOptions],
- allocator: BufferAllocator,
- pool: NativeMemoryPool
- ): FileSystemDatasetFactory = {
- val factory =
- new FileSystemDatasetFactory(allocator, pool, format,
rewriteUri(encodedUri), option)
- factory
- }
-
- def readArrowSchema(
- file: String,
- format: FileFormat,
- option: FragmentScanOptions,
- allocator: BufferAllocator,
- pool: NativeMemoryPool): Schema = {
- val factory: FileSystemDatasetFactory =
- makeArrowDiscovery(file, format, Optional.of(option), allocator, pool)
- val schema = factory.inspect()
- factory.close()
- schema
- }
-
- def readArrowFileColumnNames(
- file: String,
- format: FileFormat,
- option: FragmentScanOptions,
- allocator: BufferAllocator,
- pool: NativeMemoryPool): Array[String] = {
- val fileFields = ArrowUtil
- .readArrowSchema(URLDecoder.decode(file, "UTF-8"), format, option,
allocator, pool)
- .getFields
- .asScala
- fileFields.map(_.getName).toArray
- }
-
- def readSchema(
- file: FileStatus,
- format: FileFormat,
- option: FragmentScanOptions,
- allocator: BufferAllocator,
- pool: NativeMemoryPool): Option[StructType] = {
- val factory: FileSystemDatasetFactory =
- makeArrowDiscovery(file.getPath.toString, format, Optional.of(option),
allocator, pool)
- val schema = factory.inspect()
- try {
- Option(SparkSchemaUtil.fromArrowSchema(schema))
- } finally {
- factory.close()
- }
- }
-
- def readSchema(
- files: Seq[FileStatus],
- format: FileFormat,
- option: FragmentScanOptions,
- allocator: BufferAllocator,
- pool: NativeMemoryPool): Option[StructType] = {
- if (files.isEmpty) {
- throw new IllegalArgumentException("No input file specified")
- }
-
- readSchema(files.head, format, option, allocator, pool)
- }
-
- def loadMissingColumns(
- rowCount: Int,
- missingSchema: StructType): Array[ArrowWritableColumnVector] = {
-
- val vectors =
- ArrowWritableColumnVector.allocateColumns(rowCount, missingSchema)
- vectors.foreach {
- vector =>
- vector.putNulls(0, rowCount)
- vector.setValueCount(rowCount)
- }
-
- vectors
- }
-
- def loadPartitionColumns(
- rowCount: Int,
- partitionSchema: StructType,
- partitionValues: InternalRow): Array[ArrowWritableColumnVector] = {
- val partitionColumns = ArrowWritableColumnVector.allocateColumns(rowCount,
partitionSchema)
- (0 until partitionColumns.length).foreach(
- i => {
- ArrowColumnVectorUtils.populate(partitionColumns(i), partitionValues,
i)
- partitionColumns(i).setValueCount(rowCount)
- partitionColumns(i).setIsConstant()
- })
-
- partitionColumns
- }
-
- def loadBatch(
- allocator: BufferAllocator,
- input: ArrowRecordBatch,
- dataSchema: Schema,
- partitionVectors: Array[ArrowWritableColumnVector] = Array.empty,
- nullVectors: Array[ArrowWritableColumnVector] = Array.empty):
ColumnarBatch = {
- val rowCount: Int = input.getLength
-
- val vectors =
- try {
- ArrowWritableColumnVector.loadColumns(rowCount, dataSchema, input,
allocator)
- } finally {
- input.close()
- }
-
- val totalVectors = if (nullVectors.nonEmpty) {
- val finalVectors =
- mutable.ArrayBuffer[ArrowWritableColumnVector]()
- finalVectors.appendAll(vectors)
- finalVectors.appendAll(nullVectors)
- finalVectors.toArray
- } else {
- vectors
- }
-
- val batch = new ColumnarBatch(
- totalVectors.map(_.asInstanceOf[ColumnVector]) ++
- partitionVectors
- .map {
- vector =>
- vector.setValueCount(rowCount)
- vector.asInstanceOf[ColumnVector]
- },
- rowCount
- )
- batch
- }
-
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index 79274b4c60..7f5d5a9c21 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -139,8 +139,6 @@ trait BackendSettingsApi {
def enableNativeWriteFiles(): Boolean
- def enableNativeArrowReadFiles(): Boolean = false
-
def shouldRewriteCount(): Boolean = false
def supportCartesianProductExec(): Boolean = false
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index acfb84cf96..414bf113db 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -344,8 +344,6 @@ class GlutenConfig(conf: SQLConf) extends
GlutenCoreConfig(conf) {
// Please use `BackendsApiManager.getSettings.enableNativeWriteFiles()`
instead
def enableNativeWriter: Option[Boolean] = getConf(NATIVE_WRITER_ENABLED)
- def enableNativeArrowReader: Boolean = getConf(NATIVE_ARROW_READER_ENABLED)
-
def enableColumnarProjectCollapse: Boolean =
getConf(ENABLE_COLUMNAR_PROJECT_COLLAPSE)
def enableColumnarPartialProject: Boolean =
getConf(ENABLE_COLUMNAR_PARTIAL_PROJECT)
@@ -1353,12 +1351,6 @@ object GlutenConfig extends ConfigRegistry {
.booleanConf
.createWithDefault(true)
- val NATIVE_ARROW_READER_ENABLED =
- buildConf("spark.gluten.sql.native.arrow.reader.enabled")
- .doc("This is config to specify whether to enable the native columnar
csv reader")
- .booleanConf
- .createWithDefault(false)
-
val NATIVE_WRITE_FILES_COLUMN_METADATA_EXCLUSION_LIST =
buildConf("spark.gluten.sql.native.writeColumnMetadataExclusionList")
.doc(
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenReadSchemaSuite.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenReadSchemaSuite.scala
index 6ea7dd9101..1ca70a2cb9 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenReadSchemaSuite.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenReadSchemaSuite.scala
@@ -16,27 +16,16 @@
*/
package org.apache.spark.sql.execution.datasources
-import org.apache.gluten.config.GlutenConfig
-
-import org.apache.spark.SparkConf
import org.apache.spark.sql.GlutenSQLTestsBaseTrait
import org.apache.spark.sql.internal.SQLConf
import java.io.File
-class GlutenCSVReadSchemaSuite extends CSVReadSchemaSuite with
GlutenSQLTestsBaseTrait {
-
- override def sparkConf: SparkConf =
- super.sparkConf
- .set(GlutenConfig.NATIVE_ARROW_READER_ENABLED.key, "true")
-}
-
-class GlutenHeaderCSVReadSchemaSuite extends HeaderCSVReadSchemaSuite with
GlutenSQLTestsBaseTrait {
+class GlutenCSVReadSchemaSuite extends CSVReadSchemaSuite with
GlutenSQLTestsBaseTrait {}
- override def sparkConf: SparkConf =
- super.sparkConf
- .set(GlutenConfig.NATIVE_ARROW_READER_ENABLED.key, "true")
-}
+class GlutenHeaderCSVReadSchemaSuite
+ extends HeaderCSVReadSchemaSuite
+ with GlutenSQLTestsBaseTrait {}
class GlutenJsonReadSchemaSuite extends JsonReadSchemaSuite with
GlutenSQLTestsBaseTrait {}
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/csv/GlutenCSVSuite.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/csv/GlutenCSVSuite.scala
index 6cfa9f2028..14c6711ef3 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/csv/GlutenCSVSuite.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/csv/GlutenCSVSuite.scala
@@ -16,7 +16,6 @@
*/
package org.apache.spark.sql.execution.datasources.csv
-import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.exception.GlutenException
import org.apache.spark.{SparkConf, SparkException}
@@ -30,10 +29,6 @@ import java.sql.{Date, Timestamp}
class GlutenCSVSuite extends CSVSuite with GlutenSQLTestsBaseTrait {
- override def sparkConf: SparkConf =
- super.sparkConf
- .set(GlutenConfig.NATIVE_ARROW_READER_ENABLED.key, "true")
-
/** Returns full path to the given file in the resource folder */
override protected def testFile(fileName: String): String = {
getWorkspaceFilePath("sql", "core", "src", "test", "resources").toString +
"/" + fileName
@@ -49,11 +44,10 @@ class GlutenCSVv1Suite extends GlutenCSVSuite {
class GlutenCSVv2Suite extends GlutenCSVSuite {
import testImplicits._
+
override def sparkConf: SparkConf =
super.sparkConf
.set(SQLConf.USE_V1_SOURCE_LIST, "")
- .set(GlutenConfig.NATIVE_ARROW_READER_ENABLED.key, "true")
-
override def testNameBlackList: Seq[String] = Seq(
// overwritten with different test
"test for FAILFAST parsing mode",
diff --git
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenReadSchemaSuite.scala
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenReadSchemaSuite.scala
index 6ea7dd9101..1ca70a2cb9 100644
---
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenReadSchemaSuite.scala
+++
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenReadSchemaSuite.scala
@@ -16,27 +16,16 @@
*/
package org.apache.spark.sql.execution.datasources
-import org.apache.gluten.config.GlutenConfig
-
-import org.apache.spark.SparkConf
import org.apache.spark.sql.GlutenSQLTestsBaseTrait
import org.apache.spark.sql.internal.SQLConf
import java.io.File
-class GlutenCSVReadSchemaSuite extends CSVReadSchemaSuite with
GlutenSQLTestsBaseTrait {
-
- override def sparkConf: SparkConf =
- super.sparkConf
- .set(GlutenConfig.NATIVE_ARROW_READER_ENABLED.key, "true")
-}
-
-class GlutenHeaderCSVReadSchemaSuite extends HeaderCSVReadSchemaSuite with
GlutenSQLTestsBaseTrait {
+class GlutenCSVReadSchemaSuite extends CSVReadSchemaSuite with
GlutenSQLTestsBaseTrait {}
- override def sparkConf: SparkConf =
- super.sparkConf
- .set(GlutenConfig.NATIVE_ARROW_READER_ENABLED.key, "true")
-}
+class GlutenHeaderCSVReadSchemaSuite
+ extends HeaderCSVReadSchemaSuite
+ with GlutenSQLTestsBaseTrait {}
class GlutenJsonReadSchemaSuite extends JsonReadSchemaSuite with
GlutenSQLTestsBaseTrait {}
diff --git
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/csv/GlutenCSVSuite.scala
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/csv/GlutenCSVSuite.scala
index af0f59b9bc..3ae0abfd74 100644
---
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/csv/GlutenCSVSuite.scala
+++
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/csv/GlutenCSVSuite.scala
@@ -16,7 +16,6 @@
*/
package org.apache.spark.sql.execution.datasources.csv
-import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.exception.GlutenException
import org.apache.spark.{SparkConf, SparkException}
@@ -30,10 +29,6 @@ import java.sql.{Date, Timestamp}
class GlutenCSVSuite extends CSVSuite with GlutenSQLTestsBaseTrait {
- override def sparkConf: SparkConf =
- super.sparkConf
- .set(GlutenConfig.NATIVE_ARROW_READER_ENABLED.key, "true")
-
/** Returns full path to the given file in the resource folder */
override protected def testFile(fileName: String): String = {
getWorkspaceFilePath("sql", "core", "src", "test", "resources").toString +
"/" + fileName
@@ -49,11 +44,10 @@ class GlutenCSVv1Suite extends GlutenCSVSuite {
class GlutenCSVv2Suite extends GlutenCSVSuite {
import testImplicits._
+
override def sparkConf: SparkConf =
super.sparkConf
.set(SQLConf.USE_V1_SOURCE_LIST, "")
- .set(GlutenConfig.NATIVE_ARROW_READER_ENABLED.key, "true")
-
override def testNameBlackList: Seq[String] = Seq(
// overwritten with different test
"test for FAILFAST parsing mode",
diff --git
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenReadSchemaSuite.scala
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenReadSchemaSuite.scala
index 6ea7dd9101..1ca70a2cb9 100644
---
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenReadSchemaSuite.scala
+++
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenReadSchemaSuite.scala
@@ -16,27 +16,16 @@
*/
package org.apache.spark.sql.execution.datasources
-import org.apache.gluten.config.GlutenConfig
-
-import org.apache.spark.SparkConf
import org.apache.spark.sql.GlutenSQLTestsBaseTrait
import org.apache.spark.sql.internal.SQLConf
import java.io.File
-class GlutenCSVReadSchemaSuite extends CSVReadSchemaSuite with
GlutenSQLTestsBaseTrait {
-
- override def sparkConf: SparkConf =
- super.sparkConf
- .set(GlutenConfig.NATIVE_ARROW_READER_ENABLED.key, "true")
-}
-
-class GlutenHeaderCSVReadSchemaSuite extends HeaderCSVReadSchemaSuite with
GlutenSQLTestsBaseTrait {
+class GlutenCSVReadSchemaSuite extends CSVReadSchemaSuite with
GlutenSQLTestsBaseTrait {}
- override def sparkConf: SparkConf =
- super.sparkConf
- .set(GlutenConfig.NATIVE_ARROW_READER_ENABLED.key, "true")
-}
+class GlutenHeaderCSVReadSchemaSuite
+ extends HeaderCSVReadSchemaSuite
+ with GlutenSQLTestsBaseTrait {}
class GlutenJsonReadSchemaSuite extends JsonReadSchemaSuite with
GlutenSQLTestsBaseTrait {}
diff --git
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/csv/GlutenCSVSuite.scala
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/csv/GlutenCSVSuite.scala
index b137a1a77f..a554194ae2 100644
---
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/csv/GlutenCSVSuite.scala
+++
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/csv/GlutenCSVSuite.scala
@@ -16,7 +16,6 @@
*/
package org.apache.spark.sql.execution.datasources.csv
-import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.exception.GlutenException
import org.apache.spark.{SparkConf, SparkException}
@@ -30,10 +29,6 @@ import java.sql.{Date, Timestamp}
class GlutenCSVSuite extends CSVSuite with GlutenSQLTestsBaseTrait {
- override def sparkConf: SparkConf =
- super.sparkConf
- .set(GlutenConfig.NATIVE_ARROW_READER_ENABLED.key, "true")
-
/** Returns full path to the given file in the resource folder */
override protected def testFile(fileName: String): String = {
"file://" + getWorkspaceFilePath(
@@ -54,11 +49,10 @@ class GlutenCSVv1Suite extends GlutenCSVSuite {
class GlutenCSVv2Suite extends GlutenCSVSuite {
import testImplicits._
+
override def sparkConf: SparkConf =
super.sparkConf
.set(SQLConf.USE_V1_SOURCE_LIST, "")
- .set(GlutenConfig.NATIVE_ARROW_READER_ENABLED.key, "true")
-
override def testNameBlackList: Seq[String] = Seq(
// overwritten with different test
"test for FAILFAST parsing mode",
diff --git
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
index ffa6759c74..05e1b88f76 100644
---
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
+++
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
@@ -138,12 +138,3 @@ abstract class BatchScanExecShim(
Boolean.box(replicatePartitions))
}
}
-
-abstract class ArrowBatchScanExecShim(original: BatchScanExec)
- extends BatchScanExecShim(
- original.output,
- original.scan,
- original.runtimeFilters,
- original.keyGroupedPartitioning,
- table = null
- )
diff --git
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
index 9a54e01cd6..81446cf23a 100644
---
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
+++
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
@@ -143,24 +143,3 @@ abstract class BatchScanExecShim(
}
}
}
-
-abstract class ArrowBatchScanExecShim(original: BatchScanExec)
- extends BatchScanExecShim(
- original.output,
- original.scan,
- original.runtimeFilters,
- original.keyGroupedPartitioning,
- original.ordering,
- original.table,
- original.commonPartitionValues,
- original.applyPartialClustering,
- original.replicatePartitions
- ) {
- override def scan: Scan = original.scan
-
- override def ordering: Option[Seq[SortOrder]] = original.ordering
-
- override def output: Seq[Attribute] = original.output
-
- override def keyGroupedPartitioning: Option[Seq[Expression]] =
original.keyGroupedPartitioning
-}
diff --git
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
index 59bf431683..0767f5e9f4 100644
---
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
+++
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
@@ -145,22 +145,3 @@ abstract class BatchScanExecShim(
}
}
}
-
-abstract class ArrowBatchScanExecShim(original: BatchScanExec)
- extends BatchScanExecShim(
- original.output,
- original.scan,
- original.runtimeFilters,
- original.spjParams.keyGroupedPartitioning,
- original.ordering,
- original.table,
- original.spjParams.commonPartitionValues,
- original.spjParams.applyPartialClustering,
- original.spjParams.replicatePartitions
- ) {
- override def scan: Scan = original.scan
-
- override def ordering: Option[Seq[SortOrder]] = original.ordering
-
- override def output: Seq[Attribute] = original.output
-}
diff --git
a/shims/spark40/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
b/shims/spark40/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
index 242241254e..c00ebe11c0 100644
---
a/shims/spark40/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
+++
b/shims/spark40/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
@@ -153,24 +153,3 @@ abstract class BatchScanExecShim(
}
}
}
-
-abstract class ArrowBatchScanExecShim(original: BatchScanExec)
- extends BatchScanExecShim(
- original.output,
- original.scan,
- original.runtimeFilters,
- original.spjParams.keyGroupedPartitioning,
- original.ordering,
- original.table,
- original.spjParams.joinKeyPositions,
- original.spjParams.commonPartitionValues,
- original.spjParams.reducers,
- original.spjParams.applyPartialClustering,
- original.spjParams.replicatePartitions
- ) {
- override def scan: Scan = original.scan
-
- override def ordering: Option[Seq[SortOrder]] = original.ordering
-
- override def output: Seq[Attribute] = original.output
-}
diff --git
a/shims/spark41/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
b/shims/spark41/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
index 046c42ea7b..604a8c0ee2 100644
---
a/shims/spark41/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
+++
b/shims/spark41/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
@@ -154,24 +154,3 @@ abstract class BatchScanExecShim(
}
}
}
-
-abstract class ArrowBatchScanExecShim(original: BatchScanExec)
- extends BatchScanExecShim(
- original.output,
- original.scan,
- original.runtimeFilters,
- original.spjParams.keyGroupedPartitioning,
- original.ordering,
- original.table,
- original.spjParams.joinKeyPositions,
- original.spjParams.commonPartitionValues,
- original.spjParams.reducers,
- original.spjParams.applyPartialClustering,
- original.spjParams.replicatePartitions
- ) {
- override def scan: Scan = original.scan
-
- override def ordering: Option[Seq[SortOrder]] = original.ordering
-
- override def output: Seq[Attribute] = original.output
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]