This is an automated email from the ASF dual-hosted git repository.

changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new d3f67d59a8 [MINOR] Code cleanup: Remove deprecated files and fix typos 
(#11370)
d3f67d59a8 is described below

commit d3f67d59a894bb2f91e1d0095d75fb835aaf6b15
Author: Chang chen <[email protected]>
AuthorDate: Wed Jan 7 21:37:05 2026 +0800

    [MINOR] Code cleanup: Remove deprecated files and fix typos (#11370)
    
    * Remove deprecated `BatchScanExec` shims for Spark versions 3.2 to 4.0.
    
    * Rename `InternalRowUtl` to `InternalRowUtil` across shims for Spark 
versions 3.2 to 4.0.
    
    * Remove the SLF4J and Log4J version properties from the Spark UT modudle 
and rely on the Spark profile instead.
    
    * fix build due to rebase
---
 .../gluten/columnarbatch/ColumnarBatches.java      |   4 +-
 gluten-ut/spark34/pom.xml                          |   4 -
 gluten-ut/spark35/pom.xml                          |   4 -
 gluten-ut/spark40/pom.xml                          |   4 -
 ...{InternalRowUtl.scala => InternalRowUtil.scala} |   2 +-
 .../datasources/v2/BatchScanExec.scala.deprecated  | 124 -----------
 ...{InternalRowUtl.scala => InternalRowUtil.scala} |   2 +-
 .../datasources/v2/BatchScanExec.scala.deprecated  | 142 ------------
 ...{InternalRowUtl.scala => InternalRowUtil.scala} |   2 +-
 .../datasources/v2/BatchScanExec.scala.deprecated  | 243 ---------------------
 .../org/apache/gluten/utils/InternalRowUtil.scala} |   2 +-
 .../datasources/v2/BatchScanExec.scala.deprecated  | 243 ---------------------
 .../org/apache/gluten/utils/InternalRowUtil.scala} |   2 +-
 .../datasources/v2/BatchScanExec.scala.deprecated  | 243 ---------------------
 .../org/apache/gluten/utils/InternalRowUtil.scala} |   2 +-
 .../datasources/v2/BatchScanExec.scala.deprecated  | 243 ---------------------
 16 files changed, 8 insertions(+), 1258 deletions(-)

diff --git 
a/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
 
b/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
index 78cbc98ec4..40b931d92a 100644
--- 
a/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
+++ 
b/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
@@ -21,7 +21,7 @@ import org.apache.gluten.runtime.Runtime;
 import org.apache.gluten.runtime.Runtimes;
 import org.apache.gluten.utils.ArrowAbiUtil;
 import org.apache.gluten.utils.ArrowUtil;
-import org.apache.gluten.utils.InternalRowUtl;
+import org.apache.gluten.utils.InternalRowUtil;
 import org.apache.gluten.vectorized.ArrowColumnarBatch;
 import org.apache.gluten.vectorized.ArrowWritableColumnVector;
 
@@ -412,7 +412,7 @@ public final class ColumnarBatches {
   public static String toString(ColumnarBatch batch, int start, int length) {
     ColumnarBatch loadedBatch = 
ensureLoaded(ArrowBufferAllocators.contextInstance(), batch);
     StructType type = 
SparkArrowUtil.fromArrowSchema(ArrowUtil.toSchema(loadedBatch));
-    return InternalRowUtl.toString(
+    return InternalRowUtil.toString(
         type,
         JavaConverters.<InternalRow>asScalaIterator(loadedBatch.rowIterator()),
         start,
diff --git a/gluten-ut/spark34/pom.xml b/gluten-ut/spark34/pom.xml
index 6d34eb6384..32ca19e99f 100644
--- a/gluten-ut/spark34/pom.xml
+++ b/gluten-ut/spark34/pom.xml
@@ -130,10 +130,6 @@
       <activation>
         <activeByDefault>false</activeByDefault>
       </activation>
-      <properties>
-        <slf4j.version>2.0.6</slf4j.version>
-        <log4j.version>2.19.0</log4j.version>
-      </properties>
       <dependencies>
         <dependency>
           <groupId>org.apache.gluten</groupId>
diff --git a/gluten-ut/spark35/pom.xml b/gluten-ut/spark35/pom.xml
index 0fbd50d1ec..e82d8d9086 100644
--- a/gluten-ut/spark35/pom.xml
+++ b/gluten-ut/spark35/pom.xml
@@ -191,10 +191,6 @@
       <activation>
         <activeByDefault>false</activeByDefault>
       </activation>
-      <properties>
-        <slf4j.version>2.0.6</slf4j.version>
-        <log4j.version>2.19.0</log4j.version>
-      </properties>
       <dependencies>
         <dependency>
           <groupId>org.apache.gluten</groupId>
diff --git a/gluten-ut/spark40/pom.xml b/gluten-ut/spark40/pom.xml
index 9d5633274a..d169abb888 100644
--- a/gluten-ut/spark40/pom.xml
+++ b/gluten-ut/spark40/pom.xml
@@ -236,10 +236,6 @@
       <activation>
         <activeByDefault>false</activeByDefault>
       </activation>
-      <properties>
-        <slf4j.version>2.0.16</slf4j.version>
-        <log4j.version>2.24.3</log4j.version>
-      </properties>
       <dependencies>
         <dependency>
           <groupId>org.junit.jupiter</groupId>
diff --git 
a/shims/spark32/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala 
b/shims/spark32/src/main/scala/org/apache/gluten/utils/InternalRowUtil.scala
similarity index 98%
rename from 
shims/spark32/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
rename to 
shims/spark32/src/main/scala/org/apache/gluten/utils/InternalRowUtil.scala
index 32d6943713..7c9ffc70ca 100644
--- a/shims/spark32/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
+++ b/shims/spark32/src/main/scala/org/apache/gluten/utils/InternalRowUtil.scala
@@ -20,7 +20,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.types.StructType
 
-object InternalRowUtl {
+object InternalRowUtil {
   def toString(struct: StructType, rows: Iterator[InternalRow]): String = {
     val encoder = RowEncoder(struct).resolveAndBind()
     val deserializer = encoder.createDeserializer()
diff --git 
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala.deprecated
 
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala.deprecated
deleted file mode 100644
index 4d4dba7b44..0000000000
--- 
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala.deprecated
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution.datasources.v2
-
-import org.apache.spark.SparkException
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
-import org.apache.spark.sql.catalyst.util.truncatedString
-import org.apache.spark.sql.connector.read.{InputPartition, 
PartitionReaderFactory, Scan, SupportsRuntimeFiltering}
-import org.apache.spark.sql.execution.datasources.DataSourceStrategy
-
-import com.google.common.base.Objects
-
-// This file is copied from Spark with a little change to solve below issue:
-// The BatchScanExec can support columnar output, which is incompatible with
-// Arrow's columnar format. But there is no config to disable the columnar 
output.
-// In this file, the supportsColumnar was set as false to prevent Spark's 
columnar
-// output.
-
-/** Physical plan node for scanning a batch of data from a data source v2. */
-case class BatchScanExec(
-    output: Seq[AttributeReference],
-    @transient scan: Scan,
-    runtimeFilters: Seq[Expression])
-  extends DataSourceV2ScanExecBase {
-
-  @transient lazy val batch = scan.toBatch
-
-  // TODO: unify the equal/hashCode implementation for all data source v2 
query plans.
-  override def equals(other: Any): Boolean = other match {
-    case other: BatchScanExec =>
-      this.batch == other.batch && this.runtimeFilters == other.runtimeFilters
-    case _ =>
-      false
-  }
-
-  override def hashCode(): Int = Objects.hashCode(batch, runtimeFilters)
-
-  @transient override lazy val partitions: Seq[InputPartition] = 
batch.planInputPartitions()
-
-  @transient private lazy val filteredPartitions: Seq[InputPartition] = {
-    val dataSourceFilters = runtimeFilters.flatMap {
-      case DynamicPruningExpression(e) => 
DataSourceStrategy.translateRuntimeFilter(e)
-      case _ => None
-    }
-
-    if (dataSourceFilters.nonEmpty) {
-      val originalPartitioning = outputPartitioning
-
-      // the cast is safe as runtime filters are only assigned if the scan can 
be filtered
-      val filterableScan = scan.asInstanceOf[SupportsRuntimeFiltering]
-      filterableScan.filter(dataSourceFilters.toArray)
-
-      // call toBatch again to get filtered partitions
-      val newPartitions = scan.toBatch.planInputPartitions()
-
-      originalPartitioning match {
-        case p: DataSourcePartitioning if p.numPartitions != 
newPartitions.size =>
-          throw new SparkException(
-            "Data source must have preserved the original partitioning during 
runtime filtering; " +
-              s"reported num partitions: ${p.numPartitions}, " +
-              s"num partitions after runtime filtering: ${newPartitions.size}")
-        case _ =>
-        // no validation is needed as the data source did not report any 
specific partitioning
-      }
-
-      newPartitions
-    } else {
-      partitions
-    }
-  }
-
-  override lazy val readerFactory: PartitionReaderFactory = 
batch.createReaderFactory()
-
-  override lazy val inputRDD: RDD[InternalRow] = {
-    if (filteredPartitions.isEmpty && outputPartitioning == SinglePartition) {
-      // return an empty RDD with 1 partition if dynamic filtering removed the 
only split
-      sparkContext.parallelize(Array.empty[InternalRow], 1)
-    } else {
-      new DataSourceRDD(
-        sparkContext,
-        filteredPartitions,
-        readerFactory,
-        supportsColumnar,
-        customMetrics)
-    }
-  }
-
-  override def doCanonicalize(): BatchScanExec = {
-    this.copy(
-      output = output.map(QueryPlan.normalizeExpressions(_, output)),
-      runtimeFilters = QueryPlan.normalizePredicates(
-        runtimeFilters.filterNot(_ == 
DynamicPruningExpression(Literal.TrueLiteral)),
-        output)
-    )
-  }
-
-  override def simpleString(maxFields: Int): String = {
-    val truncatedOutputString = truncatedString(output, "[", ", ", "]", 
maxFields)
-    val runtimeFiltersString = s"RuntimeFilters: 
${runtimeFilters.mkString("[", ",", "]")}"
-    val result = s"$nodeName$truncatedOutputString ${scan.description()} 
$runtimeFiltersString"
-    redact(result)
-  }
-
-  // Set to false to disable BatchScan's columnar output.
-  override def supportsColumnar: Boolean = false
-}
diff --git 
a/shims/spark33/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala 
b/shims/spark33/src/main/scala/org/apache/gluten/utils/InternalRowUtil.scala
similarity index 98%
rename from 
shims/spark33/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
rename to 
shims/spark33/src/main/scala/org/apache/gluten/utils/InternalRowUtil.scala
index 32d6943713..7c9ffc70ca 100644
--- a/shims/spark33/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
+++ b/shims/spark33/src/main/scala/org/apache/gluten/utils/InternalRowUtil.scala
@@ -20,7 +20,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.types.StructType
 
-object InternalRowUtl {
+object InternalRowUtil {
   def toString(struct: StructType, rows: Iterator[InternalRow]): String = {
     val encoder = RowEncoder(struct).resolveAndBind()
     val deserializer = encoder.createDeserializer()
diff --git 
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala.deprecated
 
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala.deprecated
deleted file mode 100644
index aaae179971..0000000000
--- 
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala.deprecated
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution.datasources.v2
-
-import org.apache.spark.SparkException
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.physical.{KeyGroupedPartitioning, 
SinglePartition}
-import org.apache.spark.sql.catalyst.util.{truncatedString, InternalRowSet}
-import org.apache.spark.sql.connector.read._
-import org.apache.spark.sql.execution.datasources.DataSourceStrategy
-
-import java.util.Objects
-
-/** Physical plan node for scanning a batch of data from a data source v2. */
-case class BatchScanExec(
-    output: Seq[AttributeReference],
-    @transient scan: Scan,
-    runtimeFilters: Seq[Expression],
-    keyGroupedPartitioning: Option[Seq[Expression]] = None)
-  extends DataSourceV2ScanExecBase {
-
-  @transient lazy val batch = scan.toBatch
-
-  // TODO: unify the equal/hashCode implementation for all data source v2 
query plans.
-  override def equals(other: Any): Boolean = other match {
-    case other: BatchScanExec =>
-      this.batch == other.batch && this.runtimeFilters == other.runtimeFilters
-    case _ =>
-      false
-  }
-
-  override def hashCode(): Int = Objects.hashCode(batch, runtimeFilters)
-
-  @transient override lazy val inputPartitions: Seq[InputPartition] = 
batch.planInputPartitions()
-
-  @transient private lazy val filteredPartitions: Seq[Seq[InputPartition]] = {
-    val dataSourceFilters = runtimeFilters.flatMap {
-      case DynamicPruningExpression(e) => 
DataSourceStrategy.translateRuntimeFilter(e)
-      case _ => None
-    }
-
-    if (dataSourceFilters.nonEmpty) {
-      val originalPartitioning = outputPartitioning
-
-      // the cast is safe as runtime filters are only assigned if the scan can 
be filtered
-      val filterableScan = scan.asInstanceOf[SupportsRuntimeFiltering]
-      filterableScan.filter(dataSourceFilters.toArray)
-
-      // call toBatch again to get filtered partitions
-      val newPartitions = scan.toBatch.planInputPartitions()
-
-      originalPartitioning match {
-        case p: KeyGroupedPartitioning =>
-          if (newPartitions.exists(!_.isInstanceOf[HasPartitionKey])) {
-            throw new SparkException(
-              "Data source must have preserved the original partitioning " +
-                "during runtime filtering: not all partitions implement 
HasPartitionKey after " +
-                "filtering")
-          }
-
-          val newRows = new InternalRowSet(p.expressions.map(_.dataType))
-          newRows ++= 
newPartitions.map(_.asInstanceOf[HasPartitionKey].partitionKey())
-          val oldRows = p.partitionValuesOpt.get
-
-          if (oldRows.size != newRows.size) {
-            throw new SparkException(
-              "Data source must have preserved the original partitioning " +
-                "during runtime filtering: the number of unique partition 
values obtained " +
-                s"through HasPartitionKey changed: before ${oldRows.size}, 
after ${newRows.size}")
-          }
-
-          if (!oldRows.forall(newRows.contains)) {
-            throw new SparkException(
-              "Data source must have preserved the original partitioning " +
-                "during runtime filtering: the number of unique partition 
values obtained " +
-                s"through HasPartitionKey remain the same but do not exactly 
match")
-          }
-
-          groupPartitions(newPartitions).get.map(_._2)
-
-        case _ =>
-          // no validation is needed as the data source did not report any 
specific partitioning
-          newPartitions.map(Seq(_))
-      }
-
-    } else {
-      partitions
-    }
-  }
-
-  override lazy val readerFactory: PartitionReaderFactory = 
batch.createReaderFactory()
-
-  override lazy val inputRDD: RDD[InternalRow] = {
-    if (filteredPartitions.isEmpty && outputPartitioning == SinglePartition) {
-      // return an empty RDD with 1 partition if dynamic filtering removed the 
only split
-      sparkContext.parallelize(Array.empty[InternalRow], 1)
-    } else {
-      new DataSourceRDD(
-        sparkContext,
-        filteredPartitions,
-        readerFactory,
-        supportsColumnar,
-        customMetrics)
-    }
-  }
-
-  override def doCanonicalize(): BatchScanExec = {
-    this.copy(
-      output = output.map(QueryPlan.normalizeExpressions(_, output)),
-      runtimeFilters = QueryPlan.normalizePredicates(
-        runtimeFilters.filterNot(_ == 
DynamicPruningExpression(Literal.TrueLiteral)),
-        output)
-    )
-  }
-
-  override def simpleString(maxFields: Int): String = {
-    val truncatedOutputString = truncatedString(output, "[", ", ", "]", 
maxFields)
-    val runtimeFiltersString = s"RuntimeFilters: 
${runtimeFilters.mkString("[", ",", "]")}"
-    val result = s"$nodeName$truncatedOutputString ${scan.description()} 
$runtimeFiltersString"
-    redact(result)
-  }
-
-  // Set to false to disable BatchScan's columnar output.
-  override def supportsColumnar: Boolean = false
-}
diff --git 
a/shims/spark34/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala 
b/shims/spark34/src/main/scala/org/apache/gluten/utils/InternalRowUtil.scala
similarity index 98%
rename from 
shims/spark34/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
rename to 
shims/spark34/src/main/scala/org/apache/gluten/utils/InternalRowUtil.scala
index 32d6943713..7c9ffc70ca 100644
--- a/shims/spark34/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
+++ b/shims/spark34/src/main/scala/org/apache/gluten/utils/InternalRowUtil.scala
@@ -20,7 +20,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.types.StructType
 
-object InternalRowUtl {
+object InternalRowUtil {
   def toString(struct: StructType, rows: Iterator[InternalRow]): String = {
     val encoder = RowEncoder(struct).resolveAndBind()
     val deserializer = encoder.createDeserializer()
diff --git 
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala.deprecated
 
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala.deprecated
deleted file mode 100644
index d43331d57c..0000000000
--- 
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala.deprecated
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.v2
-
-import com.google.common.base.Objects
-
-import org.apache.spark.SparkException
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.physical.{KeyGroupedPartitioning, 
Partitioning, SinglePartition}
-import org.apache.spark.sql.catalyst.util.{truncatedString, 
InternalRowComparableWrapper}
-import org.apache.spark.sql.connector.catalog.Table
-import org.apache.spark.sql.connector.read._
-import org.apache.spark.sql.internal.SQLConf
-
-/**
- * Physical plan node for scanning a batch of data from a data source v2.
- */
-case class BatchScanExec(
-    output: Seq[AttributeReference],
-    @transient scan: Scan,
-    runtimeFilters: Seq[Expression],
-    keyGroupedPartitioning: Option[Seq[Expression]] = None,
-    ordering: Option[Seq[SortOrder]] = None,
-    @transient table: Table,
-    commonPartitionValues: Option[Seq[(InternalRow, Int)]] = None,
-    applyPartialClustering: Boolean = false,
-    replicatePartitions: Boolean = false) extends DataSourceV2ScanExecBase {
-
-  @transient lazy val batch = if (scan == null) null else scan.toBatch
-
-  // TODO: unify the equal/hashCode implementation for all data source v2 
query plans.
-  override def equals(other: Any): Boolean = other match {
-    case other: BatchScanExec =>
-      this.batch != null && this.batch == other.batch &&
-          this.runtimeFilters == other.runtimeFilters &&
-          this.commonPartitionValues == other.commonPartitionValues &&
-          this.replicatePartitions == other.replicatePartitions &&
-          this.applyPartialClustering == other.applyPartialClustering
-    case _ =>
-      false
-  }
-
-  override def hashCode(): Int = Objects.hashCode(batch, runtimeFilters)
-
-  @transient override lazy val inputPartitions: Seq[InputPartition] = 
batch.planInputPartitions()
-
-  @transient private lazy val filteredPartitions: Seq[Seq[InputPartition]] = {
-    val dataSourceFilters = runtimeFilters.flatMap {
-      case DynamicPruningExpression(e) => 
DataSourceV2Strategy.translateRuntimeFilterV2(e)
-      case _ => None
-    }
-
-    if (dataSourceFilters.nonEmpty) {
-      val originalPartitioning = outputPartitioning
-
-      // the cast is safe as runtime filters are only assigned if the scan can 
be filtered
-      val filterableScan = scan.asInstanceOf[SupportsRuntimeV2Filtering]
-      filterableScan.filter(dataSourceFilters.toArray)
-
-      // call toBatch again to get filtered partitions
-      val newPartitions = scan.toBatch.planInputPartitions()
-
-      originalPartitioning match {
-        case p: KeyGroupedPartitioning =>
-          if (newPartitions.exists(!_.isInstanceOf[HasPartitionKey])) {
-            throw new SparkException("Data source must have preserved the 
original partitioning " +
-                "during runtime filtering: not all partitions implement 
HasPartitionKey after " +
-                "filtering")
-          }
-          val newPartitionValues = newPartitions.map(partition =>
-              
InternalRowComparableWrapper(partition.asInstanceOf[HasPartitionKey], 
p.expressions))
-            .toSet
-          val oldPartitionValues = p.partitionValues
-            .map(partition => InternalRowComparableWrapper(partition, 
p.expressions)).toSet
-          // We require the new number of partition values to be equal or less 
than the old number
-          // of partition values here. In the case of less than, empty 
partitions will be added for
-          // those missing values that are not present in the new input 
partitions.
-          if (oldPartitionValues.size < newPartitionValues.size) {
-            throw new SparkException("During runtime filtering, data source 
must either report " +
-                "the same number of partition values, or a subset of partition 
values from the " +
-                s"original. Before: ${oldPartitionValues.size} partition 
values. " +
-                s"After: ${newPartitionValues.size} partition values")
-          }
-
-          if (!newPartitionValues.forall(oldPartitionValues.contains)) {
-            throw new SparkException("During runtime filtering, data source 
must not report new " +
-                "partition values that are not present in the original 
partitioning.")
-          }
-
-          groupPartitions(newPartitions).get.map(_._2)
-
-        case _ =>
-          // no validation is needed as the data source did not report any 
specific partitioning
-          newPartitions.map(Seq(_))
-      }
-
-    } else {
-      partitions
-    }
-  }
-
-  override def outputPartitioning: Partitioning = {
-    super.outputPartitioning match {
-      case k: KeyGroupedPartitioning if commonPartitionValues.isDefined =>
-        // We allow duplicated partition values if
-        // 
`spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled` is true
-        val newPartValues = commonPartitionValues.get.flatMap { case 
(partValue, numSplits) =>
-          Seq.fill(numSplits)(partValue)
-        }
-        k.copy(numPartitions = newPartValues.length, partitionValues = 
newPartValues)
-      case p => p
-    }
-  }
-
-  override lazy val readerFactory: PartitionReaderFactory = 
batch.createReaderFactory()
-
-  override lazy val inputRDD: RDD[InternalRow] = {
-    val rdd = if (filteredPartitions.isEmpty && outputPartitioning == 
SinglePartition) {
-      // return an empty RDD with 1 partition if dynamic filtering removed the 
only split
-      sparkContext.parallelize(Array.empty[InternalRow], 1)
-    } else {
-      var finalPartitions = filteredPartitions
-
-      outputPartitioning match {
-        case p: KeyGroupedPartitioning =>
-          if (conf.v2BucketingPushPartValuesEnabled &&
-              conf.v2BucketingPartiallyClusteredDistributionEnabled) {
-            assert(filteredPartitions.forall(_.size == 1),
-              "Expect partitions to be not grouped when " +
-                  
s"${SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key} " +
-                  "is enabled")
-
-            val groupedPartitions = 
groupPartitions(finalPartitions.map(_.head), true).get
-
-            // This means the input partitions are not grouped by partition 
values. We'll need to
-            // check `groupByPartitionValues` and decide whether to group and 
replicate splits
-            // within a partition.
-            if (commonPartitionValues.isDefined && applyPartialClustering) {
-              // A mapping from the common partition values to how many splits 
the partition
-              // should contain. Note this no longer maintain the partition 
key ordering.
-              val commonPartValuesMap = commonPartitionValues
-                .get
-                .map(t => (InternalRowComparableWrapper(t._1, p.expressions), 
t._2))
-                .toMap
-              val nestGroupedPartitions = groupedPartitions.map {
-                case (partValue, splits) =>
-                  // `commonPartValuesMap` should contain the part value since 
it's the super set.
-                  val numSplits = commonPartValuesMap
-                    .get(InternalRowComparableWrapper(partValue, 
p.expressions))
-                  assert(numSplits.isDefined, s"Partition value $partValue 
does not exist in " +
-                      "common partition values from Spark plan")
-
-                  val newSplits = if (replicatePartitions) {
-                    // We need to also replicate partitions according to the 
other side of join
-                    Seq.fill(numSplits.get)(splits)
-                  } else {
-                    // Not grouping by partition values: this could be the 
side with partially
-                    // clustered distribution. Because of dynamic filtering, 
we'll need to check if
-                    // the final number of splits of a partition is smaller 
than the original
-                    // number, and fill with empty splits if so. This is 
necessary so that both
-                    // sides of a join will have the same number of partitions 
& splits.
-                    splits.map(Seq(_)).padTo(numSplits.get, Seq.empty)
-                  }
-                  (InternalRowComparableWrapper(partValue, p.expressions), 
newSplits)
-              }
-
-              // Now fill missing partition keys with empty partitions
-              val partitionMapping = nestGroupedPartitions.toMap
-              finalPartitions = commonPartitionValues.get.flatMap { case 
(partValue, numSplits) =>
-                // Use empty partition for those partition values that are not 
present.
-                partitionMapping.getOrElse(
-                  InternalRowComparableWrapper(partValue, p.expressions),
-                  Seq.fill(numSplits)(Seq.empty))
-              }
-            } else {
-              val partitionMapping = groupedPartitions.map { case (row, parts) 
=>
-                InternalRowComparableWrapper(row, p.expressions) -> parts
-              }.toMap
-              finalPartitions = p.partitionValues.map { partValue =>
-                // Use empty partition for those partition values that are not 
present
-                partitionMapping.getOrElse(
-                  InternalRowComparableWrapper(partValue, p.expressions), 
Seq.empty)
-              }
-            }
-          } else {
-            val partitionMapping = finalPartitions.map { parts =>
-              val row = parts.head.asInstanceOf[HasPartitionKey].partitionKey()
-              InternalRowComparableWrapper(row, p.expressions) -> parts
-            }.toMap
-            finalPartitions = p.partitionValues.map { partValue =>
-              // Use empty partition for those partition values that are not 
present
-              partitionMapping.getOrElse(
-                InternalRowComparableWrapper(partValue, p.expressions), 
Seq.empty)
-            }
-          }
-
-        case _ =>
-      }
-
-      new DataSourceRDD(
-        sparkContext, finalPartitions, readerFactory, supportsColumnar, 
customMetrics)
-    }
-    postDriverMetrics()
-    rdd
-  }
-
-  override def doCanonicalize(): BatchScanExec = {
-    this.copy(
-      output = output.map(QueryPlan.normalizeExpressions(_, output)),
-      runtimeFilters = QueryPlan.normalizePredicates(
-        runtimeFilters.filterNot(_ == 
DynamicPruningExpression(Literal.TrueLiteral)),
-        output))
-  }
-
-  override def simpleString(maxFields: Int): String = {
-    val truncatedOutputString = truncatedString(output, "[", ", ", "]", 
maxFields)
-    val runtimeFiltersString = s"RuntimeFilters: 
${runtimeFilters.mkString("[", ",", "]")}"
-    val result = s"$nodeName$truncatedOutputString ${scan.description()} 
$runtimeFiltersString"
-    redact(result)
-  }
-
-  override def nodeName: String = {
-    s"BatchScan ${table.name()}".trim
-  }
-}
diff --git 
a/shims/spark40/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala 
b/shims/spark35/src/main/scala/org/apache/gluten/utils/InternalRowUtil.scala
similarity index 98%
rename from 
shims/spark40/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
rename to 
shims/spark35/src/main/scala/org/apache/gluten/utils/InternalRowUtil.scala
index 654e43cbd0..0f7c13448f 100644
--- a/shims/spark40/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
+++ b/shims/spark35/src/main/scala/org/apache/gluten/utils/InternalRowUtil.scala
@@ -20,7 +20,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.types.StructType
 
-object InternalRowUtl {
+object InternalRowUtil {
   def toString(struct: StructType, rows: Iterator[InternalRow]): String = {
     val encoder = ExpressionEncoder(struct).resolveAndBind()
     val deserializer = encoder.createDeserializer()
diff --git 
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala.deprecated
 
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala.deprecated
deleted file mode 100644
index d43331d57c..0000000000
--- 
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala.deprecated
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.v2
-
-import com.google.common.base.Objects
-
-import org.apache.spark.SparkException
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.physical.{KeyGroupedPartitioning, 
Partitioning, SinglePartition}
-import org.apache.spark.sql.catalyst.util.{truncatedString, 
InternalRowComparableWrapper}
-import org.apache.spark.sql.connector.catalog.Table
-import org.apache.spark.sql.connector.read._
-import org.apache.spark.sql.internal.SQLConf
-
-/**
- * Physical plan node for scanning a batch of data from a data source v2.
- */
-case class BatchScanExec(
-    output: Seq[AttributeReference],
-    @transient scan: Scan,
-    runtimeFilters: Seq[Expression],
-    keyGroupedPartitioning: Option[Seq[Expression]] = None,
-    ordering: Option[Seq[SortOrder]] = None,
-    @transient table: Table,
-    commonPartitionValues: Option[Seq[(InternalRow, Int)]] = None,
-    applyPartialClustering: Boolean = false,
-    replicatePartitions: Boolean = false) extends DataSourceV2ScanExecBase {
-
-  @transient lazy val batch = if (scan == null) null else scan.toBatch
-
-  // TODO: unify the equal/hashCode implementation for all data source v2 
query plans.
-  override def equals(other: Any): Boolean = other match {
-    case other: BatchScanExec =>
-      this.batch != null && this.batch == other.batch &&
-          this.runtimeFilters == other.runtimeFilters &&
-          this.commonPartitionValues == other.commonPartitionValues &&
-          this.replicatePartitions == other.replicatePartitions &&
-          this.applyPartialClustering == other.applyPartialClustering
-    case _ =>
-      false
-  }
-
-  override def hashCode(): Int = Objects.hashCode(batch, runtimeFilters)
-
-  @transient override lazy val inputPartitions: Seq[InputPartition] = 
batch.planInputPartitions()
-
-  @transient private lazy val filteredPartitions: Seq[Seq[InputPartition]] = {
-    val dataSourceFilters = runtimeFilters.flatMap {
-      case DynamicPruningExpression(e) => 
DataSourceV2Strategy.translateRuntimeFilterV2(e)
-      case _ => None
-    }
-
-    if (dataSourceFilters.nonEmpty) {
-      val originalPartitioning = outputPartitioning
-
-      // the cast is safe as runtime filters are only assigned if the scan can 
be filtered
-      val filterableScan = scan.asInstanceOf[SupportsRuntimeV2Filtering]
-      filterableScan.filter(dataSourceFilters.toArray)
-
-      // call toBatch again to get filtered partitions
-      val newPartitions = scan.toBatch.planInputPartitions()
-
-      originalPartitioning match {
-        case p: KeyGroupedPartitioning =>
-          if (newPartitions.exists(!_.isInstanceOf[HasPartitionKey])) {
-            throw new SparkException("Data source must have preserved the 
original partitioning " +
-                "during runtime filtering: not all partitions implement 
HasPartitionKey after " +
-                "filtering")
-          }
-          val newPartitionValues = newPartitions.map(partition =>
-              
InternalRowComparableWrapper(partition.asInstanceOf[HasPartitionKey], 
p.expressions))
-            .toSet
-          val oldPartitionValues = p.partitionValues
-            .map(partition => InternalRowComparableWrapper(partition, 
p.expressions)).toSet
-          // We require the new number of partition values to be equal or less 
than the old number
-          // of partition values here. In the case of less than, empty 
partitions will be added for
-          // those missing values that are not present in the new input 
partitions.
-          if (oldPartitionValues.size < newPartitionValues.size) {
-            throw new SparkException("During runtime filtering, data source 
must either report " +
-                "the same number of partition values, or a subset of partition 
values from the " +
-                s"original. Before: ${oldPartitionValues.size} partition 
values. " +
-                s"After: ${newPartitionValues.size} partition values")
-          }
-
-          if (!newPartitionValues.forall(oldPartitionValues.contains)) {
-            throw new SparkException("During runtime filtering, data source 
must not report new " +
-                "partition values that are not present in the original 
partitioning.")
-          }
-
-          groupPartitions(newPartitions).get.map(_._2)
-
-        case _ =>
-          // no validation is needed as the data source did not report any 
specific partitioning
-          newPartitions.map(Seq(_))
-      }
-
-    } else {
-      partitions
-    }
-  }
-
-  override def outputPartitioning: Partitioning = {
-    super.outputPartitioning match {
-      case k: KeyGroupedPartitioning if commonPartitionValues.isDefined =>
-        // We allow duplicated partition values if
-        // 
`spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled` is true
-        val newPartValues = commonPartitionValues.get.flatMap { case 
(partValue, numSplits) =>
-          Seq.fill(numSplits)(partValue)
-        }
-        k.copy(numPartitions = newPartValues.length, partitionValues = 
newPartValues)
-      case p => p
-    }
-  }
-
-  override lazy val readerFactory: PartitionReaderFactory = 
batch.createReaderFactory()
-
-  override lazy val inputRDD: RDD[InternalRow] = {
-    val rdd = if (filteredPartitions.isEmpty && outputPartitioning == 
SinglePartition) {
-      // return an empty RDD with 1 partition if dynamic filtering removed the 
only split
-      sparkContext.parallelize(Array.empty[InternalRow], 1)
-    } else {
-      var finalPartitions = filteredPartitions
-
-      outputPartitioning match {
-        case p: KeyGroupedPartitioning =>
-          if (conf.v2BucketingPushPartValuesEnabled &&
-              conf.v2BucketingPartiallyClusteredDistributionEnabled) {
-            assert(filteredPartitions.forall(_.size == 1),
-              "Expect partitions to be not grouped when " +
-                  
s"${SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key} " +
-                  "is enabled")
-
-            val groupedPartitions = 
groupPartitions(finalPartitions.map(_.head), true).get
-
-            // This means the input partitions are not grouped by partition 
values. We'll need to
-            // check `groupByPartitionValues` and decide whether to group and 
replicate splits
-            // within a partition.
-            if (commonPartitionValues.isDefined && applyPartialClustering) {
-              // A mapping from the common partition values to how many splits 
the partition
-              // should contain. Note this no longer maintain the partition 
key ordering.
-              val commonPartValuesMap = commonPartitionValues
-                .get
-                .map(t => (InternalRowComparableWrapper(t._1, p.expressions), 
t._2))
-                .toMap
-              val nestGroupedPartitions = groupedPartitions.map {
-                case (partValue, splits) =>
-                  // `commonPartValuesMap` should contain the part value since 
it's the super set.
-                  val numSplits = commonPartValuesMap
-                    .get(InternalRowComparableWrapper(partValue, 
p.expressions))
-                  assert(numSplits.isDefined, s"Partition value $partValue 
does not exist in " +
-                      "common partition values from Spark plan")
-
-                  val newSplits = if (replicatePartitions) {
-                    // We need to also replicate partitions according to the 
other side of join
-                    Seq.fill(numSplits.get)(splits)
-                  } else {
-                    // Not grouping by partition values: this could be the 
side with partially
-                    // clustered distribution. Because of dynamic filtering, 
we'll need to check if
-                    // the final number of splits of a partition is smaller 
than the original
-                    // number, and fill with empty splits if so. This is 
necessary so that both
-                    // sides of a join will have the same number of partitions 
& splits.
-                    splits.map(Seq(_)).padTo(numSplits.get, Seq.empty)
-                  }
-                  (InternalRowComparableWrapper(partValue, p.expressions), 
newSplits)
-              }
-
-              // Now fill missing partition keys with empty partitions
-              val partitionMapping = nestGroupedPartitions.toMap
-              finalPartitions = commonPartitionValues.get.flatMap { case 
(partValue, numSplits) =>
-                // Use empty partition for those partition values that are not 
present.
-                partitionMapping.getOrElse(
-                  InternalRowComparableWrapper(partValue, p.expressions),
-                  Seq.fill(numSplits)(Seq.empty))
-              }
-            } else {
-              val partitionMapping = groupedPartitions.map { case (row, parts) 
=>
-                InternalRowComparableWrapper(row, p.expressions) -> parts
-              }.toMap
-              finalPartitions = p.partitionValues.map { partValue =>
-                // Use empty partition for those partition values that are not 
present
-                partitionMapping.getOrElse(
-                  InternalRowComparableWrapper(partValue, p.expressions), 
Seq.empty)
-              }
-            }
-          } else {
-            val partitionMapping = finalPartitions.map { parts =>
-              val row = parts.head.asInstanceOf[HasPartitionKey].partitionKey()
-              InternalRowComparableWrapper(row, p.expressions) -> parts
-            }.toMap
-            finalPartitions = p.partitionValues.map { partValue =>
-              // Use empty partition for those partition values that are not 
present
-              partitionMapping.getOrElse(
-                InternalRowComparableWrapper(partValue, p.expressions), 
Seq.empty)
-            }
-          }
-
-        case _ =>
-      }
-
-      new DataSourceRDD(
-        sparkContext, finalPartitions, readerFactory, supportsColumnar, 
customMetrics)
-    }
-    postDriverMetrics()
-    rdd
-  }
-
-  override def doCanonicalize(): BatchScanExec = {
-    this.copy(
-      output = output.map(QueryPlan.normalizeExpressions(_, output)),
-      runtimeFilters = QueryPlan.normalizePredicates(
-        runtimeFilters.filterNot(_ == 
DynamicPruningExpression(Literal.TrueLiteral)),
-        output))
-  }
-
-  override def simpleString(maxFields: Int): String = {
-    val truncatedOutputString = truncatedString(output, "[", ", ", "]", 
maxFields)
-    val runtimeFiltersString = s"RuntimeFilters: 
${runtimeFilters.mkString("[", ",", "]")}"
-    val result = s"$nodeName$truncatedOutputString ${scan.description()} 
$runtimeFiltersString"
-    redact(result)
-  }
-
-  override def nodeName: String = {
-    s"BatchScan ${table.name()}".trim
-  }
-}
diff --git 
a/shims/spark41/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala 
b/shims/spark40/src/main/scala/org/apache/gluten/utils/InternalRowUtil.scala
similarity index 98%
rename from 
shims/spark41/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
rename to 
shims/spark40/src/main/scala/org/apache/gluten/utils/InternalRowUtil.scala
index 654e43cbd0..0f7c13448f 100644
--- a/shims/spark41/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
+++ b/shims/spark40/src/main/scala/org/apache/gluten/utils/InternalRowUtil.scala
@@ -20,7 +20,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.types.StructType
 
-object InternalRowUtl {
+object InternalRowUtil {
   def toString(struct: StructType, rows: Iterator[InternalRow]): String = {
     val encoder = ExpressionEncoder(struct).resolveAndBind()
     val deserializer = encoder.createDeserializer()
diff --git 
a/shims/spark40/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala.deprecated
 
b/shims/spark40/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala.deprecated
deleted file mode 100644
index d43331d57c..0000000000
--- 
a/shims/spark40/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala.deprecated
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.v2
-
-import com.google.common.base.Objects
-
-import org.apache.spark.SparkException
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.physical.{KeyGroupedPartitioning, 
Partitioning, SinglePartition}
-import org.apache.spark.sql.catalyst.util.{truncatedString, 
InternalRowComparableWrapper}
-import org.apache.spark.sql.connector.catalog.Table
-import org.apache.spark.sql.connector.read._
-import org.apache.spark.sql.internal.SQLConf
-
-/**
- * Physical plan node for scanning a batch of data from a data source v2.
- */
-case class BatchScanExec(
-    output: Seq[AttributeReference],
-    @transient scan: Scan,
-    runtimeFilters: Seq[Expression],
-    keyGroupedPartitioning: Option[Seq[Expression]] = None,
-    ordering: Option[Seq[SortOrder]] = None,
-    @transient table: Table,
-    commonPartitionValues: Option[Seq[(InternalRow, Int)]] = None,
-    applyPartialClustering: Boolean = false,
-    replicatePartitions: Boolean = false) extends DataSourceV2ScanExecBase {
-
-  @transient lazy val batch = if (scan == null) null else scan.toBatch
-
-  // TODO: unify the equal/hashCode implementation for all data source v2 
query plans.
-  override def equals(other: Any): Boolean = other match {
-    case other: BatchScanExec =>
-      this.batch != null && this.batch == other.batch &&
-          this.runtimeFilters == other.runtimeFilters &&
-          this.commonPartitionValues == other.commonPartitionValues &&
-          this.replicatePartitions == other.replicatePartitions &&
-          this.applyPartialClustering == other.applyPartialClustering
-    case _ =>
-      false
-  }
-
-  override def hashCode(): Int = Objects.hashCode(batch, runtimeFilters)
-
-  @transient override lazy val inputPartitions: Seq[InputPartition] = 
batch.planInputPartitions()
-
-  @transient private lazy val filteredPartitions: Seq[Seq[InputPartition]] = {
-    val dataSourceFilters = runtimeFilters.flatMap {
-      case DynamicPruningExpression(e) => 
DataSourceV2Strategy.translateRuntimeFilterV2(e)
-      case _ => None
-    }
-
-    if (dataSourceFilters.nonEmpty) {
-      val originalPartitioning = outputPartitioning
-
-      // the cast is safe as runtime filters are only assigned if the scan can 
be filtered
-      val filterableScan = scan.asInstanceOf[SupportsRuntimeV2Filtering]
-      filterableScan.filter(dataSourceFilters.toArray)
-
-      // call toBatch again to get filtered partitions
-      val newPartitions = scan.toBatch.planInputPartitions()
-
-      originalPartitioning match {
-        case p: KeyGroupedPartitioning =>
-          if (newPartitions.exists(!_.isInstanceOf[HasPartitionKey])) {
-            throw new SparkException("Data source must have preserved the 
original partitioning " +
-                "during runtime filtering: not all partitions implement 
HasPartitionKey after " +
-                "filtering")
-          }
-          val newPartitionValues = newPartitions.map(partition =>
-              
InternalRowComparableWrapper(partition.asInstanceOf[HasPartitionKey], 
p.expressions))
-            .toSet
-          val oldPartitionValues = p.partitionValues
-            .map(partition => InternalRowComparableWrapper(partition, 
p.expressions)).toSet
-          // We require the new number of partition values to be equal or less 
than the old number
-          // of partition values here. In the case of less than, empty 
partitions will be added for
-          // those missing values that are not present in the new input 
partitions.
-          if (oldPartitionValues.size < newPartitionValues.size) {
-            throw new SparkException("During runtime filtering, data source 
must either report " +
-                "the same number of partition values, or a subset of partition 
values from the " +
-                s"original. Before: ${oldPartitionValues.size} partition 
values. " +
-                s"After: ${newPartitionValues.size} partition values")
-          }
-
-          if (!newPartitionValues.forall(oldPartitionValues.contains)) {
-            throw new SparkException("During runtime filtering, data source 
must not report new " +
-                "partition values that are not present in the original 
partitioning.")
-          }
-
-          groupPartitions(newPartitions).get.map(_._2)
-
-        case _ =>
-          // no validation is needed as the data source did not report any 
specific partitioning
-          newPartitions.map(Seq(_))
-      }
-
-    } else {
-      partitions
-    }
-  }
-
-  override def outputPartitioning: Partitioning = {
-    super.outputPartitioning match {
-      case k: KeyGroupedPartitioning if commonPartitionValues.isDefined =>
-        // We allow duplicated partition values if
-        // 
`spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled` is true
-        val newPartValues = commonPartitionValues.get.flatMap { case 
(partValue, numSplits) =>
-          Seq.fill(numSplits)(partValue)
-        }
-        k.copy(numPartitions = newPartValues.length, partitionValues = 
newPartValues)
-      case p => p
-    }
-  }
-
-  override lazy val readerFactory: PartitionReaderFactory = 
batch.createReaderFactory()
-
-  override lazy val inputRDD: RDD[InternalRow] = {
-    val rdd = if (filteredPartitions.isEmpty && outputPartitioning == 
SinglePartition) {
-      // return an empty RDD with 1 partition if dynamic filtering removed the 
only split
-      sparkContext.parallelize(Array.empty[InternalRow], 1)
-    } else {
-      var finalPartitions = filteredPartitions
-
-      outputPartitioning match {
-        case p: KeyGroupedPartitioning =>
-          if (conf.v2BucketingPushPartValuesEnabled &&
-              conf.v2BucketingPartiallyClusteredDistributionEnabled) {
-            assert(filteredPartitions.forall(_.size == 1),
-              "Expect partitions to be not grouped when " +
-                  
s"${SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key} " +
-                  "is enabled")
-
-            val groupedPartitions = 
groupPartitions(finalPartitions.map(_.head), true).get
-
-            // This means the input partitions are not grouped by partition 
values. We'll need to
-            // check `groupByPartitionValues` and decide whether to group and 
replicate splits
-            // within a partition.
-            if (commonPartitionValues.isDefined && applyPartialClustering) {
-              // A mapping from the common partition values to how many splits 
the partition
-              // should contain. Note this no longer maintain the partition 
key ordering.
-              val commonPartValuesMap = commonPartitionValues
-                .get
-                .map(t => (InternalRowComparableWrapper(t._1, p.expressions), 
t._2))
-                .toMap
-              val nestGroupedPartitions = groupedPartitions.map {
-                case (partValue, splits) =>
-                  // `commonPartValuesMap` should contain the part value since 
it's the super set.
-                  val numSplits = commonPartValuesMap
-                    .get(InternalRowComparableWrapper(partValue, 
p.expressions))
-                  assert(numSplits.isDefined, s"Partition value $partValue 
does not exist in " +
-                      "common partition values from Spark plan")
-
-                  val newSplits = if (replicatePartitions) {
-                    // We need to also replicate partitions according to the 
other side of join
-                    Seq.fill(numSplits.get)(splits)
-                  } else {
-                    // Not grouping by partition values: this could be the 
side with partially
-                    // clustered distribution. Because of dynamic filtering, 
we'll need to check if
-                    // the final number of splits of a partition is smaller 
than the original
-                    // number, and fill with empty splits if so. This is 
necessary so that both
-                    // sides of a join will have the same number of partitions 
& splits.
-                    splits.map(Seq(_)).padTo(numSplits.get, Seq.empty)
-                  }
-                  (InternalRowComparableWrapper(partValue, p.expressions), 
newSplits)
-              }
-
-              // Now fill missing partition keys with empty partitions
-              val partitionMapping = nestGroupedPartitions.toMap
-              finalPartitions = commonPartitionValues.get.flatMap { case 
(partValue, numSplits) =>
-                // Use empty partition for those partition values that are not 
present.
-                partitionMapping.getOrElse(
-                  InternalRowComparableWrapper(partValue, p.expressions),
-                  Seq.fill(numSplits)(Seq.empty))
-              }
-            } else {
-              val partitionMapping = groupedPartitions.map { case (row, parts) 
=>
-                InternalRowComparableWrapper(row, p.expressions) -> parts
-              }.toMap
-              finalPartitions = p.partitionValues.map { partValue =>
-                // Use empty partition for those partition values that are not 
present
-                partitionMapping.getOrElse(
-                  InternalRowComparableWrapper(partValue, p.expressions), 
Seq.empty)
-              }
-            }
-          } else {
-            val partitionMapping = finalPartitions.map { parts =>
-              val row = parts.head.asInstanceOf[HasPartitionKey].partitionKey()
-              InternalRowComparableWrapper(row, p.expressions) -> parts
-            }.toMap
-            finalPartitions = p.partitionValues.map { partValue =>
-              // Use empty partition for those partition values that are not 
present
-              partitionMapping.getOrElse(
-                InternalRowComparableWrapper(partValue, p.expressions), 
Seq.empty)
-            }
-          }
-
-        case _ =>
-      }
-
-      new DataSourceRDD(
-        sparkContext, finalPartitions, readerFactory, supportsColumnar, 
customMetrics)
-    }
-    postDriverMetrics()
-    rdd
-  }
-
-  override def doCanonicalize(): BatchScanExec = {
-    this.copy(
-      output = output.map(QueryPlan.normalizeExpressions(_, output)),
-      runtimeFilters = QueryPlan.normalizePredicates(
-        runtimeFilters.filterNot(_ == 
DynamicPruningExpression(Literal.TrueLiteral)),
-        output))
-  }
-
-  override def simpleString(maxFields: Int): String = {
-    val truncatedOutputString = truncatedString(output, "[", ", ", "]", 
maxFields)
-    val runtimeFiltersString = s"RuntimeFilters: 
${runtimeFilters.mkString("[", ",", "]")}"
-    val result = s"$nodeName$truncatedOutputString ${scan.description()} 
$runtimeFiltersString"
-    redact(result)
-  }
-
-  override def nodeName: String = {
-    s"BatchScan ${table.name()}".trim
-  }
-}
diff --git 
a/shims/spark35/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala 
b/shims/spark41/src/main/scala/org/apache/gluten/utils/InternalRowUtil.scala
similarity index 98%
rename from 
shims/spark35/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
rename to 
shims/spark41/src/main/scala/org/apache/gluten/utils/InternalRowUtil.scala
index 654e43cbd0..0f7c13448f 100644
--- a/shims/spark35/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
+++ b/shims/spark41/src/main/scala/org/apache/gluten/utils/InternalRowUtil.scala
@@ -20,7 +20,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.types.StructType
 
-object InternalRowUtl {
+object InternalRowUtil {
   def toString(struct: StructType, rows: Iterator[InternalRow]): String = {
     val encoder = ExpressionEncoder(struct).resolveAndBind()
     val deserializer = encoder.createDeserializer()
diff --git 
a/shims/spark41/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala.deprecated
 
b/shims/spark41/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala.deprecated
deleted file mode 100644
index d43331d57c..0000000000
--- 
a/shims/spark41/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala.deprecated
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.v2
-
-import com.google.common.base.Objects
-
-import org.apache.spark.SparkException
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.physical.{KeyGroupedPartitioning, 
Partitioning, SinglePartition}
-import org.apache.spark.sql.catalyst.util.{truncatedString, 
InternalRowComparableWrapper}
-import org.apache.spark.sql.connector.catalog.Table
-import org.apache.spark.sql.connector.read._
-import org.apache.spark.sql.internal.SQLConf
-
-/**
- * Physical plan node for scanning a batch of data from a data source v2.
- */
-case class BatchScanExec(
-    output: Seq[AttributeReference],
-    @transient scan: Scan,
-    runtimeFilters: Seq[Expression],
-    keyGroupedPartitioning: Option[Seq[Expression]] = None,
-    ordering: Option[Seq[SortOrder]] = None,
-    @transient table: Table,
-    commonPartitionValues: Option[Seq[(InternalRow, Int)]] = None,
-    applyPartialClustering: Boolean = false,
-    replicatePartitions: Boolean = false) extends DataSourceV2ScanExecBase {
-
-  @transient lazy val batch = if (scan == null) null else scan.toBatch
-
-  // TODO: unify the equal/hashCode implementation for all data source v2 
query plans.
-  override def equals(other: Any): Boolean = other match {
-    case other: BatchScanExec =>
-      this.batch != null && this.batch == other.batch &&
-          this.runtimeFilters == other.runtimeFilters &&
-          this.commonPartitionValues == other.commonPartitionValues &&
-          this.replicatePartitions == other.replicatePartitions &&
-          this.applyPartialClustering == other.applyPartialClustering
-    case _ =>
-      false
-  }
-
-  override def hashCode(): Int = Objects.hashCode(batch, runtimeFilters)
-
-  @transient override lazy val inputPartitions: Seq[InputPartition] = 
batch.planInputPartitions()
-
-  @transient private lazy val filteredPartitions: Seq[Seq[InputPartition]] = {
-    val dataSourceFilters = runtimeFilters.flatMap {
-      case DynamicPruningExpression(e) => 
DataSourceV2Strategy.translateRuntimeFilterV2(e)
-      case _ => None
-    }
-
-    if (dataSourceFilters.nonEmpty) {
-      val originalPartitioning = outputPartitioning
-
-      // the cast is safe as runtime filters are only assigned if the scan can 
be filtered
-      val filterableScan = scan.asInstanceOf[SupportsRuntimeV2Filtering]
-      filterableScan.filter(dataSourceFilters.toArray)
-
-      // call toBatch again to get filtered partitions
-      val newPartitions = scan.toBatch.planInputPartitions()
-
-      originalPartitioning match {
-        case p: KeyGroupedPartitioning =>
-          if (newPartitions.exists(!_.isInstanceOf[HasPartitionKey])) {
-            throw new SparkException("Data source must have preserved the 
original partitioning " +
-                "during runtime filtering: not all partitions implement 
HasPartitionKey after " +
-                "filtering")
-          }
-          val newPartitionValues = newPartitions.map(partition =>
-              
InternalRowComparableWrapper(partition.asInstanceOf[HasPartitionKey], 
p.expressions))
-            .toSet
-          val oldPartitionValues = p.partitionValues
-            .map(partition => InternalRowComparableWrapper(partition, 
p.expressions)).toSet
-          // We require the new number of partition values to be equal or less 
than the old number
-          // of partition values here. In the case of less than, empty 
partitions will be added for
-          // those missing values that are not present in the new input 
partitions.
-          if (oldPartitionValues.size < newPartitionValues.size) {
-            throw new SparkException("During runtime filtering, data source 
must either report " +
-                "the same number of partition values, or a subset of partition 
values from the " +
-                s"original. Before: ${oldPartitionValues.size} partition 
values. " +
-                s"After: ${newPartitionValues.size} partition values")
-          }
-
-          if (!newPartitionValues.forall(oldPartitionValues.contains)) {
-            throw new SparkException("During runtime filtering, data source 
must not report new " +
-                "partition values that are not present in the original 
partitioning.")
-          }
-
-          groupPartitions(newPartitions).get.map(_._2)
-
-        case _ =>
-          // no validation is needed as the data source did not report any 
specific partitioning
-          newPartitions.map(Seq(_))
-      }
-
-    } else {
-      partitions
-    }
-  }
-
-  override def outputPartitioning: Partitioning = {
-    super.outputPartitioning match {
-      case k: KeyGroupedPartitioning if commonPartitionValues.isDefined =>
-        // We allow duplicated partition values if
-        // 
`spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled` is true
-        val newPartValues = commonPartitionValues.get.flatMap { case 
(partValue, numSplits) =>
-          Seq.fill(numSplits)(partValue)
-        }
-        k.copy(numPartitions = newPartValues.length, partitionValues = 
newPartValues)
-      case p => p
-    }
-  }
-
-  override lazy val readerFactory: PartitionReaderFactory = 
batch.createReaderFactory()
-
-  override lazy val inputRDD: RDD[InternalRow] = {
-    val rdd = if (filteredPartitions.isEmpty && outputPartitioning == 
SinglePartition) {
-      // return an empty RDD with 1 partition if dynamic filtering removed the 
only split
-      sparkContext.parallelize(Array.empty[InternalRow], 1)
-    } else {
-      var finalPartitions = filteredPartitions
-
-      outputPartitioning match {
-        case p: KeyGroupedPartitioning =>
-          if (conf.v2BucketingPushPartValuesEnabled &&
-              conf.v2BucketingPartiallyClusteredDistributionEnabled) {
-            assert(filteredPartitions.forall(_.size == 1),
-              "Expect partitions to be not grouped when " +
-                  
s"${SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key} " +
-                  "is enabled")
-
-            val groupedPartitions = 
groupPartitions(finalPartitions.map(_.head), true).get
-
-            // This means the input partitions are not grouped by partition 
values. We'll need to
-            // check `groupByPartitionValues` and decide whether to group and 
replicate splits
-            // within a partition.
-            if (commonPartitionValues.isDefined && applyPartialClustering) {
-              // A mapping from the common partition values to how many splits 
the partition
-              // should contain. Note this no longer maintain the partition 
key ordering.
-              val commonPartValuesMap = commonPartitionValues
-                .get
-                .map(t => (InternalRowComparableWrapper(t._1, p.expressions), 
t._2))
-                .toMap
-              val nestGroupedPartitions = groupedPartitions.map {
-                case (partValue, splits) =>
-                  // `commonPartValuesMap` should contain the part value since 
it's the super set.
-                  val numSplits = commonPartValuesMap
-                    .get(InternalRowComparableWrapper(partValue, 
p.expressions))
-                  assert(numSplits.isDefined, s"Partition value $partValue 
does not exist in " +
-                      "common partition values from Spark plan")
-
-                  val newSplits = if (replicatePartitions) {
-                    // We need to also replicate partitions according to the 
other side of join
-                    Seq.fill(numSplits.get)(splits)
-                  } else {
-                    // Not grouping by partition values: this could be the 
side with partially
-                    // clustered distribution. Because of dynamic filtering, 
we'll need to check if
-                    // the final number of splits of a partition is smaller 
than the original
-                    // number, and fill with empty splits if so. This is 
necessary so that both
-                    // sides of a join will have the same number of partitions 
& splits.
-                    splits.map(Seq(_)).padTo(numSplits.get, Seq.empty)
-                  }
-                  (InternalRowComparableWrapper(partValue, p.expressions), 
newSplits)
-              }
-
-              // Now fill missing partition keys with empty partitions
-              val partitionMapping = nestGroupedPartitions.toMap
-              finalPartitions = commonPartitionValues.get.flatMap { case 
(partValue, numSplits) =>
-                // Use empty partition for those partition values that are not 
present.
-                partitionMapping.getOrElse(
-                  InternalRowComparableWrapper(partValue, p.expressions),
-                  Seq.fill(numSplits)(Seq.empty))
-              }
-            } else {
-              val partitionMapping = groupedPartitions.map { case (row, parts) 
=>
-                InternalRowComparableWrapper(row, p.expressions) -> parts
-              }.toMap
-              finalPartitions = p.partitionValues.map { partValue =>
-                // Use empty partition for those partition values that are not 
present
-                partitionMapping.getOrElse(
-                  InternalRowComparableWrapper(partValue, p.expressions), 
Seq.empty)
-              }
-            }
-          } else {
-            val partitionMapping = finalPartitions.map { parts =>
-              val row = parts.head.asInstanceOf[HasPartitionKey].partitionKey()
-              InternalRowComparableWrapper(row, p.expressions) -> parts
-            }.toMap
-            finalPartitions = p.partitionValues.map { partValue =>
-              // Use empty partition for those partition values that are not 
present
-              partitionMapping.getOrElse(
-                InternalRowComparableWrapper(partValue, p.expressions), 
Seq.empty)
-            }
-          }
-
-        case _ =>
-      }
-
-      new DataSourceRDD(
-        sparkContext, finalPartitions, readerFactory, supportsColumnar, 
customMetrics)
-    }
-    postDriverMetrics()
-    rdd
-  }
-
-  override def doCanonicalize(): BatchScanExec = {
-    this.copy(
-      output = output.map(QueryPlan.normalizeExpressions(_, output)),
-      runtimeFilters = QueryPlan.normalizePredicates(
-        runtimeFilters.filterNot(_ == 
DynamicPruningExpression(Literal.TrueLiteral)),
-        output))
-  }
-
-  override def simpleString(maxFields: Int): String = {
-    val truncatedOutputString = truncatedString(output, "[", ", ", "]", 
maxFields)
-    val runtimeFiltersString = s"RuntimeFilters: 
${runtimeFilters.mkString("[", ",", "]")}"
-    val result = s"$nodeName$truncatedOutputString ${scan.description()} 
$runtimeFiltersString"
-    redact(result)
-  }
-
-  override def nodeName: String = {
-    s"BatchScan ${table.name()}".trim
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to