This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new f12bc05e440 [SPARK-45664][SQL] Introduce a mapper for orc compression codecs f12bc05e440 is described below commit f12bc05e44099f24c470466bf777473744ab893d Author: Jiaan Geng <belie...@163.com> AuthorDate: Sun Oct 29 22:22:07 2023 -0700 [SPARK-45664][SQL] Introduce a mapper for orc compression codecs ### What changes were proposed in this pull request? Currently, Spark supported all the orc compression codecs, but the orc supported compression codecs and spark supported are not completely one-on-one due to Spark introduce two compression codecs `NONE` and `UNCOMPRESSED`. On the other hand, there are a lot of magic strings copy from orc compression codecs. This issue lead to developers need to manually maintain its consistency. It is easy to make mistakes and reduce development efficiency. ### Why are the changes needed? Let developers easy to use orc compression codecs. ### Does this PR introduce _any_ user-facing change? 'No'. Introduce a new class. ### How was this patch tested? Exists test cases. ### Was this patch authored or co-authored using generative AI tooling? 'No'. Closes #43528 from beliefer/SPARK-45664. Authored-by: Jiaan Geng <belie...@163.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../datasources/orc/OrcCompressionCodec.java | 56 ++++++++++++++++++++++ .../sql/execution/datasources/orc/OrcOptions.scala | 11 ++--- .../sql/execution/datasources/orc/OrcUtils.scala | 12 ++--- .../BuiltInDataSourceWriteBenchmark.scala | 4 +- .../benchmark/DataSourceReadBenchmark.scala | 4 +- .../benchmark/FilterPushdownBenchmark.scala | 3 +- .../datasources/FileSourceCodecSuite.scala | 5 +- .../execution/datasources/orc/OrcQuerySuite.scala | 26 +++++----- .../execution/datasources/orc/OrcSourceSuite.scala | 29 +++++++---- .../spark/sql/hive/CompressionCodecSuite.scala | 23 ++++++--- .../spark/sql/hive/execution/HiveDDLSuite.scala | 7 +-- .../sql/hive/orc/OrcHadoopFsRelationSuite.scala | 6 +-- .../spark/sql/hive/orc/OrcReadBenchmark.scala | 5 +- 13 files changed, 134 insertions(+), 57 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcCompressionCodec.java b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcCompressionCodec.java new file mode 100644 index 00000000000..c8e57969068 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcCompressionCodec.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc; + +import java.util.Arrays; +import java.util.Locale; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.orc.CompressionKind; + +/** + * A mapper class from Spark supported orc compression codecs to orc compression codecs. + */ +public enum OrcCompressionCodec { + NONE(CompressionKind.NONE), + UNCOMPRESSED(CompressionKind.NONE), + ZLIB(CompressionKind.ZLIB), + SNAPPY(CompressionKind.SNAPPY), + LZO(CompressionKind.LZO), + LZ4(CompressionKind.LZ4), + ZSTD(CompressionKind.ZSTD); + + private final CompressionKind compressionKind; + + OrcCompressionCodec(CompressionKind compressionKind) { + this.compressionKind = compressionKind; + } + + public CompressionKind getCompressionKind() { + return this.compressionKind; + } + + public static final Map<String, String> codecNameMap = + Arrays.stream(OrcCompressionCodec.values()).collect( + Collectors.toMap(codec -> codec.name(), codec -> codec.name().toLowerCase(Locale.ROOT))); + + public String lowerCaseName() { + return codecNameMap.get(this.name()); + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala index 1c819f07038..4bed600fa4e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala @@ -75,14 +75,9 @@ object OrcOptions extends DataSourceOptions { val COMPRESSION = newOption("compression") // The ORC compression short names - private val shortOrcCompressionCodecNames = Map( - "none" -> "NONE", - "uncompressed" -> "NONE", - "snappy" -> "SNAPPY", - "zlib" -> "ZLIB", - "lzo" -> "LZO", - "lz4" -> "LZ4", - "zstd" -> "ZSTD") + private val shortOrcCompressionCodecNames = OrcCompressionCodec.values().map { + mapper => mapper.lowerCaseName() -> mapper.getCompressionKind.name() + }.toMap def getORCCompressionCodecName(name: String): String = shortOrcCompressionCodecNames(name) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala index d6d42e74da1..c4490b95e3b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala @@ -49,12 +49,12 @@ object OrcUtils extends Logging { // The extensions for ORC compression codecs val extensionsForCompressionCodecNames = Map( - "NONE" -> "", - "SNAPPY" -> ".snappy", - "ZLIB" -> ".zlib", - "ZSTD" -> ".zstd", - "LZ4" -> ".lz4", - "LZO" -> ".lzo") + OrcCompressionCodec.NONE.name() -> "", + OrcCompressionCodec.SNAPPY.name() -> ".snappy", + OrcCompressionCodec.ZLIB.name() -> ".zlib", + OrcCompressionCodec.ZSTD.name() -> ".zstd", + OrcCompressionCodec.LZ4.name() -> ".lz4", + OrcCompressionCodec.LZO.name() -> ".lzo") val CATALYST_TYPE_ATTRIBUTE_NAME = "spark.sql.catalyst.type" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala index ba3228878ec..a8b448c8dc9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala @@ -21,6 +21,7 @@ import java.util.Locale import org.apache.parquet.column.ParquetProperties import org.apache.parquet.hadoop.ParquetOutputFormat +import org.apache.spark.sql.execution.datasources.orc.OrcCompressionCodec import org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodec import org.apache.spark.sql.internal.SQLConf @@ -56,7 +57,8 @@ object BuiltInDataSourceWriteBenchmark extends DataSourceWriteBenchmark { spark.conf.set(SQLConf.PARQUET_COMPRESSION.key, ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT)) - spark.conf.set(SQLConf.ORC_COMPRESSION.key, "snappy") + spark.conf.set(SQLConf.ORC_COMPRESSION.key, + OrcCompressionCodec.SNAPPY.lowerCaseName()) formats.foreach { format => runBenchmark(s"$format writer benchmark") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala index a8736c04151..bc271235f9d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala @@ -29,6 +29,7 @@ import org.apache.spark.{SparkConf, TestUtils} import org.apache.spark.benchmark.Benchmark import org.apache.spark.sql.{DataFrame, DataFrameWriter, Row, SparkSession} import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.orc.OrcCompressionCodec import org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec, VectorizedParquetRecordReader} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -116,7 +117,8 @@ object DataSourceReadBenchmark extends SqlBasedBenchmark { } private def saveAsOrcTable(df: DataFrameWriter[Row], dir: String): Unit = { - df.mode("overwrite").option("compression", "snappy").orc(dir) + df.mode("overwrite").option("compression", + OrcCompressionCodec.SNAPPY.lowerCaseName()).orc(dir) spark.read.orc(dir).createOrReplaceTempView("orcTable") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala index 10781ec90fa..1a788bf5f2f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala @@ -25,6 +25,7 @@ import scala.util.Random import org.apache.spark.SparkConf import org.apache.spark.benchmark.Benchmark import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.execution.datasources.orc.OrcCompressionCodec import org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodec import org.apache.spark.sql.functions.{monotonically_increasing_id, timestamp_seconds} import org.apache.spark.sql.internal.SQLConf @@ -51,7 +52,7 @@ object FilterPushdownBenchmark extends SqlBasedBenchmark { .set("spark.master", "local[1]") .setIfMissing("spark.driver.memory", "3g") .setIfMissing("spark.executor.memory", "3g") - .setIfMissing("orc.compression", "snappy") + .setIfMissing("orc.compression", OrcCompressionCodec.SNAPPY.lowerCaseName()) .setIfMissing("spark.sql.parquet.compression.codec", ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala index 1f1805a02d7..e4d9e13c2b0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala @@ -22,6 +22,7 @@ import java.util.Locale import scala.jdk.CollectionConverters._ import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.execution.datasources.orc.OrcCompressionCodec import org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} @@ -73,6 +74,6 @@ class OrcCodecSuite extends FileSourceCodecSuite { override def format: String = "orc" override val codecConfigName: String = SQLConf.ORC_COMPRESSION.key - override protected def availableCodecs = Seq("none", "uncompressed", "snappy", - "zlib", "zstd", "lz4", "lzo") + override protected def availableCodecs = + OrcCompressionCodec.values().map(_.lowerCaseName()).toSeq } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala index 39447ed71a8..7d666729bb4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala @@ -190,7 +190,7 @@ abstract class OrcQueryTest extends OrcTest { // Respect `orc.compress` (i.e., OrcConf.COMPRESS). withTempPath { file => spark.range(0, 10).write - .option(COMPRESS.getAttribute, "ZLIB") + .option(COMPRESS.getAttribute, OrcCompressionCodec.ZLIB.name()) .orc(file.getCanonicalPath) val maybeOrcFile = file.listFiles().find(_.getName.endsWith(".zlib.orc")) @@ -199,15 +199,15 @@ abstract class OrcQueryTest extends OrcTest { val orcFilePath = new Path(maybeOrcFile.get.getAbsolutePath) val conf = OrcFile.readerOptions(new Configuration()) Utils.tryWithResource(OrcFile.createReader(orcFilePath, conf)) { reader => - assert("ZLIB" === reader.getCompressionKind.name) + assert(OrcCompressionCodec.ZLIB.name() === reader.getCompressionKind.name) } } // `compression` overrides `orc.compress`. withTempPath { file => spark.range(0, 10).write - .option("compression", "ZLIB") - .option(COMPRESS.getAttribute, "SNAPPY") + .option("compression", OrcCompressionCodec.ZLIB.name()) + .option(COMPRESS.getAttribute, OrcCompressionCodec.SNAPPY.name()) .orc(file.getCanonicalPath) val maybeOrcFile = file.listFiles().find(_.getName.endsWith(".zlib.orc")) @@ -216,7 +216,7 @@ abstract class OrcQueryTest extends OrcTest { val orcFilePath = new Path(maybeOrcFile.get.getAbsolutePath) val conf = OrcFile.readerOptions(new Configuration()) Utils.tryWithResource(OrcFile.createReader(orcFilePath, conf)) { reader => - assert("ZLIB" === reader.getCompressionKind.name) + assert(OrcCompressionCodec.ZLIB.name() === reader.getCompressionKind.name) } } } @@ -224,7 +224,7 @@ abstract class OrcQueryTest extends OrcTest { test("Compression options for writing to an ORC file (SNAPPY, ZLIB and NONE)") { withTempPath { file => spark.range(0, 10).write - .option("compression", "ZLIB") + .option("compression", OrcCompressionCodec.ZLIB.name()) .orc(file.getCanonicalPath) val maybeOrcFile = file.listFiles().find(_.getName.endsWith(".zlib.orc")) @@ -233,13 +233,13 @@ abstract class OrcQueryTest extends OrcTest { val orcFilePath = new Path(maybeOrcFile.get.getAbsolutePath) val conf = OrcFile.readerOptions(new Configuration()) Utils.tryWithResource(OrcFile.createReader(orcFilePath, conf)) { reader => - assert("ZLIB" === reader.getCompressionKind.name) + assert(OrcCompressionCodec.ZLIB.name() === reader.getCompressionKind.name) } } withTempPath { file => spark.range(0, 10).write - .option("compression", "SNAPPY") + .option("compression", OrcCompressionCodec.SNAPPY.name()) .orc(file.getCanonicalPath) val maybeOrcFile = file.listFiles().find(_.getName.endsWith(".snappy.orc")) @@ -248,13 +248,13 @@ abstract class OrcQueryTest extends OrcTest { val orcFilePath = new Path(maybeOrcFile.get.getAbsolutePath) val conf = OrcFile.readerOptions(new Configuration()) Utils.tryWithResource(OrcFile.createReader(orcFilePath, conf)) { reader => - assert("SNAPPY" === reader.getCompressionKind.name) + assert(OrcCompressionCodec.SNAPPY.name() === reader.getCompressionKind.name) } } withTempPath { file => spark.range(0, 10).write - .option("compression", "NONE") + .option("compression", OrcCompressionCodec.NONE.name()) .orc(file.getCanonicalPath) val maybeOrcFile = file.listFiles().find(_.getName.endsWith(".orc")) @@ -263,7 +263,7 @@ abstract class OrcQueryTest extends OrcTest { val orcFilePath = new Path(maybeOrcFile.get.getAbsolutePath) val conf = OrcFile.readerOptions(new Configuration()) Utils.tryWithResource(OrcFile.createReader(orcFilePath, conf)) { reader => - assert("NONE" === reader.getCompressionKind.name) + assert(OrcCompressionCodec.NONE.name() === reader.getCompressionKind.name) } } } @@ -647,7 +647,7 @@ abstract class OrcQuerySuite extends OrcQueryTest with SharedSparkSession { test("LZO compression options for writing to an ORC file") { withTempPath { file => spark.range(0, 10).write - .option("compression", "LZO") + .option("compression", OrcCompressionCodec.LZO.name()) .orc(file.getCanonicalPath) val maybeOrcFile = file.listFiles().find(_.getName.endsWith(".lzo.orc")) @@ -656,7 +656,7 @@ abstract class OrcQuerySuite extends OrcQueryTest with SharedSparkSession { val orcFilePath = new Path(maybeOrcFile.get.getAbsolutePath) val conf = OrcFile.readerOptions(new Configuration()) Utils.tryWithResource(OrcFile.createReader(orcFilePath, conf)) { reader => - assert("LZO" === reader.getCompressionKind.name) + assert(OrcCompressionCodec.LZO.name() === reader.getCompressionKind.name) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala index 2ed24943908..4abcb4a7ef1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala @@ -324,29 +324,38 @@ abstract class OrcSuite test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") { val conf = spark.sessionState.conf - val option = new OrcOptions(Map(COMPRESS.getAttribute.toUpperCase(Locale.ROOT) -> "NONE"), conf) - assert(option.compressionCodec == "NONE") + val option = new OrcOptions( + Map(COMPRESS.getAttribute.toUpperCase(Locale.ROOT) -> OrcCompressionCodec.NONE.name()), conf) + assert(option.compressionCodec == OrcCompressionCodec.NONE.name()) } test("SPARK-21839: Add SQL config for ORC compression") { val conf = spark.sessionState.conf // Test if the default of spark.sql.orc.compression.codec is snappy - assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == "SNAPPY") + assert(new OrcOptions( + Map.empty[String, String], conf).compressionCodec == OrcCompressionCodec.SNAPPY.name()) // OrcOptions's parameters have a higher priority than SQL configuration. // `compression` -> `orc.compression` -> `spark.sql.orc.compression.codec` withSQLConf(SQLConf.ORC_COMPRESSION.key -> "uncompressed") { - assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == "NONE") - val map1 = Map(COMPRESS.getAttribute -> "zlib") - val map2 = Map(COMPRESS.getAttribute -> "zlib", "compression" -> "lzo") - assert(new OrcOptions(map1, conf).compressionCodec == "ZLIB") - assert(new OrcOptions(map2, conf).compressionCodec == "LZO") + assert(new OrcOptions( + Map.empty[String, String], conf).compressionCodec == OrcCompressionCodec.NONE.name()) + val zlibCodec = OrcCompressionCodec.ZLIB.lowerCaseName() + val lzoCodec = OrcCompressionCodec.LZO.lowerCaseName() + val map1 = Map(COMPRESS.getAttribute -> zlibCodec) + val map2 = Map(COMPRESS.getAttribute -> zlibCodec, "compression" -> lzoCodec) + assert(new OrcOptions(map1, conf).compressionCodec == OrcCompressionCodec.ZLIB.name()) + assert(new OrcOptions(map2, conf).compressionCodec == OrcCompressionCodec.LZO.name()) } // Test all the valid options of spark.sql.orc.compression.codec - Seq("NONE", "UNCOMPRESSED", "SNAPPY", "ZLIB", "LZO", "ZSTD", "LZ4").foreach { c => + OrcCompressionCodec.values().map(_.name()).foreach { c => withSQLConf(SQLConf.ORC_COMPRESSION.key -> c) { - val expected = if (c == "UNCOMPRESSED") "NONE" else c + val expected = if (c == OrcCompressionCodec.UNCOMPRESSED.name()) { + OrcCompressionCodec.NONE.name() + } else { + c + } assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == expected) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala index df28e7b4485..52138ae4558 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala @@ -23,12 +23,11 @@ import java.util.Locale import scala.jdk.CollectionConverters._ import org.apache.hadoop.fs.Path -import org.apache.hadoop.hive.ql.io.orc.CompressionKind import org.apache.orc.OrcConf.COMPRESS import org.apache.parquet.hadoop.ParquetOutputFormat import org.scalatest.BeforeAndAfterAll -import org.apache.spark.sql.execution.datasources.orc.OrcOptions +import org.apache.spark.sql.execution.datasources.orc.{OrcCompressionCodec, OrcOptions} import org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec, ParquetOptions, ParquetTest} import org.apache.spark.sql.hive.orc.OrcFileOperator import org.apache.spark.sql.hive.test.TestHiveSingleton @@ -299,9 +298,15 @@ class CompressionCodecSuite extends TestHiveSingleton with ParquetTest with Befo ParquetCompressionCodec.SNAPPY.name)) checkForTableWithCompressProp("orc", tableCompressCodecs = - List(CompressionKind.NONE.name, CompressionKind.SNAPPY.name, CompressionKind.ZLIB.name), + List( + OrcCompressionCodec.NONE.name, + OrcCompressionCodec.SNAPPY.name, + OrcCompressionCodec.ZLIB.name), sessionCompressCodecs = - List(CompressionKind.SNAPPY.name, CompressionKind.ZLIB.name, CompressionKind.SNAPPY.name)) + List( + OrcCompressionCodec.SNAPPY.name, + OrcCompressionCodec.ZLIB.name, + OrcCompressionCodec.SNAPPY.name)) } test("table-level compression is not set but session-level compressions is set ") { @@ -314,7 +319,10 @@ class CompressionCodecSuite extends TestHiveSingleton with ParquetTest with Befo checkForTableWithCompressProp("orc", tableCompressCodecs = List.empty, sessionCompressCodecs = - List(CompressionKind.NONE.name, CompressionKind.SNAPPY.name, CompressionKind.ZLIB.name)) + List( + OrcCompressionCodec.NONE.name, + OrcCompressionCodec.SNAPPY.name, + OrcCompressionCodec.ZLIB.name)) } def checkTableWriteWithCompressionCodecs(format: String, compressCodecs: List[String]): Unit = { @@ -355,6 +363,9 @@ class CompressionCodecSuite extends TestHiveSingleton with ParquetTest with Befo ParquetCompressionCodec.GZIP.name)) checkTableWriteWithCompressionCodecs( "orc", - List(CompressionKind.NONE.name, CompressionKind.SNAPPY.name, CompressionKind.ZLIB.name)) + List( + OrcCompressionCodec.NONE.name, + OrcCompressionCodec.SNAPPY.name, + OrcCompressionCodec.ZLIB.name)) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 78365d25c89..55cbf591303 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils} +import org.apache.spark.sql.execution.datasources.orc.OrcCompressionCodec import org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec, ParquetFooterReader} import org.apache.spark.sql.functions._ import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils} @@ -2710,7 +2711,7 @@ class HiveDDLSuite } Seq( - ("orc", "ZLIB"), + ("orc", OrcCompressionCodec.ZLIB.name()), ("parquet", ParquetCompressionCodec.GZIP.name)).foreach { case (fileFormat, compression) => test(s"SPARK-22158 convertMetastore should not ignore table property - $fileFormat") { withSQLConf(CONVERT_METASTORE_ORC.key -> "true", CONVERT_METASTORE_PARQUET.key -> "true") { @@ -2768,7 +2769,7 @@ class HiveDDLSuite assert(DDLUtils.isHiveTable(table)) assert(table.storage.serde.get.contains("orc")) val properties = table.properties - assert(properties.get("orc.compress") == Some("ZLIB")) + assert(properties.get("orc.compress") == Some(OrcCompressionCodec.ZLIB.name())) assert(properties.get("orc.compress.size") == Some("1001")) assert(properties.get("orc.row.index.stride") == Some("2002")) assert(properties.get("hive.exec.orc.default.block.size") == Some("3003")) @@ -2780,7 +2781,7 @@ class HiveDDLSuite val maybeFile = path.listFiles().find(_.getName.startsWith("part")) Utils.tryWithResource(getReader(maybeFile.head.getCanonicalPath)) { reader => - assert(reader.getCompressionKind.name === "ZLIB") + assert(reader.getCompressionKind.name === OrcCompressionCodec.ZLIB.name()) assert(reader.getCompressionSize == 1001) assert(reader.getRowIndexStride == 2002) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala index e9b6bd28823..aa2f110ceac 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala @@ -20,10 +20,10 @@ package org.apache.spark.sql.hive.orc import java.io.File import org.apache.hadoop.fs.Path -import org.apache.hadoop.hive.ql.io.orc.CompressionKind import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.catalog.CatalogUtils +import org.apache.spark.sql.execution.datasources.orc.OrcCompressionCodec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.HadoopFsRelationTest import org.apache.spark.sql.types._ @@ -99,7 +99,7 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest { val orcFilePath = maybeOrcFile.get.toPath.toString val expectedCompressionKind = OrcFileOperator.getFileReader(orcFilePath).get.getCompression - assert(CompressionKind.ZLIB.name() === expectedCompressionKind.name()) + assert(OrcCompressionCodec.ZLIB.name() === expectedCompressionKind.name()) val copyDf = spark .read @@ -114,7 +114,7 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest { .orc(file.getCanonicalPath) val expectedCompressionKind = OrcFileOperator.getFileReader(file.getCanonicalPath).get.getCompression - assert(CompressionKind.SNAPPY.name() === expectedCompressionKind.name()) + assert(OrcCompressionCodec.SNAPPY.name() === expectedCompressionKind.name()) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala index 0330ce51a2a..c6ff7931410 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala @@ -21,12 +21,11 @@ import java.io.File import scala.util.Random -import org.apache.hadoop.hive.ql.io.orc.CompressionKind - import org.apache.spark.SparkConf import org.apache.spark.benchmark.Benchmark import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.execution.benchmark.SqlBasedBenchmark +import org.apache.spark.sql.execution.datasources.orc.OrcCompressionCodec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -48,7 +47,7 @@ object OrcReadBenchmark extends SqlBasedBenchmark { override def getSparkSession: SparkSession = { val conf = new SparkConf() - conf.set("orc.compression", CompressionKind.SNAPPY.name()) + conf.set("orc.compression", OrcCompressionCodec.SNAPPY.name()) val sparkSession = SparkSession.builder() .master("local[1]") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org