This is an automated email from the ASF dual-hosted git repository.

dbtsai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 47f54b1  [SPARK-28118][CORE] Add `spark.eventLog.compression.codec` 
configuration
47f54b1 is described below

commit 47f54b1ec717d0d744bf3ad46bb1ed3542b667c8
Author: Dongjoon Hyun <dh...@apple.com>
AuthorDate: Fri Jun 21 00:43:38 2019 +0000

    [SPARK-28118][CORE] Add `spark.eventLog.compression.codec` configuration
    
    ## What changes were proposed in this pull request?
    
    Event logs are different from the other data in terms of the lifetime. It 
would be great to have a new configuration for Spark event log compression like 
`spark.eventLog.compression.codec` .
    This PR adds this new configuration as an optional configuration. So, if 
`spark.eventLog.compression.codec` is not given, `spark.io.compression.codec` 
will be used.
    
    ## How was this patch tested?
    
    Pass the Jenkins with the newly added test case.
    
    Closes #24921 from dongjoon-hyun/SPARK-28118.
    
    Authored-by: Dongjoon Hyun <dh...@apple.com>
    Signed-off-by: DB Tsai <d_t...@apple.com>
---
 .../scala/org/apache/spark/internal/config/package.scala |  7 +++++++
 .../apache/spark/scheduler/EventLoggingListener.scala    |  6 ++++--
 .../spark/scheduler/EventLoggingListenerSuite.scala      | 16 +++++++++++++++-
 docs/configuration.md                                    |  9 ++++++++-
 4 files changed, 34 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index c2dab68..8098542 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -1180,6 +1180,13 @@ package object config {
       .intConf
       .createWithDefault(1)
 
+  private[spark] val EVENT_LOG_COMPRESSION_CODEC =
+    ConfigBuilder("spark.eventLog.compression.codec")
+      .doc("The codec used to compress event log. By default, Spark provides 
four codecs: " +
+        "lz4, lzf, snappy, and zstd. You can also use fully qualified class 
names to specify " +
+        "the codec. If this is not given, spark.io.compression.codec will be 
used.")
+      .fallbackConf(IO_COMPRESSION_CODEC)
+
   private[spark] val BUFFER_SIZE =
     ConfigBuilder("spark.buffer.size")
       .intConf
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala 
b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 5ce09c4..20c74b1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -44,6 +44,7 @@ import org.apache.spark.util.{JsonProtocol, Utils}
  *   spark.eventLog.enabled - Whether event logging is enabled.
  *   spark.eventLog.logBlockUpdates.enabled - Whether to log block updates
  *   spark.eventLog.compress - Whether to compress logged events
+ *   spark.eventLog.compression.codec - The codec to compress logged events
  *   spark.eventLog.overwrite - Whether to overwrite any existing files.
  *   spark.eventLog.dir - Path to the directory in which events are logged.
  *   spark.eventLog.buffer.kb - Buffer size to use when writing to output 
streams
@@ -73,11 +74,12 @@ private[spark] class EventLoggingListener(
   private val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf)
   private val compressionCodec =
     if (shouldCompress) {
-      Some(CompressionCodec.createCodec(sparkConf))
+      Some(CompressionCodec.createCodec(sparkConf, 
sparkConf.get(EVENT_LOG_COMPRESSION_CODEC)))
     } else {
       None
     }
-  private val compressionCodecName = compressionCodec.map { c =>
+  // Visible for tests only.
+  private[scheduler] val compressionCodecName = compressionCodec.map { c =>
     CompressionCodec.getShortName(c.getClass.getName)
   }
 
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 350fc2a..5b8d254 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -87,6 +87,20 @@ class EventLoggingListenerSuite extends SparkFunSuite with 
LocalSparkContext wit
     testEventLogging()
   }
 
+  test("spark.eventLog.compression.codec overrides 
spark.io.compression.codec") {
+    val conf = new SparkConf
+    conf.set(EVENT_LOG_COMPRESS, true)
+
+    // The default value is `spark.io.compression.codec`.
+    val e = new EventLoggingListener("test", None, testDirPath.toUri(), conf)
+    assert(e.compressionCodecName.contains("lz4"))
+
+    // `spark.eventLog.compression.codec` overrides 
`spark.io.compression.codec`.
+    conf.set(EVENT_LOG_COMPRESSION_CODEC, "zstd")
+    val e2 = new EventLoggingListener("test", None, testDirPath.toUri(), conf)
+    assert(e2.compressionCodecName.contains("zstd"))
+  }
+
   test("Basic event logging with compression") {
     CompressionCodec.ALL_COMPRESSION_CODECS.foreach { codec =>
       testEventLogging(compressionCodec = 
Some(CompressionCodec.getShortName(codec)))
@@ -535,7 +549,7 @@ object EventLoggingListenerSuite {
     conf.set(EVENT_LOG_DIR, logDir.toString)
     compressionCodec.foreach { codec =>
       conf.set(EVENT_LOG_COMPRESS, true)
-      conf.set(IO_COMPRESSION_CODEC, codec)
+      conf.set(EVENT_LOG_COMPRESSION_CODEC, codec)
     }
     conf.set(EVENT_LOG_STAGE_EXECUTOR_METRICS, true)
     conf
diff --git a/docs/configuration.md b/docs/configuration.md
index 67e0cef..211dfbb 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -880,7 +880,14 @@ Apart from these, the following properties are also 
available, and may be useful
   <td>false</td>
   <td>
     Whether to compress logged events, if <code>spark.eventLog.enabled</code> 
is true.
-    Compression will use <code>spark.io.compression.codec</code>.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.eventLog.compression.codec</code></td>
+  <td></td>
+  <td>
+    The codec to compress logged events. If this is not given,
+    <code>spark.io.compression.codec</code> will be used.
   </td>
 </tr>
 <tr>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to