This is an automated email from the ASF dual-hosted git repository.

jinchengchenghh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 6298aebe0e [GLUTEN-11782][CORE] Optimize parquet metadata validation 
by sampling root paths (#12042)
6298aebe0e is described below

commit 6298aebe0e2805c6941857dfe8bd478d6d955200
Author: Yao-MR <[email protected]>
AuthorDate: Mon May 11 17:53:11 2026 +0800

    [GLUTEN-11782][CORE] Optimize parquet metadata validation by sampling root 
paths (#12042)
    
    When a table has many partitions, the metadata validation checks every
    root path with `fileLimit` files each, resulting in excessive I/O cost.
    
    This patch introduces a sampling mechanism that selects a percentage of
    root paths for validation instead of checking all of them. The file limit
    is distributed evenly across the sampled paths.
    
    Key changes:
    - Add config 
`spark.gluten.sql.fallbackUnexpectedMetadataParquet.samplePercentage`
      with default value 0.1 (10% sampling)
    - Use evenly spaced interval sampling for good partition coverage
    - Add unit tests for the sampling logic
---
 .../gluten/backendsapi/velox/VeloxBackend.scala    | 18 +++++-
 .../apache/gluten/utils/ParquetMetadataUtils.scala | 36 +++++++++++-
 .../utils/ParquetEncryptionDetectionSuite.scala    | 68 ++++++++++++++++++++++
 docs/Configuration.md                              |  1 +
 .../org/apache/gluten/config/GlutenConfig.scala    | 14 +++++
 5 files changed, 135 insertions(+), 2 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 1ac9f264a2..9d2d92a2fc 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -207,10 +207,26 @@ object VeloxBackendSettings extends BackendSettingsApi {
         // Only Parquet is needed for metadata validation so far.
         return None
       }
+      // Skip root paths that do not exist yet (e.g., during INSERT operations 
where
+      // the target directory may not be created yet). Do not filter out local
+      // (file://) paths here because metadata validation is independent of 
native
+      // file system registration and must still run for local paths in test 
envs.
+      val existingRootPaths = rootPaths.filter {
+        p =>
+          try {
+            val path = new Path(p)
+            path.getFileSystem(hadoopConf).exists(path)
+          } catch {
+            case _: Exception => false
+          }
+      }
+      if (existingRootPaths.isEmpty) {
+        return None
+      }
       val fileLimit = GlutenConfig.get.parquetMetadataFallbackFileLimit
       val parquetOptions = new ParquetOptions(CaseInsensitiveMap(properties), 
SQLConf.get)
       ParquetMetadataUtils
-        .validateMetadata(rootPaths, hadoopConf, parquetOptions, fileLimit)
+        .validateMetadata(existingRootPaths, hadoopConf, parquetOptions, 
fileLimit)
         .map(reason => s"Detected unsupported metadata in parquet files: 
$reason")
     }
 
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
index 9864f42a9f..9bf7574efd 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
@@ -52,7 +52,14 @@ object ParquetMetadataUtils extends Logging {
     if (!GlutenConfig.get.parquetMetadataValidationEnabled) {
       None
     } else {
-      rootPaths.foreach {
+      val samplePercentage = 
GlutenConfig.get.parquetMetadataFallbackSamplePercentage
+      val sampledPaths = sampleRootPaths(rootPaths, samplePercentage)
+
+      logDebug(s"Parquet metadata validation: total 
rootPaths=${rootPaths.size}, " +
+        s"sampled=${sampledPaths.size}, samplePercentage=$samplePercentage, " +
+        s"fileLimit=$fileLimit")
+
+      sampledPaths.foreach {
         rootPath =>
           val fs = new Path(rootPath).getFileSystem(hadoopConf)
           try {
@@ -75,6 +82,33 @@ object ParquetMetadataUtils extends Logging {
     }
   }
 
+  /**
+   * Samples root paths based on the given percentage. When the number of root 
paths is large,
+   * sampling reduces the cost of metadata validation significantly.
+   *
+   * The sampling strategy selects paths at evenly spaced intervals to ensure 
good coverage across
+   * different partitions.
+   *
+   * @param rootPaths
+   *   All root paths to sample from
+   * @param samplePercentage
+   *   Percentage of paths to sample, in range (0, 1.0]
+   * @return
+   *   Sampled subset of root paths
+   */
+  private def sampleRootPaths(rootPaths: Seq[String], samplePercentage: 
Double): Seq[String] = {
+    if (samplePercentage >= 1.0 || rootPaths.size <= 1) {
+      return rootPaths
+    }
+    val sampleCount = math.max(1, math.ceil(rootPaths.size * 
samplePercentage).toInt)
+    if (sampleCount >= rootPaths.size) {
+      return rootPaths
+    }
+    // Use evenly spaced interval sampling for better coverage across 
partitions
+    val step = rootPaths.size.toDouble / sampleCount
+    (0 until sampleCount).map(i => rootPaths((i * step).toInt))
+  }
+
   def validateCodec(footer: ParquetMetadata): Option[String] = {
     val blocks = footer.getBlocks
     if (blocks.isEmpty) {
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala
index 1a303ad120..cb2658cb56 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala
@@ -162,4 +162,72 @@ class ParquetEncryptionDetectionSuite extends 
SharedSparkSession {
         assertFalse(isFileEncrypted(filePath))
     }
   }
+
+  test("Metadata validation with sampling - encrypted file detected in sampled 
paths") {
+    withTempDir {
+      tempDir =>
+        // Create multiple subdirectories simulating partitions
+        val numPartitions = 20
+        val encryptedPartitionIndex = 5
+        val paths = (0 until numPartitions).map {
+          i =>
+            val partDir = s"${tempDir.getAbsolutePath}/partition=$i"
+            new java.io.File(partDir).mkdirs()
+            val filePath = s"$partDir/data.parquet"
+            if (i == encryptedPartitionIndex) {
+              val encryptionProps = FileEncryptionProperties
+                .builder(Base64.getDecoder.decode(masterKey))
+                .withEncryptedColumns(
+                  Map(
+                    ColumnPath.get("name") -> ColumnEncryptionProperties
+                      .builder(ColumnPath.get("name"))
+                      .withKey(Base64.getDecoder.decode(columnKey))
+                      .build()).asJava)
+                .build()
+              writeParquet(filePath, Some(encryptionProps), Seq(Map("id" -> i, 
"name" -> "Test")))
+            } else {
+              writeParquet(filePath, None, Seq(Map("id" -> i, "name" -> 
"Test")))
+            }
+            partDir
+        }
+
+        // With 100% sampling, the encrypted file should always be detected
+        withSQLConf(
+          
GlutenConfig.PARQUET_UNEXPECTED_METADATA_FALLBACK_SAMPLE_PERCENTAGE.key -> 
"1.0") {
+          val parquetOptions = new ParquetOptions(CaseInsensitiveMap(Map()), 
SQLConf.get)
+          val result = ParquetMetadataUtils
+            .validateMetadata(paths, new Configuration(), parquetOptions, 100)
+          assertTrue(
+            "Should detect encrypted file with 100% sampling",
+            result.isDefined)
+        }
+    }
+  }
+
+  test("Metadata validation with sampling - all plain files pass validation") {
+    withTempDir {
+      tempDir =>
+        // Create multiple subdirectories with only plain files
+        val numPartitions = 10
+        val paths = (0 until numPartitions).map {
+          i =>
+            val partDir = s"${tempDir.getAbsolutePath}/partition=$i"
+            new java.io.File(partDir).mkdirs()
+            val filePath = s"$partDir/data.parquet"
+            writeParquet(filePath, None, Seq(Map("id" -> i, "name" -> "Test")))
+            partDir
+        }
+
+        // With any sampling percentage, all plain files should pass
+        withSQLConf(
+          
GlutenConfig.PARQUET_UNEXPECTED_METADATA_FALLBACK_SAMPLE_PERCENTAGE.key -> 
"0.3") {
+          val parquetOptions = new ParquetOptions(CaseInsensitiveMap(Map()), 
SQLConf.get)
+          val result = ParquetMetadataUtils
+            .validateMetadata(paths, new Configuration(), parquetOptions, 100)
+          assertFalse(
+            "Should pass validation when all files are plain",
+            result.isDefined)
+        }
+    }
+  }
 }
diff --git a/docs/Configuration.md b/docs/Configuration.md
index b329411505..294e6c010f 100644
--- a/docs/Configuration.md
+++ b/docs/Configuration.md
@@ -120,6 +120,7 @@ nav_order: 15
 | spark.gluten.sql.fallbackRegexpExpressions                         | false   
          | If true, fall back all regexp expressions. There are a few 
incompatible cases between RE2 (used by native engine) and java.util.regex 
(used by Spark). User should enable this property if their incompatibility is 
intolerable.                                                                    
                                                                           |
 | spark.gluten.sql.fallbackUnexpectedMetadataParquet                 | false   
          | If enabled, Gluten will not offload scan when unexpected metadata 
is detected.                                                                    
                                                                                
                                                                                
                                                             |
 | spark.gluten.sql.fallbackUnexpectedMetadataParquet.limit           | 10      
          | If supplied, metadata of `limit` number of Parquet files will be 
checked to determine whether to fall back to java scan.                         
                                                                                
                                                                                
                                                              |
+| spark.gluten.sql.fallbackUnexpectedMetadataParquet.samplePercentage | 0.1    
          | The percentage of root paths to sample for metadata validation when 
the number of root paths is large. Value range is (0, 1.0]. 1.0 means check all 
paths (no sampling). A smaller value reduces validation cost for tables with 
many partitions.                                                                
                                                              |
 | spark.gluten.sql.injectNativePlanStringToExplain                   | false   
          | When true, Gluten will inject native plan tree to Spark's explain 
output.                                                                         
                                                                                
                                                                                
                                                             |
 | spark.gluten.sql.mergeTwoPhasesAggregate.enabled                   | true    
          | Whether to merge two phases aggregate if there are no other 
operators between them.                                                         
                                                                                
                                                                                
                                                                   |
 | spark.gluten.sql.native.arrow.reader.enabled                       | false   
          | This is config to specify whether to enable the native columnar csv 
reader                                                                          
                                                                                
                                                                                
                                                           |
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala 
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index 2ef4c022d0..0e2877ef4e 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -379,6 +379,10 @@ class GlutenConfig(conf: SQLConf) extends 
GlutenCoreConfig(conf) {
     getConf(PARQUET_UNEXPECTED_METADATA_FALLBACK_FILE_LIMIT)
   }
 
+  def parquetMetadataFallbackSamplePercentage: Double = {
+    getConf(PARQUET_UNEXPECTED_METADATA_FALLBACK_SAMPLE_PERCENTAGE)
+  }
+
   def enableColumnarRange: Boolean = getConf(COLUMNAR_RANGE_ENABLED)
   def enableColumnarCollectLimit: Boolean = 
getConf(COLUMNAR_COLLECT_LIMIT_ENABLED)
   def enableColumnarCollectTail: Boolean = 
getConf(COLUMNAR_COLLECT_TAIL_ENABLED)
@@ -1570,6 +1574,16 @@ object GlutenConfig extends ConfigRegistry {
       .checkValue(_ > 0, s"must be positive.")
       .createWithDefault(10)
 
+  val PARQUET_UNEXPECTED_METADATA_FALLBACK_SAMPLE_PERCENTAGE =
+    
buildConf("spark.gluten.sql.fallbackUnexpectedMetadataParquet.samplePercentage")
+      .doc("The percentage of root paths to sample for metadata validation 
when the number" +
+        " of root paths is large. Value range is (0, 1.0]. 1.0 means check all 
paths" +
+        " (no sampling). A smaller value reduces validation cost for tables 
with many" +
+        " partitions.")
+      .doubleConf
+      .checkValue(v => v > 0 && v <= 1.0, "must be in range (0, 1.0].")
+      .createWithDefault(0.1)
+
   val COLUMNAR_RANGE_ENABLED =
     buildConf("spark.gluten.sql.columnar.range")
       .doc("Enable or disable columnar range.")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to