This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 7c465c4fd chore: Remove all remaining uses of legacy BatchReader from 
Comet [iceberg] (#3468)
7c465c4fd is described below

commit 7c465c4fdc2d3d125d559e82d5abc6c51d5fedd4
Author: Andy Grove <[email protected]>
AuthorDate: Wed Feb 25 09:41:41 2026 -0700

    chore: Remove all remaining uses of legacy BatchReader from Comet [iceberg] 
(#3468)
---
 .../java/org/apache/comet/parquet/BatchReader.java | 131 +-----------
 .../comet/parquet/IcebergCometBatchReader.java     |   2 +
 .../main/scala/org/apache/comet/CometConf.scala    |  17 --
 .../comet/parquet/CometReaderThreadPool.scala      |   5 -
 .../comet/parquet/CometParquetFileFormat.scala     | 119 ++++-------
 .../CometParquetPartitionReaderFactory.scala       | 233 ---------------------
 .../apache/comet/parquet/CometParquetScan.scala    |  94 ---------
 .../rules/EliminateRedundantTransitions.scala      |   5 +-
 .../spark/sql/comet/CometNativeScanExec.scala      |   4 +-
 .../org/apache/spark/sql/comet/CometScanExec.scala |  49 +----
 .../apache/comet/parquet/ParquetReadSuite.scala    | 208 +-----------------
 .../apache/comet/rules/CometScanRuleSuite.scala    |   6 -
 .../spark/sql/benchmark/CometReadBenchmark.scala   |  24 ---
 13 files changed, 56 insertions(+), 841 deletions(-)

diff --git a/common/src/main/java/org/apache/comet/parquet/BatchReader.java 
b/common/src/main/java/org/apache/comet/parquet/BatchReader.java
index d59145459..5a0bc9f6d 100644
--- a/common/src/main/java/org/apache/comet/parquet/BatchReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/BatchReader.java
@@ -24,10 +24,6 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.*;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
 
 import scala.Option;
 
@@ -36,9 +32,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
@@ -87,7 +81,10 @@ import org.apache.comet.vector.CometVector;
  *     reader.close();
  *   }
  * </pre>
+ *
+ * @deprecated since 0.14.0. This class is kept for Iceberg compatibility only.
  */
+@Deprecated
 @IcebergApi
 public class BatchReader extends RecordReader<Void, ColumnarBatch> implements 
Closeable {
   private static final Logger LOG = LoggerFactory.getLogger(FileReader.class);
@@ -110,8 +107,6 @@ public class BatchReader extends RecordReader<Void, 
ColumnarBatch> implements Cl
   protected AbstractColumnReader[] columnReaders;
   private CometSchemaImporter importer;
   protected ColumnarBatch currentBatch;
-  private Future<Option<Throwable>> prefetchTask;
-  private LinkedBlockingQueue<Pair<PageReadStore, Long>> prefetchQueue;
   private FileReader fileReader;
   private boolean[] missingColumns;
   protected boolean isInitialized;
@@ -363,26 +358,7 @@ public class BatchReader extends RecordReader<Void, 
ColumnarBatch> implements Cl
       }
     }
 
-    // Pre-fetching
-    boolean preFetchEnabled =
-        conf.getBoolean(
-            CometConf.COMET_SCAN_PREFETCH_ENABLED().key(),
-            (boolean) 
CometConf.COMET_SCAN_PREFETCH_ENABLED().defaultValue().get());
-
-    if (preFetchEnabled) {
-      LOG.info("Prefetch enabled for BatchReader.");
-      this.prefetchQueue = new LinkedBlockingQueue<>();
-    }
-
     isInitialized = true;
-    synchronized (this) {
-      // if prefetch is enabled, `init()` is called in separate thread. When
-      // `BatchReader.nextBatch()` is called asynchronously, it is possibly 
that
-      // `init()` is not called or finished. We need to hold on `nextBatch` 
until
-      // initialization of `BatchReader` is done. Once we are close to finish
-      // initialization, we notify the waiting thread of `nextBatch` to 
continue.
-      notifyAll();
-    }
   }
 
   /**
@@ -436,51 +412,13 @@ public class BatchReader extends RecordReader<Void, 
ColumnarBatch> implements Cl
     return currentBatch;
   }
 
-  // Only for testing
-  public Future<Option<Throwable>> getPrefetchTask() {
-    return this.prefetchTask;
-  }
-
-  // Only for testing
-  public LinkedBlockingQueue<Pair<PageReadStore, Long>> getPrefetchQueue() {
-    return this.prefetchQueue;
-  }
-
   /**
    * Loads the next batch of rows.
    *
    * @return true if there are no more rows to read, false otherwise.
    */
   public boolean nextBatch() throws IOException {
-    if (this.prefetchTask == null) {
-      Preconditions.checkState(isInitialized, "init() should be called 
first!");
-    } else {
-      // If prefetch is enabled, this reader will be initialized 
asynchronously from a
-      // different thread. Wait until it is initialized
-      while (!isInitialized) {
-        synchronized (this) {
-          try {
-            // Wait until initialization of current `BatchReader` is finished 
(i.e., `init()`),
-            // is done. It is possibly that `init()` is done after entering 
this while loop,
-            // so a short timeout is given.
-            wait(100);
-
-            // Checks if prefetch task is finished. If so, tries to get 
exception if any.
-            if (prefetchTask.isDone()) {
-              Option<Throwable> exception = prefetchTask.get();
-              if (exception.isDefined()) {
-                throw exception.get();
-              }
-            }
-          } catch (RuntimeException e) {
-            // Spark will check certain exception e.g. 
`SchemaColumnConvertNotSupportedException`.
-            throw e;
-          } catch (Throwable e) {
-            throw new IOException(e);
-          }
-        }
-      }
-    }
+    Preconditions.checkState(isInitialized, "init() should be called first!");
 
     if (rowsRead >= totalRowCount) return false;
     boolean hasMore;
@@ -547,7 +485,6 @@ public class BatchReader extends RecordReader<Void, 
ColumnarBatch> implements Cl
     }
   }
 
-  @SuppressWarnings("deprecation")
   private boolean loadNextRowGroupIfNecessary() throws Throwable {
     // More rows can be read from loaded row group. No need to load next one.
     if (rowsRead != totalRowsLoaded) return true;
@@ -556,21 +493,7 @@ public class BatchReader extends RecordReader<Void, 
ColumnarBatch> implements Cl
     SQLMetric numRowGroupsMetric = metrics.get("ParquetRowGroups");
     long startNs = System.nanoTime();
 
-    PageReadStore rowGroupReader = null;
-    if (prefetchTask != null && prefetchQueue != null) {
-      // Wait for pre-fetch task to finish.
-      Pair<PageReadStore, Long> rowGroupReaderPair = prefetchQueue.take();
-      rowGroupReader = rowGroupReaderPair.getLeft();
-
-      // Update incremental byte read metric. Because this metric in Spark is 
maintained
-      // by thread local variable, we need to manually update it.
-      // TODO: We may expose metrics from `FileReader` and get from it 
directly.
-      long incBytesRead = rowGroupReaderPair.getRight();
-      FileSystem.getAllStatistics().stream()
-          .forEach(statistic -> statistic.incrementBytesRead(incBytesRead));
-    } else {
-      rowGroupReader = fileReader.readNextRowGroup();
-    }
+    PageReadStore rowGroupReader = fileReader.readNextRowGroup();
 
     if (rowGroupTimeMetric != null) {
       rowGroupTimeMetric.add(System.nanoTime() - startNs);
@@ -608,48 +531,4 @@ public class BatchReader extends RecordReader<Void, 
ColumnarBatch> implements Cl
     totalRowsLoaded += rowGroupReader.getRowCount();
     return true;
   }
-
-  // Submits a prefetch task for this reader.
-  public void submitPrefetchTask(ExecutorService threadPool) {
-    this.prefetchTask = threadPool.submit(new PrefetchTask());
-  }
-
-  // A task for prefetching parquet row groups.
-  private class PrefetchTask implements Callable<Option<Throwable>> {
-    private long getBytesRead() {
-      return FileSystem.getAllStatistics().stream()
-          .mapToLong(s -> s.getThreadStatistics().getBytesRead())
-          .sum();
-    }
-
-    @Override
-    public Option<Throwable> call() throws Exception {
-      // Gets the bytes read so far.
-      long baseline = getBytesRead();
-
-      try {
-        init();
-
-        while (true) {
-          PageReadStore rowGroupReader = fileReader.readNextRowGroup();
-
-          if (rowGroupReader == null) {
-            // Reaches the end of row groups.
-            return Option.empty();
-          } else {
-            long incBytesRead = getBytesRead() - baseline;
-
-            prefetchQueue.add(Pair.of(rowGroupReader, incBytesRead));
-          }
-        }
-      } catch (Throwable e) {
-        // Returns exception thrown from the reader. The reader will re-throw 
it.
-        return Option.apply(e);
-      } finally {
-        if (fileReader != null) {
-          fileReader.closeStream();
-        }
-      }
-    }
-  }
 }
diff --git 
a/common/src/main/java/org/apache/comet/parquet/IcebergCometBatchReader.java 
b/common/src/main/java/org/apache/comet/parquet/IcebergCometBatchReader.java
index d4bfa2b87..bd66f2dea 100644
--- a/common/src/main/java/org/apache/comet/parquet/IcebergCometBatchReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/IcebergCometBatchReader.java
@@ -24,9 +24,11 @@ import java.util.HashMap;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.vectorized.ColumnarBatch;
 
+import org.apache.comet.IcebergApi;
 import org.apache.comet.vector.CometVector;
 
 /** This class is a public interface used by Apache Iceberg to read batches 
using Comet */
+@IcebergApi
 public class IcebergCometBatchReader extends BatchReader {
   public IcebergCometBatchReader(int numColumns, StructType schema) {
     this.columnReaders = new AbstractColumnReader[numColumns];
diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala 
b/common/src/main/scala/org/apache/comet/CometConf.scala
index 5ee777f3d..41b69952a 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -675,23 +675,6 @@ object CometConf extends ShimCometConf {
       .doubleConf
       .createWithDefault(1.0)
 
-  val COMET_SCAN_PREFETCH_ENABLED: ConfigEntry[Boolean] =
-    conf("spark.comet.scan.preFetch.enabled")
-      .category(CATEGORY_SCAN)
-      .doc("Whether to enable pre-fetching feature of CometScan.")
-      .booleanConf
-      .createWithDefault(false)
-
-  val COMET_SCAN_PREFETCH_THREAD_NUM: ConfigEntry[Int] =
-    conf("spark.comet.scan.preFetch.threadNum")
-      .category(CATEGORY_SCAN)
-      .doc(
-        "The number of threads running pre-fetching for CometScan. Effective 
if " +
-          s"${COMET_SCAN_PREFETCH_ENABLED.key} is enabled. Note that more " +
-          "pre-fetching threads means more memory requirement to store 
pre-fetched row groups.")
-      .intConf
-      .createWithDefault(2)
-
   val COMET_NATIVE_LOAD_REQUIRED: ConfigEntry[Boolean] = 
conf("spark.comet.nativeLoadRequired")
     .category(CATEGORY_EXEC)
     .doc(
diff --git 
a/common/src/main/scala/org/apache/comet/parquet/CometReaderThreadPool.scala 
b/common/src/main/scala/org/apache/comet/parquet/CometReaderThreadPool.scala
index ca13bba0c..1759ea276 100644
--- a/common/src/main/scala/org/apache/comet/parquet/CometReaderThreadPool.scala
+++ b/common/src/main/scala/org/apache/comet/parquet/CometReaderThreadPool.scala
@@ -54,11 +54,6 @@ abstract class CometReaderThreadPool {
 
 }
 
-// A thread pool used for pre-fetching files.
-object CometPrefetchThreadPool extends CometReaderThreadPool {
-  override def threadNamePrefix: String = "prefetch_thread"
-}
-
 // Thread pool used by the Parquet parallel reader
 object CometFileReaderThreadPool extends CometReaderThreadPool {
   override def threadNamePrefix: String = "file_reader_thread"
diff --git 
a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala 
b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
index e07d16d4d..7874f3774 100644
--- a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
+++ b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
@@ -57,7 +57,7 @@ import org.apache.comet.vector.CometVector
  *     in [[org.apache.comet.CometSparkSessionExtensions]]
  *   - `buildReaderWithPartitionValues`, so Spark calls Comet's Parquet reader 
to read values.
  */
-class CometParquetFileFormat(session: SparkSession, scanImpl: String)
+class CometParquetFileFormat(session: SparkSession)
     extends ParquetFileFormat
     with MetricsSupport
     with ShimSQLConf {
@@ -110,8 +110,6 @@ class CometParquetFileFormat(session: SparkSession, 
scanImpl: String)
     // Comet specific configurations
     val capacity = CometConf.COMET_BATCH_SIZE.get(sqlConf)
 
-    val nativeIcebergCompat = scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT
-
     (file: PartitionedFile) => {
       val sharedConf = broadcastedHadoopConf.value.value
       val footer = FooterReader.readFooter(sharedConf, file)
@@ -135,85 +133,42 @@ class CometParquetFileFormat(session: SparkSession, 
scanImpl: String)
         isCaseSensitive,
         datetimeRebaseSpec)
 
-      val recordBatchReader =
-        if (nativeIcebergCompat) {
-          // We still need the predicate in the conf to allow us to generate 
row indexes based on
-          // the actual row groups read
-          val pushed = if (parquetFilterPushDown) {
-            filters
-              // Collects all converted Parquet filter predicates. Notice that 
not all predicates
-              // can be converted (`ParquetFilters.createFilter` returns an 
`Option`). That's why
-              // a `flatMap` is used here.
-              .flatMap(parquetFilters.createFilter)
-              .reduceOption(FilterApi.and)
-          } else {
-            None
-          }
-          pushed.foreach(p => 
ParquetInputFormat.setFilterPredicate(sharedConf, p))
-          val pushedNative = if (parquetFilterPushDown) {
-            parquetFilters.createNativeFilters(filters)
-          } else {
-            None
-          }
-          val batchReader = new NativeBatchReader(
-            sharedConf,
-            file,
-            footer,
-            pushedNative.orNull,
-            capacity,
-            requiredSchema,
-            dataSchema,
-            isCaseSensitive,
-            useFieldId,
-            ignoreMissingIds,
-            datetimeRebaseSpec.mode == CORRECTED,
-            partitionSchema,
-            file.partitionValues,
-            metrics.asJava,
-            CometMetricNode(metrics))
-          try {
-            batchReader.init()
-          } catch {
-            case e: Throwable =>
-              batchReader.close()
-              throw e
-          }
-          batchReader
-        } else {
-          val pushed = if (parquetFilterPushDown) {
-            filters
-              // Collects all converted Parquet filter predicates. Notice that 
not all predicates
-              // can be converted (`ParquetFilters.createFilter` returns an 
`Option`). That's why
-              // a `flatMap` is used here.
-              .flatMap(parquetFilters.createFilter)
-              .reduceOption(FilterApi.and)
-          } else {
-            None
-          }
-          pushed.foreach(p => 
ParquetInputFormat.setFilterPredicate(sharedConf, p))
-
-          val batchReader = new BatchReader(
-            sharedConf,
-            file,
-            footer,
-            capacity,
-            requiredSchema,
-            isCaseSensitive,
-            useFieldId,
-            ignoreMissingIds,
-            datetimeRebaseSpec.mode == CORRECTED,
-            partitionSchema,
-            file.partitionValues,
-            metrics.asJava)
-          try {
-            batchReader.init()
-          } catch {
-            case e: Throwable =>
-              batchReader.close()
-              throw e
-          }
-          batchReader
-        }
+      val pushed = if (parquetFilterPushDown) {
+        filters
+          .flatMap(parquetFilters.createFilter)
+          .reduceOption(FilterApi.and)
+      } else {
+        None
+      }
+      pushed.foreach(p => ParquetInputFormat.setFilterPredicate(sharedConf, p))
+      val pushedNative = if (parquetFilterPushDown) {
+        parquetFilters.createNativeFilters(filters)
+      } else {
+        None
+      }
+      val recordBatchReader = new NativeBatchReader(
+        sharedConf,
+        file,
+        footer,
+        pushedNative.orNull,
+        capacity,
+        requiredSchema,
+        dataSchema,
+        isCaseSensitive,
+        useFieldId,
+        ignoreMissingIds,
+        datetimeRebaseSpec.mode == CORRECTED,
+        partitionSchema,
+        file.partitionValues,
+        metrics.asJava,
+        CometMetricNode(metrics))
+      try {
+        recordBatchReader.init()
+      } catch {
+        case e: Throwable =>
+          recordBatchReader.close()
+          throw e
+      }
       val iter = new RecordReaderIterator(recordBatchReader)
       try {
         iter.asInstanceOf[Iterator[InternalRow]]
diff --git 
a/spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala
 
b/spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala
deleted file mode 100644
index 495054fc8..000000000
--- 
a/spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.comet.parquet
-
-import scala.collection.mutable
-import scala.jdk.CollectionConverters._
-
-import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate}
-import org.apache.parquet.hadoop.ParquetInputFormat
-import org.apache.parquet.hadoop.metadata.ParquetMetadata
-import org.apache.spark.TaskContext
-import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
-import org.apache.spark.sql.connector.read.InputPartition
-import org.apache.spark.sql.connector.read.PartitionReader
-import org.apache.spark.sql.execution.datasources.{FilePartition, 
PartitionedFile}
-import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
-import org.apache.spark.sql.execution.datasources.v2.FilePartitionReaderFactory
-import org.apache.spark.sql.execution.metric.SQLMetric
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.Filter
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.vectorized.ColumnarBatch
-import org.apache.spark.util.SerializableConfiguration
-
-import org.apache.comet.{CometConf, CometRuntimeException}
-import org.apache.comet.shims.ShimSQLConf
-
-case class CometParquetPartitionReaderFactory(
-    usingDataFusionReader: Boolean,
-    @transient sqlConf: SQLConf,
-    broadcastedConf: Broadcast[SerializableConfiguration],
-    readDataSchema: StructType,
-    partitionSchema: StructType,
-    filters: Array[Filter],
-    options: ParquetOptions,
-    metrics: Map[String, SQLMetric])
-    extends FilePartitionReaderFactory
-    with ShimSQLConf
-    with Logging {
-
-  private val isCaseSensitive = sqlConf.caseSensitiveAnalysis
-  private val useFieldId = CometParquetUtils.readFieldId(sqlConf)
-  private val ignoreMissingIds = CometParquetUtils.ignoreMissingIds(sqlConf)
-  private val pushDownDate = sqlConf.parquetFilterPushDownDate
-  private val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
-  private val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
-  private val pushDownStringPredicate = 
sqlConf.parquetFilterPushDownStringPredicate
-  private val pushDownInFilterThreshold = 
sqlConf.parquetFilterPushDownInFilterThreshold
-  private val datetimeRebaseModeInRead = options.datetimeRebaseModeInRead
-  private val parquetFilterPushDown = sqlConf.parquetFilterPushDown
-
-  // Comet specific configurations
-  private val batchSize = CometConf.COMET_BATCH_SIZE.get(sqlConf)
-
-  // This is only called at executor on a Broadcast variable, so we don't want 
it to be
-  // materialized at driver.
-  @transient private lazy val preFetchEnabled = {
-    val conf = broadcastedConf.value.value
-
-    conf.getBoolean(
-      CometConf.COMET_SCAN_PREFETCH_ENABLED.key,
-      CometConf.COMET_SCAN_PREFETCH_ENABLED.defaultValue.get) &&
-    !usingDataFusionReader // Turn off prefetch if native_iceberg_compat is 
enabled
-  }
-
-  private var cometReaders: Iterator[BatchReader] = _
-  private val cometReaderExceptionMap = new mutable.HashMap[PartitionedFile, 
Throwable]()
-
-  // TODO: we may want to revisit this as we're going to only support flat 
types at the beginning
-  override def supportColumnarReads(partition: InputPartition): Boolean = true
-
-  override def createColumnarReader(partition: InputPartition): 
PartitionReader[ColumnarBatch] = {
-    if (preFetchEnabled) {
-      val filePartition = partition.asInstanceOf[FilePartition]
-      val conf = broadcastedConf.value.value
-
-      val threadNum = conf.getInt(
-        CometConf.COMET_SCAN_PREFETCH_THREAD_NUM.key,
-        CometConf.COMET_SCAN_PREFETCH_THREAD_NUM.defaultValue.get)
-      val prefetchThreadPool = 
CometPrefetchThreadPool.getOrCreateThreadPool(threadNum)
-
-      this.cometReaders = filePartition.files
-        .map { file =>
-          // `init()` call is deferred to when the prefetch task begins.
-          // Otherwise we will hold too many resources for readers which are 
not ready
-          // to prefetch.
-          val cometReader = buildCometReader(file)
-          if (cometReader != null) {
-            cometReader.submitPrefetchTask(prefetchThreadPool)
-          }
-
-          cometReader
-        }
-        .toSeq
-        .toIterator
-    }
-
-    super.createColumnarReader(partition)
-  }
-
-  override def buildReader(partitionedFile: PartitionedFile): 
PartitionReader[InternalRow] =
-    throw new UnsupportedOperationException("Comet doesn't support 
'buildReader'")
-
-  private def buildCometReader(file: PartitionedFile): BatchReader = {
-    val conf = broadcastedConf.value.value
-
-    try {
-      val (datetimeRebaseSpec, footer, filters) = getFilter(file)
-      filters.foreach(pushed => ParquetInputFormat.setFilterPredicate(conf, 
pushed))
-      val cometReader = new BatchReader(
-        conf,
-        file,
-        footer,
-        batchSize,
-        readDataSchema,
-        isCaseSensitive,
-        useFieldId,
-        ignoreMissingIds,
-        datetimeRebaseSpec.mode == CORRECTED,
-        partitionSchema,
-        file.partitionValues,
-        metrics.asJava)
-      val taskContext = Option(TaskContext.get)
-      taskContext.foreach(_.addTaskCompletionListener[Unit](_ => 
cometReader.close()))
-      return cometReader
-    } catch {
-      case e: Throwable if preFetchEnabled =>
-        // Keep original exception
-        cometReaderExceptionMap.put(file, e)
-    }
-    null
-  }
-
-  override def buildColumnarReader(file: PartitionedFile): 
PartitionReader[ColumnarBatch] = {
-    val cometReader = if (!preFetchEnabled) {
-      // Prefetch is not enabled, create comet reader and initiate it.
-      val cometReader = buildCometReader(file)
-      cometReader.init()
-
-      cometReader
-    } else {
-      // If prefetch is enabled, we already tried to access the file when in 
`buildCometReader`.
-      // It is possibly we got an exception like `FileNotFoundException` and 
we need to throw it
-      // now to let Spark handle it.
-      val reader = cometReaders.next()
-      val exception = cometReaderExceptionMap.get(file)
-      exception.foreach(e => throw e)
-
-      if (reader == null) {
-        throw new CometRuntimeException(s"Cannot find comet file reader for 
$file")
-      }
-      reader
-    }
-    CometPartitionReader(cometReader)
-  }
-
-  def getFilter(file: PartitionedFile): (RebaseSpec, ParquetMetadata, 
Option[FilterPredicate]) = {
-    val sharedConf = broadcastedConf.value.value
-    val footer = FooterReader.readFooter(sharedConf, file)
-    val footerFileMetaData = footer.getFileMetaData
-    val datetimeRebaseSpec = CometParquetFileFormat.getDatetimeRebaseSpec(
-      file,
-      readDataSchema,
-      sharedConf,
-      footerFileMetaData,
-      datetimeRebaseModeInRead)
-
-    val pushed = if (parquetFilterPushDown) {
-      val parquetSchema = footerFileMetaData.getSchema
-      val parquetFilters = new ParquetFilters(
-        parquetSchema,
-        readDataSchema,
-        pushDownDate,
-        pushDownTimestamp,
-        pushDownDecimal,
-        pushDownStringPredicate,
-        pushDownInFilterThreshold,
-        isCaseSensitive,
-        datetimeRebaseSpec)
-      filters
-        // Collects all converted Parquet filter predicates. Notice that not 
all predicates can be
-        // converted (`ParquetFilters.createFilter` returns an `Option`). 
That's why a `flatMap`
-        // is used here.
-        .flatMap(parquetFilters.createFilter)
-        .reduceOption(FilterApi.and)
-    } else {
-      None
-    }
-    (datetimeRebaseSpec, footer, pushed)
-  }
-
-  override def createReader(inputPartition: InputPartition): 
PartitionReader[InternalRow] =
-    throw new UnsupportedOperationException("Only 'createColumnarReader' is 
supported.")
-
-  /**
-   * A simple adapter on Comet's [[BatchReader]].
-   */
-  protected case class CometPartitionReader(reader: BatchReader)
-      extends PartitionReader[ColumnarBatch] {
-
-    override def next(): Boolean = {
-      reader.nextBatch()
-    }
-
-    override def get(): ColumnarBatch = {
-      reader.currentBatch()
-    }
-
-    override def close(): Unit = {
-      reader.close()
-    }
-  }
-}
diff --git 
a/spark/src/main/scala/org/apache/comet/parquet/CometParquetScan.scala 
b/spark/src/main/scala/org/apache/comet/parquet/CometParquetScan.scala
deleted file mode 100644
index 3f5025576..000000000
--- a/spark/src/main/scala/org/apache/comet/parquet/CometParquetScan.scala
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.comet.parquet
-
-import scala.jdk.CollectionConverters._
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.comet.CometMetricNode
-import org.apache.spark.sql.connector.read.PartitionReaderFactory
-import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
-import org.apache.spark.sql.execution.datasources.v2.FileScan
-import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
-import org.apache.spark.sql.sources.Filter
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.util.CaseInsensitiveStringMap
-import org.apache.spark.util.SerializableConfiguration
-
-import org.apache.comet.MetricsSupport
-
-// TODO: Consider creating a case class and patch SQL tests if needed, will 
make life easier.
-// currently hacking around this by setting the metrics within the object's 
apply method.
-trait CometParquetScan extends FileScan with MetricsSupport {
-  def sparkSession: SparkSession
-  def hadoopConf: Configuration
-  def readDataSchema: StructType
-  def readPartitionSchema: StructType
-  def pushedFilters: Array[Filter]
-  def options: CaseInsensitiveStringMap
-
-  override def equals(obj: Any): Boolean = obj match {
-    case other: CometParquetScan =>
-      super.equals(other) && readDataSchema == other.readDataSchema &&
-      readPartitionSchema == other.readPartitionSchema &&
-      equivalentFilters(pushedFilters, other.pushedFilters)
-    case _ => false
-  }
-
-  override def hashCode(): Int = getClass.hashCode()
-
-  override def createReaderFactory(): PartitionReaderFactory = {
-    val sqlConf = sparkSession.sessionState.conf
-    CometParquetFileFormat.populateConf(sqlConf, hadoopConf)
-    val broadcastedConf =
-      sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
-    CometParquetPartitionReaderFactory(
-      usingDataFusionReader = false, // this value is not used since this is 
v2 scan
-      sqlConf,
-      broadcastedConf,
-      readDataSchema,
-      readPartitionSchema,
-      pushedFilters,
-      new ParquetOptions(options.asScala.toMap, sqlConf),
-      metrics)
-  }
-}
-
-object CometParquetScan {
-  def apply(session: SparkSession, scan: ParquetScan): CometParquetScan = {
-    val newScan = new ParquetScan(
-      scan.sparkSession,
-      scan.hadoopConf,
-      scan.fileIndex,
-      scan.dataSchema,
-      scan.readDataSchema,
-      scan.readPartitionSchema,
-      scan.pushedFilters,
-      scan.options,
-      partitionFilters = scan.partitionFilters,
-      dataFilters = scan.dataFilters) with CometParquetScan
-
-    newScan.metrics = CometMetricNode.nativeScanMetrics(session.sparkContext) 
++ CometMetricNode
-      .parquetScanMetrics(session.sparkContext)
-
-    newScan
-  }
-}
diff --git 
a/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala
 
b/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala
index ec3336352..ce57624b7 100644
--- 
a/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala
+++ 
b/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala
@@ -22,14 +22,13 @@ package org.apache.comet.rules
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.util.sideBySide
-import org.apache.spark.sql.comet.{CometBatchScanExec, CometCollectLimitExec, 
CometColumnarToRowExec, CometNativeColumnarToRowExec, CometNativeWriteExec, 
CometPlan, CometScanExec, CometSparkToColumnarExec}
+import org.apache.spark.sql.comet.{CometCollectLimitExec, 
CometColumnarToRowExec, CometNativeColumnarToRowExec, CometNativeWriteExec, 
CometPlan, CometScanExec, CometSparkToColumnarExec}
 import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, 
CometShuffleExchangeExec}
 import org.apache.spark.sql.execution.{ColumnarToRowExec, RowToColumnarExec, 
SparkPlan}
 import org.apache.spark.sql.execution.adaptive.QueryStageExec
 import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
 
 import org.apache.comet.CometConf
-import org.apache.comet.parquet.CometParquetScan
 
 // This rule is responsible for eliminating redundant transitions between 
row-based and
 // columnar-based operators for Comet. Currently, three potential redundant 
transitions are:
@@ -157,7 +156,6 @@ case class EliminateRedundantTransitions(session: 
SparkSession) extends Rule[Spa
    * This includes:
    *   - CometScanExec with native_iceberg_compat and partition columns - uses
    *     ConstantColumnReader
-   *   - CometBatchScanExec with CometParquetScan (V2 Parquet path) - uses 
BatchReader
    */
   private def hasScanUsingMutableBuffers(op: SparkPlan): Boolean = {
     op match {
@@ -168,7 +166,6 @@ case class EliminateRedundantTransitions(session: 
SparkSession) extends Rule[Spa
           case scan: CometScanExec =>
             scan.scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT &&
             scan.relation.partitionSchema.nonEmpty
-          case scan: CometBatchScanExec => 
scan.scan.isInstanceOf[CometParquetScan]
           case _ => false
         }
     }
diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala 
b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala
index 3f2748c3e..4e68a423a 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala
@@ -38,7 +38,6 @@ import org.apache.spark.util.collection._
 
 import com.google.common.base.Objects
 
-import org.apache.comet.CometConf
 import org.apache.comet.parquet.{CometParquetFileFormat, CometParquetUtils}
 import org.apache.comet.serde.OperatorOuterClass.Operator
 
@@ -247,8 +246,7 @@ object CometNativeScanExec {
     // https://github.com/apache/arrow-datafusion-comet/issues/190
     def transform(arg: Any): AnyRef = arg match {
       case _: HadoopFsRelation =>
-        scanExec.relation.copy(fileFormat =
-          new CometParquetFileFormat(session, 
CometConf.SCAN_NATIVE_DATAFUSION))(session)
+        scanExec.relation.copy(fileFormat = new 
CometParquetFileFormat(session))(session)
       case other: AnyRef => other
       case null => null
     }
diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala 
b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
index e283f6b2c..2707f0c04 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
@@ -37,15 +37,13 @@ import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
ParquetOptions}
-import org.apache.spark.sql.execution.datasources.v2.DataSourceRDD
 import org.apache.spark.sql.execution.metric._
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.vectorized.ColumnarBatch
-import org.apache.spark.util.SerializableConfiguration
 import org.apache.spark.util.collection._
 
 import org.apache.comet.{CometConf, MetricsSupport}
-import org.apache.comet.parquet.{CometParquetFileFormat, 
CometParquetPartitionReaderFactory}
+import org.apache.comet.parquet.CometParquetFileFormat
 
 /**
  * Comet physical scan node for DataSource V1. Most of the code here follow 
Spark's
@@ -476,43 +474,13 @@ case class CometScanExec(
       fsRelation: HadoopFsRelation,
       readFile: (PartitionedFile) => Iterator[InternalRow],
       partitions: Seq[FilePartition]): RDD[InternalRow] = {
-    val hadoopConf = 
relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options)
-    val usingDataFusionReader: Boolean = scanImpl == 
CometConf.SCAN_NATIVE_ICEBERG_COMPAT
-
-    val prefetchEnabled = hadoopConf.getBoolean(
-      CometConf.COMET_SCAN_PREFETCH_ENABLED.key,
-      CometConf.COMET_SCAN_PREFETCH_ENABLED.defaultValue.get) &&
-      !usingDataFusionReader
-
     val sqlConf = fsRelation.sparkSession.sessionState.conf
-    if (prefetchEnabled) {
-      CometParquetFileFormat.populateConf(sqlConf, hadoopConf)
-      val broadcastedConf =
-        fsRelation.sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
-      val partitionReaderFactory = CometParquetPartitionReaderFactory(
-        scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT,
-        sqlConf,
-        broadcastedConf,
-        requiredSchema,
-        relation.partitionSchema,
-        pushedDownFilters.toArray,
-        new ParquetOptions(CaseInsensitiveMap(relation.options), sqlConf),
-        metrics)
-
-      new DataSourceRDD(
-        fsRelation.sparkSession.sparkContext,
-        partitions.map(Seq(_)),
-        partitionReaderFactory,
-        true,
-        Map.empty)
-    } else {
-      newFileScanRDD(
-        fsRelation,
-        readFile,
-        partitions,
-        new StructType(requiredSchema.fields ++ 
fsRelation.partitionSchema.fields),
-        new ParquetOptions(CaseInsensitiveMap(relation.options), sqlConf))
-    }
+    newFileScanRDD(
+      fsRelation,
+      readFile,
+      partitions,
+      new StructType(requiredSchema.fields ++ 
fsRelation.partitionSchema.fields),
+      new ParquetOptions(CaseInsensitiveMap(relation.options), sqlConf))
   }
 
   override def doCanonicalize(): CometScanExec = {
@@ -556,8 +524,7 @@ object CometScanExec {
     // https://github.com/apache/arrow-datafusion-comet/issues/190
     def transform(arg: Any): AnyRef = arg match {
       case _: HadoopFsRelation =>
-        scanExec.relation.copy(fileFormat = new 
CometParquetFileFormat(session, scanImpl))(
-          session)
+        scanExec.relation.copy(fileFormat = new 
CometParquetFileFormat(session))(session)
       case other: AnyRef => other
       case null => null
     }
diff --git 
a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala 
b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
index 928e66b29..1495eb34e 100644
--- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
@@ -19,7 +19,7 @@
 
 package org.apache.comet.parquet
 
-import java.io.{File, FileFilter}
+import java.io.File
 import java.math.{BigDecimal, BigInteger}
 import java.time.{ZoneId, ZoneOffset}
 
@@ -31,20 +31,17 @@ import scala.util.control.Breaks.breakable
 import org.scalactic.source.Position
 import org.scalatest.Tag
 
-import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.parquet.example.data.simple.SimpleGroup
 import org.apache.parquet.schema.MessageTypeParser
 import org.apache.spark.SparkException
 import org.apache.spark.sql.{CometTestBase, DataFrame, Row}
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec, 
CometScanExec}
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.types.UTF8String
 
 import com.google.common.primitives.UnsignedLong
 
@@ -703,76 +700,6 @@ abstract class ParquetReadSuite extends CometTestBase {
     }
   }
 
-  test("partition column types") {
-    withTempPath { dir =>
-      Seq(1).toDF().repartition(1).write.parquet(dir.getCanonicalPath)
-
-      val dataTypes =
-        Seq(
-          StringType,
-          BooleanType,
-          ByteType,
-          BinaryType,
-          ShortType,
-          IntegerType,
-          LongType,
-          FloatType,
-          DoubleType,
-          DecimalType(25, 5),
-          DateType,
-          TimestampType)
-
-      // TODO: support `NullType` here, after we add the support in 
`ColumnarBatchRow`
-      val constantValues =
-        Seq(
-          UTF8String.fromString("a string"),
-          true,
-          1.toByte,
-          "Spark SQL".getBytes,
-          2.toShort,
-          3,
-          Long.MaxValue,
-          0.25.toFloat,
-          0.75d,
-          Decimal("1234.23456"),
-          DateTimeUtils.fromJavaDate(java.sql.Date.valueOf("2015-01-01")),
-          
DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf("2015-01-01 
23:50:59.123")))
-
-      dataTypes.zip(constantValues).foreach { case (dt, v) =>
-        val schema = StructType(StructField("pcol", dt) :: Nil)
-        val conf = SQLConf.get
-        val partitionValues = new GenericInternalRow(Array(v))
-        val file = dir
-          .listFiles(new FileFilter {
-            override def accept(pathname: File): Boolean =
-              pathname.isFile && pathname.toString.endsWith("parquet")
-          })
-          .head
-        val reader = new BatchReader(
-          file.toString,
-          CometConf.COMET_BATCH_SIZE.get(conf),
-          schema,
-          partitionValues)
-        reader.init()
-
-        try {
-          reader.nextBatch()
-          val batch = reader.currentBatch()
-          val actual = batch.getRow(0).get(1, dt)
-          val expected = v
-          if (dt.isInstanceOf[BinaryType]) {
-            assert(
-              actual.asInstanceOf[Array[Byte]] sameElements 
expected.asInstanceOf[Array[Byte]])
-          } else {
-            assert(actual == expected)
-          }
-        } finally {
-          reader.close()
-        }
-      }
-    }
-  }
-
   test("partition columns - multiple batch") {
     withSQLConf(
       CometConf.COMET_BATCH_SIZE.key -> 7.toString,
@@ -1535,116 +1462,6 @@ abstract class ParquetReadSuite extends CometTestBase {
     }
   }
 
-  test("test pre-fetching multiple files") {
-    def makeRawParquetFile(
-        path: Path,
-        dictionaryEnabled: Boolean,
-        n: Int,
-        pageSize: Int): Seq[Option[Int]] = {
-      val schemaStr =
-        """
-          |message root {
-          |  optional boolean _1;
-          |  optional int32   _2(INT_8);
-          |  optional int32   _3(INT_16);
-          |  optional int32   _4;
-          |  optional int64   _5;
-          |  optional float   _6;
-          |  optional double  _7;
-          |  optional binary  _8(UTF8);
-          |  optional int32   _9(UINT_8);
-          |  optional int32   _10(UINT_16);
-          |  optional int32   _11(UINT_32);
-          |  optional int64   _12(UINT_64);
-          |  optional binary  _13(ENUM);
-          |}
-        """.stripMargin
-
-      val schema = MessageTypeParser.parseMessageType(schemaStr)
-      val writer = createParquetWriter(
-        schema,
-        path,
-        dictionaryEnabled = dictionaryEnabled,
-        pageSize = pageSize,
-        dictionaryPageSize = pageSize)
-
-      val rand = new scala.util.Random(42)
-      val expected = (0 until n).map { i =>
-        if (rand.nextBoolean()) {
-          None
-        } else {
-          Some(i)
-        }
-      }
-      expected.foreach { opt =>
-        val record = new SimpleGroup(schema)
-        opt match {
-          case Some(i) =>
-            record.add(0, i % 2 == 0)
-            record.add(1, i.toByte)
-            record.add(2, i.toShort)
-            record.add(3, i)
-            record.add(4, i.toLong)
-            record.add(5, i.toFloat)
-            record.add(6, i.toDouble)
-            record.add(7, i.toString * 48)
-            record.add(8, (-i).toByte)
-            record.add(9, (-i).toShort)
-            record.add(10, -i)
-            record.add(11, (-i).toLong)
-            record.add(12, i.toString)
-          case _ =>
-        }
-        writer.write(record)
-      }
-
-      writer.close()
-      expected
-    }
-
-    val conf = new Configuration()
-    conf.set("spark.comet.scan.preFetch.enabled", "true");
-    conf.set("spark.comet.scan.preFetch.threadNum", "4");
-
-    withTempDir { dir =>
-      val threadPool = CometPrefetchThreadPool.getOrCreateThreadPool(2)
-
-      val readers = (0 to 10).map { idx =>
-        val path = new Path(dir.toURI.toString, s"part-r-$idx.parquet")
-        makeRawParquetFile(path, dictionaryEnabled = false, 10000, 500)
-
-        val reader = new BatchReader(conf, path.toString, 1000, null, null)
-        reader.submitPrefetchTask(threadPool)
-
-        reader
-      }
-
-      // Wait for all pre-fetch tasks
-      readers.foreach { reader =>
-        val task = reader.getPrefetchTask()
-        task.get()
-      }
-
-      val totolRows = readers.map { reader =>
-        val queue = reader.getPrefetchQueue()
-        var rowCount = 0L
-
-        while (!queue.isEmpty) {
-          val rowGroup = queue.take().getLeft
-          rowCount += rowGroup.getRowCount
-        }
-
-        reader.close()
-
-        rowCount
-      }.sum
-
-      readParquetFile(dir.toString) { df =>
-        assert(df.count() == totolRows)
-      }
-    }
-  }
-
   test("test merge scan range") {
     def makeRawParquetFile(path: Path, n: Int): Seq[Option[Int]] = {
       val dictionaryPageSize = 1024
@@ -1753,23 +1570,6 @@ abstract class ParquetReadSuite extends CometTestBase {
     }
   }
 
-  override protected def test(testName: String, testTags: Tag*)(testFun: => 
Any)(implicit
-      pos: Position): Unit = {
-    Seq(true, false).foreach { prefetch =>
-      val cometTestName = if (prefetch) {
-        testName + " (prefetch enabled)"
-      } else {
-        testName
-      }
-
-      super.test(cometTestName, testTags: _*) {
-        withSQLConf(CometConf.COMET_SCAN_PREFETCH_ENABLED.key -> 
prefetch.toString) {
-          testFun
-        }
-      }
-    }
-  }
-
   private def withId(id: Int) =
     new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY, 
id).build()
 
@@ -2036,11 +1836,7 @@ class ParquetReadV2Suite extends ParquetReadSuite with 
AdaptiveSparkPlanHelper {
       val scans = collect(r.filter(f).queryExecution.executedPlan) { case p: 
CometBatchScanExec =>
         p.scan
       }
-      if (CometConf.COMET_ENABLED.get()) {
-        assert(scans.nonEmpty && 
scans.forall(_.isInstanceOf[CometParquetScan]))
-      } else {
-        assert(!scans.exists(_.isInstanceOf[CometParquetScan]))
-      }
+      assert(scans.isEmpty)
     }
   }
 
diff --git 
a/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala 
b/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala
index a349ab2b9..18dec6817 100644
--- a/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala
@@ -30,7 +30,6 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
 
 import org.apache.comet.CometConf
-import org.apache.comet.parquet.CometParquetScan
 import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator}
 
 /**
@@ -127,11 +126,6 @@ class CometScanRuleSuite extends CometTestBase {
               if (cometEnabled) {
                 assert(countOperators(transformedPlan, classOf[BatchScanExec]) 
== 0)
                 assert(countOperators(transformedPlan, 
classOf[CometBatchScanExec]) == 1)
-
-                // CometScanRule should have replaced the underlying scan
-                val scan = transformedPlan.collect { case scan: 
CometBatchScanExec => scan }.head
-                assert(scan.wrapped.scan.isInstanceOf[CometParquetScan])
-
               } else {
                 assert(countOperators(transformedPlan, classOf[BatchScanExec]) 
== 1)
                 assert(countOperators(transformedPlan, 
classOf[CometBatchScanExec]) == 0)
diff --git 
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala 
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala
index a2f196a4f..5b0371b27 100644
--- 
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala
+++ 
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala
@@ -38,7 +38,6 @@ import org.apache.spark.sql.types._
 import org.apache.spark.sql.vectorized.ColumnVector
 
 import org.apache.comet.{CometConf, WithHdfsCluster}
-import org.apache.comet.parquet.BatchReader
 
 /**
  * Benchmark to measure Comet read performance. To run this benchmark:
@@ -179,29 +178,6 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
           }
         }
 
-        sqlBenchmark.addCase("ParquetReader Comet") { _ =>
-          files.map(_.asInstanceOf[String]).foreach { p =>
-            val reader = new BatchReader(p, vectorizedReaderBatchSize)
-            reader.init()
-            try {
-              var totalNumRows = 0
-              while (reader.nextBatch()) {
-                val batch = reader.currentBatch()
-                val column = batch.column(0)
-                val numRows = batch.numRows()
-                var i = 0
-                while (i < numRows) {
-                  if (!column.isNullAt(i)) aggregateValue(column, i)
-                  i += 1
-                }
-                totalNumRows += batch.numRows()
-              }
-            } finally {
-              reader.close()
-            }
-          }
-        }
-
         sqlBenchmark.run()
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to