This is an automated email from the ASF dual-hosted git repository.
liuneng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 4cd6440d23 [CORE] Fix numFiles metric not being populated in
FileSourceScanExecTransformer (#11459)
4cd6440d23 is described below
commit 4cd6440d23665d8807fbf1fc0d470a148accf020
Author: Ankita Victor <[email protected]>
AuthorDate: Wed Jan 21 17:13:47 2026 +0530
[CORE] Fix numFiles metric not being populated in
FileSourceScanExecTransformer (#11459)
What changes are proposed in this pull request?
This PR fixes an issue where the numFiles driver-side metric was not being
populated when using Gluten/Velox for file scans in Spark 4.0 and 4.1.
Added sendDriverMetrics() call after the if/else block in
dynamicallySelectedPartitions to match spark35 shim behavior
Changed getPartitionArray() to use
dynamicallySelectedPartitions.filePartitionIterator instead of directly listing
files, ensuring the metrics initialization chain is properly triggered
The numFiles metric (and other driver-side metrics like filesSize,
numPartitions) were always returning 0 in Gluten's Spark 4.0 shim because:
sendDriverMetrics() was never called - When there were no dynamic partition
filters, the dynamicallySelectedPartitions method returned selectedPartitions
directly without calling sendDriverMetrics() to post the metrics to Spark's
metrics system.
getPartitionArray() bypassed the metrics initialization chain - It directly
called relation.location.listFiles() instead of using
dynamicallySelectedPartitions, which meant the selectedPartitions lazy val
(where setFilesNumAndSizeMetric is called) was never evaluated.
How was this patch tested?
UT
---
.../gluten/utils/velox/VeloxTestSettings.scala | 2 +
.../execution/metric/GlutenSQLMetricsSuite.scala | 57 ++++++++++++++++++++++
.../gluten/utils/velox/VeloxTestSettings.scala | 2 +
.../execution/metric/GlutenSQLMetricsSuite.scala | 57 ++++++++++++++++++++++
.../sql/execution/FileSourceScanExecShim.scala | 16 +++---
.../sql/execution/FileSourceScanExecShim.scala | 16 +++---
6 files changed, 136 insertions(+), 14 deletions(-)
diff --git
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 8ce9933df0..a066668517 100644
---
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -35,6 +35,7 @@ import
org.apache.spark.sql.execution.datasources.text.{GlutenTextV1Suite, Glute
import
org.apache.spark.sql.execution.datasources.v2.{GlutenDataSourceV2StrategySuite,
GlutenFileTableSuite, GlutenV2PredicateSuite}
import org.apache.spark.sql.execution.exchange.{GlutenEnsureRequirementsSuite,
GlutenValidateRequirementsSuite}
import org.apache.spark.sql.execution.joins._
+import org.apache.spark.sql.execution.metric.GlutenSQLMetricsSuite
import org.apache.spark.sql.execution.python._
import
org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite,
GlutenSessionExtensionSuite, TestFileSourceScanExecTransformer}
import org.apache.spark.sql.gluten.{GlutenFallbackStrategiesSuite,
GlutenFallbackSuite}
@@ -956,6 +957,7 @@ class VeloxTestSettings extends BackendTestSettings {
// The case doesn't need to be run in Gluten since it's verifying against
// vanilla Spark's query plan.
.exclude("SPARK-47289: extended explain info")
+ enableSuite[GlutenSQLMetricsSuite]
override def getSQLQueryTestSettings: SQLQueryTestSettings =
VeloxSQLQueryTestSettings
}
diff --git
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/metric/GlutenSQLMetricsSuite.scala
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/metric/GlutenSQLMetricsSuite.scala
new file mode 100644
index 0000000000..8eaf4ca129
--- /dev/null
+++
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/metric/GlutenSQLMetricsSuite.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.metric
+
+import org.apache.spark.sql.{GlutenSQLTestsBaseTrait, Row}
+import org.apache.spark.sql.execution.FileSourceScanLike
+import org.apache.spark.sql.functions.col
+
+class GlutenSQLMetricsSuite extends GlutenSQLTestsBaseTrait {
+
+ test("numFiles metric should reflect partition pruning") {
+ withTempDir {
+ tempDir =>
+ val testPath = tempDir.getCanonicalPath
+
+ // Generate two files in two partitions
+ spark
+ .range(2)
+ .withColumn("part", col("id") % 2)
+ .write
+ .format("parquet")
+ .partitionBy("part")
+ .mode("append")
+ .save(testPath)
+
+ // Read only one partition
+ val query = spark.read.format("parquet").load(testPath).where("part =
1")
+ val fileScans = query.queryExecution.executedPlan.collect {
+ case f: FileSourceScanLike => f
+ }
+
+ // Force the query to read files and generate metrics
+ query.queryExecution.executedPlan.execute().count()
+
+ // Verify only one file was read
+ assert(fileScans.size == 1)
+ val numFilesAfterPartitionSkipping =
fileScans.head.metrics.get("numFiles")
+ assert(numFilesAfterPartitionSkipping.nonEmpty)
+ assert(numFilesAfterPartitionSkipping.get.value == 1)
+ assert(query.collect().toSeq == Seq(Row(1, 1)))
+ }
+ }
+}
diff --git
a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 29cd082b45..050ee84f02 100644
---
a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -35,6 +35,7 @@ import
org.apache.spark.sql.execution.datasources.text.{GlutenTextV1Suite, Glute
import org.apache.spark.sql.execution.datasources.v2._
import org.apache.spark.sql.execution.exchange.{GlutenEnsureRequirementsSuite,
GlutenValidateRequirementsSuite}
import org.apache.spark.sql.execution.joins._
+import org.apache.spark.sql.execution.metric.GlutenSQLMetricsSuite
import org.apache.spark.sql.execution.python._
import
org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite,
GlutenSessionExtensionSuite, TestFileSourceScanExecTransformer}
import org.apache.spark.sql.gluten.{GlutenFallbackStrategiesSuite,
GlutenFallbackSuite}
@@ -1016,6 +1017,7 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("dumping query execution info to a file - explainMode=formatted")
// TODO: fix in Spark-4.0
.exclude("SPARK-47289: extended explain info")
+ enableSuite[GlutenSQLMetricsSuite]
override def getSQLQueryTestSettings: SQLQueryTestSettings =
VeloxSQLQueryTestSettings
}
// scalastyle:on line.size.limit
diff --git
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/metric/GlutenSQLMetricsSuite.scala
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/metric/GlutenSQLMetricsSuite.scala
new file mode 100644
index 0000000000..8eaf4ca129
--- /dev/null
+++
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/metric/GlutenSQLMetricsSuite.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.metric
+
+import org.apache.spark.sql.{GlutenSQLTestsBaseTrait, Row}
+import org.apache.spark.sql.execution.FileSourceScanLike
+import org.apache.spark.sql.functions.col
+
+class GlutenSQLMetricsSuite extends GlutenSQLTestsBaseTrait {
+
+ test("numFiles metric should reflect partition pruning") {
+ withTempDir {
+ tempDir =>
+ val testPath = tempDir.getCanonicalPath
+
+ // Generate two files in two partitions
+ spark
+ .range(2)
+ .withColumn("part", col("id") % 2)
+ .write
+ .format("parquet")
+ .partitionBy("part")
+ .mode("append")
+ .save(testPath)
+
+ // Read only one partition
+ val query = spark.read.format("parquet").load(testPath).where("part =
1")
+ val fileScans = query.queryExecution.executedPlan.collect {
+ case f: FileSourceScanLike => f
+ }
+
+ // Force the query to read files and generate metrics
+ query.queryExecution.executedPlan.execute().count()
+
+ // Verify only one file was read
+ assert(fileScans.size == 1)
+ val numFilesAfterPartitionSkipping =
fileScans.head.metrics.get("numFiles")
+ assert(numFilesAfterPartitionSkipping.nonEmpty)
+ assert(numFilesAfterPartitionSkipping.get.value == 1)
+ assert(query.collect().toSeq == Seq(Row(1, 1)))
+ }
+ }
+}
diff --git
a/shims/spark40/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
b/shims/spark40/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
index 2dd8ff3867..c3064f35d1 100644
---
a/shims/spark40/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
+++
b/shims/spark40/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
@@ -107,7 +107,7 @@ abstract class FileSourceScanExecShim(
val dynamicDataFilters = dataFilters.filter(isDynamicPruningFilter)
val dynamicPartitionFilters =
partitionFilters.filter(isDynamicPruningFilter)
- if (dynamicPartitionFilters.nonEmpty) {
+ val selected = if (dynamicPartitionFilters.nonEmpty) {
GlutenTimeMetric.withMillisTime {
// call the file index for the files matching all filters except
dynamic partition filters
val boundedFilters = dynamicPartitionFilters.map {
@@ -127,15 +127,17 @@ abstract class FileSourceScanExecShim(
} else {
selectedPartitions
}
+ sendDriverMetrics()
+ selected
}
def getPartitionArray: Array[PartitionDirectory] = {
- // TODO: fix the value of partiton directories in dynamic pruning
- val staticDataFilters = dataFilters.filterNot(isDynamicPruningFilter)
- val staticPartitionFilters =
partitionFilters.filterNot(isDynamicPruningFilter)
- val partitionDirectories =
- relation.location.listFiles(staticPartitionFilters, staticDataFilters)
- partitionDirectories.toArray
+ // Use dynamicallySelectedPartitions which triggers selectedPartitions
evaluation,
+ // ensuring driver metrics (numFiles, filesSize, etc.) are properly set.
+ // Convert ScanFileListing to Array[PartitionDirectory] via
filePartitionIterator.
+ dynamicallySelectedPartitions.filePartitionIterator.map {
+ listing => PartitionDirectory(listing.values, listing.files.toSeq)
+ }.toArray
}
/**
diff --git
a/shims/spark41/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
b/shims/spark41/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
index d9a50f65c7..0a0bbcae9c 100644
---
a/shims/spark41/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
+++
b/shims/spark41/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
@@ -106,7 +106,7 @@ abstract class FileSourceScanExecShim(
val dynamicDataFilters = dataFilters.filter(isDynamicPruningFilter)
val dynamicPartitionFilters =
partitionFilters.filter(isDynamicPruningFilter)
- if (dynamicPartitionFilters.nonEmpty) {
+ val selected = if (dynamicPartitionFilters.nonEmpty) {
GlutenTimeMetric.withMillisTime {
// call the file index for the files matching all filters except
dynamic partition filters
val boundedFilters = dynamicPartitionFilters.map {
@@ -126,15 +126,17 @@ abstract class FileSourceScanExecShim(
} else {
selectedPartitions
}
+ sendDriverMetrics()
+ selected
}
def getPartitionArray: Array[PartitionDirectory] = {
- // TODO: fix the value of partiton directories in dynamic pruning
- val staticDataFilters = dataFilters.filterNot(isDynamicPruningFilter)
- val staticPartitionFilters =
partitionFilters.filterNot(isDynamicPruningFilter)
- val partitionDirectories =
- relation.location.listFiles(staticPartitionFilters, staticDataFilters)
- partitionDirectories.toArray
+ // Use dynamicallySelectedPartitions which triggers selectedPartitions
evaluation,
+ // ensuring driver metrics (numFiles, filesSize, etc.) are properly set.
+ // Convert ScanFileListing to Array[PartitionDirectory] via
filePartitionIterator.
+ dynamicallySelectedPartitions.filePartitionIterator.map {
+ listing => PartitionDirectory(listing.values, listing.files.toSeq)
+ }.toArray
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]