This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 2daa733602 [CORE] Make LeafTransformSupport's getPartitions return 
Seq[Partition] (#10838)
2daa733602 is described below

commit 2daa733602733e4c38ecac2b40e46b10f80d1db4
Author: Zouxxyy <[email protected]>
AuthorDate: Thu Oct 16 17:13:08 2025 +0800

    [CORE] Make LeafTransformSupport's getPartitions return Seq[Partition] 
(#10838)
---
 .../sql/delta/catalog/ClickHouseTableV2.scala      |   5 +-
 .../sql/delta/catalog/ClickHouseTableV2.scala      |   5 +-
 .../sql/delta/catalog/ClickHouseTableV2.scala      |   4 +-
 .../execution/iceberg/ClickHouseIcebergSuite.scala |   4 +-
 .../backendsapi/clickhouse/CHIteratorApi.scala     |   9 +-
 .../backendsapi/clickhouse/CHTransformerApi.scala  |  13 ++-
 .../gluten/execution/CHRangeExecTransformer.scala  |   4 +-
 .../execution/GlutenMergeTreePartition.scala       |   4 +-
 .../execution/NativeFileScanColumnarRDD.scala      | 116 ---------------------
 ...PartitionsUtil.scala => CHPartitionsUtil.scala} |  14 +--
 .../utils/MergeTreePartsPartitionsUtil.scala       |  22 ++--
 ...lutenClickhouseMergetreeSoftAffinitySuite.scala |   5 +-
 .../backendsapi/velox/VeloxIteratorApi.scala       |  67 ++----------
 .../backendsapi/velox/VeloxTransformerApi.scala    |  13 ++-
 .../gluten/execution/IcebergScanTransformer.scala  |  33 ++----
 .../spark/source/GlutenIcebergSourceUtil.scala     |  64 ++----------
 .../org/apache/gluten/execution/IcebergSuite.scala |   4 +-
 .../execution/MicroBatchScanExecTransformer.scala  |   8 +-
 .../gluten/execution/PaimonScanTransformer.scala   |  51 +++++----
 .../org/apache/gluten/execution/PaimonSuite.scala  |  17 +++
 .../apache/gluten/backendsapi/IteratorApi.scala    |  13 +--
 .../apache/gluten/backendsapi/TransformerApi.scala |   8 +-
 .../execution/BasicScanExecTransformer.scala       |  28 +++--
 .../execution/BatchScanExecTransformer.scala       |  53 ++++------
 .../execution/FileSourceScanExecTransformer.scala  |  27 ++---
 .../execution/GlutenWholeStageColumnarRDD.scala    |   4 +-
 .../execution/SparkDataSourceRDDPartition.scala    |  34 ++++++
 .../gluten/execution/WholeStageTransformer.scala   | 102 ++++++------------
 ...utPartitionsUtil.scala => PartitionsUtil.scala} |  14 +--
 .../apache/spark/softaffinity/SoftAffinity.scala   |  10 +-
 .../sql/hive/HiveTableScanExecTransformer.scala    |   6 +-
 .../TestFileSourceScanExecTransformer.scala        |  23 ++--
 .../TestFileSourceScanExecTransformer.scala        |  23 ++--
 .../TestFileSourceScanExecTransformer.scala        |  23 ++--
 .../TestFileSourceScanExecTransformer.scala        |  23 ++--
 .../org/apache/gluten/sql/shims/SparkShims.scala   |   4 +
 36 files changed, 319 insertions(+), 538 deletions(-)

diff --git 
a/backends-clickhouse/src-delta20/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
 
b/backends-clickhouse/src-delta20/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
index d2966eafee..29caf77b63 100644
--- 
a/backends-clickhouse/src-delta20/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
+++ 
b/backends-clickhouse/src-delta20/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
@@ -15,11 +15,12 @@
  * limitations under the License.
  */
 package org.apache.spark.sql.delta.catalog
+
+import org.apache.spark.Partition
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
-import org.apache.spark.sql.connector.read.InputPartition
 import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
 import org.apache.spark.sql.delta.{ClickhouseSnapshot, DeltaLog, 
DeltaTimeTravelSpec, Snapshot}
 import org.apache.spark.sql.delta.actions.Metadata
@@ -153,7 +154,7 @@ object ClickHouseTableV2 extends Logging {
       optionalBucketSet: Option[BitSet],
       optionalNumCoalescedBuckets: Option[Int],
       disableBucketedScan: Boolean,
-      filterExprs: Seq[Expression]): Seq[InputPartition] = {
+      filterExprs: Seq[Expression]): Seq[Partition] = {
     val tableV2 = ClickHouseTableV2.getTable(deltaLog)
 
     MergeTreePartsPartitionsUtil.getMergeTreePartsPartitions(
diff --git 
a/backends-clickhouse/src-delta23/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
 
b/backends-clickhouse/src-delta23/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
index d2966eafee..29caf77b63 100644
--- 
a/backends-clickhouse/src-delta23/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
+++ 
b/backends-clickhouse/src-delta23/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
@@ -15,11 +15,12 @@
  * limitations under the License.
  */
 package org.apache.spark.sql.delta.catalog
+
+import org.apache.spark.Partition
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
-import org.apache.spark.sql.connector.read.InputPartition
 import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
 import org.apache.spark.sql.delta.{ClickhouseSnapshot, DeltaLog, 
DeltaTimeTravelSpec, Snapshot}
 import org.apache.spark.sql.delta.actions.Metadata
@@ -153,7 +154,7 @@ object ClickHouseTableV2 extends Logging {
       optionalBucketSet: Option[BitSet],
       optionalNumCoalescedBuckets: Option[Int],
       disableBucketedScan: Boolean,
-      filterExprs: Seq[Expression]): Seq[InputPartition] = {
+      filterExprs: Seq[Expression]): Seq[Partition] = {
     val tableV2 = ClickHouseTableV2.getTable(deltaLog)
 
     MergeTreePartsPartitionsUtil.getMergeTreePartsPartitions(
diff --git 
a/backends-clickhouse/src-delta33/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
 
b/backends-clickhouse/src-delta33/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
index 2c7a57625b..f16134471c 100644
--- 
a/backends-clickhouse/src-delta33/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
+++ 
b/backends-clickhouse/src-delta33/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.spark.sql.delta.catalog
 
+import org.apache.spark.Partition
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.TableIdentifier
@@ -24,7 +25,6 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.connector.catalog.V1Table
-import org.apache.spark.sql.connector.read.InputPartition
 import org.apache.spark.sql.delta.{ClickhouseSnapshot, DeltaErrors, DeltaLog, 
DeltaTableUtils, DeltaTimeTravelSpec, Snapshot, UnresolvedPathBasedDeltaTable}
 import org.apache.spark.sql.delta.actions.{Metadata, Protocol}
 import org.apache.spark.sql.delta.sources.DeltaDataSource
@@ -147,7 +147,7 @@ object ClickHouseTableV2 extends Logging {
       optionalBucketSet: Option[BitSet],
       optionalNumCoalescedBuckets: Option[Int],
       disableBucketedScan: Boolean,
-      filterExprs: Seq[Expression]): Seq[InputPartition] = {
+      filterExprs: Seq[Expression]): Seq[Partition] = {
     val tableV2 = ClickHouseTableV2.getTable(deltaLog)
 
     MergeTreePartsPartitionsUtil.getMergeTreePartsPartitions(
diff --git 
a/backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergSuite.scala
 
b/backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergSuite.scala
index e4aff03290..95de2e0414 100644
--- 
a/backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergSuite.scala
+++ 
b/backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergSuite.scala
@@ -292,7 +292,7 @@ class ClickHouseIcebergSuite extends 
GlutenClickHouseWholeStageTransformerSuite
             getExecutedPlan(df).map {
               case plan: IcebergScanTransformer =>
                 assert(plan.getKeyGroupPartitioning.isDefined)
-                assert(plan.getSplitInfosWithIndex.length == 3)
+                assert(plan.getSplitInfos.length == 3)
               case _ => // do nothing
             }
           }
@@ -372,7 +372,7 @@ class ClickHouseIcebergSuite extends 
GlutenClickHouseWholeStageTransformerSuite
             getExecutedPlan(df).map {
               case plan: IcebergScanTransformer =>
                 assert(plan.getKeyGroupPartitioning.isDefined)
-                assert(plan.getSplitInfosWithIndex.length == 3)
+                assert(plan.getSplitInfos.length == 3)
               case _ => // do nothing
             }
           }
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
index 07c3d14409..5fff386d72 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
@@ -27,14 +27,13 @@ import org.apache.gluten.substrait.rel._
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 import org.apache.gluten.vectorized.{BatchIterator, 
CHNativeExpressionEvaluator, CloseableCHColumnBatchIterator}
 
-import org.apache.spark.{InterruptibleIterator, SparkConf, TaskContext}
+import org.apache.spark.{InterruptibleIterator, Partition, SparkConf, 
TaskContext}
 import org.apache.spark.affinity.CHAffinity
 import org.apache.spark.executor.InputMetrics
 import org.apache.spark.internal.Logging
 import org.apache.spark.shuffle.CHColumnarShuffleWriter
 import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
 import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
-import org.apache.spark.sql.connector.read.InputPartition
 import org.apache.spark.sql.execution.datasources.FilePartition
 import 
org.apache.spark.sql.execution.datasources.clickhouse.ExtensionTableBuilder
 import org.apache.spark.sql.execution.datasources.mergetree.PartSerializer
@@ -125,12 +124,16 @@ class CHIteratorApi extends IteratorApi with Logging with 
LogLevelUtil {
   }
 
   override def genSplitInfo(
-      partition: InputPartition,
+      partitionIndex: Int,
+      partitions: Seq[Partition],
       partitionSchema: StructType,
       dataSchema: StructType,
       fileFormat: ReadFileFormat,
       metadataColumnNames: Seq[String],
       properties: Map[String, String]): SplitInfo = {
+    // todo: support multi partitions
+    assert(partitions.size == 1)
+    val partition = partitions.head
     partition match {
       case p: GlutenMergeTreePartition =>
         ExtensionTableBuilder
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala
index 9da8eed202..ddc857182f 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala
@@ -22,12 +22,12 @@ import 
org.apache.gluten.execution.{CHHashAggregateExecTransformer, WriteFilesEx
 import org.apache.gluten.expression.ConverterUtils
 import org.apache.gluten.substrait.SubstraitContext
 import org.apache.gluten.substrait.expression.{BooleanLiteralNode, 
ExpressionBuilder, ExpressionNode}
-import org.apache.gluten.utils.{CHInputPartitionsUtil, ExpressionDocUtil}
+import org.apache.gluten.utils.{CHPartitionsUtil, ExpressionDocUtil}
 
+import org.apache.spark.Partition
 import org.apache.spark.internal.Logging
 import org.apache.spark.rpc.GlutenDriverEndpoint
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
-import org.apache.spark.sql.connector.read.InputPartition
 import org.apache.spark.sql.delta.MergeTreeFileFormat
 import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
 import org.apache.spark.sql.delta.files.TahoeFileIndex
@@ -51,8 +51,7 @@ import java.util
 
 class CHTransformerApi extends TransformerApi with Logging {
 
-  /** Generate Seq[InputPartition] for FileSourceScanExecTransformer. */
-  def genInputPartitionSeq(
+  def genPartitionSeq(
       relation: HadoopFsRelation,
       requiredSchema: StructType,
       selectedPartitions: Array[PartitionDirectory],
@@ -61,7 +60,7 @@ class CHTransformerApi extends TransformerApi with Logging {
       optionalBucketSet: Option[BitSet],
       optionalNumCoalescedBuckets: Option[Int],
       disableBucketedScan: Boolean,
-      filterExprs: Seq[Expression]): Seq[InputPartition] = {
+      filterExprs: Seq[Expression]): Seq[Partition] = {
     relation.location match {
       case index: TahoeFileIndex
           if relation.fileFormat
@@ -81,7 +80,7 @@ class CHTransformerApi extends TransformerApi with Logging {
           )
       case _ =>
         // Generate FilePartition for Parquet
-        CHInputPartitionsUtil(
+        CHPartitionsUtil(
           relation,
           requiredSchema,
           selectedPartitions,
@@ -89,7 +88,7 @@ class CHTransformerApi extends TransformerApi with Logging {
           bucketedScan,
           optionalBucketSet,
           optionalNumCoalescedBuckets,
-          disableBucketedScan).genInputPartitionSeq()
+          disableBucketedScan).genPartitionSeq()
     }
   }
 
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHRangeExecTransformer.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHRangeExecTransformer.scala
index bdb716c676..fb9fc4b4d8 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHRangeExecTransformer.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHRangeExecTransformer.scala
@@ -24,8 +24,8 @@ import org.apache.gluten.substrait.SubstraitContext
 import org.apache.gluten.substrait.extensions.ExtensionBuilder
 import org.apache.gluten.substrait.rel.{RelBuilder, SplitInfo}
 
+import org.apache.spark.Partition
 import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.connector.read.InputPartition
 import org.apache.spark.sql.execution.SparkPlan
 import 
org.apache.spark.sql.execution.datasources.clickhouse.ExtensionTableBuilder
 import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
@@ -53,7 +53,7 @@ case class CHRangeExecTransformer(
     }
   }
 
-  override def getPartitions: Seq[InputPartition] = {
+  override def getPartitions: Seq[Partition] = {
     (0 until numSlices).map {
       sliceIndex => GlutenRangeExecPartition(start, end, step, numSlices, 
sliceIndex)
     }
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/GlutenMergeTreePartition.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/GlutenMergeTreePartition.scala
index 46f40fc25b..3cbce8423a 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/GlutenMergeTreePartition.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/GlutenMergeTreePartition.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.gluten.execution
 
+import org.apache.spark.Partition
 import org.apache.spark.sql.connector.read.InputPartition
 import org.apache.spark.sql.types.StructType
 
@@ -95,7 +96,8 @@ case class GlutenMergeTreePartition(
     partList: Array[MergeTreePartSplit],
     tableSchema: StructType,
     clickhouseTableConfigs: Map[String, String])
-  extends InputPartition {
+  extends Partition
+  with InputPartition {
   override def preferredLocations(): Array[String] = {
     Array.empty[String]
   }
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/NativeFileScanColumnarRDD.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/NativeFileScanColumnarRDD.scala
deleted file mode 100644
index cf0508d6cb..0000000000
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/NativeFileScanColumnarRDD.scala
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gluten.execution
-
-import org.apache.gluten.metrics.GlutenTimeMetric
-import org.apache.gluten.vectorized.{CHNativeExpressionEvaluator, 
CloseableCHColumnBatchIterator}
-
-import org.apache.spark.{Partition, SparkContext, SparkException, TaskContext}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.connector.read.InputPartition
-import org.apache.spark.sql.execution.metric.SQLMetric
-import org.apache.spark.sql.vectorized.ColumnarBatch
-
-import java.util
-import java.util.concurrent.TimeUnit.NANOSECONDS
-
-class NativeFileScanColumnarRDD(
-    @transient sc: SparkContext,
-    @transient private val inputPartitions: Seq[InputPartition],
-    numOutputRows: SQLMetric,
-    numOutputBatches: SQLMetric,
-    scanTime: SQLMetric)
-  extends RDD[ColumnarBatch](sc, Nil) {
-
-  override def compute(split: Partition, context: TaskContext): 
Iterator[ColumnarBatch] = {
-    val inputPartition = castNativePartition(split)
-
-    assert(
-      inputPartition.isInstanceOf[GlutenPartition],
-      "NativeFileScanColumnarRDD only accepts GlutenPartition.")
-
-    val splitInfoByteArray = inputPartition
-      .asInstanceOf[GlutenPartition]
-      .splitInfos
-      .map(splitInfo => splitInfo.toProtobuf.toByteArray)
-      .toArray
-
-    val resIter = GlutenTimeMetric.millis(scanTime) {
-      _ =>
-        val inBatchIters = new util.ArrayList[ColumnarNativeIterator]()
-        CHNativeExpressionEvaluator.createKernelWithBatchIterator(
-          inputPartition.plan,
-          splitInfoByteArray,
-          inBatchIters,
-          false,
-          split.index
-        )
-    }
-    TaskContext
-      .get()
-      .addTaskFailureListener(
-        (ctx, _) => {
-          if (ctx.isInterrupted()) {
-            resIter.cancel()
-          }
-        })
-    TaskContext.get().addTaskCompletionListener[Unit](_ => resIter.close())
-    val iter: Iterator[ColumnarBatch] = new Iterator[ColumnarBatch] {
-      var scanTotalTime = 0L
-      var scanTimeAdded = false
-
-      override def hasNext: Boolean = {
-        val res = GlutenTimeMetric.withNanoTime(resIter.hasNext)(t => 
scanTotalTime += t)
-        if (!res && !scanTimeAdded) {
-          scanTime += NANOSECONDS.toMillis(scanTotalTime)
-          scanTimeAdded = true
-        }
-        res
-      }
-
-      override def next(): ColumnarBatch = {
-        GlutenTimeMetric.withNanoTime {
-          val cb = resIter.next()
-          numOutputRows += cb.numRows()
-          numOutputBatches += 1
-          cb
-        }(t => scanTotalTime += t)
-      }
-    }
-    new CloseableCHColumnBatchIterator(iter)
-  }
-
-  private def castNativePartition(split: Partition): BaseGlutenPartition = 
split match {
-    case FirstZippedPartitionsPartition(_, p: BaseGlutenPartition, _) => p
-    case _ => throw new SparkException(s"[BUG] Not a NativeSubstraitPartition: 
$split")
-  }
-
-  override def getPreferredLocations(split: Partition): Seq[String] = {
-    castPartition(split).inputPartition.preferredLocations()
-  }
-
-  private def castPartition(split: Partition): FirstZippedPartitionsPartition 
= split match {
-    case p: FirstZippedPartitionsPartition => p
-    case _ => throw new SparkException(s"[BUG] Not a NativeSubstraitPartition: 
$split")
-  }
-
-  override protected def getPartitions: Array[Partition] = {
-    inputPartitions.zipWithIndex.map {
-      case (inputPartition, index) => FirstZippedPartitionsPartition(index, 
inputPartition)
-    }.toArray
-  }
-}
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHInputPartitionsUtil.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHPartitionsUtil.scala
similarity index 94%
rename from 
backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHInputPartitionsUtil.scala
rename to 
backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHPartitionsUtil.scala
index 39f86429e2..6eaab68158 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHInputPartitionsUtil.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHPartitionsUtil.scala
@@ -19,9 +19,9 @@ package org.apache.gluten.utils
 import org.apache.gluten.backendsapi.clickhouse.CHBackendSettings
 import org.apache.gluten.sql.shims.SparkShimLoader
 
+import org.apache.spark.Partition
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.connector.read.InputPartition
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.SparkResourceUtil
@@ -31,7 +31,7 @@ import org.apache.hadoop.fs.Path
 
 import scala.collection.mutable.ArrayBuffer
 
-case class CHInputPartitionsUtil(
+case class CHPartitionsUtil(
     relation: HadoopFsRelation,
     requiredSchema: StructType,
     selectedPartitions: Array[PartitionDirectory],
@@ -42,15 +42,15 @@ case class CHInputPartitionsUtil(
     disableBucketedScan: Boolean)
   extends Logging {
 
-  def genInputPartitionSeq(): Seq[InputPartition] = {
+  def genPartitionSeq(): Seq[Partition] = {
     if (bucketedScan) {
-      genBucketedInputPartitionSeq()
+      genBucketedPartitionSeq()
     } else {
-      genNonBuckedInputPartitionSeq()
+      genNonBuckedPartitionSeq()
     }
   }
 
-  private def genNonBuckedInputPartitionSeq(): Seq[InputPartition] = {
+  private def genNonBuckedPartitionSeq(): Seq[Partition] = {
     val maxSplitBytes =
       FilePartition.maxSplitBytes(relation.sparkSession, selectedPartitions)
 
@@ -111,7 +111,7 @@ case class CHInputPartitionsUtil(
     }
   }
 
-  private def genBucketedInputPartitionSeq(): Seq[InputPartition] = {
+  private def genBucketedPartitionSeq(): Seq[Partition] = {
     val bucketSpec = relation.bucketSpec.get
     logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
     val filesGroupedToBuckets =
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreePartsPartitionsUtil.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreePartsPartitionsUtil.scala
index 749de3f9d4..96caf5b790 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreePartsPartitionsUtil.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreePartsPartitionsUtil.scala
@@ -26,12 +26,12 @@ import org.apache.gluten.substrait.SubstraitContext
 import org.apache.gluten.substrait.extensions.ExtensionBuilder
 import org.apache.gluten.substrait.rel.RelBuilder
 
+import org.apache.spark.Partition
 import org.apache.spark.affinity.CHAffinity
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.connector.read.InputPartition
 import org.apache.spark.sql.delta.ClickhouseSnapshot
 import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
 import org.apache.spark.sql.delta.files.TahoeFileIndex
@@ -69,7 +69,7 @@ object MergeTreePartsPartitionsUtil extends Logging {
       optionalBucketSet: Option[BitSet],
       optionalNumCoalescedBuckets: Option[Int],
       disableBucketedScan: Boolean,
-      filterExprs: Seq[Expression]): Seq[InputPartition] = {
+      filterExprs: Seq[Expression]): Seq[Partition] = {
     if (
       !relation.location.isInstanceOf[TahoeFileIndex] || !relation.fileFormat
         .isInstanceOf[DeltaMergeTreeFileFormat]
@@ -82,7 +82,7 @@ object MergeTreePartsPartitionsUtil extends Logging {
     val snapshotId =
       
ClickhouseSnapshot.genSnapshotId(table.deltaLog.update(stalenessAcceptable = 
true))
 
-    val partitions = new ArrayBuffer[InputPartition]
+    val partitions = new ArrayBuffer[Partition]
     val (database, tableName) = if (table.catalogTable.isDefined) {
       (table.catalogTable.get.identifier.database.get, 
table.catalogTable.get.identifier.table)
     } else {
@@ -95,7 +95,7 @@ object MergeTreePartsPartitionsUtil extends Logging {
 
     // bucket table
     if (table.bucketOption.isDefined && bucketedScan) {
-      genBucketedInputPartitionSeq(
+      genBucketedPartitionSeq(
         engine,
         database,
         tableName,
@@ -115,7 +115,7 @@ object MergeTreePartsPartitionsUtil extends Logging {
         sparkSession
       )
     } else {
-      genInputPartitionSeq(
+      genPartitionSeq(
         relation,
         engine,
         database,
@@ -137,7 +137,7 @@ object MergeTreePartsPartitionsUtil extends Logging {
     partitions.toSeq
   }
 
-  def genInputPartitionSeq(
+  def genPartitionSeq(
       relation: HadoopFsRelation,
       engine: String,
       database: String,
@@ -148,7 +148,7 @@ object MergeTreePartsPartitionsUtil extends Logging {
       optionalBucketSet: Option[BitSet],
       selectedPartitions: Array[PartitionDirectory],
       tableSchema: StructType,
-      partitions: ArrayBuffer[InputPartition],
+      partitions: ArrayBuffer[Partition],
       table: ClickHouseTableV2,
       clickhouseTableConfigs: Map[String, String],
       output: Seq[Attribute],
@@ -316,7 +316,7 @@ object MergeTreePartsPartitionsUtil extends Logging {
       relativeTablePath: String,
       absoluteTablePath: String,
       tableSchema: StructType,
-      partitions: ArrayBuffer[InputPartition],
+      partitions: ArrayBuffer[Partition],
       table: ClickHouseTableV2,
       clickhouseTableConfigs: Map[String, String],
       splitFiles: Seq[MergeTreePartSplit],
@@ -372,7 +372,7 @@ object MergeTreePartsPartitionsUtil extends Logging {
       relativeTablePath: String,
       absoluteTablePath: String,
       tableSchema: StructType,
-      partitions: ArrayBuffer[InputPartition],
+      partitions: ArrayBuffer[Partition],
       table: ClickHouseTableV2,
       clickhouseTableConfigs: Map[String, String],
       splitFiles: Seq[MergeTreePartSplit],
@@ -441,7 +441,7 @@ object MergeTreePartsPartitionsUtil extends Logging {
   }
 
   /** Generate bucket partition */
-  def genBucketedInputPartitionSeq(
+  def genBucketedPartitionSeq(
       engine: String,
       database: String,
       tableName: String,
@@ -453,7 +453,7 @@ object MergeTreePartsPartitionsUtil extends Logging {
       optionalNumCoalescedBuckets: Option[Int],
       selectedPartitions: Array[PartitionDirectory],
       tableSchema: StructType,
-      partitions: ArrayBuffer[InputPartition],
+      partitions: ArrayBuffer[Partition],
       table: ClickHouseTableV2,
       clickhouseTableConfigs: Map[String, String],
       output: Seq[Attribute],
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickhouseMergetreeSoftAffinitySuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickhouseMergetreeSoftAffinitySuite.scala
index 8a8c2e321c..fe1ca1e6f6 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickhouseMergetreeSoftAffinitySuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickhouseMergetreeSoftAffinitySuite.scala
@@ -19,8 +19,7 @@ package org.apache.gluten.execution.mergetree
 import org.apache.gluten.affinity.{CHUTAffinity, CHUTSoftAffinityManager}
 import 
org.apache.gluten.execution.{GlutenClickHouseWholeStageTransformerSuite, 
GlutenMergeTreePartition, MergeTreePartSplit}
 
-import org.apache.spark.SparkConf
-import org.apache.spark.sql.connector.read.InputPartition
+import org.apache.spark.{Partition, SparkConf}
 import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
 import 
org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreePartsPartitionsUtil
 import org.apache.spark.sql.types.StructType
@@ -61,7 +60,7 @@ class GlutenClickhouseMergetreeSoftAffinitySuite
 
   test("Soft Affinity Scheduler with duplicate reading detection") {
 
-    val partitions: ArrayBuffer[InputPartition] = new 
ArrayBuffer[InputPartition]()
+    val partitions: ArrayBuffer[Partition] = new ArrayBuffer[Partition]()
     var splitFiles: Seq[MergeTreePartSplit] = Seq()
     val relativeTablePath = "tmp/"
 
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
index 4afad58c9a..9bb670f10e 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
@@ -28,12 +28,11 @@ import org.apache.gluten.substrait.rel.{LocalFilesBuilder, 
LocalFilesNode, Split
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 import org.apache.gluten.vectorized._
 
-import org.apache.spark.{SparkConf, TaskContext}
+import org.apache.spark.{Partition, SparkConf, TaskContext}
 import org.apache.spark.internal.Logging
 import org.apache.spark.softaffinity.SoftAffinity
 import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
 import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
-import org.apache.spark.sql.connector.read.InputPartition
 import org.apache.spark.sql.execution.datasources.{FilePartition, 
PartitionedFile}
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.types._
@@ -66,67 +65,21 @@ class VeloxIteratorApi extends IteratorApi with Logging {
   }
 
   override def genSplitInfo(
-      partition: InputPartition,
-      partitionSchema: StructType,
-      dataSchema: StructType,
-      fileFormat: ReadFileFormat,
-      metadataColumnNames: Seq[String],
-      properties: Map[String, String]): SplitInfo = {
-    partition match {
-      case f: FilePartition =>
-        val (
-          paths,
-          starts,
-          lengths,
-          fileSizes,
-          modificationTimes,
-          partitionColumns,
-          metadataColumns,
-          otherMetadataColumns) =
-          constructSplitInfo(partitionSchema, f.files, metadataColumnNames)
-        val preferredLocations =
-          SoftAffinity.getFilePartitionLocations(f)
-        setFileSchemaForLocalFiles(
-          LocalFilesBuilder.makeLocalFiles(
-            f.index,
-            paths,
-            starts,
-            lengths,
-            fileSizes,
-            modificationTimes,
-            partitionColumns,
-            metadataColumns,
-            fileFormat,
-            preferredLocations.toList.asJava,
-            mapAsJavaMap(properties),
-            otherMetadataColumns
-          ),
-          dataSchema,
-          fileFormat
-        )
-      case _ =>
-        throw new UnsupportedOperationException(s"Unsupported input 
partition.")
-    }
-  }
-
-  override def genSplitInfoForPartitions(
       partitionIndex: Int,
-      partitions: Seq[InputPartition],
+      partitions: Seq[Partition],
       partitionSchema: StructType,
       dataSchema: StructType,
       fileFormat: ReadFileFormat,
       metadataColumnNames: Seq[String],
       properties: Map[String, String]): SplitInfo = {
-    val partitionFiles = partitions.flatMap {
-      p =>
-        if (!p.isInstanceOf[FilePartition]) {
-          throw new UnsupportedOperationException(
-            s"Unsupported input partition ${p.getClass.getName}.")
-        }
-        p.asInstanceOf[FilePartition].files
-    }.toArray
-    val locations =
-      partitions.flatMap(p => 
SoftAffinity.getFilePartitionLocations(p.asInstanceOf[FilePartition]))
+    val filePartitions: Seq[FilePartition] = partitions.map {
+      case p: FilePartition => p
+      case o =>
+        throw new UnsupportedOperationException(
+          s"Unsupported input partition: ${o.getClass.getName}")
+    }
+    val partitionFiles = filePartitions.flatMap(_.files).toArray
+    val locations = filePartitions.flatMap(p => 
SoftAffinity.getFilePartitionLocations(p))
     val (
       paths,
       starts,
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala
index 76a1a0e3a7..5222265bea 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala
@@ -25,12 +25,12 @@ import org.apache.gluten.proto.ConfigMap
 import org.apache.gluten.runtime.Runtimes
 import org.apache.gluten.substrait.SubstraitContext
 import org.apache.gluten.substrait.expression.{ExpressionBuilder, 
ExpressionNode}
-import org.apache.gluten.utils.InputPartitionsUtil
+import org.apache.gluten.utils.PartitionsUtil
 import org.apache.gluten.vectorized.PlanEvaluatorJniWrapper
 
+import org.apache.spark.Partition
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
-import org.apache.spark.sql.connector.read.InputPartition
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
PartitionDirectory}
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.hive.execution.HiveFileFormat
@@ -44,8 +44,7 @@ import java.util.{Map => JMap}
 
 class VeloxTransformerApi extends TransformerApi with Logging {
 
-  /** Generate Seq[InputPartition] for FileSourceScanExecTransformer. */
-  def genInputPartitionSeq(
+  def genPartitionSeq(
       relation: HadoopFsRelation,
       requiredSchema: StructType,
       selectedPartitions: Array[PartitionDirectory],
@@ -54,8 +53,8 @@ class VeloxTransformerApi extends TransformerApi with Logging 
{
       optionalBucketSet: Option[BitSet],
       optionalNumCoalescedBuckets: Option[Int],
       disableBucketedScan: Boolean,
-      filterExprs: Seq[Expression] = Seq.empty): Seq[InputPartition] = {
-    InputPartitionsUtil(
+      filterExprs: Seq[Expression] = Seq.empty): Seq[Partition] = {
+    PartitionsUtil(
       relation,
       requiredSchema,
       selectedPartitions,
@@ -64,7 +63,7 @@ class VeloxTransformerApi extends TransformerApi with Logging 
{
       optionalBucketSet,
       optionalNumCoalescedBuckets,
       disableBucketedScan)
-      .genInputPartitionSeq()
+      .genPartitionSeq()
   }
 
   override def postProcessNativeConfig(
diff --git 
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
 
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
index 4cdc51fe2a..7bfa522a48 100644
--- 
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
+++ 
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
@@ -17,16 +17,18 @@
 package org.apache.gluten.execution
 
 import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.exception.GlutenNotSupportException
 import 
org.apache.gluten.execution.IcebergScanTransformer.{containsMetadataColumn, 
containsUuidOrFixedType}
 import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.gluten.substrait.rel.{LocalFilesNode, SplitInfo}
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 
+import org.apache.spark.Partition
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
DynamicPruningExpression, Expression, Literal}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.connector.catalog.Table
-import org.apache.spark.sql.connector.read.{InputPartition, Scan}
+import org.apache.spark.sql.connector.read.Scan
 import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
 import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.types.{ArrayType, DataType, StructType}
@@ -137,30 +139,11 @@ case class IcebergScanTransformer(
 
   override lazy val fileFormat: ReadFileFormat = 
GlutenIcebergSourceUtil.getFileFormat(scan)
 
-  override def getSplitInfosWithIndex: Seq[SplitInfo] = {
-    val splitInfos = getPartitionsWithIndex.zipWithIndex.map {
-      case (partitions, index) =>
-        GlutenIcebergSourceUtil.genSplitInfo(partitions, index, 
getPartitionSchema)
-    }
-    numSplits.add(splitInfos.map(s => 
s.asInstanceOf[LocalFilesNode].getPaths.size()).sum)
-    splitInfos
-  }
-
-  override def getSplitInfosFromPartitions(partitions: Seq[InputPartition]): 
Seq[SplitInfo] = {
-    val groupedPartitions = SparkShimLoader.getSparkShims
-      .orderPartitions(
-        this,
-        scan,
-        keyGroupedPartitioning,
-        filteredPartitions,
-        outputPartitioning,
-        commonPartitionValues,
-        applyPartialClustering,
-        replicatePartitions)
-      .flatten
-    val splitInfos = groupedPartitions.zipWithIndex.map {
-      case (p, index) =>
-        GlutenIcebergSourceUtil.genSplitInfoForPartition(p, index, 
getPartitionSchema)
+  override def getSplitInfosFromPartitions(partitions: Seq[Partition]): 
Seq[SplitInfo] = {
+    val splitInfos = partitions.map {
+      case p: SparkDataSourceRDDPartition =>
+        GlutenIcebergSourceUtil.genSplitInfo(p, getPartitionSchema)
+      case _ => throw new GlutenNotSupportException()
     }
     numSplits.add(splitInfos.map(s => 
s.asInstanceOf[LocalFilesNode].getPaths.size()).sum)
     splitInfos
diff --git 
a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala
 
b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala
index 3816499a97..2436166e92 100644
--- 
a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala
+++ 
b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala
@@ -18,12 +18,13 @@ package org.apache.iceberg.spark.source
 
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.exception.GlutenNotSupportException
+import org.apache.gluten.execution.SparkDataSourceRDDPartition
 import org.apache.gluten.substrait.rel.{IcebergLocalFilesBuilder, SplitInfo}
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 
 import org.apache.spark.softaffinity.SoftAffinity
 import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
-import org.apache.spark.sql.connector.read.{InputPartition, Scan}
+import org.apache.spark.sql.connector.read.Scan
 import org.apache.spark.sql.types.StructType
 
 import org.apache.iceberg._
@@ -36,67 +37,17 @@ import scala.collection.JavaConverters._
 
 object GlutenIcebergSourceUtil {
 
-  def genSplitInfoForPartition(
-      inputPartition: InputPartition,
-      index: Int,
-      readPartitionSchema: StructType): SplitInfo = inputPartition match {
-    case partition: SparkInputPartition =>
-      val paths = new JArrayList[String]()
-      val starts = new JArrayList[JLong]()
-      val lengths = new JArrayList[JLong]()
-      val partitionColumns = new JArrayList[JMap[String, String]]()
-      val deleteFilesList = new JArrayList[JList[DeleteFile]]()
-      var fileFormat = ReadFileFormat.UnknownFormat
-
-      val tasks = partition.taskGroup[ScanTask]().tasks().asScala
-      asFileScanTask(tasks.toList).foreach {
-        task =>
-          paths.add(
-            BackendsApiManager.getTransformerApiInstance
-              .encodeFilePathIfNeed(task.file().path().toString))
-          starts.add(task.start())
-          lengths.add(task.length())
-          partitionColumns.add(getPartitionColumns(task, readPartitionSchema))
-          deleteFilesList.add(task.deletes())
-          val currentFileFormat = convertFileFormat(task.file().format())
-          if (fileFormat == ReadFileFormat.UnknownFormat) {
-            fileFormat = currentFileFormat
-          } else if (fileFormat != currentFileFormat) {
-            throw new UnsupportedOperationException(
-              s"Only one file format is supported, " +
-                s"find different file format $fileFormat and 
$currentFileFormat")
-          }
-      }
-      val preferredLoc = SoftAffinity.getFilePartitionLocations(
-        paths.asScala.toArray,
-        inputPartition.preferredLocations())
-      IcebergLocalFilesBuilder.makeIcebergLocalFiles(
-        index,
-        paths,
-        starts,
-        lengths,
-        partitionColumns,
-        fileFormat,
-        preferredLoc.toList.asJava,
-        deleteFilesList
-      )
-    case _ =>
-      throw new UnsupportedOperationException("Only support iceberg 
SparkInputPartition.")
-  }
-
   def genSplitInfo(
-      inputPartitions: Seq[InputPartition],
-      index: Int,
+      partition: SparkDataSourceRDDPartition,
       readPartitionSchema: StructType): SplitInfo = {
     val paths = new JArrayList[String]()
     val starts = new JArrayList[JLong]()
     val lengths = new JArrayList[JLong]()
     val partitionColumns = new JArrayList[JMap[String, String]]()
     val deleteFilesList = new JArrayList[JList[DeleteFile]]()
-    val preferredLocs = new JArrayList[String]()
     var fileFormat = ReadFileFormat.UnknownFormat
 
-    inputPartitions.foreach {
+    partition.inputPartitions.foreach {
       case partition: SparkInputPartition =>
         val tasks = partition.taskGroup[ScanTask]().tasks().asScala
         asFileScanTask(tasks.toList).foreach {
@@ -117,17 +68,18 @@ object GlutenIcebergSourceUtil {
                   s"find different file format $fileFormat and 
$currentFileFormat")
             }
         }
-        preferredLocs.addAll(partition.preferredLocations().toList.asJava)
+      case o =>
+        throw new GlutenNotSupportException(s"Unsupported input partition 
type: $o")
     }
     IcebergLocalFilesBuilder.makeIcebergLocalFiles(
-      index,
+      partition.index,
       paths,
       starts,
       lengths,
       partitionColumns,
       fileFormat,
       SoftAffinity
-        .getFilePartitionLocations(paths.asScala.toArray, 
preferredLocs.asScala.toArray)
+        .getFilePartitionLocations(paths.asScala.toArray, 
partition.preferredLocations())
         .toList
         .asJava,
       deleteFilesList
diff --git 
a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/IcebergSuite.scala 
b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/IcebergSuite.scala
index 296db20829..890c408658 100644
--- 
a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/IcebergSuite.scala
+++ 
b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/IcebergSuite.scala
@@ -271,7 +271,7 @@ abstract class IcebergSuite extends 
WholeStageTransformerSuite {
               getExecutedPlan(df).map {
                 case plan: IcebergScanTransformer =>
                   assert(plan.getKeyGroupPartitioning.isDefined)
-                  assert(plan.getSplitInfosWithIndex.length == 3)
+                  assert(plan.getSplitInfos.length == 3)
                 case _ => // do nothing
               }
             }
@@ -348,7 +348,7 @@ abstract class IcebergSuite extends 
WholeStageTransformerSuite {
               getExecutedPlan(df).map {
                 case plan: IcebergScanTransformer =>
                   assert(plan.getKeyGroupPartitioning.isDefined)
-                  assert(plan.getSplitInfosWithIndex.length == 3)
+                  assert(plan.getSplitInfos.length == 3)
                 case _ => // do nothing
               }
             }
diff --git 
a/gluten-kafka/src/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala
 
b/gluten-kafka/src/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala
index f644322764..2ab458f6f7 100644
--- 
a/gluten-kafka/src/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala
+++ 
b/gluten-kafka/src/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala
@@ -20,6 +20,7 @@ import org.apache.gluten.substrait.SubstraitContext
 import org.apache.gluten.substrait.rel.{ReadRelNode, SplitInfo}
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 
+import org.apache.spark.Partition
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
 import org.apache.spark.sql.connector.catalog.Table
@@ -67,7 +68,10 @@ case class MicroBatchScanExecTransformer(
 
   override def getMetadataColumns(): Seq[AttributeReference] = Seq.empty
 
-  override def getPartitions: Seq[InputPartition] = inputPartitionsShim
+  // todo: consider grouped partitions
+  override def getPartitions: Seq[Partition] = 
inputPartitionsShim.zipWithIndex.map {
+    case (inputPartition, index) => new SparkDataSourceRDDPartition(index, 
Seq(inputPartition))
+  }
 
   /** Returns the actual schema of this data source scan. */
   override def getDataSchema: StructType = scan.readSchema()
@@ -80,7 +84,7 @@ case class MicroBatchScanExecTransformer(
     MicroBatchScanExecTransformer.supportsBatchScan(scan)
   }
 
-  override def getSplitInfosFromPartitions(partitions: Seq[InputPartition]): 
Seq[SplitInfo] = {
+  override def getSplitInfosFromPartitions(partitions: Seq[Partition]): 
Seq[SplitInfo] = {
     val groupedPartitions = filteredPartitions.flatten
     groupedPartitions.zipWithIndex.map {
       case (p, _) => GlutenStreamKafkaSourceUtil.genSplitInfo(p)
diff --git 
a/gluten-paimon/src-paimon/main/scala/org/apache/gluten/execution/PaimonScanTransformer.scala
 
b/gluten-paimon/src-paimon/main/scala/org/apache/gluten/execution/PaimonScanTransformer.scala
index 31eb197091..a4911c8263 100644
--- 
a/gluten-paimon/src-paimon/main/scala/org/apache/gluten/execution/PaimonScanTransformer.scala
+++ 
b/gluten-paimon/src-paimon/main/scala/org/apache/gluten/execution/PaimonScanTransformer.scala
@@ -21,6 +21,7 @@ import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.gluten.substrait.rel.{PaimonLocalFilesBuilder, SplitInfo}
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 
+import org.apache.spark.Partition
 import org.apache.spark.rdd.RDD
 import org.apache.spark.softaffinity.SoftAffinity
 import org.apache.spark.sql.catalyst.InternalRow
@@ -28,7 +29,7 @@ import 
org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
DynamicPruningExpression, Expression, Literal}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.connector.catalog.Table
-import org.apache.spark.sql.connector.read.{InputPartition, Scan}
+import org.apache.spark.sql.connector.read.Scan
 import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.vectorized.ColumnarBatch
@@ -119,16 +120,17 @@ case class PaimonScanTransformer(
 
   override def doExecuteColumnar(): RDD[ColumnarBatch] = throw new 
UnsupportedOperationException()
 
-  override def getSplitInfosFromPartitions(partitions: Seq[InputPartition]): 
Seq[SplitInfo] = {
+  override def getSplitInfosFromPartitions(partitions: Seq[Partition]): 
Seq[SplitInfo] = {
     val partitionComputer = 
PaimonScanTransformer.getRowDataPartitionComputer(scan)
-    getPartitions.zipWithIndex.map {
-      case (p, index) =>
-        p match {
+    partitions.map {
+      case p: SparkDataSourceRDDPartition =>
+        val paths = mutable.ListBuffer.empty[String]
+        val starts = mutable.ListBuffer.empty[JLong]
+        val lengths = mutable.ListBuffer.empty[JLong]
+        val partitionColumns = mutable.ListBuffer.empty[JMap[String, String]]
+
+        p.inputPartitions.foreach {
           case partition: PaimonInputPartition =>
-            val paths = mutable.ListBuffer.empty[String]
-            val starts = mutable.ListBuffer.empty[JLong]
-            val lengths = mutable.ListBuffer.empty[JLong]
-            val partitionColumns = mutable.ListBuffer.empty[JMap[String, 
String]]
             partition.splits.foreach {
               split =>
                 val rawFilesOpt = split.convertToRawFiles()
@@ -145,21 +147,24 @@ case class PaimonScanTransformer(
                     "Cannot get raw files from paimon SparkInputPartition.")
                 }
             }
-            val preferredLoc =
-              SoftAffinity.getFilePartitionLocations(paths.toArray, 
partition.preferredLocations())
-            PaimonLocalFilesBuilder.makePaimonLocalFiles(
-              index,
-              paths.asJava,
-              starts.asJava,
-              lengths.asJava,
-              partitionColumns.asJava,
-              fileFormat,
-              preferredLoc.toList.asJava,
-              new JHashMap[String, String]()
-            )
-          case _ =>
-            throw new GlutenNotSupportException("Only support paimon 
SparkInputPartition.")
+          case o =>
+            throw new GlutenNotSupportException(s"Unsupported input partition 
type: $o")
         }
+
+        PaimonLocalFilesBuilder.makePaimonLocalFiles(
+          p.index,
+          paths.asJava,
+          starts.asJava,
+          lengths.asJava,
+          partitionColumns.asJava,
+          fileFormat,
+          SoftAffinity
+            .getFilePartitionLocations(paths.toArray, p.preferredLocations())
+            .toList
+            .asJava,
+          new JHashMap[String, String]()
+        )
+      case _ => throw new GlutenNotSupportException()
     }
   }
 
diff --git 
a/gluten-paimon/src-paimon/test/scala/org/apache/gluten/execution/PaimonSuite.scala
 
b/gluten-paimon/src-paimon/test/scala/org/apache/gluten/execution/PaimonSuite.scala
index 91493283a0..fddac2123d 100644
--- 
a/gluten-paimon/src-paimon/test/scala/org/apache/gluten/execution/PaimonSuite.scala
+++ 
b/gluten-paimon/src-paimon/test/scala/org/apache/gluten/execution/PaimonSuite.scala
@@ -94,4 +94,21 @@ abstract class PaimonSuite extends 
WholeStageTransformerSuite {
       }
     }
   }
+
+  test("paimon transformer exists with bucket table") {
+    withTable(s"paimon_tbl") {
+      sql(s"""
+             |CREATE TABLE paimon_tbl (id INT, name STRING)
+             |USING paimon
+             |TBLPROPERTIES (
+             | 'bucket' = '1',
+             | 'bucket-key' = 'id'
+             |)
+             |""".stripMargin)
+      sql(s"INSERT INTO paimon_tbl VALUES (1, 'Bob'), (2, 'Blue'), (3, 
'Mike')")
+      runQueryAndCompare("SELECT * FROM paimon_tbl") {
+        checkGlutenOperatorMatch[PaimonScanTransformer]
+      }
+    }
+  }
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
index 34f7f41b78..52be6c4954 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
@@ -23,7 +23,6 @@ import 
org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 import org.apache.gluten.substrait.rel.SplitInfo
 
 import org.apache.spark._
-import org.apache.spark.sql.connector.read.InputPartition
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
@@ -32,21 +31,13 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
 trait IteratorApi {
 
   def genSplitInfo(
-      partition: InputPartition,
-      partitionSchema: StructType,
-      dataSchema: StructType,
-      fileFormat: ReadFileFormat,
-      metadataColumnNames: Seq[String],
-      properties: Map[String, String]): SplitInfo
-
-  def genSplitInfoForPartitions(
       partitionIndex: Int,
-      partition: Seq[InputPartition],
+      partition: Seq[Partition],
       partitionSchema: StructType,
       dataSchema: StructType,
       fileFormat: ReadFileFormat,
       metadataColumnNames: Seq[String],
-      properties: Map[String, String]): SplitInfo = throw new 
UnsupportedOperationException()
+      properties: Map[String, String]): SplitInfo
 
   /** Generate native row partition. */
   def genPartitions(
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/TransformerApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/TransformerApi.scala
index 92d6ebd325..a855537664 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/TransformerApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/TransformerApi.scala
@@ -20,8 +20,8 @@ import org.apache.gluten.execution.WriteFilesExecTransformer
 import org.apache.gluten.substrait.SubstraitContext
 import org.apache.gluten.substrait.expression.ExpressionNode
 
+import org.apache.spark.Partition
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
-import org.apache.spark.sql.connector.read.InputPartition
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
PartitionDirectory}
 import org.apache.spark.sql.types.{DataType, DecimalType, StructType}
@@ -33,8 +33,8 @@ import java.util
 
 trait TransformerApi {
 
-  /** Generate Seq[InputPartition] for FileSourceScanExecTransformer. */
-  def genInputPartitionSeq(
+  /** Generate Seq[Partition] for FileSourceScanExecTransformer. */
+  def genPartitionSeq(
       relation: HadoopFsRelation,
       requiredSchema: StructType,
       selectedPartitions: Array[PartitionDirectory],
@@ -43,7 +43,7 @@ trait TransformerApi {
       optionalBucketSet: Option[BitSet],
       optionalNumCoalescedBuckets: Option[Int],
       disableBucketedScan: Boolean,
-      filterExprs: Seq[Expression] = Seq.empty): Seq[InputPartition]
+      filterExprs: Seq[Expression] = Seq.empty): Seq[Partition]
 
   /**
    * Post-process native config, For example, for ClickHouse backend, sync 
'spark.executor.cores' to
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
index a8524d53f2..86b76e7a25 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
@@ -25,8 +25,8 @@ import org.apache.gluten.substrait.extensions.ExtensionBuilder
 import org.apache.gluten.substrait.rel.{RelBuilder, SplitInfo}
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 
+import org.apache.spark.Partition
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.connector.read.InputPartition
 import org.apache.spark.sql.types.{StringType, StructField, StructType}
 
 import com.google.protobuf.StringValue
@@ -56,21 +56,27 @@ trait BasicScanExecTransformer extends LeafTransformSupport 
with BaseDataSource
   /** Returns the file format properties. */
   def getProperties: Map[String, String] = Map.empty
 
-  /** Returns the split infos that will be processed by the underlying native 
engine. */
   override def getSplitInfos: Seq[SplitInfo] = {
     getSplitInfosFromPartitions(getPartitions)
   }
 
-  def getSplitInfosFromPartitions(partitions: Seq[InputPartition]): 
Seq[SplitInfo] = {
+  def getSplitInfosFromPartitions(partitions: Seq[Partition]): Seq[SplitInfo] 
= {
     partitions.map(
-      BackendsApiManager.getIteratorApiInstance
-        .genSplitInfo(
-          _,
-          getPartitionSchema,
-          getDataSchema,
-          fileFormat,
-          getMetadataColumns().map(_.name),
-          getProperties))
+      p => {
+        val ps = p match {
+          case sp: SparkDataSourceRDDPartition => 
sp.inputPartitions.map(_.asInstanceOf[Partition])
+          case o => Seq(o)
+        }
+        BackendsApiManager.getIteratorApiInstance
+          .genSplitInfo(
+            p.index,
+            ps,
+            getPartitionSchema,
+            getDataSchema,
+            fileFormat,
+            getMetadataColumns().map(_.name),
+            getProperties)
+      })
   }
 
   override protected def doValidateInternal(): ValidationResult = {
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
index 03a840cccd..0b0dc7c52f 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
@@ -21,15 +21,15 @@ import org.apache.gluten.expression.ExpressionConverter
 import org.apache.gluten.metrics.MetricsUpdater
 import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
-import org.apache.gluten.substrait.rel.SplitInfo
 import org.apache.gluten.utils.FileIndexUtil
 
+import org.apache.spark.Partition
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.connector.catalog.Table
-import org.apache.spark.sql.connector.read.{InputPartition, Scan}
+import org.apache.spark.sql.connector.read.Scan
 import org.apache.spark.sql.execution.datasources.v2.{BatchScanExecShim, 
FileScan}
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.types.StructType
@@ -128,26 +128,7 @@ abstract class BatchScanExecTransformerBase(
 
   override def getMetadataColumns(): Seq[AttributeReference] = Seq.empty
 
-  // With storage partition join, the return partition type is changed, so as 
SplitInfo
-  def getPartitionsWithIndex: Seq[Seq[InputPartition]] = finalPartitions
-
-  def getSplitInfosWithIndex: Seq[SplitInfo] = {
-    getPartitionsWithIndex.zipWithIndex.map {
-      case (partitions, index) =>
-        BackendsApiManager.getIteratorApiInstance
-          .genSplitInfoForPartitions(
-            index,
-            partitions,
-            getPartitionSchema,
-            getDataSchema,
-            fileFormat,
-            getMetadataColumns().map(_.name),
-            getProperties)
-    }
-  }
-
-  // May cannot call for bucket scan
-  override def getPartitions: Seq[InputPartition] = filteredFlattenPartitions
+  def getPartitions: Seq[Partition] = finalPartitions
 
   override def getPartitionSchema: StructType = scan match {
     case fileScan: FileScan => fileScan.readPartitionSchema
@@ -195,19 +176,21 @@ abstract class BatchScanExecTransformerBase(
   override def metricsUpdater(): MetricsUpdater =
     
BackendsApiManager.getMetricsApiInstance.genBatchScanTransformerMetricsUpdater(metrics)
 
-  @transient protected lazy val filteredFlattenPartitions: Seq[InputPartition] 
=
-    filteredPartitions.flatten
-
-  @transient protected lazy val finalPartitions: Seq[Seq[InputPartition]] =
-    SparkShimLoader.getSparkShims.orderPartitions(
-      this,
-      scan,
-      keyGroupedPartitioning,
-      filteredPartitions,
-      outputPartitioning,
-      commonPartitionValues,
-      applyPartialClustering,
-      replicatePartitions)
+  @transient protected lazy val finalPartitions: Seq[Partition] =
+    SparkShimLoader.getSparkShims
+      .orderPartitions(
+        this,
+        scan,
+        keyGroupedPartitioning,
+        filteredPartitions,
+        outputPartitioning,
+        commonPartitionValues,
+        applyPartialClustering,
+        replicatePartitions)
+      .zipWithIndex
+      .map {
+        case (inputPartitions, index) => new 
SparkDataSourceRDDPartition(index, inputPartitions)
+      }
 
   @transient override lazy val fileFormat: ReadFileFormat =
     BackendsApiManager.getSettings.getSubstraitReadFileFormatV2(scan)
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
index 61c94747bc..0584922bca 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
@@ -23,11 +23,11 @@ import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 import org.apache.gluten.utils.FileIndexUtil
 
+import org.apache.spark.Partition
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, Expression, PlanExpression}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.util.truncatedString
-import org.apache.spark.sql.connector.read.InputPartition
 import org.apache.spark.sql.connector.read.streaming.SparkDataStream
 import org.apache.spark.sql.execution.FileSourceScanExecShim
 import org.apache.spark.sql.execution.datasources.HadoopFsRelation
@@ -123,18 +123,19 @@ abstract class FileSourceScanExecTransformerBase(
 
   override def getMetadataColumns(): Seq[AttributeReference] = metadataColumns
 
-  override def getPartitions: Seq[InputPartition] = {
-    BackendsApiManager.getTransformerApiInstance.genInputPartitionSeq(
-      relation,
-      requiredSchema,
-      getPartitionArray(),
-      output,
-      bucketedScan,
-      optionalBucketSet,
-      optionalNumCoalescedBuckets,
-      disableBucketedScan,
-      filterExprs()
-    )
+  override def getPartitions: Seq[Partition] = {
+    BackendsApiManager.getTransformerApiInstance
+      .genPartitionSeq(
+        relation,
+        requiredSchema,
+        getPartitionArray(),
+        output,
+        bucketedScan,
+        optionalBucketSet,
+        optionalNumCoalescedBuckets,
+        disableBucketedScan,
+        filterExprs()
+      )
   }
 
   override def getPartitionSchema: StructType = relation.partitionSchema
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala
index 263d56720c..4a00dbb587 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala
@@ -47,13 +47,13 @@ case class GlutenPartition(
 
 case class FirstZippedPartitionsPartition(
     index: Int,
-    inputPartition: InputPartition,
+    inputPartition: Partition,
     inputColumnarRDDPartitions: Seq[Partition] = Seq.empty)
   extends Partition
 
 class GlutenWholeStageColumnarRDD(
     @transient sc: SparkContext,
-    @transient private val inputPartitions: Seq[InputPartition],
+    @transient private val inputPartitions: Seq[Partition],
     var rdds: ColumnarInputRDDsWrapper,
     pipelineTime: SQLMetric,
     updateInputMetrics: InputMetricsWrapper => Unit,
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/SparkDataSourceRDDPartition.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/SparkDataSourceRDDPartition.scala
new file mode 100644
index 0000000000..e3313c9cd2
--- /dev/null
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/SparkDataSourceRDDPartition.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.spark.Partition
+import org.apache.spark.sql.connector.read.InputPartition
+
+/**
+ * Copy from spark's 
[[org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition]] to
+ * make compatible with spark3.3 and before.
+ */
+class SparkDataSourceRDDPartition(val index: Int, val inputPartitions: 
Seq[InputPartition])
+  extends Partition
+  with Serializable
+  with InputPartition {
+
+  override def preferredLocations(): Array[String] = {
+    inputPartitions.flatMap(_.preferredLocations()).toArray
+  }
+}
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
index f87b9673cc..264195f93f 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
@@ -18,7 +18,6 @@ package org.apache.gluten.execution
 
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.config.GlutenConfig
-import org.apache.gluten.exception.GlutenException
 import org.apache.gluten.expression._
 import org.apache.gluten.extension.columnar.transition.Convention
 import org.apache.gluten.metrics.{GlutenTimeMetric, MetricsUpdater}
@@ -35,7 +34,6 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.catalyst.trees.TreeNodeTag
-import org.apache.spark.sql.connector.read.InputPartition
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
@@ -136,10 +134,12 @@ trait TransformSupport extends ValidatablePlan {
 
 trait LeafTransformSupport extends TransformSupport with LeafExecNode {
   final override def columnarInputRDDs: Seq[RDD[ColumnarBatch]] = Seq.empty
+
+  /** Returns the split infos that will be processed by the underlying native 
engine. */
   def getSplitInfos: Seq[SplitInfo]
 
   /** Returns the partitions generated by this data source scan. */
-  def getPartitions: Seq[InputPartition]
+  def getPartitions: Seq[Partition]
 }
 
 trait UnaryTransformSupport extends TransformSupport with UnaryExecNode {
@@ -293,32 +293,38 @@ case class WholeStageTransformer(child: SparkPlan, 
materializeInput: Boolean = f
     allLeafTransformers.toSeq
   }
 
+  /**
+   * If containing leaf exec transformer this "whole stage" generates a RDD 
which itself takes care
+   * of [[LeafTransformSupport]] there won't be any other RDD for leaf 
operator. As a result,
+   * genFirstStageIterator rather than genFinalStageIterator will be invoked
+   */
   private def generateWholeStageRDD(
       leafTransformers: Seq[LeafTransformSupport],
       wsCtx: WholeStageTransformContext,
       inputRDDs: ColumnarInputRDDsWrapper,
       pipelineTime: SQLMetric): RDD[ColumnarBatch] = {
-    val isKeyGroupPartition = leafTransformers.exists {
-      // TODO: May can apply to BatchScanExecTransformer without key group 
partitioning
-      case b: BatchScanExecTransformerBase if 
b.keyGroupedPartitioning.isDefined => true
-      case _ => false
-    }
 
-    /**
-     * If containing leaf exec transformer this "whole stage" generates a RDD 
which itself takes
-     * care of [[LeafTransformSupport]] there won't be any other RDD for leaf 
operator. As a result,
-     * genFirstStageIterator rather than genFinalStageIterator will be invoked
-     */
-    val allInputPartitions = leafTransformers.map(
-      leafTransformer => {
-        if (isKeyGroupPartition) {
-          
leafTransformer.asInstanceOf[BatchScanExecTransformerBase].getPartitionsWithIndex
-        } else {
-          Seq(leafTransformer.getPartitions)
-        }
-      })
-
-    val allSplitInfos = getSplitInfosFromPartitions(isKeyGroupPartition, 
leafTransformers)
+    // If these are two leaf transformers, they must have same partitions,
+    // otherwise, exchange will be inserted. We should combine the two leaf
+    // transformers' partitions with same index, and set them together in
+    // the substraitContext.
+    // We use transpose to do that, You can refer to
+    // the diagram below.
+    // leaf1  p11 p12 p13 p14 ... p1n
+    // leaf2  p21 p22 p23 p24 ... p2n
+    // transpose =>
+    // leaf1 | leaf2
+    //  p11  |  p21    => substraitContext.setSplitInfo([p11, p21])
+    //  p12  |  p22    => substraitContext.setSplitInfo([p12, p22])
+    //  p13  |  p23    ...
+    //  p14  |  p24
+    //      ...
+    //  p1n  |  p2n    => substraitContext.setSplitInfo([p1n, p2n])
+    // The data in partition may be empty, for example,
+    // if these are two batch scan transformer with keyGroupPartitioning,
+    // they have same partitionValues,
+    // but some partitions maybe empty for hose partition values that are not 
present.
+    val allSplitInfos = leafTransformers.map(_.getSplitInfos).transpose
 
     if (GlutenConfig.get.enableHdfsViewfs) {
       val viewfsToHdfsCache: mutable.Map[String, String] = mutable.Map.empty
@@ -356,6 +362,7 @@ case class WholeStageTransformer(child: SparkPlan, 
materializeInput: Boolean = f
       wsCtx.enableCudf
     )
 
+    val allInputPartitions = leafTransformers.map(_.getPartitions)
     SoftAffinity.updateFilePartitionLocations(allInputPartitions, rdd.id)
 
     leafTransformers.foreach {
@@ -367,55 +374,6 @@ case class WholeStageTransformer(child: SparkPlan, 
materializeInput: Boolean = f
     rdd
   }
 
-  private def getSplitInfosFromPartitions(
-      isKeyGroupPartition: Boolean,
-      leafTransformers: Seq[LeafTransformSupport]): Seq[Seq[SplitInfo]] = {
-    val allSplitInfos = if (isKeyGroupPartition) {
-      // If these are two batch scan transformer with keyGroupPartitioning,
-      // they have same partitionValues,
-      // but some partitions maybe empty for those partition values that are 
not present,
-      // otherwise, exchange will be inserted. We should combine the two leaf
-      // transformers' partitions with same index, and set them together in
-      // the substraitContext. We use transpose to do that, You can refer to
-      // the diagram below.
-      // leaf1  Seq(p11) Seq(p12, p13) Seq(p14) ... Seq(p1n)
-      // leaf2  Seq(p21) Seq(p22)      Seq()    ... Seq(p2n)
-      // transpose =>
-      // leaf1 | leaf2
-      //  Seq(p11)       |  Seq(p21)    => 
substraitContext.setSplitInfo([Seq(p11), Seq(p21)])
-      //  Seq(p12, p13)  |  Seq(p22)    => 
substraitContext.setSplitInfo([Seq(p12, p13), Seq(p22)])
-      //  Seq(p14)       |  Seq()    ...
-      //                ...
-      //  Seq(p1n)       |  Seq(p2n)    => 
substraitContext.setSplitInfo([Seq(p1n), Seq(p2n)])
-      
leafTransformers.map(_.asInstanceOf[BatchScanExecTransformerBase].getSplitInfosWithIndex)
-    } else {
-      // If these are two leaf transformers, they must have same partitions,
-      // otherwise, exchange will be inserted. We should combine the two leaf
-      // transformers' partitions with same index, and set them together in
-      // the substraitContext. We use transpose to do that, You can refer to
-      // the diagram below.
-      // leaf1  p11 p12 p13 p14 ... p1n
-      // leaf2  p21 p22 p23 p24 ... p2n
-      // transpose =>
-      // leaf1 | leaf2
-      //  p11  |  p21    => substraitContext.setSplitInfo([p11, p21])
-      //  p12  |  p22    => substraitContext.setSplitInfo([p12, p22])
-      //  p13  |  p23    ...
-      //  p14  |  p24
-      //      ...
-      //  p1n  |  p2n    => substraitContext.setSplitInfo([p1n, p2n])
-      leafTransformers.map(_.getSplitInfos)
-    }
-
-    val partitionLength = allSplitInfos.head.size
-    if (allSplitInfos.exists(_.size != partitionLength)) {
-      throw new GlutenException(
-        "The partition length of all the leaf transformer are not the same.")
-    }
-
-    allSplitInfos.transpose
-  }
-
   override def doExecuteColumnar(): RDD[ColumnarBatch] = {
     assert(child.isInstanceOf[TransformSupport])
     val pipelineTime: SQLMetric = longMetric("pipelineTime")
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/utils/InputPartitionsUtil.scala
 b/gluten-substrait/src/main/scala/org/apache/gluten/utils/PartitionsUtil.scala
similarity index 93%
rename from 
gluten-substrait/src/main/scala/org/apache/gluten/utils/InputPartitionsUtil.scala
rename to 
gluten-substrait/src/main/scala/org/apache/gluten/utils/PartitionsUtil.scala
index b05992fa30..953f95bf23 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/utils/InputPartitionsUtil.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/utils/PartitionsUtil.scala
@@ -18,16 +18,16 @@ package org.apache.gluten.utils
 
 import org.apache.gluten.sql.shims.SparkShimLoader
 
+import org.apache.spark.Partition
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.connector.read.InputPartition
 import org.apache.spark.sql.execution.datasources.{BucketingUtils, 
FilePartition, HadoopFsRelation, PartitionDirectory}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.collection.BitSet
 
 import org.apache.hadoop.fs.Path
 
-case class InputPartitionsUtil(
+case class PartitionsUtil(
     relation: HadoopFsRelation,
     requiredSchema: StructType,
     selectedPartitions: Array[PartitionDirectory],
@@ -38,15 +38,15 @@ case class InputPartitionsUtil(
     disableBucketedScan: Boolean)
   extends Logging {
 
-  def genInputPartitionSeq(): Seq[InputPartition] = {
+  def genPartitionSeq(): Seq[Partition] = {
     if (bucketedScan) {
-      genBucketedInputPartitionSeq()
+      genBucketedPartitionSeq()
     } else {
-      genNonBuckedInputPartitionSeq()
+      genNonBuckedPartitionSeq()
     }
   }
 
-  private def genNonBuckedInputPartitionSeq(): Seq[InputPartition] = {
+  private def genNonBuckedPartitionSeq(): Seq[Partition] = {
     val openCostInBytes = 
relation.sparkSession.sessionState.conf.filesOpenCostInBytes
     val maxSplitBytes =
       FilePartition.maxSplitBytes(relation.sparkSession, selectedPartitions)
@@ -99,7 +99,7 @@ case class InputPartitionsUtil(
     FilePartition.getFilePartitions(relation.sparkSession, splitFiles, 
maxSplitBytes)
   }
 
-  private def genBucketedInputPartitionSeq(): Seq[InputPartition] = {
+  private def genBucketedPartitionSeq(): Seq[Partition] = {
     val bucketSpec = relation.bucketSpec.get
     logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
     val filesGroupedToBuckets =
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala
index 7ae47e422b..714e632d08 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala
@@ -20,9 +20,9 @@ import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.logging.LogLevelUtil
 import org.apache.gluten.softaffinity.{AffinityManager, SoftAffinityManager}
 
+import org.apache.spark.Partition
 import org.apache.spark.internal.Logging
 import org.apache.spark.scheduler.ExecutorCacheTaskLocation
-import org.apache.spark.sql.connector.read.InputPartition
 import org.apache.spark.sql.execution.datasources.FilePartition
 
 abstract class Affinity(val manager: AffinityManager) extends LogLevelUtil 
with Logging {
@@ -80,15 +80,13 @@ abstract class Affinity(val manager: AffinityManager) 
extends LogLevelUtil with
   }
 
   /** Update the RDD id to SoftAffinityManager */
-  def updateFilePartitionLocations(
-      inputPartitions: Seq[Seq[Seq[InputPartition]]],
-      rddId: Int): Unit = {
+  def updateFilePartitionLocations(inputPartitions: Seq[Seq[Partition]], 
rddId: Int): Unit = {
     if (SoftAffinityManager.usingSoftAffinity && 
SoftAffinityManager.detectDuplicateReading) {
-      inputPartitions.foreach(_.foreach(_.foreach {
+      inputPartitions.foreach(_.foreach {
         case f: FilePartition =>
           SoftAffinityManager.updatePartitionMap(f, rddId)
         case _ =>
-      }))
+      })
     }
   }
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
index d83edfb22d..39500a50e7 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
@@ -21,11 +21,11 @@ import org.apache.gluten.execution.BasicScanExecTransformer
 import org.apache.gluten.metrics.MetricsUpdater
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 
+import org.apache.spark.Partition
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, AttributeSeq, Expression}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.connector.read.InputPartition
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.hive.HiveTableScanExecTransformer._
@@ -67,7 +67,7 @@ case class HiveTableScanExecTransformer(
 
   override def getMetadataColumns(): Seq[AttributeReference] = Seq.empty
 
-  override def getPartitions: Seq[InputPartition] = partitions
+  override def getPartitions: Seq[Partition] = partitions
 
   override def getPartitionSchema: StructType = 
relation.tableMeta.partitionSchema
 
@@ -82,7 +82,7 @@ case class HiveTableScanExecTransformer(
   @transient private lazy val hivePartitionConverter =
     new HivePartitionConverter(session.sessionState.newHadoopConf(), session)
 
-  @transient private lazy val partitions: Seq[InputPartition] =
+  @transient private lazy val partitions: Seq[Partition] =
     if (!relation.isPartitioned) {
       val tableLocation: URI = 
relation.tableMeta.storage.locationUri.getOrElse {
         throw new UnsupportedOperationException("Table path not set.")
diff --git 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
index 8b5efa9167..339bda2a03 100644
--- 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
+++ 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
@@ -19,10 +19,10 @@ package org.apache.spark.sql.extension
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.execution.FileSourceScanExecTransformerBase
 
+import org.apache.spark.Partition
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.connector.read.InputPartition
 import org.apache.spark.sql.execution.datasources.HadoopFsRelation
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.collection.BitSet
@@ -49,16 +49,17 @@ case class TestFileSourceScanExecTransformer(
     tableIdentifier,
     disableBucketedScan) {
 
-  override def getPartitions: Seq[InputPartition] =
-    BackendsApiManager.getTransformerApiInstance.genInputPartitionSeq(
-      relation,
-      requiredSchema,
-      selectedPartitions,
-      output,
-      bucketedScan,
-      optionalBucketSet,
-      optionalNumCoalescedBuckets,
-      disableBucketedScan)
+  override def getPartitions: Seq[Partition] =
+    BackendsApiManager.getTransformerApiInstance
+      .genPartitionSeq(
+        relation,
+        requiredSchema,
+        selectedPartitions,
+        output,
+        bucketedScan,
+        optionalBucketSet,
+        optionalNumCoalescedBuckets,
+        disableBucketedScan)
 
   override val nodeNamePrefix: String = "TestFile"
 
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
index 98ef4eff78..6846f140ed 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
@@ -19,9 +19,9 @@ package org.apache.spark.sql.extension
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.execution.FileSourceScanExecTransformerBase
 
+import org.apache.spark.Partition
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
-import org.apache.spark.sql.connector.read.InputPartition
 import org.apache.spark.sql.execution.datasources.HadoopFsRelation
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.collection.BitSet
@@ -47,16 +47,17 @@ case class TestFileSourceScanExecTransformer(
     dataFilters,
     tableIdentifier,
     disableBucketedScan) {
-  override def getPartitions: Seq[InputPartition] =
-    BackendsApiManager.getTransformerApiInstance.genInputPartitionSeq(
-      relation,
-      requiredSchema,
-      selectedPartitions,
-      output,
-      bucketedScan,
-      optionalBucketSet,
-      optionalNumCoalescedBuckets,
-      disableBucketedScan)
+  override def getPartitions: Seq[Partition] =
+    BackendsApiManager.getTransformerApiInstance
+      .genPartitionSeq(
+        relation,
+        requiredSchema,
+        selectedPartitions,
+        output,
+        bucketedScan,
+        optionalBucketSet,
+        optionalNumCoalescedBuckets,
+        disableBucketedScan)
 
   override val nodeNamePrefix: String = "TestFile"
 }
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
index 98ef4eff78..6846f140ed 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
@@ -19,9 +19,9 @@ package org.apache.spark.sql.extension
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.execution.FileSourceScanExecTransformerBase
 
+import org.apache.spark.Partition
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
-import org.apache.spark.sql.connector.read.InputPartition
 import org.apache.spark.sql.execution.datasources.HadoopFsRelation
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.collection.BitSet
@@ -47,16 +47,17 @@ case class TestFileSourceScanExecTransformer(
     dataFilters,
     tableIdentifier,
     disableBucketedScan) {
-  override def getPartitions: Seq[InputPartition] =
-    BackendsApiManager.getTransformerApiInstance.genInputPartitionSeq(
-      relation,
-      requiredSchema,
-      selectedPartitions,
-      output,
-      bucketedScan,
-      optionalBucketSet,
-      optionalNumCoalescedBuckets,
-      disableBucketedScan)
+  override def getPartitions: Seq[Partition] =
+    BackendsApiManager.getTransformerApiInstance
+      .genPartitionSeq(
+        relation,
+        requiredSchema,
+        selectedPartitions,
+        output,
+        bucketedScan,
+        optionalBucketSet,
+        optionalNumCoalescedBuckets,
+        disableBucketedScan)
 
   override val nodeNamePrefix: String = "TestFile"
 }
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
index 98ef4eff78..6846f140ed 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
@@ -19,9 +19,9 @@ package org.apache.spark.sql.extension
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.execution.FileSourceScanExecTransformerBase
 
+import org.apache.spark.Partition
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
-import org.apache.spark.sql.connector.read.InputPartition
 import org.apache.spark.sql.execution.datasources.HadoopFsRelation
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.collection.BitSet
@@ -47,16 +47,17 @@ case class TestFileSourceScanExecTransformer(
     dataFilters,
     tableIdentifier,
     disableBucketedScan) {
-  override def getPartitions: Seq[InputPartition] =
-    BackendsApiManager.getTransformerApiInstance.genInputPartitionSeq(
-      relation,
-      requiredSchema,
-      selectedPartitions,
-      output,
-      bucketedScan,
-      optionalBucketSet,
-      optionalNumCoalescedBuckets,
-      disableBucketedScan)
+  override def getPartitions: Seq[Partition] =
+    BackendsApiManager.getTransformerApiInstance
+      .genPartitionSeq(
+        relation,
+        requiredSchema,
+        selectedPartitions,
+        output,
+        bucketedScan,
+        optionalBucketSet,
+        optionalNumCoalescedBuckets,
+        disableBucketedScan)
 
   override val nodeNamePrefix: String = "TestFile"
 }
diff --git 
a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala 
b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
index 5f62c272f7..bbbd665cd8 100644
--- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
@@ -243,6 +243,10 @@ trait SparkShims {
   def getCommonPartitionValues(batchScan: BatchScanExec): 
Option[Seq[(InternalRow, Int)]] =
     Option(Seq())
 
+  /**
+   * Most of the code in this method is copied from
+   * [[org.apache.spark.sql.execution.datasources.v2.BatchScanExec.inputRDD]].
+   */
   def orderPartitions(
       batchScan: DataSourceV2ScanExecBase,
       scan: Scan,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to