This is an automated email from the ASF dual-hosted git repository. beliefer pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new fc867266f08 [SPARK-45758][SQL] Introduce a mapper for hadoop compression codecs fc867266f08 is described below commit fc867266f0898866ab5ff7ed82b0c7c5fbaccefc Author: Jiaan Geng <belie...@163.com> AuthorDate: Mon Nov 6 18:01:11 2023 +0800 [SPARK-45758][SQL] Introduce a mapper for hadoop compression codecs ### What changes were proposed in this pull request? Currently, Spark supported partial Hadoop compression codecs, but the Hadoop supported compression codecs and spark supported are not completely one-on-one due to Spark introduce two fake compression codecs none and uncompress. There are a lot of magic strings copy from Hadoop compression codecs. This issue lead to developers need to manually maintain its consistency. It is easy to make mistakes and reduce development efficiency. ### Why are the changes needed? Let developers easy to use Hadoop compression codecs. ### Does this PR introduce _any_ user-facing change? 'No'. Introduce a new class. ### How was this patch tested? Exists test cases. ### Was this patch authored or co-authored using generative AI tooling? 'No'. Closes #43620 from beliefer/SPARK-45758. Authored-by: Jiaan Geng <belie...@163.com> Signed-off-by: Jiaan Geng <belie...@163.com> --- .../sql/catalyst/util/HadoopCompressionCodec.java | 63 ++++++++++++++++++++++ .../sql/catalyst/util/CompressionCodecs.scala | 12 ++--- .../org/apache/spark/sql/DataFrameSuite.scala | 4 +- .../benchmark/DataSourceReadBenchmark.scala | 8 ++- .../sql/execution/datasources/csv/CSVSuite.scala | 4 +- .../sql/execution/datasources/json/JsonSuite.scala | 4 +- .../sql/execution/datasources/text/TextSuite.scala | 10 ++-- .../datasources/text/WholeTextFileSuite.scala | 3 +- 8 files changed, 87 insertions(+), 21 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/HadoopCompressionCodec.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/HadoopCompressionCodec.java new file mode 100644 index 00000000000..ee4cb4da322 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/HadoopCompressionCodec.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util; + +import java.util.Arrays; +import java.util.Locale; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.hadoop.io.compress.BZip2Codec; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.DeflateCodec; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.io.compress.Lz4Codec; +import org.apache.hadoop.io.compress.SnappyCodec; + +/** + * A mapper class from Spark supported hadoop compression codecs to hadoop compression codecs. + */ +public enum HadoopCompressionCodec { + NONE(null), + UNCOMPRESSED(null), + BZIP2(new BZip2Codec()), + DEFLATE(new DeflateCodec()), + GZIP(new GzipCodec()), + LZ4(new Lz4Codec()), + SNAPPY(new SnappyCodec()); + + // TODO supports ZStandardCodec + + private final CompressionCodec compressionCodec; + + HadoopCompressionCodec(CompressionCodec compressionCodec) { + this.compressionCodec = compressionCodec; + } + + public CompressionCodec getCompressionCodec() { + return this.compressionCodec; + } + + private static final Map<String, String> codecNameMap = + Arrays.stream(HadoopCompressionCodec.values()).collect( + Collectors.toMap(Enum::name, codec -> codec.name().toLowerCase(Locale.ROOT))); + + public String lowerCaseName() { + return codecNameMap.get(this.name()); + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CompressionCodecs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CompressionCodecs.scala index 1377a03d93b..a1d6446cc10 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CompressionCodecs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CompressionCodecs.scala @@ -21,19 +21,13 @@ import java.util.Locale import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.SequenceFile.CompressionType -import org.apache.hadoop.io.compress._ import org.apache.spark.util.Utils object CompressionCodecs { - private val shortCompressionCodecNames = Map( - "none" -> null, - "uncompressed" -> null, - "bzip2" -> classOf[BZip2Codec].getName, - "deflate" -> classOf[DeflateCodec].getName, - "gzip" -> classOf[GzipCodec].getName, - "lz4" -> classOf[Lz4Codec].getName, - "snappy" -> classOf[SnappyCodec].getName) + private val shortCompressionCodecNames = HadoopCompressionCodec.values().map { codec => + codec.lowerCaseName() -> Option(codec.getCompressionCodec).map(_.getClass.getName).orNull + }.toMap /** * Return the full version of the given codec class. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index b0a0b189cb7..d3271283baa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -40,6 +40,7 @@ import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LeafNode, LocalRelation, LogicalPlan, OneRowRelation, Statistics} import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.catalyst.util.HadoopCompressionCodec.GZIP import org.apache.spark.sql.connector.FakeV2Provider import org.apache.spark.sql.execution.{FilterExec, LogicalRDD, QueryExecution, SortExec, WholeStageCodegenExec} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper @@ -2766,7 +2767,8 @@ class DataFrameSuite extends QueryTest // The data set has 2 partitions, so Spark will write at least 2 json files. // Use a non-splittable compression (gzip), to make sure the json scan RDD has at least 2 // partitions. - .write.partitionBy("p").option("compression", "gzip").json(path.getCanonicalPath) + .write.partitionBy("p") + .option("compression", GZIP.lowerCaseName()).json(path.getCanonicalPath) val numJobs = new AtomicLong(0) sparkContext.addSparkListener(new SparkListener { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala index 74043bac49a..ea90cd9cd09 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala @@ -28,6 +28,7 @@ import org.apache.spark.{SparkConf, TestUtils} import org.apache.spark.benchmark.Benchmark import org.apache.spark.sql.{DataFrame, DataFrameWriter, Row, SparkSession} import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.util.HadoopCompressionCodec.GZIP import org.apache.spark.sql.execution.datasources.orc.OrcCompressionCodec import org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec, VectorizedParquetRecordReader} import org.apache.spark.sql.internal.SQLConf @@ -91,12 +92,15 @@ object DataSourceReadBenchmark extends SqlBasedBenchmark { } private def saveAsCsvTable(df: DataFrameWriter[Row], dir: String): Unit = { - df.mode("overwrite").option("compression", "gzip").option("header", true).csv(dir) + df.mode("overwrite") + .option("compression", GZIP.lowerCaseName()) + .option("header", true) + .csv(dir) spark.read.option("header", true).csv(dir).createOrReplaceTempView("csvTable") } private def saveAsJsonTable(df: DataFrameWriter[Row], dir: String): Unit = { - df.mode("overwrite").option("compression", "gzip").json(dir) + df.mode("overwrite").option("compression", GZIP.lowerCaseName()).json(dir) spark.read.json(dir).createOrReplaceTempView("jsonTable") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index a84aea27868..a2ce9b5db2a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -39,7 +39,7 @@ import org.apache.logging.log4j.Level import org.apache.spark.{SparkConf, SparkException, SparkFileNotFoundException, SparkRuntimeException, SparkUpgradeException, TestUtils} import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Encoders, QueryTest, Row} import org.apache.spark.sql.catalyst.csv.CSVOptions -import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils} +import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils, HadoopCompressionCodec} import org.apache.spark.sql.execution.datasources.CommonFileDataSourceSuite import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf} import org.apache.spark.sql.test.SharedSparkSession @@ -874,7 +874,7 @@ abstract class CSVSuite cars.coalesce(1).write .format("csv") .option("header", "true") - .option("compression", "none") + .option("compression", HadoopCompressionCodec.NONE.lowerCaseName()) .options(extraOptions) .save(csvDir) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 2f8b0a323dc..d906ae80a80 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -34,7 +34,7 @@ import org.apache.spark.{SparkConf, SparkException, SparkFileNotFoundException, import org.apache.spark.rdd.RDD import org.apache.spark.sql.{functions => F, _} import org.apache.spark.sql.catalyst.json._ -import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils} +import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils, HadoopCompressionCodec} import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLType import org.apache.spark.sql.execution.ExternalRDD import org.apache.spark.sql.execution.datasources.{CommonFileDataSourceSuite, DataSource, InMemoryFileIndex, NoopCache} @@ -1689,7 +1689,7 @@ abstract class JsonSuite val jsonDir = new File(dir, "json").getCanonicalPath jsonDF.coalesce(1).write .format("json") - .option("compression", "none") + .option("compression", HadoopCompressionCodec.NONE.lowerCaseName()) .options(extraOptions) .save(jsonDir) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala index ff6b9aadf7c..6e3210f8c17 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala @@ -26,6 +26,7 @@ import org.apache.hadoop.io.compress.GzipCodec import org.apache.spark.{SparkConf, TestUtils} import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row, SaveMode} +import org.apache.spark.sql.catalyst.util.HadoopCompressionCodec.{BZIP2, DEFLATE, GZIP, NONE} import org.apache.spark.sql.execution.datasources.CommonFileDataSourceSuite import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession @@ -92,7 +93,8 @@ abstract class TextSuite extends QueryTest with SharedSparkSession with CommonFi test("SPARK-13503 Support to specify the option for compression codec for TEXT") { val testDf = spark.read.text(testFile) - val extensionNameMap = Map("bzip2" -> ".bz2", "deflate" -> ".deflate", "gzip" -> ".gz") + val extensionNameMap = Seq(BZIP2, DEFLATE, GZIP) + .map(codec => codec.lowerCaseName() -> codec.getCompressionCodec.getDefaultExtension) extensionNameMap.foreach { case (codecName, extension) => val tempDir = Utils.createTempDir() @@ -122,7 +124,7 @@ abstract class TextSuite extends QueryTest with SharedSparkSession with CommonFi withTempDir { dir => val testDf = spark.read.text(testFile) val tempDirPath = dir.getAbsolutePath - testDf.write.option("compression", "none") + testDf.write.option("compression", NONE.lowerCaseName()) .options(extraOptions).mode(SaveMode.Overwrite).text(tempDirPath) val compressedFiles = new File(tempDirPath).listFiles() assert(compressedFiles.exists(!_.getName.endsWith(".txt.gz"))) @@ -141,7 +143,7 @@ abstract class TextSuite extends QueryTest with SharedSparkSession with CommonFi withTempDir { dir => val testDf = spark.read.text(testFile) val tempDirPath = dir.getAbsolutePath - testDf.write.option("CoMpReSsIoN", "none") + testDf.write.option("CoMpReSsIoN", NONE.lowerCaseName()) .options(extraOptions).mode(SaveMode.Overwrite).text(tempDirPath) val compressedFiles = new File(tempDirPath).listFiles() assert(compressedFiles.exists(!_.getName.endsWith(".txt.gz"))) @@ -166,7 +168,7 @@ abstract class TextSuite extends QueryTest with SharedSparkSession with CommonFi withTempDir { dir => val path = dir.getCanonicalPath val df1 = spark.range(0, 1000).selectExpr("CAST(id AS STRING) AS s") - df1.write.option("compression", "gzip").mode("overwrite").text(path) + df1.write.option("compression", GZIP.lowerCaseName()).mode("overwrite").text(path) val expected = df1.collect() Seq(10, 100, 1000).foreach { bytes => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/WholeTextFileSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/WholeTextFileSuite.scala index f4812844cba..57e08c55874 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/WholeTextFileSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/WholeTextFileSuite.scala @@ -21,6 +21,7 @@ import java.io.File import org.apache.spark.SparkConf import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.catalyst.util.HadoopCompressionCodec.GZIP import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{StringType, StructType} @@ -90,7 +91,7 @@ abstract class WholeTextFileSuite extends QueryTest with SharedSparkSession { withTempDir { dir => val path = dir.getCanonicalPath val df1 = spark.range(0, 1000).selectExpr("CAST(id AS STRING) AS s").repartition(1) - df1.write.option("compression", "gzip").mode("overwrite").text(path) + df1.write.option("compression", GZIP.lowerCaseName()).mode("overwrite").text(path) // On reading through wholetext mode, one file will be read as a single row, i.e. not // delimited by "next line" character. val expected = Row(df1.collect().map(_.getString(0)).mkString("", "\n", "\n")) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org