This is an automated email from the ASF dual-hosted git repository.
jinchengchenghh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 6298aebe0e [GLUTEN-11782][CORE] Optimize parquet metadata validation
by sampling root paths (#12042)
6298aebe0e is described below
commit 6298aebe0e2805c6941857dfe8bd478d6d955200
Author: Yao-MR <[email protected]>
AuthorDate: Mon May 11 17:53:11 2026 +0800
[GLUTEN-11782][CORE] Optimize parquet metadata validation by sampling root
paths (#12042)
When a table has many partitions, the metadata validation checks every
root path with `fileLimit` files each, resulting in excessive I/O cost.
This patch introduces a sampling mechanism that selects a percentage of
root paths for validation instead of checking all of them. The file limit
is distributed evenly across the sampled paths.
Key changes:
- Add config
`spark.gluten.sql.fallbackUnexpectedMetadataParquet.samplePercentage`
with default value 0.1 (10% sampling)
- Use evenly spaced interval sampling for good partition coverage
- Add unit tests for the sampling logic
---
.../gluten/backendsapi/velox/VeloxBackend.scala | 18 +++++-
.../apache/gluten/utils/ParquetMetadataUtils.scala | 36 +++++++++++-
.../utils/ParquetEncryptionDetectionSuite.scala | 68 ++++++++++++++++++++++
docs/Configuration.md | 1 +
.../org/apache/gluten/config/GlutenConfig.scala | 14 +++++
5 files changed, 135 insertions(+), 2 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 1ac9f264a2..9d2d92a2fc 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -207,10 +207,26 @@ object VeloxBackendSettings extends BackendSettingsApi {
// Only Parquet is needed for metadata validation so far.
return None
}
+ // Skip root paths that do not exist yet (e.g., during INSERT operations
where
+ // the target directory may not be created yet). Do not filter out local
+ // (file://) paths here because metadata validation is independent of
native
+ // file system registration and must still run for local paths in test
envs.
+ val existingRootPaths = rootPaths.filter {
+ p =>
+ try {
+ val path = new Path(p)
+ path.getFileSystem(hadoopConf).exists(path)
+ } catch {
+ case _: Exception => false
+ }
+ }
+ if (existingRootPaths.isEmpty) {
+ return None
+ }
val fileLimit = GlutenConfig.get.parquetMetadataFallbackFileLimit
val parquetOptions = new ParquetOptions(CaseInsensitiveMap(properties),
SQLConf.get)
ParquetMetadataUtils
- .validateMetadata(rootPaths, hadoopConf, parquetOptions, fileLimit)
+ .validateMetadata(existingRootPaths, hadoopConf, parquetOptions,
fileLimit)
.map(reason => s"Detected unsupported metadata in parquet files:
$reason")
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
index 9864f42a9f..9bf7574efd 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
@@ -52,7 +52,14 @@ object ParquetMetadataUtils extends Logging {
if (!GlutenConfig.get.parquetMetadataValidationEnabled) {
None
} else {
- rootPaths.foreach {
+ val samplePercentage =
GlutenConfig.get.parquetMetadataFallbackSamplePercentage
+ val sampledPaths = sampleRootPaths(rootPaths, samplePercentage)
+
+ logDebug(s"Parquet metadata validation: total
rootPaths=${rootPaths.size}, " +
+ s"sampled=${sampledPaths.size}, samplePercentage=$samplePercentage, " +
+ s"fileLimit=$fileLimit")
+
+ sampledPaths.foreach {
rootPath =>
val fs = new Path(rootPath).getFileSystem(hadoopConf)
try {
@@ -75,6 +82,33 @@ object ParquetMetadataUtils extends Logging {
}
}
+ /**
+ * Samples root paths based on the given percentage. When the number of root
paths is large,
+ * sampling reduces the cost of metadata validation significantly.
+ *
+ * The sampling strategy selects paths at evenly spaced intervals to ensure
good coverage across
+ * different partitions.
+ *
+ * @param rootPaths
+ * All root paths to sample from
+ * @param samplePercentage
+ * Percentage of paths to sample, in range (0, 1.0]
+ * @return
+ * Sampled subset of root paths
+ */
+ private def sampleRootPaths(rootPaths: Seq[String], samplePercentage:
Double): Seq[String] = {
+ if (samplePercentage >= 1.0 || rootPaths.size <= 1) {
+ return rootPaths
+ }
+ val sampleCount = math.max(1, math.ceil(rootPaths.size *
samplePercentage).toInt)
+ if (sampleCount >= rootPaths.size) {
+ return rootPaths
+ }
+ // Use evenly spaced interval sampling for better coverage across
partitions
+ val step = rootPaths.size.toDouble / sampleCount
+ (0 until sampleCount).map(i => rootPaths((i * step).toInt))
+ }
+
def validateCodec(footer: ParquetMetadata): Option[String] = {
val blocks = footer.getBlocks
if (blocks.isEmpty) {
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala
index 1a303ad120..cb2658cb56 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala
@@ -162,4 +162,72 @@ class ParquetEncryptionDetectionSuite extends
SharedSparkSession {
assertFalse(isFileEncrypted(filePath))
}
}
+
+ test("Metadata validation with sampling - encrypted file detected in sampled
paths") {
+ withTempDir {
+ tempDir =>
+ // Create multiple subdirectories simulating partitions
+ val numPartitions = 20
+ val encryptedPartitionIndex = 5
+ val paths = (0 until numPartitions).map {
+ i =>
+ val partDir = s"${tempDir.getAbsolutePath}/partition=$i"
+ new java.io.File(partDir).mkdirs()
+ val filePath = s"$partDir/data.parquet"
+ if (i == encryptedPartitionIndex) {
+ val encryptionProps = FileEncryptionProperties
+ .builder(Base64.getDecoder.decode(masterKey))
+ .withEncryptedColumns(
+ Map(
+ ColumnPath.get("name") -> ColumnEncryptionProperties
+ .builder(ColumnPath.get("name"))
+ .withKey(Base64.getDecoder.decode(columnKey))
+ .build()).asJava)
+ .build()
+ writeParquet(filePath, Some(encryptionProps), Seq(Map("id" -> i,
"name" -> "Test")))
+ } else {
+ writeParquet(filePath, None, Seq(Map("id" -> i, "name" ->
"Test")))
+ }
+ partDir
+ }
+
+ // With 100% sampling, the encrypted file should always be detected
+ withSQLConf(
+
GlutenConfig.PARQUET_UNEXPECTED_METADATA_FALLBACK_SAMPLE_PERCENTAGE.key ->
"1.0") {
+ val parquetOptions = new ParquetOptions(CaseInsensitiveMap(Map()),
SQLConf.get)
+ val result = ParquetMetadataUtils
+ .validateMetadata(paths, new Configuration(), parquetOptions, 100)
+ assertTrue(
+ "Should detect encrypted file with 100% sampling",
+ result.isDefined)
+ }
+ }
+ }
+
+ test("Metadata validation with sampling - all plain files pass validation") {
+ withTempDir {
+ tempDir =>
+ // Create multiple subdirectories with only plain files
+ val numPartitions = 10
+ val paths = (0 until numPartitions).map {
+ i =>
+ val partDir = s"${tempDir.getAbsolutePath}/partition=$i"
+ new java.io.File(partDir).mkdirs()
+ val filePath = s"$partDir/data.parquet"
+ writeParquet(filePath, None, Seq(Map("id" -> i, "name" -> "Test")))
+ partDir
+ }
+
+ // With any sampling percentage, all plain files should pass
+ withSQLConf(
+
GlutenConfig.PARQUET_UNEXPECTED_METADATA_FALLBACK_SAMPLE_PERCENTAGE.key ->
"0.3") {
+ val parquetOptions = new ParquetOptions(CaseInsensitiveMap(Map()),
SQLConf.get)
+ val result = ParquetMetadataUtils
+ .validateMetadata(paths, new Configuration(), parquetOptions, 100)
+ assertFalse(
+ "Should pass validation when all files are plain",
+ result.isDefined)
+ }
+ }
+ }
}
diff --git a/docs/Configuration.md b/docs/Configuration.md
index b329411505..294e6c010f 100644
--- a/docs/Configuration.md
+++ b/docs/Configuration.md
@@ -120,6 +120,7 @@ nav_order: 15
| spark.gluten.sql.fallbackRegexpExpressions | false
| If true, fall back all regexp expressions. There are a few
incompatible cases between RE2 (used by native engine) and java.util.regex
(used by Spark). User should enable this property if their incompatibility is
intolerable.
|
| spark.gluten.sql.fallbackUnexpectedMetadataParquet | false
| If enabled, Gluten will not offload scan when unexpected metadata
is detected.
|
| spark.gluten.sql.fallbackUnexpectedMetadataParquet.limit | 10
| If supplied, metadata of `limit` number of Parquet files will be
checked to determine whether to fall back to java scan.
|
+| spark.gluten.sql.fallbackUnexpectedMetadataParquet.samplePercentage | 0.1
| The percentage of root paths to sample for metadata validation when
the number of root paths is large. Value range is (0, 1.0]. 1.0 means check all
paths (no sampling). A smaller value reduces validation cost for tables with
many partitions.
|
| spark.gluten.sql.injectNativePlanStringToExplain | false
| When true, Gluten will inject native plan tree to Spark's explain
output.
|
| spark.gluten.sql.mergeTwoPhasesAggregate.enabled | true
| Whether to merge two phases aggregate if there are no other
operators between them.
|
| spark.gluten.sql.native.arrow.reader.enabled | false
| This is config to specify whether to enable the native columnar csv
reader
|
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index 2ef4c022d0..0e2877ef4e 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -379,6 +379,10 @@ class GlutenConfig(conf: SQLConf) extends
GlutenCoreConfig(conf) {
getConf(PARQUET_UNEXPECTED_METADATA_FALLBACK_FILE_LIMIT)
}
+ def parquetMetadataFallbackSamplePercentage: Double = {
+ getConf(PARQUET_UNEXPECTED_METADATA_FALLBACK_SAMPLE_PERCENTAGE)
+ }
+
def enableColumnarRange: Boolean = getConf(COLUMNAR_RANGE_ENABLED)
def enableColumnarCollectLimit: Boolean =
getConf(COLUMNAR_COLLECT_LIMIT_ENABLED)
def enableColumnarCollectTail: Boolean =
getConf(COLUMNAR_COLLECT_TAIL_ENABLED)
@@ -1570,6 +1574,16 @@ object GlutenConfig extends ConfigRegistry {
.checkValue(_ > 0, s"must be positive.")
.createWithDefault(10)
+ val PARQUET_UNEXPECTED_METADATA_FALLBACK_SAMPLE_PERCENTAGE =
+
buildConf("spark.gluten.sql.fallbackUnexpectedMetadataParquet.samplePercentage")
+ .doc("The percentage of root paths to sample for metadata validation
when the number" +
+ " of root paths is large. Value range is (0, 1.0]. 1.0 means check all
paths" +
+ " (no sampling). A smaller value reduces validation cost for tables
with many" +
+ " partitions.")
+ .doubleConf
+ .checkValue(v => v > 0 && v <= 1.0, "must be in range (0, 1.0].")
+ .createWithDefault(0.1)
+
val COLUMNAR_RANGE_ENABLED =
buildConf("spark.gluten.sql.columnar.range")
.doc("Enable or disable columnar range.")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]