This is an automated email from the ASF dual-hosted git repository.

beliefer pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new b2103731bcf [SPARK-45484][SQL][3.5] Deprecated the incorrect parquet 
compression codec lz4raw
b2103731bcf is described below

commit b2103731bcfe7e0bee3b1302c773e46f80badcc9
Author: Jiaan Geng <belie...@163.com>
AuthorDate: Tue Oct 17 09:50:39 2023 +0800

    [SPARK-45484][SQL][3.5] Deprecated the incorrect parquet compression codec 
lz4raw
    
    ### What changes were proposed in this pull request?
    According to the discussion at 
https://github.com/apache/spark/pull/43310#issuecomment-1757139681, this PR 
want deprecates the incorrect parquet compression codec `lz4raw` at Spark 3.5.1 
and adds a warning log.
    
    The warning log prompts users that `lz4raw` will be removed it at Apache 
Spark 4.0.0.
    
    ### Why are the changes needed?
    Deprecated the incorrect parquet compression codec `lz4raw`.
    
    ### Does this PR introduce _any_ user-facing change?
    'Yes'.
    Users will see the waring log below.
    `Parquet compression codec 'lz4raw' is deprecated, please use 'lz4_raw'`
    
    ### How was this patch tested?
    Exists test cases and new test cases.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    'No'.
    
    Closes #43330 from beliefer/SPARK-45484_3.5.
    
    Authored-by: Jiaan Geng <belie...@163.com>
    Signed-off-by: Jiaan Geng <belie...@163.com>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    | 14 ++++++++++--
 .../datasources/parquet/ParquetOptions.scala       |  8 ++++++-
 .../datasources/FileSourceCodecSuite.scala         |  2 +-
 .../ParquetCompressionCodecPrecedenceSuite.scala   | 25 ++++++++++++++++++----
 4 files changed, 41 insertions(+), 8 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 73d3756ef6b..427d0480190 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -995,12 +995,22 @@ object SQLConf {
       "`parquet.compression` is specified in the table-specific 
options/properties, the " +
       "precedence would be `compression`, `parquet.compression`, " +
       "`spark.sql.parquet.compression.codec`. Acceptable values include: none, 
uncompressed, " +
-      "snappy, gzip, lzo, brotli, lz4, lz4raw, zstd.")
+      "snappy, gzip, lzo, brotli, lz4, lz4raw, lz4_raw, zstd.")
     .version("1.1.1")
     .stringConf
     .transform(_.toLowerCase(Locale.ROOT))
     .checkValues(
-      Set("none", "uncompressed", "snappy", "gzip", "lzo", "brotli", "lz4", 
"lz4raw", "zstd"))
+      Set(
+        "none",
+        "uncompressed",
+        "snappy",
+        "gzip",
+        "lzo",
+        "brotli",
+        "lz4",
+        "lz4raw",
+        "lz4_raw",
+        "zstd"))
     .createWithDefault("snappy")
 
   val PARQUET_FILTER_PUSHDOWN_ENABLED = 
buildConf("spark.sql.parquet.filterPushdown")
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
index 023d2460959..95869b6fbb9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
@@ -22,6 +22,7 @@ import java.util.Locale
 import org.apache.parquet.hadoop.ParquetOutputFormat
 import org.apache.parquet.hadoop.metadata.CompressionCodecName
 
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.{DataSourceOptions, FileSourceOptions}
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.internal.SQLConf
@@ -32,7 +33,7 @@ import org.apache.spark.sql.internal.SQLConf
 class ParquetOptions(
     @transient private val parameters: CaseInsensitiveMap[String],
     @transient private val sqlConf: SQLConf)
-  extends FileSourceOptions(parameters) {
+  extends FileSourceOptions(parameters) with Logging {
 
   import ParquetOptions._
 
@@ -59,6 +60,9 @@ class ParquetOptions(
       throw new IllegalArgumentException(s"Codec [$codecName] " +
         s"is not available. Available codecs are ${availableCodecs.mkString(", 
")}.")
     }
+    if (codecName == "lz4raw") {
+      log.warn("Parquet compression codec 'lz4raw' is deprecated, please use 
'lz4_raw'")
+    }
     shortParquetCompressionCodecNames(codecName).name()
   }
 
@@ -96,7 +100,9 @@ object ParquetOptions extends DataSourceOptions {
     "lzo" -> CompressionCodecName.LZO,
     "brotli" -> CompressionCodecName.BROTLI,
     "lz4" -> CompressionCodecName.LZ4,
+    // Deprecated, to be removed at Spark 4.0.0, please use 'lz4_raw' instead.
     "lz4raw" -> CompressionCodecName.LZ4_RAW,
+    "lz4_raw" -> CompressionCodecName.LZ4_RAW,
     "zstd" -> CompressionCodecName.ZSTD)
 
   def getParquetCompressionCodecName(name: String): String = {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala
index 09a348cd294..9f3d6ff48d4 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala
@@ -59,7 +59,7 @@ class ParquetCodecSuite extends FileSourceCodecSuite {
   // Exclude "brotli" because the com.github.rdblue:brotli-codec dependency is 
not available
   // on Maven Central.
   override protected def availableCodecs: Seq[String] = {
-    Seq("none", "uncompressed", "snappy", "gzip", "zstd", "lz4", "lz4raw")
+    Seq("none", "uncompressed", "snappy", "gzip", "zstd", "lz4", "lz4raw", 
"lz4_raw")
   }
 }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala
index ac0aad16f1e..27e2816ce9d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala
@@ -29,9 +29,23 @@ import org.apache.spark.sql.test.SharedSparkSession
 
 class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with 
SharedSparkSession {
   test("Test `spark.sql.parquet.compression.codec` config") {
-    Seq("NONE", "UNCOMPRESSED", "SNAPPY", "GZIP", "LZO", "LZ4", "BROTLI", 
"ZSTD").foreach { c =>
+    Seq(
+      "NONE",
+      "UNCOMPRESSED",
+      "SNAPPY",
+      "GZIP",
+      "LZO",
+      "LZ4",
+      "BROTLI",
+      "ZSTD",
+      "LZ4RAW",
+      "LZ4_RAW").foreach { c =>
       withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> c) {
-        val expected = if (c == "NONE") "UNCOMPRESSED" else c
+        val expected = c match {
+          case "NONE" => "UNCOMPRESSED"
+          case "LZ4RAW" => "LZ4_RAW"
+          case other => other
+        }
         val option = new ParquetOptions(Map.empty[String, String], 
spark.sessionState.conf)
         assert(option.compressionCodecClassName == expected)
       }
@@ -97,7 +111,10 @@ class ParquetCompressionCodecPrecedenceSuite extends 
ParquetTest with SharedSpar
         createTableWithCompression(tempTableName, isPartitioned, 
compressionCodec, tmpDir)
         val partitionPath = if (isPartitioned) "p=2" else ""
         val path = 
s"${tmpDir.getPath.stripSuffix("/")}/$tempTableName/$partitionPath"
-        val realCompressionCodecs = getTableCompressionCodec(path)
+        val realCompressionCodecs = getTableCompressionCodec(path).map {
+          case "LZ4_RAW" if compressionCodec == "LZ4RAW" => "LZ4RAW"
+          case other => other
+        }
         assert(realCompressionCodecs.forall(_ == compressionCodec))
       }
     }
@@ -105,7 +122,7 @@ class ParquetCompressionCodecPrecedenceSuite extends 
ParquetTest with SharedSpar
 
   test("Create parquet table with compression") {
     Seq(true, false).foreach { isPartitioned =>
-      val codecs = Seq("UNCOMPRESSED", "SNAPPY", "GZIP", "ZSTD", "LZ4")
+      val codecs = Seq("UNCOMPRESSED", "SNAPPY", "GZIP", "ZSTD", "LZ4", 
"LZ4RAW", "LZ4_RAW")
       codecs.foreach { compressionCodec =>
         checkCompressionCodec(compressionCodec, isPartitioned)
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to