This is an automated email from the ASF dual-hosted git repository.

liuneng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 4cd6440d23 [CORE] Fix numFiles metric not being populated in 
FileSourceScanExecTransformer (#11459)
4cd6440d23 is described below

commit 4cd6440d23665d8807fbf1fc0d470a148accf020
Author: Ankita Victor <[email protected]>
AuthorDate: Wed Jan 21 17:13:47 2026 +0530

    [CORE] Fix numFiles metric not being populated in 
FileSourceScanExecTransformer (#11459)
    
    What changes are proposed in this pull request?
    This PR fixes an issue where the numFiles driver-side metric was not being 
populated when using Gluten/Velox for file scans in Spark 4.0 and 4.1.
    
    Added sendDriverMetrics() call after the if/else block in 
dynamicallySelectedPartitions to match spark35 shim behavior
    Changed getPartitionArray() to use 
dynamicallySelectedPartitions.filePartitionIterator instead of directly listing 
files, ensuring the metrics initialization chain is properly triggered
    The numFiles metric (and other driver-side metrics like filesSize, 
numPartitions) were always returning 0 in Gluten's Spark 4.0 shim because:
    
    sendDriverMetrics() was never called - When there were no dynamic partition 
filters, the dynamicallySelectedPartitions method returned selectedPartitions 
directly without calling sendDriverMetrics() to post the metrics to Spark's 
metrics system.
    
    getPartitionArray() bypassed the metrics initialization chain - It directly 
called relation.location.listFiles() instead of using 
dynamicallySelectedPartitions, which meant the selectedPartitions lazy val 
(where setFilesNumAndSizeMetric is called) was never evaluated.
    
    How was this patch tested?
    UT
---
 .../gluten/utils/velox/VeloxTestSettings.scala     |  2 +
 .../execution/metric/GlutenSQLMetricsSuite.scala   | 57 ++++++++++++++++++++++
 .../gluten/utils/velox/VeloxTestSettings.scala     |  2 +
 .../execution/metric/GlutenSQLMetricsSuite.scala   | 57 ++++++++++++++++++++++
 .../sql/execution/FileSourceScanExecShim.scala     | 16 +++---
 .../sql/execution/FileSourceScanExecShim.scala     | 16 +++---
 6 files changed, 136 insertions(+), 14 deletions(-)

diff --git 
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 8ce9933df0..a066668517 100644
--- 
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -35,6 +35,7 @@ import 
org.apache.spark.sql.execution.datasources.text.{GlutenTextV1Suite, Glute
 import 
org.apache.spark.sql.execution.datasources.v2.{GlutenDataSourceV2StrategySuite, 
GlutenFileTableSuite, GlutenV2PredicateSuite}
 import org.apache.spark.sql.execution.exchange.{GlutenEnsureRequirementsSuite, 
GlutenValidateRequirementsSuite}
 import org.apache.spark.sql.execution.joins._
+import org.apache.spark.sql.execution.metric.GlutenSQLMetricsSuite
 import org.apache.spark.sql.execution.python._
 import 
org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, 
GlutenSessionExtensionSuite, TestFileSourceScanExecTransformer}
 import org.apache.spark.sql.gluten.{GlutenFallbackStrategiesSuite, 
GlutenFallbackSuite}
@@ -956,6 +957,7 @@ class VeloxTestSettings extends BackendTestSettings {
     // The case doesn't need to be run in Gluten since it's verifying against
     // vanilla Spark's query plan.
     .exclude("SPARK-47289: extended explain info")
+  enableSuite[GlutenSQLMetricsSuite]
 
   override def getSQLQueryTestSettings: SQLQueryTestSettings = 
VeloxSQLQueryTestSettings
 }
diff --git 
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/metric/GlutenSQLMetricsSuite.scala
 
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/metric/GlutenSQLMetricsSuite.scala
new file mode 100644
index 0000000000..8eaf4ca129
--- /dev/null
+++ 
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/metric/GlutenSQLMetricsSuite.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.metric
+
+import org.apache.spark.sql.{GlutenSQLTestsBaseTrait, Row}
+import org.apache.spark.sql.execution.FileSourceScanLike
+import org.apache.spark.sql.functions.col
+
+class GlutenSQLMetricsSuite extends GlutenSQLTestsBaseTrait {
+
+  test("numFiles metric should reflect partition pruning") {
+    withTempDir {
+      tempDir =>
+        val testPath = tempDir.getCanonicalPath
+
+        // Generate two files in two partitions
+        spark
+          .range(2)
+          .withColumn("part", col("id") % 2)
+          .write
+          .format("parquet")
+          .partitionBy("part")
+          .mode("append")
+          .save(testPath)
+
+        // Read only one partition
+        val query = spark.read.format("parquet").load(testPath).where("part = 
1")
+        val fileScans = query.queryExecution.executedPlan.collect {
+          case f: FileSourceScanLike => f
+        }
+
+        // Force the query to read files and generate metrics
+        query.queryExecution.executedPlan.execute().count()
+
+        // Verify only one file was read
+        assert(fileScans.size == 1)
+        val numFilesAfterPartitionSkipping = 
fileScans.head.metrics.get("numFiles")
+        assert(numFilesAfterPartitionSkipping.nonEmpty)
+        assert(numFilesAfterPartitionSkipping.get.value == 1)
+        assert(query.collect().toSeq == Seq(Row(1, 1)))
+    }
+  }
+}
diff --git 
a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 29cd082b45..050ee84f02 100644
--- 
a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -35,6 +35,7 @@ import 
org.apache.spark.sql.execution.datasources.text.{GlutenTextV1Suite, Glute
 import org.apache.spark.sql.execution.datasources.v2._
 import org.apache.spark.sql.execution.exchange.{GlutenEnsureRequirementsSuite, 
GlutenValidateRequirementsSuite}
 import org.apache.spark.sql.execution.joins._
+import org.apache.spark.sql.execution.metric.GlutenSQLMetricsSuite
 import org.apache.spark.sql.execution.python._
 import 
org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, 
GlutenSessionExtensionSuite, TestFileSourceScanExecTransformer}
 import org.apache.spark.sql.gluten.{GlutenFallbackStrategiesSuite, 
GlutenFallbackSuite}
@@ -1016,6 +1017,7 @@ class VeloxTestSettings extends BackendTestSettings {
     .exclude("dumping query execution info to a file - explainMode=formatted")
     // TODO: fix in Spark-4.0
     .exclude("SPARK-47289: extended explain info")
+  enableSuite[GlutenSQLMetricsSuite]
   override def getSQLQueryTestSettings: SQLQueryTestSettings = 
VeloxSQLQueryTestSettings
 }
 // scalastyle:on line.size.limit
diff --git 
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/metric/GlutenSQLMetricsSuite.scala
 
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/metric/GlutenSQLMetricsSuite.scala
new file mode 100644
index 0000000000..8eaf4ca129
--- /dev/null
+++ 
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/metric/GlutenSQLMetricsSuite.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.metric
+
+import org.apache.spark.sql.{GlutenSQLTestsBaseTrait, Row}
+import org.apache.spark.sql.execution.FileSourceScanLike
+import org.apache.spark.sql.functions.col
+
+class GlutenSQLMetricsSuite extends GlutenSQLTestsBaseTrait {
+
+  test("numFiles metric should reflect partition pruning") {
+    withTempDir {
+      tempDir =>
+        val testPath = tempDir.getCanonicalPath
+
+        // Generate two files in two partitions
+        spark
+          .range(2)
+          .withColumn("part", col("id") % 2)
+          .write
+          .format("parquet")
+          .partitionBy("part")
+          .mode("append")
+          .save(testPath)
+
+        // Read only one partition
+        val query = spark.read.format("parquet").load(testPath).where("part = 
1")
+        val fileScans = query.queryExecution.executedPlan.collect {
+          case f: FileSourceScanLike => f
+        }
+
+        // Force the query to read files and generate metrics
+        query.queryExecution.executedPlan.execute().count()
+
+        // Verify only one file was read
+        assert(fileScans.size == 1)
+        val numFilesAfterPartitionSkipping = 
fileScans.head.metrics.get("numFiles")
+        assert(numFilesAfterPartitionSkipping.nonEmpty)
+        assert(numFilesAfterPartitionSkipping.get.value == 1)
+        assert(query.collect().toSeq == Seq(Row(1, 1)))
+    }
+  }
+}
diff --git 
a/shims/spark40/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
 
b/shims/spark40/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
index 2dd8ff3867..c3064f35d1 100644
--- 
a/shims/spark40/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
+++ 
b/shims/spark40/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
@@ -107,7 +107,7 @@ abstract class FileSourceScanExecShim(
     val dynamicDataFilters = dataFilters.filter(isDynamicPruningFilter)
     val dynamicPartitionFilters =
       partitionFilters.filter(isDynamicPruningFilter)
-    if (dynamicPartitionFilters.nonEmpty) {
+    val selected = if (dynamicPartitionFilters.nonEmpty) {
       GlutenTimeMetric.withMillisTime {
         // call the file index for the files matching all filters except 
dynamic partition filters
         val boundedFilters = dynamicPartitionFilters.map {
@@ -127,15 +127,17 @@ abstract class FileSourceScanExecShim(
     } else {
       selectedPartitions
     }
+    sendDriverMetrics()
+    selected
   }
 
   def getPartitionArray: Array[PartitionDirectory] = {
-    // TODO: fix the value of partiton directories in dynamic pruning
-    val staticDataFilters = dataFilters.filterNot(isDynamicPruningFilter)
-    val staticPartitionFilters = 
partitionFilters.filterNot(isDynamicPruningFilter)
-    val partitionDirectories =
-      relation.location.listFiles(staticPartitionFilters, staticDataFilters)
-    partitionDirectories.toArray
+    // Use dynamicallySelectedPartitions which triggers selectedPartitions 
evaluation,
+    // ensuring driver metrics (numFiles, filesSize, etc.) are properly set.
+    // Convert ScanFileListing to Array[PartitionDirectory] via 
filePartitionIterator.
+    dynamicallySelectedPartitions.filePartitionIterator.map {
+      listing => PartitionDirectory(listing.values, listing.files.toSeq)
+    }.toArray
   }
 
   /**
diff --git 
a/shims/spark41/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
 
b/shims/spark41/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
index d9a50f65c7..0a0bbcae9c 100644
--- 
a/shims/spark41/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
+++ 
b/shims/spark41/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
@@ -106,7 +106,7 @@ abstract class FileSourceScanExecShim(
     val dynamicDataFilters = dataFilters.filter(isDynamicPruningFilter)
     val dynamicPartitionFilters =
       partitionFilters.filter(isDynamicPruningFilter)
-    if (dynamicPartitionFilters.nonEmpty) {
+    val selected = if (dynamicPartitionFilters.nonEmpty) {
       GlutenTimeMetric.withMillisTime {
         // call the file index for the files matching all filters except 
dynamic partition filters
         val boundedFilters = dynamicPartitionFilters.map {
@@ -126,15 +126,17 @@ abstract class FileSourceScanExecShim(
     } else {
       selectedPartitions
     }
+    sendDriverMetrics()
+    selected
   }
 
   def getPartitionArray: Array[PartitionDirectory] = {
-    // TODO: fix the value of partiton directories in dynamic pruning
-    val staticDataFilters = dataFilters.filterNot(isDynamicPruningFilter)
-    val staticPartitionFilters = 
partitionFilters.filterNot(isDynamicPruningFilter)
-    val partitionDirectories =
-      relation.location.listFiles(staticPartitionFilters, staticDataFilters)
-    partitionDirectories.toArray
+    // Use dynamicallySelectedPartitions which triggers selectedPartitions 
evaluation,
+    // ensuring driver metrics (numFiles, filesSize, etc.) are properly set.
+    // Convert ScanFileListing to Array[PartitionDirectory] via 
filePartitionIterator.
+    dynamicallySelectedPartitions.filePartitionIterator.map {
+      listing => PartitionDirectory(listing.values, listing.files.toSeq)
+    }.toArray
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to