This is an automated email from the ASF dual-hosted git repository.

beliefer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new fc867266f08 [SPARK-45758][SQL] Introduce a mapper for hadoop 
compression codecs
fc867266f08 is described below

commit fc867266f0898866ab5ff7ed82b0c7c5fbaccefc
Author: Jiaan Geng <belie...@163.com>
AuthorDate: Mon Nov 6 18:01:11 2023 +0800

    [SPARK-45758][SQL] Introduce a mapper for hadoop compression codecs
    
    ### What changes were proposed in this pull request?
    Currently, Spark supported partial Hadoop compression codecs, but the 
Hadoop supported compression codecs and spark supported are not completely 
one-on-one due to Spark introduce two fake compression codecs none and 
uncompress.
    There are a lot of magic strings copy from Hadoop compression codecs. This 
issue lead to developers need to manually maintain its consistency. It is easy 
to make mistakes and reduce development efficiency.
    
    ### Why are the changes needed?
    Let developers easy to use Hadoop compression codecs.
    
    ### Does this PR introduce _any_ user-facing change?
    'No'.
    Introduce a new class.
    
    ### How was this patch tested?
    Exists test cases.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    'No'.
    
    Closes #43620 from beliefer/SPARK-45758.
    
    Authored-by: Jiaan Geng <belie...@163.com>
    Signed-off-by: Jiaan Geng <belie...@163.com>
---
 .../sql/catalyst/util/HadoopCompressionCodec.java  | 63 ++++++++++++++++++++++
 .../sql/catalyst/util/CompressionCodecs.scala      | 12 ++---
 .../org/apache/spark/sql/DataFrameSuite.scala      |  4 +-
 .../benchmark/DataSourceReadBenchmark.scala        |  8 ++-
 .../sql/execution/datasources/csv/CSVSuite.scala   |  4 +-
 .../sql/execution/datasources/json/JsonSuite.scala |  4 +-
 .../sql/execution/datasources/text/TextSuite.scala | 10 ++--
 .../datasources/text/WholeTextFileSuite.scala      |  3 +-
 8 files changed, 87 insertions(+), 21 deletions(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/HadoopCompressionCodec.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/HadoopCompressionCodec.java
new file mode 100644
index 00000000000..ee4cb4da322
--- /dev/null
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/HadoopCompressionCodec.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DeflateCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.compress.Lz4Codec;
+import org.apache.hadoop.io.compress.SnappyCodec;
+
+/**
+ * A mapper class from Spark supported hadoop compression codecs to hadoop 
compression codecs.
+ */
+public enum HadoopCompressionCodec {
+  NONE(null),
+  UNCOMPRESSED(null),
+  BZIP2(new BZip2Codec()),
+  DEFLATE(new DeflateCodec()),
+  GZIP(new GzipCodec()),
+  LZ4(new Lz4Codec()),
+  SNAPPY(new SnappyCodec());
+
+  // TODO supports ZStandardCodec
+
+  private final CompressionCodec compressionCodec;
+
+  HadoopCompressionCodec(CompressionCodec compressionCodec) {
+    this.compressionCodec = compressionCodec;
+  }
+
+  public CompressionCodec getCompressionCodec() {
+    return this.compressionCodec;
+  }
+
+  private static final Map<String, String> codecNameMap =
+    Arrays.stream(HadoopCompressionCodec.values()).collect(
+      Collectors.toMap(Enum::name, codec -> 
codec.name().toLowerCase(Locale.ROOT)));
+
+  public String lowerCaseName() {
+    return codecNameMap.get(this.name());
+  }
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CompressionCodecs.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CompressionCodecs.scala
index 1377a03d93b..a1d6446cc10 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CompressionCodecs.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CompressionCodecs.scala
@@ -21,19 +21,13 @@ import java.util.Locale
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.io.SequenceFile.CompressionType
-import org.apache.hadoop.io.compress._
 
 import org.apache.spark.util.Utils
 
 object CompressionCodecs {
-  private val shortCompressionCodecNames = Map(
-    "none" -> null,
-    "uncompressed" -> null,
-    "bzip2" -> classOf[BZip2Codec].getName,
-    "deflate" -> classOf[DeflateCodec].getName,
-    "gzip" -> classOf[GzipCodec].getName,
-    "lz4" -> classOf[Lz4Codec].getName,
-    "snappy" -> classOf[SnappyCodec].getName)
+  private val shortCompressionCodecNames = HadoopCompressionCodec.values().map 
{ codec =>
+    codec.lowerCaseName() -> 
Option(codec.getCompressionCodec).map(_.getClass.getName).orNull
+  }.toMap
 
   /**
    * Return the full version of the given codec class.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index b0a0b189cb7..d3271283baa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -40,6 +40,7 @@ import 
org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LeafNode, 
LocalRelation, LogicalPlan, OneRowRelation, Statistics}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.catalyst.util.HadoopCompressionCodec.GZIP
 import org.apache.spark.sql.connector.FakeV2Provider
 import org.apache.spark.sql.execution.{FilterExec, LogicalRDD, QueryExecution, 
SortExec, WholeStageCodegenExec}
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
@@ -2766,7 +2767,8 @@ class DataFrameSuite extends QueryTest
         // The data set has 2 partitions, so Spark will write at least 2 json 
files.
         // Use a non-splittable compression (gzip), to make sure the json scan 
RDD has at least 2
         // partitions.
-        .write.partitionBy("p").option("compression", 
"gzip").json(path.getCanonicalPath)
+        .write.partitionBy("p")
+        .option("compression", 
GZIP.lowerCaseName()).json(path.getCanonicalPath)
 
       val numJobs = new AtomicLong(0)
       sparkContext.addSparkListener(new SparkListener {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
index 74043bac49a..ea90cd9cd09 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
@@ -28,6 +28,7 @@ import org.apache.spark.{SparkConf, TestUtils}
 import org.apache.spark.benchmark.Benchmark
 import org.apache.spark.sql.{DataFrame, DataFrameWriter, Row, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.HadoopCompressionCodec.GZIP
 import org.apache.spark.sql.execution.datasources.orc.OrcCompressionCodec
 import 
org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec, 
VectorizedParquetRecordReader}
 import org.apache.spark.sql.internal.SQLConf
@@ -91,12 +92,15 @@ object DataSourceReadBenchmark extends SqlBasedBenchmark {
   }
 
   private def saveAsCsvTable(df: DataFrameWriter[Row], dir: String): Unit = {
-    df.mode("overwrite").option("compression", "gzip").option("header", 
true).csv(dir)
+    df.mode("overwrite")
+      .option("compression", GZIP.lowerCaseName())
+      .option("header", true)
+      .csv(dir)
     spark.read.option("header", 
true).csv(dir).createOrReplaceTempView("csvTable")
   }
 
   private def saveAsJsonTable(df: DataFrameWriter[Row], dir: String): Unit = {
-    df.mode("overwrite").option("compression", "gzip").json(dir)
+    df.mode("overwrite").option("compression", GZIP.lowerCaseName()).json(dir)
     spark.read.json(dir).createOrReplaceTempView("jsonTable")
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index a84aea27868..a2ce9b5db2a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -39,7 +39,7 @@ import org.apache.logging.log4j.Level
 import org.apache.spark.{SparkConf, SparkException, 
SparkFileNotFoundException, SparkRuntimeException, SparkUpgradeException, 
TestUtils}
 import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Encoders, 
QueryTest, Row}
 import org.apache.spark.sql.catalyst.csv.CSVOptions
-import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils}
+import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils, 
HadoopCompressionCodec}
 import org.apache.spark.sql.execution.datasources.CommonFileDataSourceSuite
 import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
 import org.apache.spark.sql.test.SharedSparkSession
@@ -874,7 +874,7 @@ abstract class CSVSuite
       cars.coalesce(1).write
         .format("csv")
         .option("header", "true")
-        .option("compression", "none")
+        .option("compression", HadoopCompressionCodec.NONE.lowerCaseName())
         .options(extraOptions)
         .save(csvDir)
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 2f8b0a323dc..d906ae80a80 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -34,7 +34,7 @@ import org.apache.spark.{SparkConf, SparkException, 
SparkFileNotFoundException,
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{functions => F, _}
 import org.apache.spark.sql.catalyst.json._
-import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils}
+import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils, 
HadoopCompressionCodec}
 import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLType
 import org.apache.spark.sql.execution.ExternalRDD
 import org.apache.spark.sql.execution.datasources.{CommonFileDataSourceSuite, 
DataSource, InMemoryFileIndex, NoopCache}
@@ -1689,7 +1689,7 @@ abstract class JsonSuite
       val jsonDir = new File(dir, "json").getCanonicalPath
       jsonDF.coalesce(1).write
         .format("json")
-        .option("compression", "none")
+        .option("compression", HadoopCompressionCodec.NONE.lowerCaseName())
         .options(extraOptions)
         .save(jsonDir)
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
index ff6b9aadf7c..6e3210f8c17 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
@@ -26,6 +26,7 @@ import org.apache.hadoop.io.compress.GzipCodec
 
 import org.apache.spark.{SparkConf, TestUtils}
 import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row, 
SaveMode}
+import org.apache.spark.sql.catalyst.util.HadoopCompressionCodec.{BZIP2, 
DEFLATE, GZIP, NONE}
 import org.apache.spark.sql.execution.datasources.CommonFileDataSourceSuite
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
@@ -92,7 +93,8 @@ abstract class TextSuite extends QueryTest with 
SharedSparkSession with CommonFi
 
   test("SPARK-13503 Support to specify the option for compression codec for 
TEXT") {
     val testDf = spark.read.text(testFile)
-    val extensionNameMap = Map("bzip2" -> ".bz2", "deflate" -> ".deflate", 
"gzip" -> ".gz")
+    val extensionNameMap = Seq(BZIP2, DEFLATE, GZIP)
+      .map(codec => codec.lowerCaseName() -> 
codec.getCompressionCodec.getDefaultExtension)
     extensionNameMap.foreach {
       case (codecName, extension) =>
         val tempDir = Utils.createTempDir()
@@ -122,7 +124,7 @@ abstract class TextSuite extends QueryTest with 
SharedSparkSession with CommonFi
     withTempDir { dir =>
       val testDf = spark.read.text(testFile)
       val tempDirPath = dir.getAbsolutePath
-      testDf.write.option("compression", "none")
+      testDf.write.option("compression", NONE.lowerCaseName())
         .options(extraOptions).mode(SaveMode.Overwrite).text(tempDirPath)
       val compressedFiles = new File(tempDirPath).listFiles()
       assert(compressedFiles.exists(!_.getName.endsWith(".txt.gz")))
@@ -141,7 +143,7 @@ abstract class TextSuite extends QueryTest with 
SharedSparkSession with CommonFi
     withTempDir { dir =>
       val testDf = spark.read.text(testFile)
       val tempDirPath = dir.getAbsolutePath
-      testDf.write.option("CoMpReSsIoN", "none")
+      testDf.write.option("CoMpReSsIoN", NONE.lowerCaseName())
         .options(extraOptions).mode(SaveMode.Overwrite).text(tempDirPath)
       val compressedFiles = new File(tempDirPath).listFiles()
       assert(compressedFiles.exists(!_.getName.endsWith(".txt.gz")))
@@ -166,7 +168,7 @@ abstract class TextSuite extends QueryTest with 
SharedSparkSession with CommonFi
     withTempDir { dir =>
       val path = dir.getCanonicalPath
       val df1 = spark.range(0, 1000).selectExpr("CAST(id AS STRING) AS s")
-      df1.write.option("compression", "gzip").mode("overwrite").text(path)
+      df1.write.option("compression", 
GZIP.lowerCaseName()).mode("overwrite").text(path)
 
       val expected = df1.collect()
       Seq(10, 100, 1000).foreach { bytes =>
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/WholeTextFileSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/WholeTextFileSuite.scala
index f4812844cba..57e08c55874 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/WholeTextFileSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/WholeTextFileSuite.scala
@@ -21,6 +21,7 @@ import java.io.File
 
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.catalyst.util.HadoopCompressionCodec.GZIP
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types.{StringType, StructType}
@@ -90,7 +91,7 @@ abstract class WholeTextFileSuite extends QueryTest with 
SharedSparkSession {
     withTempDir { dir =>
       val path = dir.getCanonicalPath
       val df1 = spark.range(0, 1000).selectExpr("CAST(id AS STRING) AS 
s").repartition(1)
-      df1.write.option("compression", "gzip").mode("overwrite").text(path)
+      df1.write.option("compression", 
GZIP.lowerCaseName()).mode("overwrite").text(path)
       // On reading through wholetext mode, one file will be read as a single 
row, i.e. not
       // delimited by "next line" character.
       val expected = Row(df1.collect().map(_.getString(0)).mkString("", "\n", 
"\n"))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to