This is an automated email from the ASF dual-hosted git repository.
FelixYBW pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 3bf094200b [GLUTEN-12071][VL] Respect HadoopConf write options in
Velox native Parquet writer (#12072)
3bf094200b is described below
commit 3bf094200be98eaaedd6efef4f0af567dedbc6f3
Author: Wechar Yu <[email protected]>
AuthorDate: Fri May 15 13:22:03 2026 +0800
[GLUTEN-12071][VL] Respect HadoopConf write options in Velox native Parquet
writer (#12072)
updates Velox native Parquet write parameter generation to build write
options
---
.../backendsapi/velox/VeloxTransformerApi.scala | 13 +++++-
.../sql/execution/VeloxParquetWriteSuite.scala | 54 ++++++++++++++++++++++
.../datasources/parquet/ParquetFileFormat.scala | 13 ++++--
3 files changed, 75 insertions(+), 5 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala
index 3a1d53154f..47ad778dc8 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala
@@ -32,6 +32,7 @@ import org.apache.spark.Partition
import org.apache.spark.internal.Logging
import org.apache.spark.rpc.GlutenDriverEndpoint
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation,
PartitionDirectory}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.hive.execution.HiveFileFormat
@@ -44,6 +45,8 @@ import org.apache.commons.lang3.math.NumberUtils
import java.util.{Map => JMap}
+import scala.collection.JavaConverters._
+
class VeloxTransformerApi extends TransformerApi with Logging {
def genPartitionSeq(
@@ -131,11 +134,17 @@ class VeloxTransformerApi extends TransformerApi with
Logging {
// Only Parquet is supported. It's safe to set a fixed "parquet" here
// because others already fell back by WriteFilesExecTransformer's
validation.
val shortName = "parquet"
+ val writeOptions = Option(write.session).map {
+ session =>
+ val hadoopConf =
session.sessionState.newHadoopConfWithOptions(write.options)
+ CaseInsensitiveMap(hadoopConf.iterator().asScala.map(
+ entry => entry.getKey -> entry.getValue).toMap)
+ }.getOrElse(write.caseInsensitiveOptions)
val nativeConf =
GlutenFormatFactory(shortName)
.nativeConf(
- write.caseInsensitiveOptions,
-
WriteFilesExecTransformer.getCompressionCodec(write.caseInsensitiveOptions))
+ writeOptions,
+ WriteFilesExecTransformer.getCompressionCodec(writeOptions))
packPBMessage(
ConfigMap
.newBuilder()
diff --git
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
index a4577c8a5b..9de24103b0 100644
---
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
@@ -27,6 +27,10 @@ import org.apache.hadoop.fs.Path
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.util.HadoopInputFile
+import java.io.File
+
+import scala.collection.JavaConverters._
+
class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite with
WriteUtils {
override protected val resourcePath: String = "/tpch-data-parquet"
@@ -267,3 +271,53 @@ class VeloxParquetWriteSuite extends
VeloxWholeStageTransformerSuite with WriteU
}
}
}
+
+class VeloxParquetWriteHadoopConfSuite extends VeloxWholeStageTransformerSuite
with WriteUtils {
+
+ override protected val resourcePath: String = ""
+ override protected val fileFormat: String = "parquet"
+
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf
+ .set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "true")
+ .set(s"spark.hadoop.parquet.enable.dictionary", "false")
+ }
+
+ private val dictionaryEncodingNames = Set("PLAIN_DICTIONARY",
"RLE_DICTIONARY")
+
+ private def parquetColumnEncodings(dir: File): Seq[Set[String]] = {
+ val parquetFiles = dir.list((_, name) => name.contains("parquet"))
+ assert(parquetFiles.nonEmpty)
+ parquetFiles.flatMap {
+ file =>
+ val path = new Path(dir.getAbsolutePath, file)
+ val in = HadoopInputFile.fromPath(path,
spark.sessionState.newHadoopConf())
+ Utils.tryWithResource(ParquetFileReader.open(in)) {
+ reader =>
+ reader.getFooter.getBlocks.asScala.flatMap {
+ block =>
block.getColumns.asScala.map(_.getEncodings.asScala.map(_.name()).toSet)
+ }
+ }
+ }.toSeq
+ }
+
+ test("native writer should respect parquet dictionary config from
spark.hadoop config") {
+ spark
+ .range(0, 20000, 1, 1)
+ .selectExpr("concat('gluten-parquet-dictionary-', CAST(id % 10 AS
STRING)) AS payload")
+ .createOrReplaceTempView("parquet_dictionary_source")
+
+ withTempPath {
+ hadoopConfDir =>
+ checkNativeWrite(
+ s"""
+ |INSERT OVERWRITE DIRECTORY USING PARQUET
+ |OPTIONS ('path' '${hadoopConfDir.getCanonicalPath}')
+ |SELECT * FROM parquet_dictionary_source
+ |""".stripMargin)
+ val columnEncodings = parquetColumnEncodings(hadoopConfDir)
+ assert(columnEncodings.nonEmpty)
+
assert(!columnEncodings.exists(_.exists(dictionaryEncodingNames.contains)))
+ }
+ }
+}
diff --git
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 61864d01d9..a534f88081 100644
---
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.WholeStageCodegenExec
import org.apache.spark.sql.execution.datasources._
@@ -77,11 +77,18 @@ class ParquetFileFormat extends FileFormat with
DataSourceRegister with Logging
if (sparkSession.sparkContext.getLocalProperty("isNativeApplicable") ==
"true") {
// Pass compression to job conf so that the file extension can be aware
of it.
val conf = ContextUtil.getConfiguration(job)
- val parquetOptions = new ParquetOptions(options,
sparkSession.sessionState.conf)
+ val writeOptions = CaseInsensitiveMap(
+ sparkSession.sessionState
+ .newHadoopConfWithOptions(options)
+ .iterator()
+ .asScala
+ .map(entry => entry.getKey -> entry.getValue)
+ .toMap)
+ val parquetOptions = new ParquetOptions(writeOptions,
sparkSession.sessionState.conf)
conf.set(ParquetOutputFormat.COMPRESSION,
parquetOptions.compressionCodecClassName)
val nativeConf =
GlutenFormatFactory(shortName())
- .nativeConf(options, parquetOptions.compressionCodecClassName)
+ .nativeConf(writeOptions, parquetOptions.compressionCodecClassName)
new OutputWriterFactory {
override def getFileExtension(context: TaskAttemptContext): String = {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]