This is an automated email from the ASF dual-hosted git repository.
beliefer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new c1e3d7631f [GLUTEN-11203][CORE] Validate input format for Hive table's
partition (#11205)
c1e3d7631f is described below
commit c1e3d7631f708fa6ec19d83bd008c60c0ecd0349
Author: Jiaan Geng <[email protected]>
AuthorDate: Thu Dec 4 10:31:54 2025 +0800
[GLUTEN-11203][CORE] Validate input format for Hive table's partition
(#11205)
---
.../gluten/backendsapi/clickhouse/CHBackend.scala | 40 ++++++++++++++--------
.../gluten/backendsapi/velox/VeloxBackend.scala | 17 +++++++--
.../gluten/backendsapi/BackendSettingsApi.scala | 3 +-
.../execution/BasicScanExecTransformer.scala | 4 ++-
.../gluten/execution/WholeStageTransformer.scala | 3 ++
.../sql/hive/HiveTableScanExecTransformer.scala | 6 ++++
6 files changed, 53 insertions(+), 20 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index 0573d0cf38..c26a8599eb 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -184,7 +184,8 @@ object CHBackendSettings extends BackendSettingsApi with
Logging {
dataSchema: StructType,
rootPaths: Seq[String],
properties: Map[String, String],
- hadoopConf: Configuration): ValidationResult = {
+ hadoopConf: Configuration,
+ partitionFileFormats: Set[ReadFileFormat]): ValidationResult = {
// Validate if all types are supported.
def hasComplexType: Boolean = {
@@ -203,20 +204,29 @@ object CHBackendSettings extends BackendSettingsApi with
Logging {
}
!unsupportedDataTypes.isEmpty
}
- format match {
- case ParquetReadFormat => ValidationResult.succeeded
- case OrcReadFormat => ValidationResult.succeeded
- case MergeTreeReadFormat => ValidationResult.succeeded
- case TextReadFormat =>
- if (!hasComplexType) {
- ValidationResult.succeeded
- } else {
- ValidationResult.failed("Has complex type.")
- }
- case JsonReadFormat => ValidationResult.succeeded
- case KafkaReadFormat => ValidationResult.succeeded
- case _ => ValidationResult.failed(s"Unsupported file format $format")
- }
+
+ def checkFormat(format: ReadFileFormat): ValidationResult =
+ format match {
+ case ParquetReadFormat => ValidationResult.succeeded
+ case OrcReadFormat => ValidationResult.succeeded
+ case MergeTreeReadFormat => ValidationResult.succeeded
+ case TextReadFormat =>
+ if (!hasComplexType) {
+ ValidationResult.succeeded
+ } else {
+ ValidationResult.failed("Has complex type.")
+ }
+ case JsonReadFormat => ValidationResult.succeeded
+ case KafkaReadFormat => ValidationResult.succeeded
+ case _ => ValidationResult.failed(s"Unsupported file format $format")
+ }
+
+ val distinctFileFormats = partitionFileFormats + format
+ distinctFileFormats
+ .collectFirst {
+ case format if !checkFormat(format).ok() => checkFormat(format)
+ }
+ .getOrElse(ValidationResult.succeeded)
}
override def getSubstraitReadFileFormatV1(
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index e5d19de082..743e6bd9e5 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -102,7 +102,8 @@ object VeloxBackendSettings extends BackendSettingsApi {
dataSchema: StructType,
rootPaths: Seq[String],
properties: Map[String, String],
- hadoopConf: Configuration): ValidationResult = {
+ hadoopConf: Configuration,
+ partitionFileFormats: Set[ReadFileFormat]): ValidationResult = {
def validateScheme(): Option[String] = {
val filteredRootPaths = distinctRootPaths(rootPaths)
@@ -117,7 +118,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
}
}
- def validateFormat(): Option[String] = {
+ def validateFormat(format: ReadFileFormat): Option[String] = {
def validateTypes(
validatorFunc: PartialFunction[StructField, String],
fieldsToValidate: Array[StructField]): Option[String] = {
@@ -183,6 +184,16 @@ object VeloxBackendSettings extends BackendSettingsApi {
}
}
+ def validateFormats(): Option[String] = {
+ val distinctFileFormats = partitionFileFormats + format
+ distinctFileFormats.iterator
+ .foldLeft(Option.empty[String]) {
+ (acc, format) =>
+ if (acc.isDefined) acc
+ else validateFormat(format)
+ }
+ }
+
def validateMetadata(): Option[String] = {
if (format != ParquetReadFormat || rootPaths.isEmpty ||
dataSchema.isEmpty) {
// Only Parquet is needed for metadata validation so far.
@@ -215,7 +226,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
}
val validationChecks = Seq(
validateScheme(),
- validateFormat(),
+ validateFormats(),
validateMetadata(),
validateDataSchema()
)
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index 7bb28eca50..4f9354020d 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -43,7 +43,8 @@ trait BackendSettingsApi {
dataSchema: StructType, // the schema of the table
rootPaths: Seq[String],
properties: Map[String, String],
- hadoopConf: Configuration): ValidationResult =
+ hadoopConf: Configuration,
+ partitionFileFormats: Set[ReadFileFormat]): ValidationResult =
ValidationResult.succeeded
def getSubstraitReadFileFormatV1(fileFormat: FileFormat):
LocalFilesNode.ReadFileFormat
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
index 5de0683e96..3b142fecb9 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
@@ -131,7 +131,9 @@ trait BasicScanExecTransformer extends LeafTransformSupport
with BaseDataSource
getDataSchema,
getRootFilePaths,
getProperties,
- sparkContext.hadoopConfiguration)
+ sparkContext.hadoopConfiguration,
+ getDistinctPartitionReadFileFormats
+ )
if (!validationResult.ok()) {
return validationResult
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
index 8ced5aa45c..db42778d29 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
@@ -143,6 +143,9 @@ trait LeafTransformSupport extends TransformSupport with
LeafExecNode {
/** Returns the partitions generated by this data source scan and tied with
ReadFileFormat. */
def getPartitionWithReadFileFormats: Seq[(Partition, ReadFileFormat)]
+
+ /** Returns distinct partition's ReadFileFormats. */
+ def getDistinctPartitionReadFileFormats: Set[ReadFileFormat] = Set.empty
}
trait UnaryTransformSupport extends TransformSupport with UnaryExecNode {
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
index 8bf3e74ac0..b971eb46bc 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
@@ -76,6 +76,9 @@ case class HiveTableScanExecTransformer(
override def getPartitionWithReadFileFormats: Seq[(Partition,
ReadFileFormat)] =
partitionWithReadFileFormats
+ override def getDistinctPartitionReadFileFormats: Set[ReadFileFormat] =
+ distinctReadFileFormats
+
override def getPartitionSchema: StructType =
relation.tableMeta.partitionSchema
override def getDataSchema: StructType = relation.tableMeta.dataSchema
@@ -117,6 +120,9 @@ case class HiveTableScanExecTransformer(
@transient private lazy val partitions: Seq[Partition] =
partitionWithReadFileFormats.unzip._1
+ @transient private lazy val distinctReadFileFormats: Set[ReadFileFormat] =
+ partitionWithReadFileFormats.iterator.map(_._2).toSet
+
@transient override lazy val fileFormat: ReadFileFormat =
getReadFileFormat(relation.tableMeta.storage)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]