This is an automated email from the ASF dual-hosted git repository. beliefer pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new b2103731bcf [SPARK-45484][SQL][3.5] Deprecated the incorrect parquet compression codec lz4raw b2103731bcf is described below commit b2103731bcfe7e0bee3b1302c773e46f80badcc9 Author: Jiaan Geng <belie...@163.com> AuthorDate: Tue Oct 17 09:50:39 2023 +0800 [SPARK-45484][SQL][3.5] Deprecated the incorrect parquet compression codec lz4raw ### What changes were proposed in this pull request? According to the discussion at https://github.com/apache/spark/pull/43310#issuecomment-1757139681, this PR want deprecates the incorrect parquet compression codec `lz4raw` at Spark 3.5.1 and adds a warning log. The warning log prompts users that `lz4raw` will be removed it at Apache Spark 4.0.0. ### Why are the changes needed? Deprecated the incorrect parquet compression codec `lz4raw`. ### Does this PR introduce _any_ user-facing change? 'Yes'. Users will see the waring log below. `Parquet compression codec 'lz4raw' is deprecated, please use 'lz4_raw'` ### How was this patch tested? Exists test cases and new test cases. ### Was this patch authored or co-authored using generative AI tooling? 'No'. Closes #43330 from beliefer/SPARK-45484_3.5. Authored-by: Jiaan Geng <belie...@163.com> Signed-off-by: Jiaan Geng <belie...@163.com> --- .../org/apache/spark/sql/internal/SQLConf.scala | 14 ++++++++++-- .../datasources/parquet/ParquetOptions.scala | 8 ++++++- .../datasources/FileSourceCodecSuite.scala | 2 +- .../ParquetCompressionCodecPrecedenceSuite.scala | 25 ++++++++++++++++++---- 4 files changed, 41 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 73d3756ef6b..427d0480190 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -995,12 +995,22 @@ object SQLConf { "`parquet.compression` is specified in the table-specific options/properties, the " + "precedence would be `compression`, `parquet.compression`, " + "`spark.sql.parquet.compression.codec`. Acceptable values include: none, uncompressed, " + - "snappy, gzip, lzo, brotli, lz4, lz4raw, zstd.") + "snappy, gzip, lzo, brotli, lz4, lz4raw, lz4_raw, zstd.") .version("1.1.1") .stringConf .transform(_.toLowerCase(Locale.ROOT)) .checkValues( - Set("none", "uncompressed", "snappy", "gzip", "lzo", "brotli", "lz4", "lz4raw", "zstd")) + Set( + "none", + "uncompressed", + "snappy", + "gzip", + "lzo", + "brotli", + "lz4", + "lz4raw", + "lz4_raw", + "zstd")) .createWithDefault("snappy") val PARQUET_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.parquet.filterPushdown") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala index 023d2460959..95869b6fbb9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala @@ -22,6 +22,7 @@ import java.util.Locale import org.apache.parquet.hadoop.ParquetOutputFormat import org.apache.parquet.hadoop.metadata.CompressionCodecName +import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.{DataSourceOptions, FileSourceOptions} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.internal.SQLConf @@ -32,7 +33,7 @@ import org.apache.spark.sql.internal.SQLConf class ParquetOptions( @transient private val parameters: CaseInsensitiveMap[String], @transient private val sqlConf: SQLConf) - extends FileSourceOptions(parameters) { + extends FileSourceOptions(parameters) with Logging { import ParquetOptions._ @@ -59,6 +60,9 @@ class ParquetOptions( throw new IllegalArgumentException(s"Codec [$codecName] " + s"is not available. Available codecs are ${availableCodecs.mkString(", ")}.") } + if (codecName == "lz4raw") { + log.warn("Parquet compression codec 'lz4raw' is deprecated, please use 'lz4_raw'") + } shortParquetCompressionCodecNames(codecName).name() } @@ -96,7 +100,9 @@ object ParquetOptions extends DataSourceOptions { "lzo" -> CompressionCodecName.LZO, "brotli" -> CompressionCodecName.BROTLI, "lz4" -> CompressionCodecName.LZ4, + // Deprecated, to be removed at Spark 4.0.0, please use 'lz4_raw' instead. "lz4raw" -> CompressionCodecName.LZ4_RAW, + "lz4_raw" -> CompressionCodecName.LZ4_RAW, "zstd" -> CompressionCodecName.ZSTD) def getParquetCompressionCodecName(name: String): String = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala index 09a348cd294..9f3d6ff48d4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala @@ -59,7 +59,7 @@ class ParquetCodecSuite extends FileSourceCodecSuite { // Exclude "brotli" because the com.github.rdblue:brotli-codec dependency is not available // on Maven Central. override protected def availableCodecs: Seq[String] = { - Seq("none", "uncompressed", "snappy", "gzip", "zstd", "lz4", "lz4raw") + Seq("none", "uncompressed", "snappy", "gzip", "zstd", "lz4", "lz4raw", "lz4_raw") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala index ac0aad16f1e..27e2816ce9d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala @@ -29,9 +29,23 @@ import org.apache.spark.sql.test.SharedSparkSession class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with SharedSparkSession { test("Test `spark.sql.parquet.compression.codec` config") { - Seq("NONE", "UNCOMPRESSED", "SNAPPY", "GZIP", "LZO", "LZ4", "BROTLI", "ZSTD").foreach { c => + Seq( + "NONE", + "UNCOMPRESSED", + "SNAPPY", + "GZIP", + "LZO", + "LZ4", + "BROTLI", + "ZSTD", + "LZ4RAW", + "LZ4_RAW").foreach { c => withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> c) { - val expected = if (c == "NONE") "UNCOMPRESSED" else c + val expected = c match { + case "NONE" => "UNCOMPRESSED" + case "LZ4RAW" => "LZ4_RAW" + case other => other + } val option = new ParquetOptions(Map.empty[String, String], spark.sessionState.conf) assert(option.compressionCodecClassName == expected) } @@ -97,7 +111,10 @@ class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with SharedSpar createTableWithCompression(tempTableName, isPartitioned, compressionCodec, tmpDir) val partitionPath = if (isPartitioned) "p=2" else "" val path = s"${tmpDir.getPath.stripSuffix("/")}/$tempTableName/$partitionPath" - val realCompressionCodecs = getTableCompressionCodec(path) + val realCompressionCodecs = getTableCompressionCodec(path).map { + case "LZ4_RAW" if compressionCodec == "LZ4RAW" => "LZ4RAW" + case other => other + } assert(realCompressionCodecs.forall(_ == compressionCodec)) } } @@ -105,7 +122,7 @@ class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with SharedSpar test("Create parquet table with compression") { Seq(true, false).foreach { isPartitioned => - val codecs = Seq("UNCOMPRESSED", "SNAPPY", "GZIP", "ZSTD", "LZ4") + val codecs = Seq("UNCOMPRESSED", "SNAPPY", "GZIP", "ZSTD", "LZ4", "LZ4RAW", "LZ4_RAW") codecs.foreach { compressionCodec => checkCompressionCodec(compressionCodec, isPartitioned) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org