This is an automated email from the ASF dual-hosted git repository.

beliefer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 9c8023b9eb [GLUTEN-11163][CORE] Refactor dynamicallySelectedPartitions 
for FileSourceScanExecShim (#11164)
9c8023b9eb is described below

commit 9c8023b9eb2641d92fcc142390b0eb60a9b9f9bd
Author: Jiaan Geng <[email protected]>
AuthorDate: Wed Nov 26 17:39:44 2025 +0800

    [GLUTEN-11163][CORE] Refactor dynamicallySelectedPartitions for 
FileSourceScanExecShim (#11164)
---
 .../execution/FileSourceScanExecTransformer.scala  |  2 +-
 .../org/apache/spark/sql/GlutenSubquerySuite.scala |  2 +-
 .../org/apache/spark/sql/GlutenSubquerySuite.scala |  2 +-
 .../org/apache/spark/sql/GlutenSubquerySuite.scala |  2 +-
 .../org/apache/spark/sql/GlutenSubquerySuite.scala |  2 +-
 .../TestFileSourceScanExecTransformer.scala        |  2 +-
 .../sql/execution/FileSourceScanExecShim.scala     | 23 +++++++++++-----------
 .../sql/execution/FileSourceScanExecShim.scala     | 23 +++++++++++-----------
 .../sql/execution/FileSourceScanExecShim.scala     | 21 ++++++++++----------
 .../sql/execution/FileSourceScanExecShim.scala     | 21 ++++++++++----------
 .../sql/execution/FileSourceScanExecShim.scala     | 21 ++++++++++----------
 11 files changed, 58 insertions(+), 63 deletions(-)

diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
index af77fa24b3..c00ed1f671 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
@@ -128,7 +128,7 @@ abstract class FileSourceScanExecTransformerBase(
       .genPartitionSeq(
         relation,
         requiredSchema,
-        getPartitionArray(),
+        getPartitionArray,
         output,
         bucketedScan,
         optionalBucketSet,
diff --git 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/GlutenSubquerySuite.scala
 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/GlutenSubquerySuite.scala
index f0e766e5ce..8365d9bb62 100644
--- 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/GlutenSubquerySuite.scala
+++ 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/GlutenSubquerySuite.scala
@@ -46,7 +46,7 @@ class GlutenSubquerySuite extends SubquerySuite with 
GlutenSQLTestsTrait {
         case t: WholeStageTransformer => t
       } match {
         case Some(WholeStageTransformer(fs: FileSourceScanExecTransformer, _)) 
=>
-          fs.dynamicallySelectedPartitions
+          fs.getPartitionArray
             .exists(_.files.exists(_.getPath.toString.contains("p=0")))
         case _ => false
       })
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenSubquerySuite.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenSubquerySuite.scala
index f0e766e5ce..8365d9bb62 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenSubquerySuite.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenSubquerySuite.scala
@@ -46,7 +46,7 @@ class GlutenSubquerySuite extends SubquerySuite with 
GlutenSQLTestsTrait {
         case t: WholeStageTransformer => t
       } match {
         case Some(WholeStageTransformer(fs: FileSourceScanExecTransformer, _)) 
=>
-          fs.dynamicallySelectedPartitions
+          fs.getPartitionArray
             .exists(_.files.exists(_.getPath.toString.contains("p=0")))
         case _ => false
       })
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSubquerySuite.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSubquerySuite.scala
index f0e766e5ce..8365d9bb62 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSubquerySuite.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSubquerySuite.scala
@@ -46,7 +46,7 @@ class GlutenSubquerySuite extends SubquerySuite with 
GlutenSQLTestsTrait {
         case t: WholeStageTransformer => t
       } match {
         case Some(WholeStageTransformer(fs: FileSourceScanExecTransformer, _)) 
=>
-          fs.dynamicallySelectedPartitions
+          fs.getPartitionArray
             .exists(_.files.exists(_.getPath.toString.contains("p=0")))
         case _ => false
       })
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenSubquerySuite.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenSubquerySuite.scala
index f0e766e5ce..8365d9bb62 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenSubquerySuite.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenSubquerySuite.scala
@@ -46,7 +46,7 @@ class GlutenSubquerySuite extends SubquerySuite with 
GlutenSQLTestsTrait {
         case t: WholeStageTransformer => t
       } match {
         case Some(WholeStageTransformer(fs: FileSourceScanExecTransformer, _)) 
=>
-          fs.dynamicallySelectedPartitions
+          fs.getPartitionArray
             .exists(_.files.exists(_.getPath.toString.contains("p=0")))
         case _ => false
       })
diff --git 
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
 
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
index 4ac1348e1d..e805d175be 100644
--- 
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
+++ 
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
@@ -53,7 +53,7 @@ case class TestFileSourceScanExecTransformer(
     BackendsApiManager.getTransformerApiInstance.genPartitionSeq(
       relation,
       requiredSchema,
-      getPartitionArray(),
+      getPartitionArray,
       output,
       bucketedScan,
       optionalBucketSet,
diff --git 
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
 
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
index bfcdfda267..a15fc73430 100644
--- 
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
+++ 
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
@@ -125,22 +125,21 @@ abstract class FileSourceScanExecShim(
   // We can only determine the actual partitions at runtime when a dynamic 
partition filter is
   // present. This is because such a filter relies on information that is only 
available at run
   // time (for instance the keys used in the other side of a join).
-  @transient lazy val dynamicallySelectedPartitions: Array[PartitionDirectory] 
= {
+  @transient private lazy val dynamicallySelectedPartitions: 
Array[PartitionDirectory] = {
     val dynamicPartitionFilters =
       partitionFilters.filter(isDynamicPruningFilter)
     val selected = if (dynamicPartitionFilters.nonEmpty) {
       GlutenTimeMetric.withMillisTime {
         // call the file index for the files matching all filters except 
dynamic partition filters
-        val predicate = dynamicPartitionFilters.reduce(And)
-        val partitionColumns = relation.partitionSchema
-        val boundPredicate = Predicate.create(
-          predicate.transform {
-            case a: AttributeReference =>
-              val index = partitionColumns.indexWhere(a.name == _.name)
-              BoundReference(index, partitionColumns(index).dataType, nullable 
= true)
-          },
-          Nil
-        )
+        val boundedFilters = dynamicPartitionFilters.map {
+          dynamicPartitionFilter =>
+            dynamicPartitionFilter.transform {
+              case a: AttributeReference =>
+                val index = relation.partitionSchema.indexWhere(a.name == 
_.name)
+                BoundReference(index, 
relation.partitionSchema(index).dataType, nullable = true)
+            }
+        }
+        val boundPredicate = Predicate.create(boundedFilters.reduce(And), Nil)
         val ret = selectedPartitions.filter(p => boundPredicate.eval(p.values))
         setFilesNumAndSizeMetric(ret, static = false)
         ret
@@ -152,7 +151,7 @@ abstract class FileSourceScanExecShim(
     selected
   }
 
-  def getPartitionArray(): Array[PartitionDirectory] = {
+  def getPartitionArray: Array[PartitionDirectory] = {
     dynamicallySelectedPartitions
   }
 }
diff --git 
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
 
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
index f6c2f43c38..3dbd29ef1c 100644
--- 
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
+++ 
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
@@ -134,22 +134,21 @@ abstract class FileSourceScanExecShim(
   // We can only determine the actual partitions at runtime when a dynamic 
partition filter is
   // present. This is because such a filter relies on information that is only 
available at run
   // time (for instance the keys used in the other side of a join).
-  @transient lazy val dynamicallySelectedPartitions: Array[PartitionDirectory] 
= {
+  @transient private lazy val dynamicallySelectedPartitions: 
Array[PartitionDirectory] = {
     val dynamicPartitionFilters =
       partitionFilters.filter(isDynamicPruningFilter)
     val selected = if (dynamicPartitionFilters.nonEmpty) {
       GlutenTimeMetric.withMillisTime {
         // call the file index for the files matching all filters except 
dynamic partition filters
-        val predicate = dynamicPartitionFilters.reduce(And)
-        val partitionColumns = relation.partitionSchema
-        val boundPredicate = Predicate.create(
-          predicate.transform {
-            case a: AttributeReference =>
-              val index = partitionColumns.indexWhere(a.name == _.name)
-              BoundReference(index, partitionColumns(index).dataType, nullable 
= true)
-          },
-          Nil
-        )
+        val boundedFilters = dynamicPartitionFilters.map {
+          dynamicPartitionFilter =>
+            dynamicPartitionFilter.transform {
+              case a: AttributeReference =>
+                val index = relation.partitionSchema.indexWhere(a.name == 
_.name)
+                BoundReference(index, 
relation.partitionSchema(index).dataType, nullable = true)
+            }
+        }
+        val boundPredicate = Predicate.create(boundedFilters.reduce(And), Nil)
         val ret = selectedPartitions.filter(p => boundPredicate.eval(p.values))
         setFilesNumAndSizeMetric(ret, static = false)
         ret
@@ -161,7 +160,7 @@ abstract class FileSourceScanExecShim(
     selected
   }
 
-  def getPartitionArray(): Array[PartitionDirectory] = {
+  def getPartitionArray: Array[PartitionDirectory] = {
     dynamicallySelectedPartitions
   }
 }
diff --git 
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
 
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
index 657915735d..01445c170c 100644
--- 
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
+++ 
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
@@ -100,16 +100,15 @@ abstract class FileSourceScanExecShim(
     val selected = if (dynamicPartitionFilters.nonEmpty) {
       GlutenTimeMetric.withMillisTime {
         // call the file index for the files matching all filters except 
dynamic partition filters
-        val predicate = dynamicPartitionFilters.reduce(And)
-        val partitionColumns = relation.partitionSchema
-        val boundPredicate = Predicate.create(
-          predicate.transform {
-            case a: AttributeReference =>
-              val index = partitionColumns.indexWhere(a.name == _.name)
-              BoundReference(index, partitionColumns(index).dataType, nullable 
= true)
-          },
-          Nil
-        )
+        val boundedFilters = dynamicPartitionFilters.map {
+          dynamicPartitionFilter =>
+            dynamicPartitionFilter.transform {
+              case a: AttributeReference =>
+                val index = relation.partitionSchema.indexWhere(a.name == 
_.name)
+                BoundReference(index, 
relation.partitionSchema(index).dataType, nullable = true)
+            }
+        }
+        val boundPredicate = Predicate.create(boundedFilters.reduce(And), Nil)
         val ret = selectedPartitions.filter(p => boundPredicate.eval(p.values))
         setFilesNumAndSizeMetric(ret, static = false)
         ret
@@ -121,7 +120,7 @@ abstract class FileSourceScanExecShim(
     selected
   }
 
-  def getPartitionArray(): Array[PartitionDirectory] = {
+  def getPartitionArray: Array[PartitionDirectory] = {
     dynamicallySelectedPartitions
   }
 }
diff --git 
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
 
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
index 0b9905d849..234762f2f7 100644
--- 
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
+++ 
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
@@ -101,16 +101,15 @@ abstract class FileSourceScanExecShim(
     val selected = if (dynamicPartitionFilters.nonEmpty) {
       GlutenTimeMetric.withMillisTime {
         // call the file index for the files matching all filters except 
dynamic partition filters
-        val predicate = dynamicPartitionFilters.reduce(And)
-        val partitionColumns = relation.partitionSchema
-        val boundPredicate = Predicate.create(
-          predicate.transform {
-            case a: AttributeReference =>
-              val index = partitionColumns.indexWhere(a.name == _.name)
-              BoundReference(index, partitionColumns(index).dataType, nullable 
= true)
-          },
-          Nil
-        )
+        val boundedFilters = dynamicPartitionFilters.map {
+          dynamicPartitionFilter =>
+            dynamicPartitionFilter.transform {
+              case a: AttributeReference =>
+                val index = relation.partitionSchema.indexWhere(a.name == 
_.name)
+                BoundReference(index, 
relation.partitionSchema(index).dataType, nullable = true)
+            }
+        }
+        val boundPredicate = Predicate.create(boundedFilters.reduce(And), Nil)
         val ret = selectedPartitions.filter(p => boundPredicate.eval(p.values))
         setFilesNumAndSizeMetric(ret, static = false)
         ret
@@ -122,7 +121,7 @@ abstract class FileSourceScanExecShim(
     selected
   }
 
-  def getPartitionArray(): Array[PartitionDirectory] = {
+  def getPartitionArray: Array[PartitionDirectory] = {
     dynamicallySelectedPartitions
   }
 }
diff --git 
a/shims/spark40/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
 
b/shims/spark40/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
index b7fe2cde9f..73cc9ba2ce 100644
--- 
a/shims/spark40/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
+++ 
b/shims/spark40/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
@@ -102,16 +102,15 @@ abstract class FileSourceScanExecShim(
     if (dynamicPartitionFilters.nonEmpty) {
       GlutenTimeMetric.withMillisTime {
         // call the file index for the files matching all filters except 
dynamic partition filters
-        val predicate = dynamicPartitionFilters.reduce(And)
-        val partitionColumns = relation.partitionSchema
-        val boundPredicate = Predicate.create(
-          predicate.transform {
-            case a: AttributeReference =>
-              val index = partitionColumns.indexWhere(a.name == _.name)
-              BoundReference(index, partitionColumns(index).dataType, nullable 
= true)
-          },
-          Nil
-        )
+        val boundedFilters = dynamicPartitionFilters.map {
+          dynamicPartitionFilter =>
+            dynamicPartitionFilter.transform {
+              case a: AttributeReference =>
+                val index = relation.partitionSchema.indexWhere(a.name == 
_.name)
+                BoundReference(index, 
relation.partitionSchema(index).dataType, nullable = true)
+            }
+        }
+        val boundPredicate = Predicate.create(boundedFilters.reduce(And), Nil)
         val returnedFiles =
           selectedPartitions.filterAndPruneFiles(boundPredicate, 
dynamicDataFilters)
         setFilesNumAndSizeMetric(returnedFiles, false)
@@ -122,7 +121,7 @@ abstract class FileSourceScanExecShim(
     }
   }
 
-  def getPartitionArray(): Array[PartitionDirectory] = {
+  def getPartitionArray: Array[PartitionDirectory] = {
     // TODO: fix the value of partiton directories in dynamic pruning
     val staticDataFilters = dataFilters.filterNot(isDynamicPruningFilter)
     val staticPartitionFilters = 
partitionFilters.filterNot(isDynamicPruningFilter)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to