Repository: spark Updated Branches: refs/heads/master 2fbe294cf -> d20c10fdf
[SPARK-24952][SQL] Support LZMA2 compression by Avro datasource ## What changes were proposed in this pull request? In the PR, I propose to support `LZMA2` (`XZ`) and `BZIP2` compressions by `AVRO` datasource in write since the codecs may have better characteristics like compression ratio and speed comparing to already supported `snappy` and `deflate` codecs. ## How was this patch tested? It was tested manually and by an existing test which was extended to check the `xz` and `bzip2` compressions. Author: Maxim Gekk <maxim.g...@databricks.com> Closes #21902 from MaxGekk/avro-xz-bzip2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d20c10fd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d20c10fd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d20c10fd Branch: refs/heads/master Commit: d20c10fdf382acf43a7e6a541923bd078e19ca75 Parents: 2fbe294 Author: Maxim Gekk <maxim.g...@databricks.com> Authored: Tue Jul 31 09:12:57 2018 +0800 Committer: hyukjinkwon <gurwls...@apache.org> Committed: Tue Jul 31 09:12:57 2018 +0800 ---------------------------------------------------------------------- .../apache/spark/sql/avro/AvroFileFormat.scala | 40 +++++++++----------- .../org/apache/spark/sql/avro/AvroOptions.scala | 2 +- .../org/apache/spark/sql/avro/AvroSuite.scala | 14 ++++++- .../org/apache/spark/sql/internal/SQLConf.scala | 6 ++- 4 files changed, 36 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d20c10fd/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala ---------------------------------------------------------------------- diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala index e0159b9..7db452b 100755 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala @@ -23,7 +23,8 @@ import java.net.URI import scala.util.control.NonFatal import org.apache.avro.Schema -import org.apache.avro.file.{DataFileConstants, DataFileReader} +import org.apache.avro.file.DataFileConstants._ +import org.apache.avro.file.DataFileReader import org.apache.avro.generic.{GenericDatumReader, GenericRecord} import org.apache.avro.mapred.{AvroOutputFormat, FsInput} import org.apache.avro.mapreduce.AvroJob @@ -116,27 +117,22 @@ private[avro] class AvroFileFormat extends FileFormat dataSchema, nullable = false, parsedOptions.recordName, parsedOptions.recordNamespace) AvroJob.setOutputKeySchema(job, outputAvroSchema) - val COMPRESS_KEY = "mapred.output.compress" - - parsedOptions.compression match { - case "uncompressed" => - logInfo("writing uncompressed Avro records") - job.getConfiguration.setBoolean(COMPRESS_KEY, false) - - case "snappy" => - logInfo("compressing Avro output using Snappy") - job.getConfiguration.setBoolean(COMPRESS_KEY, true) - job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.SNAPPY_CODEC) - - case "deflate" => - val deflateLevel = spark.sessionState.conf.avroDeflateLevel - logInfo(s"compressing Avro output using deflate (level=$deflateLevel)") - job.getConfiguration.setBoolean(COMPRESS_KEY, true) - job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.DEFLATE_CODEC) - job.getConfiguration.setInt(AvroOutputFormat.DEFLATE_LEVEL_KEY, deflateLevel) - - case unknown: String => - logError(s"unsupported compression codec $unknown") + + if (parsedOptions.compression == "uncompressed") { + job.getConfiguration.setBoolean("mapred.output.compress", false) + } else { + job.getConfiguration.setBoolean("mapred.output.compress", true) + logInfo(s"Compressing Avro output using the ${parsedOptions.compression} codec") + val codec = parsedOptions.compression match { + case DEFLATE_CODEC => + val deflateLevel = spark.sessionState.conf.avroDeflateLevel + logInfo(s"Avro compression level $deflateLevel will be used for $DEFLATE_CODEC codec.") + job.getConfiguration.setInt(AvroOutputFormat.DEFLATE_LEVEL_KEY, deflateLevel) + DEFLATE_CODEC + case codec @ (SNAPPY_CODEC | BZIP2_CODEC | XZ_CODEC) => codec + case unknown => throw new IllegalArgumentException(s"Invalid compression codec: $unknown") + } + job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, codec) } new AvroOutputWriterFactory(dataSchema, outputAvroSchema.toString) http://git-wip-us.apache.org/repos/asf/spark/blob/d20c10fd/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala ---------------------------------------------------------------------- diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala index 0f59007..67f5634 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala @@ -72,7 +72,7 @@ class AvroOptions( /** * The `compression` option allows to specify a compression codec used in write. - * Currently supported codecs are `uncompressed`, `snappy` and `deflate`. + * Currently supported codecs are `uncompressed`, `snappy`, `deflate`, `bzip2` and `xz`. * If the option is not set, the `spark.sql.avro.compression.codec` config is taken into * account. If the former one is not set too, the `snappy` codec is used by default. */ http://git-wip-us.apache.org/repos/asf/spark/blob/d20c10fd/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala ---------------------------------------------------------------------- diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index f59c2cc..c221c4f 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.avro import java.io._ import java.net.URL -import java.nio.file.{Files, Path, Paths} +import java.nio.file.{Files, Paths} import java.sql.{Date, Timestamp} import java.util.{TimeZone, UUID} @@ -368,12 +368,18 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { test("write with compression - sql configs") { withTempPath { dir => val uncompressDir = s"$dir/uncompress" + val bzip2Dir = s"$dir/bzip2" + val xzDir = s"$dir/xz" val deflateDir = s"$dir/deflate" val snappyDir = s"$dir/snappy" val df = spark.read.format("avro").load(testAvro) spark.conf.set(SQLConf.AVRO_COMPRESSION_CODEC.key, "uncompressed") df.write.format("avro").save(uncompressDir) + spark.conf.set(SQLConf.AVRO_COMPRESSION_CODEC.key, "bzip2") + df.write.format("avro").save(bzip2Dir) + spark.conf.set(SQLConf.AVRO_COMPRESSION_CODEC.key, "xz") + df.write.format("avro").save(xzDir) spark.conf.set(SQLConf.AVRO_COMPRESSION_CODEC.key, "deflate") spark.conf.set(SQLConf.AVRO_DEFLATE_LEVEL.key, "9") df.write.format("avro").save(deflateDir) @@ -381,11 +387,15 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { df.write.format("avro").save(snappyDir) val uncompressSize = FileUtils.sizeOfDirectory(new File(uncompressDir)) + val bzip2Size = FileUtils.sizeOfDirectory(new File(bzip2Dir)) + val xzSize = FileUtils.sizeOfDirectory(new File(xzDir)) val deflateSize = FileUtils.sizeOfDirectory(new File(deflateDir)) val snappySize = FileUtils.sizeOfDirectory(new File(snappyDir)) assert(uncompressSize > deflateSize) assert(snappySize > deflateSize) + assert(snappySize > bzip2Size) + assert(bzip2Size > xzSize) } } @@ -921,6 +931,8 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { checkCodec(df, path, "uncompressed") checkCodec(df, path, "deflate") checkCodec(df, path, "snappy") + checkCodec(df, path, "bzip2") + checkCodec(df, path, "xz") } } } http://git-wip-us.apache.org/repos/asf/spark/blob/d20c10fd/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index a269e21..edc1a48 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -27,6 +27,7 @@ import scala.collection.immutable import scala.util.matching.Regex import org.apache.hadoop.fs.Path +import org.tukaani.xz.LZMA2Options import org.apache.spark.{SparkContext, TaskContext} import org.apache.spark.internal.Logging @@ -1437,9 +1438,10 @@ object SQLConf { .createWithDefault(20) val AVRO_COMPRESSION_CODEC = buildConf("spark.sql.avro.compression.codec") - .doc("Compression codec used in writing of AVRO files. Default codec is snappy.") + .doc("Compression codec used in writing of AVRO files. Supported codecs: " + + "uncompressed, deflate, snappy, bzip2 and xz. Default codec is snappy.") .stringConf - .checkValues(Set("uncompressed", "deflate", "snappy")) + .checkValues(Set("uncompressed", "deflate", "snappy", "bzip2", "xz")) .createWithDefault("snappy") val AVRO_DEFLATE_LEVEL = buildConf("spark.sql.avro.deflate.level") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org