This is an automated email from the ASF dual-hosted git repository.

beliefer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new c1e3d7631f [GLUTEN-11203][CORE] Validate input format for Hive table's 
partition (#11205)
c1e3d7631f is described below

commit c1e3d7631f708fa6ec19d83bd008c60c0ecd0349
Author: Jiaan Geng <[email protected]>
AuthorDate: Thu Dec 4 10:31:54 2025 +0800

    [GLUTEN-11203][CORE] Validate input format for Hive table's partition 
(#11205)
---
 .../gluten/backendsapi/clickhouse/CHBackend.scala  | 40 ++++++++++++++--------
 .../gluten/backendsapi/velox/VeloxBackend.scala    | 17 +++++++--
 .../gluten/backendsapi/BackendSettingsApi.scala    |  3 +-
 .../execution/BasicScanExecTransformer.scala       |  4 ++-
 .../gluten/execution/WholeStageTransformer.scala   |  3 ++
 .../sql/hive/HiveTableScanExecTransformer.scala    |  6 ++++
 6 files changed, 53 insertions(+), 20 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index 0573d0cf38..c26a8599eb 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -184,7 +184,8 @@ object CHBackendSettings extends BackendSettingsApi with 
Logging {
       dataSchema: StructType,
       rootPaths: Seq[String],
       properties: Map[String, String],
-      hadoopConf: Configuration): ValidationResult = {
+      hadoopConf: Configuration,
+      partitionFileFormats: Set[ReadFileFormat]): ValidationResult = {
 
     // Validate if all types are supported.
     def hasComplexType: Boolean = {
@@ -203,20 +204,29 @@ object CHBackendSettings extends BackendSettingsApi with 
Logging {
       }
       !unsupportedDataTypes.isEmpty
     }
-    format match {
-      case ParquetReadFormat => ValidationResult.succeeded
-      case OrcReadFormat => ValidationResult.succeeded
-      case MergeTreeReadFormat => ValidationResult.succeeded
-      case TextReadFormat =>
-        if (!hasComplexType) {
-          ValidationResult.succeeded
-        } else {
-          ValidationResult.failed("Has complex type.")
-        }
-      case JsonReadFormat => ValidationResult.succeeded
-      case KafkaReadFormat => ValidationResult.succeeded
-      case _ => ValidationResult.failed(s"Unsupported file format $format")
-    }
+
+    def checkFormat(format: ReadFileFormat): ValidationResult =
+      format match {
+        case ParquetReadFormat => ValidationResult.succeeded
+        case OrcReadFormat => ValidationResult.succeeded
+        case MergeTreeReadFormat => ValidationResult.succeeded
+        case TextReadFormat =>
+          if (!hasComplexType) {
+            ValidationResult.succeeded
+          } else {
+            ValidationResult.failed("Has complex type.")
+          }
+        case JsonReadFormat => ValidationResult.succeeded
+        case KafkaReadFormat => ValidationResult.succeeded
+        case _ => ValidationResult.failed(s"Unsupported file format $format")
+      }
+
+    val distinctFileFormats = partitionFileFormats + format
+    distinctFileFormats
+      .collectFirst {
+        case format if !checkFormat(format).ok() => checkFormat(format)
+      }
+      .getOrElse(ValidationResult.succeeded)
   }
 
   override def getSubstraitReadFileFormatV1(
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index e5d19de082..743e6bd9e5 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -102,7 +102,8 @@ object VeloxBackendSettings extends BackendSettingsApi {
       dataSchema: StructType,
       rootPaths: Seq[String],
       properties: Map[String, String],
-      hadoopConf: Configuration): ValidationResult = {
+      hadoopConf: Configuration,
+      partitionFileFormats: Set[ReadFileFormat]): ValidationResult = {
 
     def validateScheme(): Option[String] = {
       val filteredRootPaths = distinctRootPaths(rootPaths)
@@ -117,7 +118,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
       }
     }
 
-    def validateFormat(): Option[String] = {
+    def validateFormat(format: ReadFileFormat): Option[String] = {
       def validateTypes(
           validatorFunc: PartialFunction[StructField, String],
           fieldsToValidate: Array[StructField]): Option[String] = {
@@ -183,6 +184,16 @@ object VeloxBackendSettings extends BackendSettingsApi {
       }
     }
 
+    def validateFormats(): Option[String] = {
+      val distinctFileFormats = partitionFileFormats + format
+      distinctFileFormats.iterator
+        .foldLeft(Option.empty[String]) {
+          (acc, format) =>
+            if (acc.isDefined) acc
+            else validateFormat(format)
+        }
+    }
+
     def validateMetadata(): Option[String] = {
       if (format != ParquetReadFormat || rootPaths.isEmpty || 
dataSchema.isEmpty) {
         // Only Parquet is needed for metadata validation so far.
@@ -215,7 +226,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
     }
     val validationChecks = Seq(
       validateScheme(),
-      validateFormat(),
+      validateFormats(),
       validateMetadata(),
       validateDataSchema()
     )
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index 7bb28eca50..4f9354020d 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -43,7 +43,8 @@ trait BackendSettingsApi {
       dataSchema: StructType, // the schema of the table
       rootPaths: Seq[String],
       properties: Map[String, String],
-      hadoopConf: Configuration): ValidationResult =
+      hadoopConf: Configuration,
+      partitionFileFormats: Set[ReadFileFormat]): ValidationResult =
     ValidationResult.succeeded
 
   def getSubstraitReadFileFormatV1(fileFormat: FileFormat): 
LocalFilesNode.ReadFileFormat
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
index 5de0683e96..3b142fecb9 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
@@ -131,7 +131,9 @@ trait BasicScanExecTransformer extends LeafTransformSupport 
with BaseDataSource
         getDataSchema,
         getRootFilePaths,
         getProperties,
-        sparkContext.hadoopConfiguration)
+        sparkContext.hadoopConfiguration,
+        getDistinctPartitionReadFileFormats
+      )
     if (!validationResult.ok()) {
       return validationResult
     }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
index 8ced5aa45c..db42778d29 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
@@ -143,6 +143,9 @@ trait LeafTransformSupport extends TransformSupport with 
LeafExecNode {
 
   /** Returns the partitions generated by this data source scan and tied with 
ReadFileFormat. */
   def getPartitionWithReadFileFormats: Seq[(Partition, ReadFileFormat)]
+
+  /** Returns distinct partition's ReadFileFormats. */
+  def getDistinctPartitionReadFileFormats: Set[ReadFileFormat] = Set.empty
 }
 
 trait UnaryTransformSupport extends TransformSupport with UnaryExecNode {
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
index 8bf3e74ac0..b971eb46bc 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
@@ -76,6 +76,9 @@ case class HiveTableScanExecTransformer(
   override def getPartitionWithReadFileFormats: Seq[(Partition, 
ReadFileFormat)] =
     partitionWithReadFileFormats
 
+  override def getDistinctPartitionReadFileFormats: Set[ReadFileFormat] =
+    distinctReadFileFormats
+
   override def getPartitionSchema: StructType = 
relation.tableMeta.partitionSchema
 
   override def getDataSchema: StructType = relation.tableMeta.dataSchema
@@ -117,6 +120,9 @@ case class HiveTableScanExecTransformer(
 
   @transient private lazy val partitions: Seq[Partition] = 
partitionWithReadFileFormats.unzip._1
 
+  @transient private lazy val distinctReadFileFormats: Set[ReadFileFormat] =
+    partitionWithReadFileFormats.iterator.map(_._2).toSet
+
   @transient override lazy val fileFormat: ReadFileFormat =
     getReadFileFormat(relation.tableMeta.storage)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to