This is an automated email from the ASF dual-hosted git repository.

yaooqinn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new d206c5e20e [MINOR][VL] Remove dead Arrow-CSV / Arrow-Dataset JVM code 
paths (#12130)
d206c5e20e is described below

commit d206c5e20e2709ede7162fc5b8d4602600a92f50
Author: Kent Yao <[email protected]>
AuthorDate: Mon May 25 22:08:11 2026 +0800

    [MINOR][VL] Remove dead Arrow-CSV / Arrow-Dataset JVM code paths (#12130)
    
    * [MINOR][VL] Remove dead Arrow-CSV / Arrow-Dataset JVM code path
    
    The ArrowCSV file format and ArrowBatchScanExec chain are unreachable:
    no injection in VeloxRuleApi, no META-INF/services entry, and all
    ArrowCsvScanSuite cases are @Ignore'd. They were introduced as a
    squash-merge byproduct in #11776 and never wired up.
    
    Verified by compiling:
      * spark-3.5 + scala-2.12 + arrow 15.0.0-gluten (install)
      * spark-4.0 + scala-2.13 + arrow 18.1.0 (compile)
    
    Generated-by: claude-opus-4.7
    
    * [MINOR][VL] Remove dead Arrow dataset reader paths from ArrowUtil
    
    makeArrowDiscovery / readArrowSchema / readArrowFileColumnNames /
    readSchema(FragmentScanOptions) overloads / loadMissingColumns /
    loadPartitionColumns / loadBatch in ArrowUtil have zero callers across
    the repo after the previous removal of the ArrowCSV chain. Drop them
    together with the now-unused imports (arrow.dataset.*, FileStatus,
    URI/URLDecoder, ArrowRecordBatch, Optional, Logging, etc.).
    
    Verified by compiling:
      * spark-3.5 + scala-2.12 (test-compile, patched arrow 15.0.0-gluten)
      * spark-4.0 + scala-2.13 (compile, pure arrow 18.1.0)
    
    Generated-by: claude-opus-4.7
    
    * [MINOR][VL] Remove dead spark.gluten.sql.native.arrow.reader.enabled 
config
    
    Following the removal of ArrowConvertorRule/ArrowScanReplaceRule (already
    
    unwired from VeloxRuleApi by PR #11190 "[GLUTEN-11088][VL] Fall back CSV
    
    reader" merged 2026-01-19), the spark.gluten.sql.native.arrow.reader.enabled
    
    config and its plumbing have no consumers:
    
      * GlutenConfig.enableNativeArrowReader / NATIVE_ARROW_READER_ENABLED
    
      * BackendSettingsApi.enableNativeArrowReadFiles (default)
    
      * VeloxBackend.enableNativeArrowReadFiles (override)
    
    Test suites still set this flag (MiscOperatorSuite, GlutenCSVSuite,
    
    GlutenReadSchemaSuite across spark35/40/41) but it has been a no-op since
    
    PR #11190; CSV continues to be covered by these suites via the Spark
    
    native CSV path. The corresponding entry in docs/Configuration.md is
    
    also removed.
    
    Generated-by: Claude claude-opus-4.7
    
    * [MINOR][UT] Fix spotless violation in GlutenReadSchemaSuite
    
    Generated-by: claude-opus-4.7
    
    * [MINOR][UT] Fix spotless violation in spark40/spark41 
GlutenReadSchemaSuite
    
    Generated-by: claude-opus-4.7
    
    * [MINOR][UT] Remove unused GlutenConfig import from GlutenCSVSuite 
(spark35/40/41)
    
    Generated-by: claude-opus-4.7
---
 .../gluten/backendsapi/velox/VeloxBackend.scala    |   4 -
 .../gluten/datasource/ArrowCSVFileFormat.scala     | 379 ---------------------
 .../datasource/ArrowCSVOptionConverter.scala       |  62 ----
 .../v2/ArrowCSVPartitionReaderFactory.scala        | 176 ----------
 .../apache/gluten/datasource/v2/ArrowCSVScan.scala |  76 -----
 .../gluten/datasource/v2/ArrowCSVScanBuilder.scala |  44 ---
 .../gluten/datasource/v2/ArrowCSVTable.scala       |  80 -----
 .../datasource/v2/ArrowBatchScanExec.scala         |  46 ---
 .../gluten/extension/ArrowConvertorRule.scala      | 120 -------
 .../gluten/extension/ArrowScanReplaceRule.scala    |  39 ---
 .../sql/execution/ArrowFileSourceScanExec.scala    |  61 ----
 .../spark/sql/execution/BaseArrowScanExec.scala    |  29 --
 .../gluten/execution/ArrowCsvScanSuite.scala       | 238 -------------
 .../gluten/execution/MiscOperatorSuite.scala       |   1 -
 docs/Configuration.md                              |   1 -
 .../scala/org/apache/gluten/utils/ArrowUtil.scala  | 168 +--------
 .../gluten/backendsapi/BackendSettingsApi.scala    |   2 -
 .../org/apache/gluten/config/GlutenConfig.scala    |   8 -
 .../datasources/GlutenReadSchemaSuite.scala        |  19 +-
 .../execution/datasources/csv/GlutenCSVSuite.scala |   8 +-
 .../datasources/GlutenReadSchemaSuite.scala        |  19 +-
 .../execution/datasources/csv/GlutenCSVSuite.scala |   8 +-
 .../datasources/GlutenReadSchemaSuite.scala        |  19 +-
 .../execution/datasources/csv/GlutenCSVSuite.scala |   8 +-
 .../datasources/v2/BatchScanExecShim.scala         |   9 -
 .../datasources/v2/BatchScanExecShim.scala         |  21 --
 .../datasources/v2/BatchScanExecShim.scala         |  19 --
 .../datasources/v2/BatchScanExecShim.scala         |  21 --
 .../datasources/v2/BatchScanExecShim.scala         |  21 --
 29 files changed, 16 insertions(+), 1690 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 9d2d92a2fc..599b91851e 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -550,10 +550,6 @@ object VeloxBackendSettings extends BackendSettingsApi {
     )
   }
 
-  override def enableNativeArrowReadFiles(): Boolean = {
-    GlutenConfig.get.enableNativeArrowReader
-  }
-
   override def shouldRewriteCount(): Boolean = {
     // Velox backend does not support count if it has more that one child,
     // so we should rewrite it.
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala
deleted file mode 100644
index 10b7e7801f..0000000000
--- 
a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala
+++ /dev/null
@@ -1,379 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gluten.datasource
-
-import org.apache.gluten.columnarbatch.ColumnarBatches
-import org.apache.gluten.config.VeloxConfig
-import org.apache.gluten.exception.SchemaMismatchException
-import org.apache.gluten.execution.RowToVeloxColumnarExec
-import org.apache.gluten.iterator.Iterators
-import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
-import org.apache.gluten.memory.arrow.pool.ArrowNativeMemoryPool
-import org.apache.gluten.utils.ArrowUtil
-import org.apache.gluten.vectorized.ArrowWritableColumnVector
-
-import org.apache.spark.TaskContext
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.csv.{CSVHeaderChecker, 
CSVHeaderCheckerHelper, CSVOptions, UnivocityParser}
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
JoinedRow}
-import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
-import org.apache.spark.sql.execution.datasources.{FileFormat, 
HadoopFileLinesReader, OutputWriterFactory, PartitionedFile}
-import org.apache.spark.sql.execution.datasources.csv.CSVDataSource
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
-import org.apache.spark.sql.types.{StructField, StructType}
-import org.apache.spark.sql.vectorized.ColumnarBatch
-import org.apache.spark.util.SerializableConfiguration
-
-import org.apache.arrow.c.ArrowSchema
-import org.apache.arrow.dataset.file.FileSystemDatasetFactory
-import org.apache.arrow.dataset.scanner.ScanOptions
-import org.apache.arrow.dataset.scanner.csv.CsvFragmentScanOptions
-import org.apache.arrow.memory.BufferAllocator
-import org.apache.arrow.vector.VectorUnloader
-import org.apache.arrow.vector.types.pojo.Schema
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, Path}
-
-import java.net.URLDecoder
-import java.util.Optional
-
-import scala.collection.JavaConverters.{asJavaIterableConverter, 
asScalaBufferConverter}
-
-class ArrowCSVFileFormat(parsedOptions: CSVOptions)
-  extends FileFormat
-  with DataSourceRegister
-  with Logging
-  with Serializable {
-
-  private val fileFormat = org.apache.arrow.dataset.file.FileFormat.CSV
-  private lazy val pool = ArrowNativeMemoryPool.arrowPool("FileSystem Read")
-  var fallback = false
-
-  override def isSplitable(
-      sparkSession: SparkSession,
-      options: Map[String, String],
-      path: Path): Boolean = {
-    false
-  }
-
-  override def inferSchema(
-      sparkSession: SparkSession,
-      options: Map[String, String],
-      files: Seq[FileStatus]): Option[StructType] = {
-    val arrowConfig = ArrowCSVOptionConverter.convert(parsedOptions)
-    ArrowUtil.readSchema(
-      files,
-      fileFormat,
-      arrowConfig,
-      ArrowBufferAllocators.contextInstance(),
-      ArrowNativeMemoryPool.arrowPool("infer schema"))
-  }
-
-  override def supportBatch(sparkSession: SparkSession, dataSchema: 
StructType): Boolean = true
-
-  override def buildReaderWithPartitionValues(
-      sparkSession: SparkSession,
-      dataSchema: StructType,
-      partitionSchema: StructType,
-      requiredSchema: StructType,
-      filters: Seq[Filter],
-      options: Map[String, String],
-      hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
-    val sqlConf = sparkSession.sessionState.conf
-    val broadcastedHadoopConf =
-      sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
-    val batchSize = sqlConf.columnBatchSize
-    val columnPruning = sqlConf.csvColumnPruning &&
-      !requiredSchema.exists(_.name == 
sparkSession.sessionState.conf.columnNameOfCorruptRecord)
-    val actualFilters =
-      
filters.filterNot(_.references.contains(parsedOptions.columnNameOfCorruptRecord))
-    (file: PartitionedFile) => {
-      val actualDataSchema = StructType(
-        dataSchema.filterNot(_.name == 
parsedOptions.columnNameOfCorruptRecord))
-      val actualRequiredSchema = StructType(
-        requiredSchema.filterNot(_.name == 
parsedOptions.columnNameOfCorruptRecord))
-      ArrowCSVFileFormat.checkHeader(
-        file,
-        actualDataSchema,
-        actualRequiredSchema,
-        parsedOptions,
-        actualFilters,
-        broadcastedHadoopConf.value.value)
-
-      val arrowConfig = ArrowCSVOptionConverter.convert(parsedOptions)
-      val allocator = ArrowBufferAllocators.contextInstance()
-      // todo predicate validation / pushdown
-      val fileNames = ArrowUtil
-        .readArrowFileColumnNames(
-          URLDecoder.decode(file.filePath.toString, "UTF-8"),
-          fileFormat,
-          arrowConfig,
-          ArrowBufferAllocators.contextInstance(),
-          pool)
-      val tokenIndexArr =
-        actualRequiredSchema
-          .map(f => java.lang.Integer.valueOf(actualDataSchema.indexOf(f)))
-          .toArray
-      val fileIndex = tokenIndexArr.filter(_ < fileNames.length)
-      val requestSchema = new StructType(
-        fileIndex
-          .map(index => StructField(fileNames(index), 
actualDataSchema(index).dataType)))
-      val missingIndex = tokenIndexArr.filter(_ >= fileNames.length)
-      val missingSchema = new StructType(missingIndex.map(actualDataSchema(_)))
-      // TODO: support array/map/struct types in out-of-order schema reading.
-      val cSchema: ArrowSchema = ArrowSchema.allocateNew(allocator)
-      val cSchema2: ArrowSchema = ArrowSchema.allocateNew(allocator)
-      try {
-        ArrowCSVOptionConverter.schema(requestSchema, cSchema, allocator, 
arrowConfig)
-        val factory =
-          ArrowUtil.makeArrowDiscovery(
-            URLDecoder.decode(file.filePath.toString, "UTF-8"),
-            fileFormat,
-            Optional.of(arrowConfig),
-            ArrowBufferAllocators.contextInstance(),
-            pool)
-        val fields = factory.inspect().getFields
-        val actualReadFields = new Schema(
-          fileIndex.map(index => fields.get(index)).toIterable.asJava)
-        ArrowCSVOptionConverter.schema(requestSchema, cSchema2, allocator, 
arrowConfig)
-        ArrowCSVFileFormat
-          .readArrow(
-            ArrowBufferAllocators.contextInstance(),
-            file,
-            actualReadFields,
-            missingSchema,
-            partitionSchema,
-            factory,
-            batchSize,
-            arrowConfig)
-          .asInstanceOf[Iterator[InternalRow]]
-      } catch {
-        case e: SchemaMismatchException =>
-          logWarning(e.getMessage)
-          fallback = true
-          val iter = ArrowCSVFileFormat.fallbackReadVanilla(
-            dataSchema,
-            requiredSchema,
-            broadcastedHadoopConf.value.value,
-            parsedOptions,
-            file,
-            actualFilters,
-            columnPruning)
-          val (schema, rows) =
-            ArrowCSVFileFormat.withPartitionValue(requiredSchema, 
partitionSchema, iter, file)
-          ArrowCSVFileFormat
-            .rowToColumn(schema, batchSize, rows)
-            .asInstanceOf[Iterator[InternalRow]]
-        case d: Exception => throw d
-      } finally {
-        cSchema.close()
-        cSchema2.close()
-      }
-    }
-  }
-
-  override def vectorTypes(
-      requiredSchema: StructType,
-      partitionSchema: StructType,
-      sqlConf: SQLConf): Option[Seq[String]] = {
-    Option(
-      Seq.fill(requiredSchema.fields.length + partitionSchema.fields.length)(
-        classOf[ArrowWritableColumnVector].getName
-      ))
-  }
-
-  override def shortName(): String = "arrowcsv"
-
-  override def hashCode(): Int = getClass.hashCode()
-
-  override def equals(other: Any): Boolean = 
other.isInstanceOf[ArrowCSVFileFormat]
-
-  override def prepareWrite(
-      sparkSession: SparkSession,
-      job: _root_.org.apache.hadoop.mapreduce.Job,
-      options: Map[String, String],
-      dataSchema: StructType): OutputWriterFactory = {
-    throw new UnsupportedOperationException()
-  }
-}
-
-object ArrowCSVFileFormat {
-
-  def readArrow(
-      allocator: BufferAllocator,
-      file: PartitionedFile,
-      actualReadFields: Schema,
-      missingSchema: StructType,
-      partitionSchema: StructType,
-      factory: FileSystemDatasetFactory,
-      batchSize: Int,
-      arrowConfig: CsvFragmentScanOptions): Iterator[ColumnarBatch] = {
-    val actualReadFieldNames = 
actualReadFields.getFields.asScala.map(_.getName).toArray
-    val dataset = factory.finish(actualReadFields)
-    val scanOptions = new ScanOptions.Builder(batchSize)
-      .columns(Optional.of(actualReadFieldNames))
-      .fragmentScanOptions(arrowConfig)
-      .build()
-    val scanner = dataset.newScan(scanOptions)
-
-    val partitionVectors =
-      ArrowUtil.loadPartitionColumns(batchSize, partitionSchema, 
file.partitionValues)
-
-    val nullVectors = if (missingSchema.nonEmpty) {
-      ArrowUtil.loadMissingColumns(batchSize, missingSchema)
-    } else {
-      Array.empty[ArrowWritableColumnVector]
-    }
-    val reader = scanner.scanBatches()
-    Iterators
-      .wrap(new Iterator[ColumnarBatch] {
-
-        override def hasNext: Boolean = {
-          reader.loadNextBatch()
-        }
-
-        override def next: ColumnarBatch = {
-          val root = reader.getVectorSchemaRoot
-          val unloader = new VectorUnloader(root)
-
-          val batch = ArrowUtil.loadBatch(
-            allocator,
-            unloader.getRecordBatch,
-            actualReadFields,
-            partitionVectors,
-            nullVectors)
-          batch
-        }
-      })
-      .recycleIterator {
-        scanner.close()
-        dataset.close()
-        factory.close()
-        reader.close()
-        partitionVectors.foreach(_.close())
-        nullVectors.foreach(_.close())
-      }
-      .recyclePayload(_.close())
-      .create()
-  }
-
-  def checkHeader(
-      file: PartitionedFile,
-      actualDataSchema: StructType,
-      actualRequiredSchema: StructType,
-      parsedOptions: CSVOptions,
-      actualFilters: Seq[Filter],
-      conf: Configuration): Unit = {
-    val isStartOfFile = file.start == 0
-    if (!isStartOfFile) {
-      return
-    }
-    val parser =
-      new UnivocityParser(actualDataSchema, actualRequiredSchema, 
parsedOptions, actualFilters)
-    val schema = if (parsedOptions.columnPruning) actualRequiredSchema else 
actualDataSchema
-    val headerChecker = new CSVHeaderChecker(
-      schema,
-      parsedOptions,
-      source = s"CSV file: ${file.filePath}",
-      isStartOfFile)
-
-    val lines = {
-      val linesReader =
-        new HadoopFileLinesReader(file, parser.options.lineSeparatorInRead, 
conf)
-      Option(TaskContext.get())
-        .foreach(_.addTaskCompletionListener[Unit](_ => linesReader.close()))
-      linesReader.map {
-        line => new String(line.getBytes, 0, line.getLength, 
parser.options.charset)
-      }
-    }
-    CSVHeaderCheckerHelper.checkHeaderColumnNames(headerChecker, lines, 
parser.tokenizer)
-  }
-
-  def rowToColumn(
-      schema: StructType,
-      batchSize: Int,
-      it: Iterator[InternalRow]): Iterator[ColumnarBatch] = {
-    val veloxBatch = RowToVeloxColumnarExec.toColumnarBatchIterator(
-      it,
-      schema,
-      batchSize,
-      VeloxConfig.get.veloxPreferredBatchBytes
-    )
-    veloxBatch
-      .map(v => ColumnarBatches.load(ArrowBufferAllocators.contextInstance(), 
v))
-  }
-
-  private def toAttribute(field: StructField): AttributeReference =
-    AttributeReference(field.name, field.dataType, field.nullable, 
field.metadata)()
-
-  private def toAttributes(schema: StructType): Seq[AttributeReference] = {
-    schema.map(toAttribute)
-  }
-
-  def withPartitionValue(
-      requiredSchema: StructType,
-      partitionSchema: StructType,
-      iter: Iterator[InternalRow],
-      file: PartitionedFile): (StructType, Iterator[InternalRow]) = {
-    val fullSchema = toAttributes(requiredSchema) ++ 
toAttributes(partitionSchema)
-
-    // Using lazy val to avoid serialization
-    lazy val appendPartitionColumns =
-      GenerateUnsafeProjection.generate(fullSchema, fullSchema)
-    // Using local val to avoid per-row lazy val check (pre-mature 
optimization?...)
-    val converter = appendPartitionColumns
-
-    // Note that we have to apply the converter even though 
`file.partitionValues` is empty.
-    // This is because the converter is also responsible for converting safe 
`InternalRow`s into
-    // `UnsafeRow`s.
-    if (partitionSchema.isEmpty) {
-      val rows = iter.map(dataRow => converter(dataRow))
-      (StructType(requiredSchema ++ partitionSchema), rows)
-    } else {
-      val joinedRow = new JoinedRow()
-      val rows = iter.map(dataRow => converter(joinedRow(dataRow, 
file.partitionValues)))
-      (StructType(requiredSchema ++ partitionSchema), rows)
-    }
-  }
-
-  def fallbackReadVanilla(
-      dataSchema: StructType,
-      requiredSchema: StructType,
-      conf: Configuration,
-      parsedOptions: CSVOptions,
-      file: PartitionedFile,
-      actualFilters: Seq[Filter],
-      columnPruning: Boolean): Iterator[InternalRow] = {
-    val actualDataSchema = StructType(
-      dataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))
-    val actualRequiredSchema = StructType(
-      requiredSchema.filterNot(_.name == 
parsedOptions.columnNameOfCorruptRecord))
-    val parser =
-      new UnivocityParser(actualDataSchema, actualRequiredSchema, 
parsedOptions, actualFilters)
-    val schema = if (columnPruning) actualRequiredSchema else actualDataSchema
-    val isStartOfFile = file.start == 0
-    val headerChecker = new CSVHeaderChecker(
-      schema,
-      parsedOptions,
-      source = s"CSV file: ${file.filePath}",
-      isStartOfFile)
-    CSVDataSource(parsedOptions).readFile(conf, file, parser, headerChecker, 
requiredSchema)
-  }
-}
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVOptionConverter.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVOptionConverter.scala
deleted file mode 100644
index 7d6a54c2ac..0000000000
--- 
a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVOptionConverter.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gluten.datasource
-
-import org.apache.gluten.utils.ArrowAbiUtil
-
-import org.apache.spark.sql.catalyst.csv.CSVOptions
-import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.utils.SparkSchemaUtil
-
-import com.google.common.collect.ImmutableMap
-import org.apache.arrow.c.ArrowSchema
-import org.apache.arrow.dataset.scanner.csv.{CsvConvertOptions, 
CsvFragmentScanOptions}
-import org.apache.arrow.memory.BufferAllocator
-
-import java.util
-
-object ArrowCSVOptionConverter {
-  def convert(option: CSVOptions): CsvFragmentScanOptions = {
-    val parseMap = new util.HashMap[String, String]()
-    val default = new CSVOptions(
-      CaseInsensitiveMap(Map()),
-      option.columnPruning,
-      SparkSchemaUtil.getLocalTimezoneID)
-    parseMap.put("strings_can_be_null", "true")
-    if (option.delimiter != default.delimiter) {
-      parseMap.put("delimiter", option.delimiter)
-    }
-    if (option.escapeQuotes != default.escapeQuotes) {
-      parseMap.put("quoting", (!option.escapeQuotes).toString)
-    }
-
-    val convertOptions = new CsvConvertOptions(ImmutableMap.of())
-    new CsvFragmentScanOptions(convertOptions, ImmutableMap.of(), parseMap)
-  }
-
-  def schema(
-      requiredSchema: StructType,
-      cSchema: ArrowSchema,
-      allocator: BufferAllocator,
-      option: CsvFragmentScanOptions): Unit = {
-    val schema = SparkSchemaUtil.toArrowSchema(requiredSchema)
-    ArrowAbiUtil.exportSchema(allocator, schema, cSchema)
-    option.getConvertOptions.setArrowSchema(cSchema)
-  }
-
-}
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVPartitionReaderFactory.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVPartitionReaderFactory.scala
deleted file mode 100644
index c930cebebe..0000000000
--- 
a/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVPartitionReaderFactory.scala
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gluten.datasource.v2
-
-import org.apache.gluten.datasource.{ArrowCSVFileFormat, 
ArrowCSVOptionConverter}
-import org.apache.gluten.exception.SchemaMismatchException
-import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
-import org.apache.gluten.memory.arrow.pool.ArrowNativeMemoryPool
-import org.apache.gluten.utils.ArrowUtil
-
-import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.csv.CSVOptions
-import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader}
-import org.apache.spark.sql.execution.datasources.PartitionedFile
-import org.apache.spark.sql.execution.datasources.v2.FilePartitionReaderFactory
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.Filter
-import org.apache.spark.sql.types.{StructField, StructType}
-import org.apache.spark.sql.vectorized.ColumnarBatch
-import org.apache.spark.task.TaskResources
-import org.apache.spark.util.SerializableConfiguration
-
-import org.apache.arrow.c.ArrowSchema
-import org.apache.arrow.vector.types.pojo.Schema
-
-import java.net.URLDecoder
-import java.util.Optional
-
-import scala.collection.JavaConverters.asJavaIterableConverter
-
-case class ArrowCSVPartitionReaderFactory(
-    sqlConf: SQLConf,
-    broadcastedConf: Broadcast[SerializableConfiguration],
-    dataSchema: StructType,
-    readDataSchema: StructType,
-    readPartitionSchema: StructType,
-    options: CSVOptions,
-    filters: Seq[Filter])
-  extends FilePartitionReaderFactory
-  with Logging {
-
-  private val batchSize = sqlConf.parquetVectorizedReaderBatchSize
-  private val csvColumnPruning: Boolean = sqlConf.csvColumnPruning
-  private val fileFormat = org.apache.arrow.dataset.file.FileFormat.CSV
-  var fallback = false
-
-  override def supportColumnarReads(partition: InputPartition): Boolean = true
-
-  override def buildReader(partitionedFile: PartitionedFile): 
PartitionReader[InternalRow] = {
-    // disable row based read
-    throw new UnsupportedOperationException
-  }
-
-  override def buildColumnarReader(
-      partitionedFile: PartitionedFile): PartitionReader[ColumnarBatch] = {
-    val actualDataSchema = StructType(
-      dataSchema.filterNot(_.name == options.columnNameOfCorruptRecord))
-    val actualRequiredSchema = StructType(
-      readDataSchema.filterNot(_.name == options.columnNameOfCorruptRecord))
-    ArrowCSVFileFormat.checkHeader(
-      partitionedFile,
-      actualDataSchema,
-      actualRequiredSchema,
-      options,
-      filters,
-      broadcastedConf.value.value)
-    val (allocator, pool) = if (!TaskResources.inSparkTask()) {
-      TaskResources.runUnsafe(
-        (
-          ArrowBufferAllocators.contextInstance(),
-          ArrowNativeMemoryPool.arrowPool("FileSystemFactory"))
-      )
-    } else {
-      (
-        ArrowBufferAllocators.contextInstance(),
-        ArrowNativeMemoryPool.arrowPool("FileSystemFactory"))
-    }
-    val arrowConfig = ArrowCSVOptionConverter.convert(options)
-    val fileNames = ArrowUtil
-      .readArrowFileColumnNames(
-        URLDecoder.decode(partitionedFile.filePath.toString, "UTF-8"),
-        fileFormat,
-        arrowConfig,
-        ArrowBufferAllocators.contextInstance(),
-        pool)
-    val tokenIndexArr =
-      actualRequiredSchema.map(f => 
java.lang.Integer.valueOf(actualDataSchema.indexOf(f))).toArray
-    val fileIndex = tokenIndexArr.filter(_ < fileNames.length)
-    val requestSchema = new StructType(
-      fileIndex
-        .map(index => StructField(fileNames(index), 
actualDataSchema(index).dataType)))
-    val missingIndex = tokenIndexArr.filter(_ >= fileNames.length)
-    val missingSchema = new StructType(missingIndex.map(actualDataSchema(_)))
-    // TODO: support array/map/struct types in out-of-order schema reading.
-    val cSchema: ArrowSchema = ArrowSchema.allocateNew(allocator)
-    val cSchema2: ArrowSchema = ArrowSchema.allocateNew(allocator)
-    // TODO: support array/map/struct types in out-of-order schema reading.
-    val iter =
-      try {
-        ArrowCSVOptionConverter.schema(requestSchema, cSchema, allocator, 
arrowConfig)
-        val factory =
-          ArrowUtil.makeArrowDiscovery(
-            URLDecoder.decode(partitionedFile.filePath.toString, "UTF-8"),
-            fileFormat,
-            Optional.of(arrowConfig),
-            ArrowBufferAllocators.contextInstance(),
-            pool)
-        val fields = factory.inspect().getFields
-        val actualReadFields = new Schema(
-          fileIndex.map(index => fields.get(index)).toIterable.asJava)
-        ArrowCSVOptionConverter.schema(requestSchema, cSchema2, allocator, 
arrowConfig)
-        ArrowCSVFileFormat
-          .readArrow(
-            ArrowBufferAllocators.contextInstance(),
-            partitionedFile,
-            actualReadFields,
-            missingSchema,
-            readPartitionSchema,
-            factory,
-            batchSize,
-            arrowConfig)
-      } catch {
-        case e: SchemaMismatchException =>
-          logWarning(e.getMessage)
-          fallback = true
-          val iter = ArrowCSVFileFormat.fallbackReadVanilla(
-            dataSchema,
-            readDataSchema,
-            broadcastedConf.value.value,
-            options,
-            partitionedFile,
-            filters,
-            csvColumnPruning)
-          val (schema, rows) = ArrowCSVFileFormat.withPartitionValue(
-            readDataSchema,
-            readPartitionSchema,
-            iter,
-            partitionedFile)
-          ArrowCSVFileFormat.rowToColumn(schema, batchSize, rows)
-        case d: Exception => throw d
-      } finally {
-        cSchema.close()
-        cSchema2.close()
-      }
-
-    new PartitionReader[ColumnarBatch] {
-
-      override def next(): Boolean = {
-        iter.hasNext
-      }
-
-      override def get(): ColumnarBatch = {
-        iter.next()
-      }
-
-      override def close(): Unit = {}
-    }
-  }
-
-}
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVScan.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVScan.scala
deleted file mode 100644
index ce3f847704..0000000000
--- 
a/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVScan.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gluten.datasource.v2
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.csv.CSVOptions
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.connector.read.PartitionReaderFactory
-import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
-import org.apache.spark.sql.execution.datasources.v2.FileScan
-import org.apache.spark.sql.sources.Filter
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.util.CaseInsensitiveStringMap
-import org.apache.spark.util.SerializableConfiguration
-
-import org.apache.hadoop.fs.Path
-
-import scala.collection.JavaConverters.mapAsScalaMapConverter
-
-case class ArrowCSVScan(
-    sparkSession: SparkSession,
-    fileIndex: PartitioningAwareFileIndex,
-    dataSchema: StructType,
-    readDataSchema: StructType,
-    readPartitionSchema: StructType,
-    pushedFilters: Array[Filter],
-    options: CaseInsensitiveStringMap,
-    partitionFilters: Seq[Expression] = Seq.empty,
-    dataFilters: Seq[Expression] = Seq.empty)
-  extends FileScan {
-
-  private lazy val parsedOptions: CSVOptions = new CSVOptions(
-    options.asScala.toMap,
-    columnPruning = sparkSession.sessionState.conf.csvColumnPruning,
-    sparkSession.sessionState.conf.sessionLocalTimeZone,
-    sparkSession.sessionState.conf.columnNameOfCorruptRecord
-  )
-
-  override def isSplitable(path: Path): Boolean = {
-    false
-  }
-
-  override def createReaderFactory(): PartitionReaderFactory = {
-    val caseSensitiveMap = options.asCaseSensitiveMap().asScala.toMap
-    val hconf = 
sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
-    val broadcastedConf =
-      sparkSession.sparkContext.broadcast(new SerializableConfiguration(hconf))
-    val actualFilters =
-      
pushedFilters.filterNot(_.references.contains(parsedOptions.columnNameOfCorruptRecord))
-    ArrowCSVPartitionReaderFactory(
-      sparkSession.sessionState.conf,
-      broadcastedConf,
-      dataSchema,
-      readDataSchema,
-      readPartitionSchema,
-      parsedOptions,
-      actualFilters)
-  }
-
-  def withFilters(partitionFilters: Seq[Expression], dataFilters: 
Seq[Expression]): FileScan =
-    this.copy(partitionFilters = partitionFilters, dataFilters = dataFilters)
-}
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVScanBuilder.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVScanBuilder.scala
deleted file mode 100644
index 2b3991fe29..0000000000
--- 
a/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVScanBuilder.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gluten.datasource.v2
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.connector.read.Scan
-import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
-import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.util.CaseInsensitiveStringMap
-
-case class ArrowCSVScanBuilder(
-    sparkSession: SparkSession,
-    fileIndex: PartitioningAwareFileIndex,
-    schema: StructType,
-    dataSchema: StructType,
-    options: CaseInsensitiveStringMap)
-  extends FileScanBuilder(sparkSession, fileIndex, dataSchema) {
-
-  override def build(): Scan = {
-    ArrowCSVScan(
-      sparkSession,
-      fileIndex,
-      dataSchema,
-      readDataSchema(),
-      readPartitionSchema(),
-      Array.empty,
-      options)
-  }
-}
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVTable.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVTable.scala
deleted file mode 100644
index 3eaf4e35fd..0000000000
--- 
a/backends-velox/src/main/scala/org/apache/gluten/datasource/v2/ArrowCSVTable.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gluten.datasource.v2
-
-import org.apache.gluten.datasource.ArrowCSVOptionConverter
-import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
-import org.apache.gluten.memory.arrow.pool.ArrowNativeMemoryPool
-import org.apache.gluten.utils.ArrowUtil
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.csv.CSVOptions
-import org.apache.spark.sql.connector.read.ScanBuilder
-import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-import org.apache.spark.sql.execution.datasources.FileFormat
-import org.apache.spark.sql.execution.datasources.v2.FileTable
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.util.CaseInsensitiveStringMap
-import org.apache.spark.task.TaskResources
-
-import org.apache.hadoop.fs.FileStatus
-
-import scala.collection.JavaConverters.mapAsScalaMapConverter
-
-case class ArrowCSVTable(
-    name: String,
-    sparkSession: SparkSession,
-    options: CaseInsensitiveStringMap,
-    paths: Seq[String],
-    userSpecifiedSchema: Option[StructType],
-    fallbackFileFormat: Class[_ <: FileFormat])
-  extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
-
-  override def inferSchema(files: Seq[FileStatus]): Option[StructType] = {
-    val (allocator, pool) = if (!TaskResources.inSparkTask()) {
-      TaskResources.runUnsafe(
-        (ArrowBufferAllocators.contextInstance(), 
ArrowNativeMemoryPool.arrowPool("inferSchema"))
-      )
-    } else {
-      (ArrowBufferAllocators.contextInstance(), 
ArrowNativeMemoryPool.arrowPool("inferSchema"))
-    }
-    val parsedOptions: CSVOptions = new CSVOptions(
-      options.asScala.toMap,
-      columnPruning = sparkSession.sessionState.conf.csvColumnPruning,
-      sparkSession.sessionState.conf.sessionLocalTimeZone,
-      sparkSession.sessionState.conf.columnNameOfCorruptRecord
-    )
-    val arrowConfig = ArrowCSVOptionConverter.convert(parsedOptions)
-    ArrowUtil.readSchema(
-      files.head,
-      org.apache.arrow.dataset.file.FileFormat.CSV,
-      arrowConfig,
-      allocator,
-      pool
-    )
-  }
-
-  override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder 
= {
-    ArrowCSVScanBuilder(sparkSession, fileIndex, schema, dataSchema, options)
-  }
-
-  override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
-    throw new UnsupportedOperationException
-  }
-
-  override def formatName: String = "arrowcsv"
-}
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/datasource/v2/ArrowBatchScanExec.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/execution/datasource/v2/ArrowBatchScanExec.scala
deleted file mode 100644
index 4591192b2b..0000000000
--- 
a/backends-velox/src/main/scala/org/apache/gluten/execution/datasource/v2/ArrowBatchScanExec.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gluten.execution.datasource.v2
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.execution.BaseArrowScanExec
-import org.apache.spark.sql.execution.datasources.v2.{ArrowBatchScanExecShim, 
BatchScanExec}
-import org.apache.spark.sql.execution.metric.SQLMetrics
-import org.apache.spark.sql.vectorized.ColumnarBatch
-
-case class ArrowBatchScanExec(original: BatchScanExec)
-  extends ArrowBatchScanExecShim(original)
-  with BaseArrowScanExec {
-  override def doCanonicalize(): ArrowBatchScanExec =
-    this.copy(original = original.doCanonicalize())
-
-  override def nodeName: String = "Arrow" + original.nodeName
-
-  override lazy val metrics = {
-    Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows")) ++
-      customMetrics
-  }
-
-  override def doExecuteColumnar(): RDD[ColumnarBatch] = {
-    val numOutputRows = longMetric("numOutputRows")
-    inputRDD.asInstanceOf[RDD[ColumnarBatch]].map {
-      b =>
-        numOutputRows += b.numRows()
-        b
-    }
-  }
-}
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowConvertorRule.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowConvertorRule.scala
deleted file mode 100644
index 009674e810..0000000000
--- 
a/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowConvertorRule.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gluten.extension
-
-import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.datasource.ArrowCSVFileFormat
-import org.apache.gluten.datasource.v2.ArrowCSVTable
-
-import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.csv.CSVOptions
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, PermissiveMode}
-import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
-import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-import org.apache.spark.sql.execution.datasources.v2.csv.CSVTable
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.utils.SparkArrowUtil
-
-import java.nio.charset.StandardCharsets
-
-import scala.collection.convert.ImplicitConversions.`map AsScala`
-
-/**
- * Extracts a CSVTable from a DataSourceV2Relation.
- *
- * Only the table variable of DataSourceV2Relation is accessed to improve 
compatibility across
- * different Spark versions.
- * @since Spark
- *   4.1
- */
-private object CSVTableExtractor {
-  def unapply(relation: DataSourceV2Relation): Option[(DataSourceV2Relation, 
CSVTable)] = {
-    relation.table match {
-      case t: CSVTable =>
-        Some((relation, t))
-      case _ => None
-    }
-  }
-}
-
-@Experimental
-case class ArrowConvertorRule(session: SparkSession) extends Rule[LogicalPlan] 
{
-  override def apply(plan: LogicalPlan): LogicalPlan = {
-    if (!BackendsApiManager.getSettings.enableNativeArrowReadFiles()) {
-      return plan
-    }
-    plan.resolveOperators {
-      case l: LogicalRelation =>
-        l.relation match {
-          case r @ HadoopFsRelation(_, _, dataSchema, _, _: CSVFileFormat, 
options)
-              if validate(session, dataSchema, options) =>
-            val csvOptions = new CSVOptions(
-              options,
-              columnPruning = session.sessionState.conf.csvColumnPruning,
-              session.sessionState.conf.sessionLocalTimeZone)
-            l.copy(relation = r.copy(fileFormat = new 
ArrowCSVFileFormat(csvOptions))(session))
-          case _ => l
-        }
-      case CSVTableExtractor(d, t)
-          if validate(session, t.dataSchema, 
t.options.asCaseSensitiveMap().toMap) =>
-        d.copy(table = ArrowCSVTable(
-          "arrow" + t.name,
-          t.sparkSession,
-          t.options,
-          t.paths,
-          t.userSpecifiedSchema,
-          t.fallbackFileFormat))
-      case r =>
-        r
-    }
-  }
-
-  private def validate(
-      session: SparkSession,
-      dataSchema: StructType,
-      options: Map[String, String]): Boolean = {
-    val csvOptions = new CSVOptions(
-      options,
-      columnPruning = session.sessionState.conf.csvColumnPruning,
-      session.sessionState.conf.sessionLocalTimeZone)
-    SparkArrowUtil.checkSchema(dataSchema) &&
-    checkCsvOptions(csvOptions, 
session.sessionState.conf.sessionLocalTimeZone) &&
-    dataSchema.nonEmpty
-  }
-
-  private def checkCsvOptions(csvOptions: CSVOptions, timeZone: String): 
Boolean = {
-    val default = new CSVOptions(CaseInsensitiveMap(Map()), 
csvOptions.columnPruning, timeZone)
-    csvOptions.headerFlag && !csvOptions.multiLine &&
-    csvOptions.delimiter.length == 1 &&
-    csvOptions.quote == '\"' &&
-    csvOptions.escape == '\\' &&
-    csvOptions.lineSeparator.isEmpty &&
-    csvOptions.charset == StandardCharsets.UTF_8.name() &&
-    csvOptions.parseMode == PermissiveMode && !csvOptions.inferSchemaFlag &&
-    csvOptions.nullValue == "" &&
-    csvOptions.emptyValueInRead == "" && csvOptions.comment == '\u0000' &&
-    csvOptions.columnPruning &&
-    csvOptions.dateFormatInRead == default.dateFormatInRead &&
-    csvOptions.timestampFormatInRead == default.timestampFormatInRead &&
-    csvOptions.timestampNTZFormatInRead == default.timestampNTZFormatInRead
-  }
-
-}
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowScanReplaceRule.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowScanReplaceRule.scala
deleted file mode 100644
index adfc6ca742..0000000000
--- 
a/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowScanReplaceRule.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gluten.extension
-
-import org.apache.gluten.datasource.ArrowCSVFileFormat
-import org.apache.gluten.datasource.v2.ArrowCSVScan
-import org.apache.gluten.execution.datasource.v2.ArrowBatchScanExec
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.{ArrowFileSourceScanExec, 
FileSourceScanExec, SparkPlan}
-import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
-
-case class ArrowScanReplaceRule(spark: SparkSession) extends Rule[SparkPlan] {
-  override def apply(plan: SparkPlan): SparkPlan = {
-    plan.transformUp {
-      case plan: FileSourceScanExec if 
plan.relation.fileFormat.isInstanceOf[ArrowCSVFileFormat] =>
-        ArrowFileSourceScanExec(plan)
-      case plan: BatchScanExec if plan.scan.isInstanceOf[ArrowCSVScan] =>
-        ArrowBatchScanExec(plan)
-      case plan: BatchScanExec => plan
-      case p => p
-    }
-  }
-}
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala
 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala
deleted file mode 100644
index 16b8fb0e9f..0000000000
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.vectorized.ColumnarBatch
-
-import scala.concurrent.duration.NANOSECONDS
-
-case class ArrowFileSourceScanExec(original: FileSourceScanExec)
-  extends ArrowFileSourceScanLikeShim(original)
-  with BaseArrowScanExec {
-
-  lazy val inputRDD: RDD[InternalRow] = original.inputRDD
-
-  override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
-
-  override def output: Seq[Attribute] = original.output
-
-  override def doCanonicalize(): FileSourceScanExec = original.doCanonicalize()
-
-  override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
-    val numOutputRows = longMetric("numOutputRows")
-    val scanTime = longMetric("scanTime")
-    inputRDD.asInstanceOf[RDD[ColumnarBatch]].mapPartitionsInternal {
-      batches =>
-        new Iterator[ColumnarBatch] {
-
-          override def hasNext: Boolean = {
-            // The `FileScanRDD` returns an iterator which scans the file 
during the `hasNext` call.
-            val startNs = System.nanoTime()
-            val res = batches.hasNext
-            scanTime += NANOSECONDS.toMillis(System.nanoTime() - startNs)
-            res
-          }
-
-          override def next(): ColumnarBatch = {
-            val batch = batches.next()
-            numOutputRows += batch.numRows()
-            batch
-          }
-        }
-    }
-  }
-}
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
deleted file mode 100644
index 13d0f5699b..0000000000
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution
-
-import org.apache.gluten.backendsapi.arrow.ArrowBatchTypes
-import org.apache.gluten.execution.GlutenPlan
-import org.apache.gluten.extension.columnar.transition.Convention
-
-trait BaseArrowScanExec extends GlutenPlan {
-  final override def batchType(): Convention.BatchType = {
-    ArrowBatchTypes.ArrowJavaBatchType
-  }
-
-  final override def rowType(): Convention.RowType = Convention.RowType.None
-}
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/ArrowCsvScanSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/ArrowCsvScanSuite.scala
deleted file mode 100644
index 8e66c3a58f..0000000000
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/ArrowCsvScanSuite.scala
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gluten.execution
-
-import org.apache.gluten.config.GlutenConfig
-import org.apache.gluten.datasource.ArrowCSVFileFormat
-
-import org.apache.spark.SparkConf
-import org.apache.spark.sql.execution.{ArrowFileSourceScanExec, 
BaseArrowScanExec, ColumnarToRowExec}
-import org.apache.spark.sql.execution.columnar.SparkCacheUtil
-import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
-
-import org.scalatest.Ignore
-
-@Ignore
-class ArrowCsvScanSuiteV1 extends ArrowCsvScanSuiteBase {
-  override protected def sparkConf: SparkConf = {
-    super.sparkConf
-      .set("spark.sql.sources.useV1SourceList", "csv")
-  }
-
-  test("csv scan v1") {
-    val df = runAndCompare("select * from student")
-    val plan = df.queryExecution.executedPlan
-    assert(plan.find(s => s.isInstanceOf[ColumnarToRowExec]).isDefined)
-    assert(plan.find(_.isInstanceOf[BaseArrowScanExec]).isDefined)
-    val scan = plan.find(_.isInstanceOf[BaseArrowScanExec]).toList.head
-    assert(
-      scan
-        .asInstanceOf[ArrowFileSourceScanExec]
-        .relation
-        .fileFormat
-        .isInstanceOf[ArrowCSVFileFormat])
-  }
-
-  test("csv scan with schema v1") {
-    val df = runAndCompare("select * from student_option_schema")
-    val plan = df.queryExecution.executedPlan
-    assert(plan.find(s => s.isInstanceOf[ColumnarToRowExec]).isDefined)
-    val scan = plan.find(_.isInstanceOf[BaseArrowScanExec])
-    assert(scan.isDefined)
-    assert(
-      !scan.get
-        .asInstanceOf[ArrowFileSourceScanExec]
-        .original
-        .relation
-        .fileFormat
-        .asInstanceOf[ArrowCSVFileFormat]
-        .fallback)
-  }
-}
-
-@Ignore
-class ArrowCsvScanSuiteV2 extends ArrowCsvScanSuite {
-  override protected def sparkConf: SparkConf = {
-    super.sparkConf
-      .set("spark.sql.sources.useV1SourceList", "")
-  }
-
-  test("csv scan") {
-    runAndCompare("select * from student")
-  }
-}
-
-@Ignore
-class ArrowCsvScanWithTableCacheSuite extends ArrowCsvScanSuiteBase {
-  override protected def sparkConf: SparkConf = {
-    super.sparkConf
-      .set("spark.sql.sources.useV1SourceList", "csv")
-      .set(GlutenConfig.COLUMNAR_TABLE_CACHE_ENABLED.key, "true")
-  }
-
-  override def beforeAll(): Unit = {
-    super.beforeAll()
-    // A common practice as well as in Spark tests, to clear the cache 
serializer
-    // in case it was already set as the default row-based serializer.
-    SparkCacheUtil.clearCacheSerializer()
-  }
-
-  override def afterAll(): Unit = {
-    SparkCacheUtil.clearCacheSerializer()
-    super.afterAll()
-  }
-
-  /**
-   * Test for GLUTEN-8453: https://github.com/apache/gluten/issues/8453. To 
make sure no error is
-   * thrown when caching an Arrow Java query plan.
-   */
-  test("csv scan v1 with table cache") {
-    val df = spark.sql("select * from student")
-    df.cache()
-    assert(df.collect().length == 3)
-  }
-}
-
-/** Since https://github.com/apache/gluten/pull/5850. */
-@Ignore
-abstract class ArrowCsvScanSuite extends ArrowCsvScanSuiteBase {
-
-  test("csv scan with option string as null") {
-    val df = runAndCompare("select * from student_option_str")
-    val plan = df.queryExecution.executedPlan
-    assert(plan.find(_.isInstanceOf[ColumnarToRowExec]).isDefined)
-    assert(plan.find(_.isInstanceOf[BaseArrowScanExec]).isDefined)
-  }
-
-  test("csv scan with option delimiter") {
-    val df = runAndCompare("select * from student_option")
-    val plan = df.queryExecution.executedPlan
-    assert(plan.find(s => s.isInstanceOf[ColumnarToRowExec]).isDefined)
-    assert(plan.find(_.isInstanceOf[BaseArrowScanExec]).isDefined)
-  }
-
-  test("csv scan with missing columns") {
-    val df =
-      runAndCompare("select languagemissing, language, id_new_col from 
student_option_schema_lm")
-    val plan = df.queryExecution.executedPlan
-    assert(plan.find(s => s.isInstanceOf[VeloxColumnarToRowExec]).isDefined)
-    assert(plan.find(_.isInstanceOf[BaseArrowScanExec]).isDefined)
-  }
-
-  test("csv scan with different name") {
-    val df = runAndCompare("select * from student_option_schema")
-    val plan = df.queryExecution.executedPlan
-    assert(plan.find(s => s.isInstanceOf[ColumnarToRowExec]).isDefined)
-    assert(plan.find(_.isInstanceOf[BaseArrowScanExec]).isDefined)
-
-    val df2 = runAndCompare("select * from student_option_schema")
-    val plan2 = df2.queryExecution.executedPlan
-    assert(plan2.find(s => s.isInstanceOf[ColumnarToRowExec]).isDefined)
-    assert(plan2.find(_.isInstanceOf[BaseArrowScanExec]).isDefined)
-  }
-
-  test("csv scan with filter") {
-    val df = runAndCompare("select * from student where Name = 'Peter'")
-    assert(df.queryExecution.executedPlan.find(s => 
s.isInstanceOf[ColumnarToRowExec]).isEmpty)
-    assert(
-      df.queryExecution.executedPlan
-        .find(s => s.isInstanceOf[BaseArrowScanExec])
-        .isDefined)
-  }
-
-  test("insert into select from csv") {
-    withTable("insert_csv_t") {
-      spark.sql("create table insert_csv_t(Name string, Language string) using 
parquet;")
-      runQueryAndCompare("""
-                           |insert into insert_csv_t select * from student;
-                           |""".stripMargin) {
-        checkGlutenPlan[BaseArrowScanExec]
-      }
-    }
-  }
-}
-
-abstract class ArrowCsvScanSuiteBase extends VeloxWholeStageTransformerSuite {
-  override protected val resourcePath: String = "N/A"
-  override protected val fileFormat: String = "N/A"
-
-  protected val rootPath: String = getClass.getResource("/").getPath
-
-  override def beforeAll(): Unit = {
-    super.beforeAll()
-    createCsvTables()
-  }
-
-  override def afterAll(): Unit = {
-    super.afterAll()
-  }
-
-  override protected def sparkConf: SparkConf = {
-    super.sparkConf
-      .set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
-      .set("spark.sql.files.maxPartitionBytes", "1g")
-      .set("spark.sql.shuffle.partitions", "1")
-      .set("spark.memory.offHeap.size", "2g")
-      .set("spark.unsafe.exceptionOnMemoryLeak", "true")
-      .set("spark.sql.autoBroadcastJoinThreshold", "-1")
-      .set(GlutenConfig.NATIVE_ARROW_READER_ENABLED.key, "true")
-  }
-
-  private def createCsvTables(): Unit = {
-    spark.read
-      .format("csv")
-      .option("header", "true")
-      .load(rootPath + "/datasource/csv/student.csv")
-      .createOrReplaceTempView("student")
-
-    spark.read
-      .format("csv")
-      .option("header", "true")
-      .load(rootPath + "/datasource/csv/student_option_str.csv")
-      .createOrReplaceTempView("student_option_str")
-
-    spark.read
-      .format("csv")
-      .option("header", "true")
-      .option("delimiter", ";")
-      .load(rootPath + "/datasource/csv/student_option.csv")
-      .createOrReplaceTempView("student_option")
-
-    spark.read
-      .schema(
-        new StructType()
-          .add("id", StringType)
-          .add("name", StringType)
-          .add("language", StringType))
-      .format("csv")
-      .option("header", "true")
-      .load(rootPath + "/datasource/csv/student_option_schema.csv")
-      .createOrReplaceTempView("student_option_schema")
-
-    spark.read
-      .schema(
-        new StructType()
-          .add("id_new_col", IntegerType)
-          .add("name", StringType)
-          .add("language", StringType)
-          .add("languagemissing", StringType))
-      .format("csv")
-      .option("header", "true")
-      .load(rootPath + "/datasource/csv/student_option_schema.csv")
-      .createOrReplaceTempView("student_option_schema_lm")
-  }
-}
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
index fec55eff09..581d62aa8c 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
@@ -62,7 +62,6 @@ class MiscOperatorSuite extends 
VeloxWholeStageTransformerSuite with AdaptiveSpa
       .set("spark.unsafe.exceptionOnMemoryLeak", "true")
       .set("spark.sql.autoBroadcastJoinThreshold", "-1")
       .set("spark.sql.sources.useV1SourceList", "avro,parquet,csv")
-      .set(GlutenConfig.NATIVE_ARROW_READER_ENABLED.key, "true")
   }
 
   test("select_part_column") {
diff --git a/docs/Configuration.md b/docs/Configuration.md
index 4ed5cd4b27..6aaf25578b 100644
--- a/docs/Configuration.md
+++ b/docs/Configuration.md
@@ -125,7 +125,6 @@ nav_order: 15
 | spark.gluten.sql.fallbackUnexpectedMetadataParquet.samplePercentage | 🔄 
Dynamic    | 0.1               | The percentage of root paths to sample for 
metadata validation when the number of root paths is large. Value range is (0, 
1.0]. 1.0 means check all paths (no sampling). A smaller value reduces 
validation cost for tables with many partitions.                                
                                                                                
                                 [...]
 | spark.gluten.sql.injectNativePlanStringToExplain                    | 🔄 
Dynamic    | false             | When true, Gluten will inject native plan tree 
to Spark's explain output.                                                      
                                                                                
                                                                                
                                                                                
                   [...]
 | spark.gluten.sql.mergeTwoPhasesAggregate.enabled                    | 🔄 
Dynamic    | true              | Whether to merge two phases aggregate if there 
are no other operators between them.                                            
                                                                                
                                                                                
                                                                                
                   [...]
-| spark.gluten.sql.native.arrow.reader.enabled                        | 🔄 
Dynamic    | false             | This is config to specify whether to enable 
the native columnar csv reader                                                  
                                                                                
                                                                                
                                                                                
                      [...]
 | spark.gluten.sql.native.bloomFilter                                 | 🔄 
Dynamic    | true              |
 | spark.gluten.sql.native.hive.writer.enabled                         | 🔄 
Dynamic    | true              | This is config to specify whether to enable 
the native columnar writer for HiveFileFormat. Currently only supports 
HiveFileFormat with Parquet as the output file type.                            
                                                                                
                                                                                
                               [...]
 | spark.gluten.sql.native.hyperLogLog.Aggregate                       | 🔄 
Dynamic    | true              |
diff --git 
a/gluten-arrow/src/main/scala/org/apache/gluten/utils/ArrowUtil.scala 
b/gluten-arrow/src/main/scala/org/apache/gluten/utils/ArrowUtil.scala
index a94f8f2e3d..1354024879 100644
--- a/gluten-arrow/src/main/scala/org/apache/gluten/utils/ArrowUtil.scala
+++ b/gluten-arrow/src/main/scala/org/apache/gluten/utils/ArrowUtil.scala
@@ -18,31 +18,20 @@ package org.apache.gluten.utils
 
 import org.apache.gluten.vectorized.ArrowWritableColumnVector
 
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.execution.vectorized.ArrowColumnVectorUtils
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.utils.{SparkArrowUtil, SparkSchemaUtil}
 import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
 
 import org.apache.arrow.c.{ArrowSchema, CDataDictionaryProvider, Data}
-import org.apache.arrow.dataset.file.{FileFormat, FileSystemDatasetFactory}
-import org.apache.arrow.dataset.jni.NativeMemoryPool
-import org.apache.arrow.dataset.scanner.FragmentScanOptions
 import org.apache.arrow.memory.BufferAllocator
-import org.apache.arrow.vector.ipc.message.ArrowRecordBatch
 import org.apache.arrow.vector.types.pojo.{ArrowType, Field, Schema}
-import org.apache.hadoop.fs.FileStatus
 
-import java.net.{URI, URLDecoder}
 import java.util
-import java.util.Optional
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable
 
-object ArrowUtil extends Logging {
+object ArrowUtil {
 
   private val defaultTimeZoneId = SparkSchemaUtil.getLocalTimezoneID
 
@@ -97,159 +86,4 @@ object ArrowUtil extends Logging {
     new Schema(fields)
   }
 
-  private def rewriteUri(encodeUri: String): String = {
-    val decodedUri = encodeUri
-    val uri = URI.create(decodedUri)
-    if (uri.getScheme == "s3" || uri.getScheme == "s3a") {
-      val s3Rewritten =
-        new URI("s3", uri.getAuthority, uri.getPath, uri.getQuery, 
uri.getFragment).toString
-      return s3Rewritten
-    }
-    val sch = uri.getScheme match {
-      case "hdfs" => "hdfs"
-      case "file" => "file"
-    }
-    val ssp = uri.getScheme match {
-      case "hdfs" => uri.getSchemeSpecificPart
-      case "file" => "//" + uri.getSchemeSpecificPart
-    }
-    val rewritten = new URI(sch, ssp, uri.getFragment)
-    rewritten.toString
-  }
-
-  def makeArrowDiscovery(
-      encodedUri: String,
-      format: FileFormat,
-      option: Optional[FragmentScanOptions],
-      allocator: BufferAllocator,
-      pool: NativeMemoryPool
-  ): FileSystemDatasetFactory = {
-    val factory =
-      new FileSystemDatasetFactory(allocator, pool, format, 
rewriteUri(encodedUri), option)
-    factory
-  }
-
-  def readArrowSchema(
-      file: String,
-      format: FileFormat,
-      option: FragmentScanOptions,
-      allocator: BufferAllocator,
-      pool: NativeMemoryPool): Schema = {
-    val factory: FileSystemDatasetFactory =
-      makeArrowDiscovery(file, format, Optional.of(option), allocator, pool)
-    val schema = factory.inspect()
-    factory.close()
-    schema
-  }
-
-  def readArrowFileColumnNames(
-      file: String,
-      format: FileFormat,
-      option: FragmentScanOptions,
-      allocator: BufferAllocator,
-      pool: NativeMemoryPool): Array[String] = {
-    val fileFields = ArrowUtil
-      .readArrowSchema(URLDecoder.decode(file, "UTF-8"), format, option, 
allocator, pool)
-      .getFields
-      .asScala
-    fileFields.map(_.getName).toArray
-  }
-
-  def readSchema(
-      file: FileStatus,
-      format: FileFormat,
-      option: FragmentScanOptions,
-      allocator: BufferAllocator,
-      pool: NativeMemoryPool): Option[StructType] = {
-    val factory: FileSystemDatasetFactory =
-      makeArrowDiscovery(file.getPath.toString, format, Optional.of(option), 
allocator, pool)
-    val schema = factory.inspect()
-    try {
-      Option(SparkSchemaUtil.fromArrowSchema(schema))
-    } finally {
-      factory.close()
-    }
-  }
-
-  def readSchema(
-      files: Seq[FileStatus],
-      format: FileFormat,
-      option: FragmentScanOptions,
-      allocator: BufferAllocator,
-      pool: NativeMemoryPool): Option[StructType] = {
-    if (files.isEmpty) {
-      throw new IllegalArgumentException("No input file specified")
-    }
-
-    readSchema(files.head, format, option, allocator, pool)
-  }
-
-  def loadMissingColumns(
-      rowCount: Int,
-      missingSchema: StructType): Array[ArrowWritableColumnVector] = {
-
-    val vectors =
-      ArrowWritableColumnVector.allocateColumns(rowCount, missingSchema)
-    vectors.foreach {
-      vector =>
-        vector.putNulls(0, rowCount)
-        vector.setValueCount(rowCount)
-    }
-
-    vectors
-  }
-
-  def loadPartitionColumns(
-      rowCount: Int,
-      partitionSchema: StructType,
-      partitionValues: InternalRow): Array[ArrowWritableColumnVector] = {
-    val partitionColumns = ArrowWritableColumnVector.allocateColumns(rowCount, 
partitionSchema)
-    (0 until partitionColumns.length).foreach(
-      i => {
-        ArrowColumnVectorUtils.populate(partitionColumns(i), partitionValues, 
i)
-        partitionColumns(i).setValueCount(rowCount)
-        partitionColumns(i).setIsConstant()
-      })
-
-    partitionColumns
-  }
-
-  def loadBatch(
-      allocator: BufferAllocator,
-      input: ArrowRecordBatch,
-      dataSchema: Schema,
-      partitionVectors: Array[ArrowWritableColumnVector] = Array.empty,
-      nullVectors: Array[ArrowWritableColumnVector] = Array.empty): 
ColumnarBatch = {
-    val rowCount: Int = input.getLength
-
-    val vectors =
-      try {
-        ArrowWritableColumnVector.loadColumns(rowCount, dataSchema, input, 
allocator)
-      } finally {
-        input.close()
-      }
-
-    val totalVectors = if (nullVectors.nonEmpty) {
-      val finalVectors =
-        mutable.ArrayBuffer[ArrowWritableColumnVector]()
-      finalVectors.appendAll(vectors)
-      finalVectors.appendAll(nullVectors)
-      finalVectors.toArray
-    } else {
-      vectors
-    }
-
-    val batch = new ColumnarBatch(
-      totalVectors.map(_.asInstanceOf[ColumnVector]) ++
-        partitionVectors
-          .map {
-            vector =>
-              vector.setValueCount(rowCount)
-              vector.asInstanceOf[ColumnVector]
-          },
-      rowCount
-    )
-    batch
-  }
-
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index 79274b4c60..7f5d5a9c21 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -139,8 +139,6 @@ trait BackendSettingsApi {
 
   def enableNativeWriteFiles(): Boolean
 
-  def enableNativeArrowReadFiles(): Boolean = false
-
   def shouldRewriteCount(): Boolean = false
 
   def supportCartesianProductExec(): Boolean = false
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala 
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index acfb84cf96..414bf113db 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -344,8 +344,6 @@ class GlutenConfig(conf: SQLConf) extends 
GlutenCoreConfig(conf) {
   // Please use `BackendsApiManager.getSettings.enableNativeWriteFiles()` 
instead
   def enableNativeWriter: Option[Boolean] = getConf(NATIVE_WRITER_ENABLED)
 
-  def enableNativeArrowReader: Boolean = getConf(NATIVE_ARROW_READER_ENABLED)
-
   def enableColumnarProjectCollapse: Boolean = 
getConf(ENABLE_COLUMNAR_PROJECT_COLLAPSE)
 
   def enableColumnarPartialProject: Boolean = 
getConf(ENABLE_COLUMNAR_PARTIAL_PROJECT)
@@ -1353,12 +1351,6 @@ object GlutenConfig extends ConfigRegistry {
       .booleanConf
       .createWithDefault(true)
 
-  val NATIVE_ARROW_READER_ENABLED =
-    buildConf("spark.gluten.sql.native.arrow.reader.enabled")
-      .doc("This is config to specify whether to enable the native columnar 
csv reader")
-      .booleanConf
-      .createWithDefault(false)
-
   val NATIVE_WRITE_FILES_COLUMN_METADATA_EXCLUSION_LIST =
     buildConf("spark.gluten.sql.native.writeColumnMetadataExclusionList")
       .doc(
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenReadSchemaSuite.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenReadSchemaSuite.scala
index 6ea7dd9101..1ca70a2cb9 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenReadSchemaSuite.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenReadSchemaSuite.scala
@@ -16,27 +16,16 @@
  */
 package org.apache.spark.sql.execution.datasources
 
-import org.apache.gluten.config.GlutenConfig
-
-import org.apache.spark.SparkConf
 import org.apache.spark.sql.GlutenSQLTestsBaseTrait
 import org.apache.spark.sql.internal.SQLConf
 
 import java.io.File
 
-class GlutenCSVReadSchemaSuite extends CSVReadSchemaSuite with 
GlutenSQLTestsBaseTrait {
-
-  override def sparkConf: SparkConf =
-    super.sparkConf
-      .set(GlutenConfig.NATIVE_ARROW_READER_ENABLED.key, "true")
-}
-
-class GlutenHeaderCSVReadSchemaSuite extends HeaderCSVReadSchemaSuite with 
GlutenSQLTestsBaseTrait {
+class GlutenCSVReadSchemaSuite extends CSVReadSchemaSuite with 
GlutenSQLTestsBaseTrait {}
 
-  override def sparkConf: SparkConf =
-    super.sparkConf
-      .set(GlutenConfig.NATIVE_ARROW_READER_ENABLED.key, "true")
-}
+class GlutenHeaderCSVReadSchemaSuite
+  extends HeaderCSVReadSchemaSuite
+  with GlutenSQLTestsBaseTrait {}
 
 class GlutenJsonReadSchemaSuite extends JsonReadSchemaSuite with 
GlutenSQLTestsBaseTrait {}
 
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/csv/GlutenCSVSuite.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/csv/GlutenCSVSuite.scala
index 6cfa9f2028..14c6711ef3 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/csv/GlutenCSVSuite.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/csv/GlutenCSVSuite.scala
@@ -16,7 +16,6 @@
  */
 package org.apache.spark.sql.execution.datasources.csv
 
-import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.exception.GlutenException
 
 import org.apache.spark.{SparkConf, SparkException}
@@ -30,10 +29,6 @@ import java.sql.{Date, Timestamp}
 
 class GlutenCSVSuite extends CSVSuite with GlutenSQLTestsBaseTrait {
 
-  override def sparkConf: SparkConf =
-    super.sparkConf
-      .set(GlutenConfig.NATIVE_ARROW_READER_ENABLED.key, "true")
-
   /** Returns full path to the given file in the resource folder */
   override protected def testFile(fileName: String): String = {
     getWorkspaceFilePath("sql", "core", "src", "test", "resources").toString + 
"/" + fileName
@@ -49,11 +44,10 @@ class GlutenCSVv1Suite extends GlutenCSVSuite {
 class GlutenCSVv2Suite extends GlutenCSVSuite {
 
   import testImplicits._
+
   override def sparkConf: SparkConf =
     super.sparkConf
       .set(SQLConf.USE_V1_SOURCE_LIST, "")
-      .set(GlutenConfig.NATIVE_ARROW_READER_ENABLED.key, "true")
-
   override def testNameBlackList: Seq[String] = Seq(
     // overwritten with different test
     "test for FAILFAST parsing mode",
diff --git 
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenReadSchemaSuite.scala
 
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenReadSchemaSuite.scala
index 6ea7dd9101..1ca70a2cb9 100644
--- 
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenReadSchemaSuite.scala
+++ 
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenReadSchemaSuite.scala
@@ -16,27 +16,16 @@
  */
 package org.apache.spark.sql.execution.datasources
 
-import org.apache.gluten.config.GlutenConfig
-
-import org.apache.spark.SparkConf
 import org.apache.spark.sql.GlutenSQLTestsBaseTrait
 import org.apache.spark.sql.internal.SQLConf
 
 import java.io.File
 
-class GlutenCSVReadSchemaSuite extends CSVReadSchemaSuite with 
GlutenSQLTestsBaseTrait {
-
-  override def sparkConf: SparkConf =
-    super.sparkConf
-      .set(GlutenConfig.NATIVE_ARROW_READER_ENABLED.key, "true")
-}
-
-class GlutenHeaderCSVReadSchemaSuite extends HeaderCSVReadSchemaSuite with 
GlutenSQLTestsBaseTrait {
+class GlutenCSVReadSchemaSuite extends CSVReadSchemaSuite with 
GlutenSQLTestsBaseTrait {}
 
-  override def sparkConf: SparkConf =
-    super.sparkConf
-      .set(GlutenConfig.NATIVE_ARROW_READER_ENABLED.key, "true")
-}
+class GlutenHeaderCSVReadSchemaSuite
+  extends HeaderCSVReadSchemaSuite
+  with GlutenSQLTestsBaseTrait {}
 
 class GlutenJsonReadSchemaSuite extends JsonReadSchemaSuite with 
GlutenSQLTestsBaseTrait {}
 
diff --git 
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/csv/GlutenCSVSuite.scala
 
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/csv/GlutenCSVSuite.scala
index af0f59b9bc..3ae0abfd74 100644
--- 
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/csv/GlutenCSVSuite.scala
+++ 
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/csv/GlutenCSVSuite.scala
@@ -16,7 +16,6 @@
  */
 package org.apache.spark.sql.execution.datasources.csv
 
-import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.exception.GlutenException
 
 import org.apache.spark.{SparkConf, SparkException}
@@ -30,10 +29,6 @@ import java.sql.{Date, Timestamp}
 
 class GlutenCSVSuite extends CSVSuite with GlutenSQLTestsBaseTrait {
 
-  override def sparkConf: SparkConf =
-    super.sparkConf
-      .set(GlutenConfig.NATIVE_ARROW_READER_ENABLED.key, "true")
-
   /** Returns full path to the given file in the resource folder */
   override protected def testFile(fileName: String): String = {
     getWorkspaceFilePath("sql", "core", "src", "test", "resources").toString + 
"/" + fileName
@@ -49,11 +44,10 @@ class GlutenCSVv1Suite extends GlutenCSVSuite {
 class GlutenCSVv2Suite extends GlutenCSVSuite {
 
   import testImplicits._
+
   override def sparkConf: SparkConf =
     super.sparkConf
       .set(SQLConf.USE_V1_SOURCE_LIST, "")
-      .set(GlutenConfig.NATIVE_ARROW_READER_ENABLED.key, "true")
-
   override def testNameBlackList: Seq[String] = Seq(
     // overwritten with different test
     "test for FAILFAST parsing mode",
diff --git 
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenReadSchemaSuite.scala
 
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenReadSchemaSuite.scala
index 6ea7dd9101..1ca70a2cb9 100644
--- 
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenReadSchemaSuite.scala
+++ 
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenReadSchemaSuite.scala
@@ -16,27 +16,16 @@
  */
 package org.apache.spark.sql.execution.datasources
 
-import org.apache.gluten.config.GlutenConfig
-
-import org.apache.spark.SparkConf
 import org.apache.spark.sql.GlutenSQLTestsBaseTrait
 import org.apache.spark.sql.internal.SQLConf
 
 import java.io.File
 
-class GlutenCSVReadSchemaSuite extends CSVReadSchemaSuite with 
GlutenSQLTestsBaseTrait {
-
-  override def sparkConf: SparkConf =
-    super.sparkConf
-      .set(GlutenConfig.NATIVE_ARROW_READER_ENABLED.key, "true")
-}
-
-class GlutenHeaderCSVReadSchemaSuite extends HeaderCSVReadSchemaSuite with 
GlutenSQLTestsBaseTrait {
+class GlutenCSVReadSchemaSuite extends CSVReadSchemaSuite with 
GlutenSQLTestsBaseTrait {}
 
-  override def sparkConf: SparkConf =
-    super.sparkConf
-      .set(GlutenConfig.NATIVE_ARROW_READER_ENABLED.key, "true")
-}
+class GlutenHeaderCSVReadSchemaSuite
+  extends HeaderCSVReadSchemaSuite
+  with GlutenSQLTestsBaseTrait {}
 
 class GlutenJsonReadSchemaSuite extends JsonReadSchemaSuite with 
GlutenSQLTestsBaseTrait {}
 
diff --git 
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/csv/GlutenCSVSuite.scala
 
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/csv/GlutenCSVSuite.scala
index b137a1a77f..a554194ae2 100644
--- 
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/csv/GlutenCSVSuite.scala
+++ 
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/csv/GlutenCSVSuite.scala
@@ -16,7 +16,6 @@
  */
 package org.apache.spark.sql.execution.datasources.csv
 
-import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.exception.GlutenException
 
 import org.apache.spark.{SparkConf, SparkException}
@@ -30,10 +29,6 @@ import java.sql.{Date, Timestamp}
 
 class GlutenCSVSuite extends CSVSuite with GlutenSQLTestsBaseTrait {
 
-  override def sparkConf: SparkConf =
-    super.sparkConf
-      .set(GlutenConfig.NATIVE_ARROW_READER_ENABLED.key, "true")
-
   /** Returns full path to the given file in the resource folder */
   override protected def testFile(fileName: String): String = {
     "file://" + getWorkspaceFilePath(
@@ -54,11 +49,10 @@ class GlutenCSVv1Suite extends GlutenCSVSuite {
 class GlutenCSVv2Suite extends GlutenCSVSuite {
 
   import testImplicits._
+
   override def sparkConf: SparkConf =
     super.sparkConf
       .set(SQLConf.USE_V1_SOURCE_LIST, "")
-      .set(GlutenConfig.NATIVE_ARROW_READER_ENABLED.key, "true")
-
   override def testNameBlackList: Seq[String] = Seq(
     // overwritten with different test
     "test for FAILFAST parsing mode",
diff --git 
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
 
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
index ffa6759c74..05e1b88f76 100644
--- 
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
+++ 
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
@@ -138,12 +138,3 @@ abstract class BatchScanExecShim(
       Boolean.box(replicatePartitions))
   }
 }
-
-abstract class ArrowBatchScanExecShim(original: BatchScanExec)
-  extends BatchScanExecShim(
-    original.output,
-    original.scan,
-    original.runtimeFilters,
-    original.keyGroupedPartitioning,
-    table = null
-  )
diff --git 
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
 
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
index 9a54e01cd6..81446cf23a 100644
--- 
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
+++ 
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
@@ -143,24 +143,3 @@ abstract class BatchScanExecShim(
     }
   }
 }
-
-abstract class ArrowBatchScanExecShim(original: BatchScanExec)
-  extends BatchScanExecShim(
-    original.output,
-    original.scan,
-    original.runtimeFilters,
-    original.keyGroupedPartitioning,
-    original.ordering,
-    original.table,
-    original.commonPartitionValues,
-    original.applyPartialClustering,
-    original.replicatePartitions
-  ) {
-  override def scan: Scan = original.scan
-
-  override def ordering: Option[Seq[SortOrder]] = original.ordering
-
-  override def output: Seq[Attribute] = original.output
-
-  override def keyGroupedPartitioning: Option[Seq[Expression]] = 
original.keyGroupedPartitioning
-}
diff --git 
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
 
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
index 59bf431683..0767f5e9f4 100644
--- 
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
+++ 
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
@@ -145,22 +145,3 @@ abstract class BatchScanExecShim(
     }
   }
 }
-
-abstract class ArrowBatchScanExecShim(original: BatchScanExec)
-  extends BatchScanExecShim(
-    original.output,
-    original.scan,
-    original.runtimeFilters,
-    original.spjParams.keyGroupedPartitioning,
-    original.ordering,
-    original.table,
-    original.spjParams.commonPartitionValues,
-    original.spjParams.applyPartialClustering,
-    original.spjParams.replicatePartitions
-  ) {
-  override def scan: Scan = original.scan
-
-  override def ordering: Option[Seq[SortOrder]] = original.ordering
-
-  override def output: Seq[Attribute] = original.output
-}
diff --git 
a/shims/spark40/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
 
b/shims/spark40/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
index 242241254e..c00ebe11c0 100644
--- 
a/shims/spark40/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
+++ 
b/shims/spark40/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
@@ -153,24 +153,3 @@ abstract class BatchScanExecShim(
     }
   }
 }
-
-abstract class ArrowBatchScanExecShim(original: BatchScanExec)
-  extends BatchScanExecShim(
-    original.output,
-    original.scan,
-    original.runtimeFilters,
-    original.spjParams.keyGroupedPartitioning,
-    original.ordering,
-    original.table,
-    original.spjParams.joinKeyPositions,
-    original.spjParams.commonPartitionValues,
-    original.spjParams.reducers,
-    original.spjParams.applyPartialClustering,
-    original.spjParams.replicatePartitions
-  ) {
-  override def scan: Scan = original.scan
-
-  override def ordering: Option[Seq[SortOrder]] = original.ordering
-
-  override def output: Seq[Attribute] = original.output
-}
diff --git 
a/shims/spark41/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
 
b/shims/spark41/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
index 046c42ea7b..604a8c0ee2 100644
--- 
a/shims/spark41/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
+++ 
b/shims/spark41/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
@@ -154,24 +154,3 @@ abstract class BatchScanExecShim(
     }
   }
 }
-
-abstract class ArrowBatchScanExecShim(original: BatchScanExec)
-  extends BatchScanExecShim(
-    original.output,
-    original.scan,
-    original.runtimeFilters,
-    original.spjParams.keyGroupedPartitioning,
-    original.ordering,
-    original.table,
-    original.spjParams.joinKeyPositions,
-    original.spjParams.commonPartitionValues,
-    original.spjParams.reducers,
-    original.spjParams.applyPartialClustering,
-    original.spjParams.replicatePartitions
-  ) {
-  override def scan: Scan = original.scan
-
-  override def ordering: Option[Seq[SortOrder]] = original.ordering
-
-  override def output: Seq[Attribute] = original.output
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to