This is an automated email from the ASF dual-hosted git repository. dbtsai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 47f54b1 [SPARK-28118][CORE] Add `spark.eventLog.compression.codec` configuration 47f54b1 is described below commit 47f54b1ec717d0d744bf3ad46bb1ed3542b667c8 Author: Dongjoon Hyun <dh...@apple.com> AuthorDate: Fri Jun 21 00:43:38 2019 +0000 [SPARK-28118][CORE] Add `spark.eventLog.compression.codec` configuration ## What changes were proposed in this pull request? Event logs are different from the other data in terms of the lifetime. It would be great to have a new configuration for Spark event log compression like `spark.eventLog.compression.codec` . This PR adds this new configuration as an optional configuration. So, if `spark.eventLog.compression.codec` is not given, `spark.io.compression.codec` will be used. ## How was this patch tested? Pass the Jenkins with the newly added test case. Closes #24921 from dongjoon-hyun/SPARK-28118. Authored-by: Dongjoon Hyun <dh...@apple.com> Signed-off-by: DB Tsai <d_t...@apple.com> --- .../scala/org/apache/spark/internal/config/package.scala | 7 +++++++ .../apache/spark/scheduler/EventLoggingListener.scala | 6 ++++-- .../spark/scheduler/EventLoggingListenerSuite.scala | 16 +++++++++++++++- docs/configuration.md | 9 ++++++++- 4 files changed, 34 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index c2dab68..8098542 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1180,6 +1180,13 @@ package object config { .intConf .createWithDefault(1) + private[spark] val EVENT_LOG_COMPRESSION_CODEC = + ConfigBuilder("spark.eventLog.compression.codec") + .doc("The codec used to compress event log. By default, Spark provides four codecs: " + + "lz4, lzf, snappy, and zstd. You can also use fully qualified class names to specify " + + "the codec. If this is not given, spark.io.compression.codec will be used.") + .fallbackConf(IO_COMPRESSION_CODEC) + private[spark] val BUFFER_SIZE = ConfigBuilder("spark.buffer.size") .intConf diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 5ce09c4..20c74b1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -44,6 +44,7 @@ import org.apache.spark.util.{JsonProtocol, Utils} * spark.eventLog.enabled - Whether event logging is enabled. * spark.eventLog.logBlockUpdates.enabled - Whether to log block updates * spark.eventLog.compress - Whether to compress logged events + * spark.eventLog.compression.codec - The codec to compress logged events * spark.eventLog.overwrite - Whether to overwrite any existing files. * spark.eventLog.dir - Path to the directory in which events are logged. * spark.eventLog.buffer.kb - Buffer size to use when writing to output streams @@ -73,11 +74,12 @@ private[spark] class EventLoggingListener( private val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf) private val compressionCodec = if (shouldCompress) { - Some(CompressionCodec.createCodec(sparkConf)) + Some(CompressionCodec.createCodec(sparkConf, sparkConf.get(EVENT_LOG_COMPRESSION_CODEC))) } else { None } - private val compressionCodecName = compressionCodec.map { c => + // Visible for tests only. + private[scheduler] val compressionCodecName = compressionCodec.map { c => CompressionCodec.getShortName(c.getClass.getName) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 350fc2a..5b8d254 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -87,6 +87,20 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit testEventLogging() } + test("spark.eventLog.compression.codec overrides spark.io.compression.codec") { + val conf = new SparkConf + conf.set(EVENT_LOG_COMPRESS, true) + + // The default value is `spark.io.compression.codec`. + val e = new EventLoggingListener("test", None, testDirPath.toUri(), conf) + assert(e.compressionCodecName.contains("lz4")) + + // `spark.eventLog.compression.codec` overrides `spark.io.compression.codec`. + conf.set(EVENT_LOG_COMPRESSION_CODEC, "zstd") + val e2 = new EventLoggingListener("test", None, testDirPath.toUri(), conf) + assert(e2.compressionCodecName.contains("zstd")) + } + test("Basic event logging with compression") { CompressionCodec.ALL_COMPRESSION_CODECS.foreach { codec => testEventLogging(compressionCodec = Some(CompressionCodec.getShortName(codec))) @@ -535,7 +549,7 @@ object EventLoggingListenerSuite { conf.set(EVENT_LOG_DIR, logDir.toString) compressionCodec.foreach { codec => conf.set(EVENT_LOG_COMPRESS, true) - conf.set(IO_COMPRESSION_CODEC, codec) + conf.set(EVENT_LOG_COMPRESSION_CODEC, codec) } conf.set(EVENT_LOG_STAGE_EXECUTOR_METRICS, true) conf diff --git a/docs/configuration.md b/docs/configuration.md index 67e0cef..211dfbb 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -880,7 +880,14 @@ Apart from these, the following properties are also available, and may be useful <td>false</td> <td> Whether to compress logged events, if <code>spark.eventLog.enabled</code> is true. - Compression will use <code>spark.io.compression.codec</code>. + </td> +</tr> +<tr> + <td><code>spark.eventLog.compression.codec</code></td> + <td></td> + <td> + The codec to compress logged events. If this is not given, + <code>spark.io.compression.codec</code> will be used. </td> </tr> <tr> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org