This is an automated email from the ASF dual-hosted git repository. ulyssesyou pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push: new 445616109 [GLUTEN-4964][CORE]Fallback complex data type in parquet write for Spark32 & Spark33 (#5107) 445616109 is described below commit 44561610991f5b4c258a4c65e97e01cb13c2aa14 Author: JiaKe <ke.a....@intel.com> AuthorDate: Thu Mar 28 08:54:47 2024 +0800 [GLUTEN-4964][CORE]Fallback complex data type in parquet write for Spark32 & Spark33 (#5107) --- .../io/glutenproject/backendsapi/velox/VeloxBackend.scala | 11 +++++++++++ .../spark/sql/execution/VeloxParquetWriteSuite.scala | 14 ++++++++++++++ .../io/glutenproject/backendsapi/BackendSettingsApi.scala | 1 + .../execution/datasources/GlutenWriterColumnarRules.scala | 3 ++- 4 files changed, 28 insertions(+), 1 deletion(-) diff --git a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala index 3293abe3e..9d252149d 100644 --- a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala @@ -255,6 +255,17 @@ object BackendSettings extends BackendSettingsApi { } } + override def supportNativeWrite(fields: Array[StructField]): Boolean = { + fields.map { + field => + field.dataType match { + case _: TimestampType | _: StructType | _: ArrayType | _: MapType => return false + case _ => + } + } + true + } + override def supportNativeMetadataColumns(): Boolean = true override def supportExpandExec(): Boolean = true diff --git a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala index dc30f0559..6f938b7b9 100644 --- a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala +++ b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala @@ -37,6 +37,20 @@ class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite { super.sparkConf.set("spark.gluten.sql.native.writer.enabled", "true") } + test("test Array(Struct) fallback") { + withTempPath { + f => + val path = f.getCanonicalPath + val testAppender = new LogAppender("native write tracker") + withLogAppender(testAppender) { + spark.sql("select array(struct(1), null) as var1").write.mode("overwrite").save(path) + } + assert( + testAppender.loggingEvents.exists( + _.getMessage.toString.contains("Use Gluten parquet write for hive")) == false) + } + } + test("test write parquet with compression codec") { // compression codec details see `VeloxParquetDatasource.cc` Seq("snappy", "gzip", "zstd", "lz4", "none", "uncompressed") diff --git a/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala b/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala index 25d71f0fc..950eed2eb 100644 --- a/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala +++ b/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala @@ -41,6 +41,7 @@ trait BackendSettingsApi { fields: Array[StructField], bucketSpec: Option[BucketSpec], options: Map[String, String]): ValidationResult = ValidationResult.ok + def supportNativeWrite(fields: Array[StructField]): Boolean = true def supportNativeMetadataColumns(): Boolean = false def supportExpandExec(): Boolean = false def supportSortExec(): Boolean = false diff --git a/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala b/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala index d2c010cc4..80ef67ad6 100644 --- a/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala +++ b/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala @@ -156,7 +156,8 @@ object GlutenWriterColumnarRules { if write.getClass.getName == NOOP_WRITE && BackendsApiManager.getSettings.enableNativeWriteFiles() => injectFakeRowAdaptor(rc, rc.child) - case rc @ DataWritingCommandExec(cmd, child) => + case rc @ DataWritingCommandExec(cmd, child) + if BackendsApiManager.getSettings.supportNativeWrite(child.output.toStructType.fields) => val format = getNativeFormat(cmd) session.sparkContext.setLocalProperty( "staticPartitionWriteOnly", --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@gluten.apache.org For additional commands, e-mail: commits-h...@gluten.apache.org