This is an automated email from the ASF dual-hosted git repository.
beliefer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 9c8023b9eb [GLUTEN-11163][CORE] Refactor dynamicallySelectedPartitions
for FileSourceScanExecShim (#11164)
9c8023b9eb is described below
commit 9c8023b9eb2641d92fcc142390b0eb60a9b9f9bd
Author: Jiaan Geng <[email protected]>
AuthorDate: Wed Nov 26 17:39:44 2025 +0800
[GLUTEN-11163][CORE] Refactor dynamicallySelectedPartitions for
FileSourceScanExecShim (#11164)
---
.../execution/FileSourceScanExecTransformer.scala | 2 +-
.../org/apache/spark/sql/GlutenSubquerySuite.scala | 2 +-
.../org/apache/spark/sql/GlutenSubquerySuite.scala | 2 +-
.../org/apache/spark/sql/GlutenSubquerySuite.scala | 2 +-
.../org/apache/spark/sql/GlutenSubquerySuite.scala | 2 +-
.../TestFileSourceScanExecTransformer.scala | 2 +-
.../sql/execution/FileSourceScanExecShim.scala | 23 +++++++++++-----------
.../sql/execution/FileSourceScanExecShim.scala | 23 +++++++++++-----------
.../sql/execution/FileSourceScanExecShim.scala | 21 ++++++++++----------
.../sql/execution/FileSourceScanExecShim.scala | 21 ++++++++++----------
.../sql/execution/FileSourceScanExecShim.scala | 21 ++++++++++----------
11 files changed, 58 insertions(+), 63 deletions(-)
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
index af77fa24b3..c00ed1f671 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
@@ -128,7 +128,7 @@ abstract class FileSourceScanExecTransformerBase(
.genPartitionSeq(
relation,
requiredSchema,
- getPartitionArray(),
+ getPartitionArray,
output,
bucketedScan,
optionalBucketSet,
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/GlutenSubquerySuite.scala
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/GlutenSubquerySuite.scala
index f0e766e5ce..8365d9bb62 100644
---
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/GlutenSubquerySuite.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/GlutenSubquerySuite.scala
@@ -46,7 +46,7 @@ class GlutenSubquerySuite extends SubquerySuite with
GlutenSQLTestsTrait {
case t: WholeStageTransformer => t
} match {
case Some(WholeStageTransformer(fs: FileSourceScanExecTransformer, _))
=>
- fs.dynamicallySelectedPartitions
+ fs.getPartitionArray
.exists(_.files.exists(_.getPath.toString.contains("p=0")))
case _ => false
})
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenSubquerySuite.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenSubquerySuite.scala
index f0e766e5ce..8365d9bb62 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenSubquerySuite.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenSubquerySuite.scala
@@ -46,7 +46,7 @@ class GlutenSubquerySuite extends SubquerySuite with
GlutenSQLTestsTrait {
case t: WholeStageTransformer => t
} match {
case Some(WholeStageTransformer(fs: FileSourceScanExecTransformer, _))
=>
- fs.dynamicallySelectedPartitions
+ fs.getPartitionArray
.exists(_.files.exists(_.getPath.toString.contains("p=0")))
case _ => false
})
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSubquerySuite.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSubquerySuite.scala
index f0e766e5ce..8365d9bb62 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSubquerySuite.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSubquerySuite.scala
@@ -46,7 +46,7 @@ class GlutenSubquerySuite extends SubquerySuite with
GlutenSQLTestsTrait {
case t: WholeStageTransformer => t
} match {
case Some(WholeStageTransformer(fs: FileSourceScanExecTransformer, _))
=>
- fs.dynamicallySelectedPartitions
+ fs.getPartitionArray
.exists(_.files.exists(_.getPath.toString.contains("p=0")))
case _ => false
})
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenSubquerySuite.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenSubquerySuite.scala
index f0e766e5ce..8365d9bb62 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenSubquerySuite.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenSubquerySuite.scala
@@ -46,7 +46,7 @@ class GlutenSubquerySuite extends SubquerySuite with
GlutenSQLTestsTrait {
case t: WholeStageTransformer => t
} match {
case Some(WholeStageTransformer(fs: FileSourceScanExecTransformer, _))
=>
- fs.dynamicallySelectedPartitions
+ fs.getPartitionArray
.exists(_.files.exists(_.getPath.toString.contains("p=0")))
case _ => false
})
diff --git
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
index 4ac1348e1d..e805d175be 100644
---
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
+++
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
@@ -53,7 +53,7 @@ case class TestFileSourceScanExecTransformer(
BackendsApiManager.getTransformerApiInstance.genPartitionSeq(
relation,
requiredSchema,
- getPartitionArray(),
+ getPartitionArray,
output,
bucketedScan,
optionalBucketSet,
diff --git
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
index bfcdfda267..a15fc73430 100644
---
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
+++
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
@@ -125,22 +125,21 @@ abstract class FileSourceScanExecShim(
// We can only determine the actual partitions at runtime when a dynamic
partition filter is
// present. This is because such a filter relies on information that is only
available at run
// time (for instance the keys used in the other side of a join).
- @transient lazy val dynamicallySelectedPartitions: Array[PartitionDirectory]
= {
+ @transient private lazy val dynamicallySelectedPartitions:
Array[PartitionDirectory] = {
val dynamicPartitionFilters =
partitionFilters.filter(isDynamicPruningFilter)
val selected = if (dynamicPartitionFilters.nonEmpty) {
GlutenTimeMetric.withMillisTime {
// call the file index for the files matching all filters except
dynamic partition filters
- val predicate = dynamicPartitionFilters.reduce(And)
- val partitionColumns = relation.partitionSchema
- val boundPredicate = Predicate.create(
- predicate.transform {
- case a: AttributeReference =>
- val index = partitionColumns.indexWhere(a.name == _.name)
- BoundReference(index, partitionColumns(index).dataType, nullable
= true)
- },
- Nil
- )
+ val boundedFilters = dynamicPartitionFilters.map {
+ dynamicPartitionFilter =>
+ dynamicPartitionFilter.transform {
+ case a: AttributeReference =>
+ val index = relation.partitionSchema.indexWhere(a.name ==
_.name)
+ BoundReference(index,
relation.partitionSchema(index).dataType, nullable = true)
+ }
+ }
+ val boundPredicate = Predicate.create(boundedFilters.reduce(And), Nil)
val ret = selectedPartitions.filter(p => boundPredicate.eval(p.values))
setFilesNumAndSizeMetric(ret, static = false)
ret
@@ -152,7 +151,7 @@ abstract class FileSourceScanExecShim(
selected
}
- def getPartitionArray(): Array[PartitionDirectory] = {
+ def getPartitionArray: Array[PartitionDirectory] = {
dynamicallySelectedPartitions
}
}
diff --git
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
index f6c2f43c38..3dbd29ef1c 100644
---
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
+++
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
@@ -134,22 +134,21 @@ abstract class FileSourceScanExecShim(
// We can only determine the actual partitions at runtime when a dynamic
partition filter is
// present. This is because such a filter relies on information that is only
available at run
// time (for instance the keys used in the other side of a join).
- @transient lazy val dynamicallySelectedPartitions: Array[PartitionDirectory]
= {
+ @transient private lazy val dynamicallySelectedPartitions:
Array[PartitionDirectory] = {
val dynamicPartitionFilters =
partitionFilters.filter(isDynamicPruningFilter)
val selected = if (dynamicPartitionFilters.nonEmpty) {
GlutenTimeMetric.withMillisTime {
// call the file index for the files matching all filters except
dynamic partition filters
- val predicate = dynamicPartitionFilters.reduce(And)
- val partitionColumns = relation.partitionSchema
- val boundPredicate = Predicate.create(
- predicate.transform {
- case a: AttributeReference =>
- val index = partitionColumns.indexWhere(a.name == _.name)
- BoundReference(index, partitionColumns(index).dataType, nullable
= true)
- },
- Nil
- )
+ val boundedFilters = dynamicPartitionFilters.map {
+ dynamicPartitionFilter =>
+ dynamicPartitionFilter.transform {
+ case a: AttributeReference =>
+ val index = relation.partitionSchema.indexWhere(a.name ==
_.name)
+ BoundReference(index,
relation.partitionSchema(index).dataType, nullable = true)
+ }
+ }
+ val boundPredicate = Predicate.create(boundedFilters.reduce(And), Nil)
val ret = selectedPartitions.filter(p => boundPredicate.eval(p.values))
setFilesNumAndSizeMetric(ret, static = false)
ret
@@ -161,7 +160,7 @@ abstract class FileSourceScanExecShim(
selected
}
- def getPartitionArray(): Array[PartitionDirectory] = {
+ def getPartitionArray: Array[PartitionDirectory] = {
dynamicallySelectedPartitions
}
}
diff --git
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
index 657915735d..01445c170c 100644
---
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
+++
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
@@ -100,16 +100,15 @@ abstract class FileSourceScanExecShim(
val selected = if (dynamicPartitionFilters.nonEmpty) {
GlutenTimeMetric.withMillisTime {
// call the file index for the files matching all filters except
dynamic partition filters
- val predicate = dynamicPartitionFilters.reduce(And)
- val partitionColumns = relation.partitionSchema
- val boundPredicate = Predicate.create(
- predicate.transform {
- case a: AttributeReference =>
- val index = partitionColumns.indexWhere(a.name == _.name)
- BoundReference(index, partitionColumns(index).dataType, nullable
= true)
- },
- Nil
- )
+ val boundedFilters = dynamicPartitionFilters.map {
+ dynamicPartitionFilter =>
+ dynamicPartitionFilter.transform {
+ case a: AttributeReference =>
+ val index = relation.partitionSchema.indexWhere(a.name ==
_.name)
+ BoundReference(index,
relation.partitionSchema(index).dataType, nullable = true)
+ }
+ }
+ val boundPredicate = Predicate.create(boundedFilters.reduce(And), Nil)
val ret = selectedPartitions.filter(p => boundPredicate.eval(p.values))
setFilesNumAndSizeMetric(ret, static = false)
ret
@@ -121,7 +120,7 @@ abstract class FileSourceScanExecShim(
selected
}
- def getPartitionArray(): Array[PartitionDirectory] = {
+ def getPartitionArray: Array[PartitionDirectory] = {
dynamicallySelectedPartitions
}
}
diff --git
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
index 0b9905d849..234762f2f7 100644
---
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
+++
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
@@ -101,16 +101,15 @@ abstract class FileSourceScanExecShim(
val selected = if (dynamicPartitionFilters.nonEmpty) {
GlutenTimeMetric.withMillisTime {
// call the file index for the files matching all filters except
dynamic partition filters
- val predicate = dynamicPartitionFilters.reduce(And)
- val partitionColumns = relation.partitionSchema
- val boundPredicate = Predicate.create(
- predicate.transform {
- case a: AttributeReference =>
- val index = partitionColumns.indexWhere(a.name == _.name)
- BoundReference(index, partitionColumns(index).dataType, nullable
= true)
- },
- Nil
- )
+ val boundedFilters = dynamicPartitionFilters.map {
+ dynamicPartitionFilter =>
+ dynamicPartitionFilter.transform {
+ case a: AttributeReference =>
+ val index = relation.partitionSchema.indexWhere(a.name ==
_.name)
+ BoundReference(index,
relation.partitionSchema(index).dataType, nullable = true)
+ }
+ }
+ val boundPredicate = Predicate.create(boundedFilters.reduce(And), Nil)
val ret = selectedPartitions.filter(p => boundPredicate.eval(p.values))
setFilesNumAndSizeMetric(ret, static = false)
ret
@@ -122,7 +121,7 @@ abstract class FileSourceScanExecShim(
selected
}
- def getPartitionArray(): Array[PartitionDirectory] = {
+ def getPartitionArray: Array[PartitionDirectory] = {
dynamicallySelectedPartitions
}
}
diff --git
a/shims/spark40/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
b/shims/spark40/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
index b7fe2cde9f..73cc9ba2ce 100644
---
a/shims/spark40/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
+++
b/shims/spark40/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
@@ -102,16 +102,15 @@ abstract class FileSourceScanExecShim(
if (dynamicPartitionFilters.nonEmpty) {
GlutenTimeMetric.withMillisTime {
// call the file index for the files matching all filters except
dynamic partition filters
- val predicate = dynamicPartitionFilters.reduce(And)
- val partitionColumns = relation.partitionSchema
- val boundPredicate = Predicate.create(
- predicate.transform {
- case a: AttributeReference =>
- val index = partitionColumns.indexWhere(a.name == _.name)
- BoundReference(index, partitionColumns(index).dataType, nullable
= true)
- },
- Nil
- )
+ val boundedFilters = dynamicPartitionFilters.map {
+ dynamicPartitionFilter =>
+ dynamicPartitionFilter.transform {
+ case a: AttributeReference =>
+ val index = relation.partitionSchema.indexWhere(a.name ==
_.name)
+ BoundReference(index,
relation.partitionSchema(index).dataType, nullable = true)
+ }
+ }
+ val boundPredicate = Predicate.create(boundedFilters.reduce(And), Nil)
val returnedFiles =
selectedPartitions.filterAndPruneFiles(boundPredicate,
dynamicDataFilters)
setFilesNumAndSizeMetric(returnedFiles, false)
@@ -122,7 +121,7 @@ abstract class FileSourceScanExecShim(
}
}
- def getPartitionArray(): Array[PartitionDirectory] = {
+ def getPartitionArray: Array[PartitionDirectory] = {
// TODO: fix the value of partiton directories in dynamic pruning
val staticDataFilters = dataFilters.filterNot(isDynamicPruningFilter)
val staticPartitionFilters =
partitionFilters.filterNot(isDynamicPruningFilter)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]