This is an automated email from the ASF dual-hosted git repository.

joshrosen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b27ad5  [SPARK-28102][CORE] Avoid performance problems when lz4-java 
JNI libraries fail to initialize
6b27ad5 is described below

commit 6b27ad5ea11297c39ac216054f061af334387a59
Author: Josh Rosen <rosenvi...@gmail.com>
AuthorDate: Wed Jun 19 15:26:26 2019 -0700

    [SPARK-28102][CORE] Avoid performance problems when lz4-java JNI libraries 
fail to initialize
    
    ## What changes were proposed in this pull request?
    
    This PR fixes a performance problem in environments where `lz4-java`'s 
native JNI libraries fail to initialize.
    
    Spark's uses `lz4-java` for LZ4 compression. Under the hood, the 
`LZ4BlockInputStream` and `LZ4BlockOutputStream` constructors call 
`LZ4Factory.fastestInstance()`, which attempts to load JNI libraries and falls 
back on Java implementations in case the JNI library cannot be loaded or 
initialized.
    
    If the LZ4 JNI libraries are present on the library load path 
(`Native.isLoaded()`) but cannot be initialized (e.g. due to breakage caused by 
shading) then an exception will be thrown and caught, triggering fallback to 
`fastestJavaInstance()` (a non-JNI implementation).
    
    Unfortunately, the LZ4 library does not cache the fact that the JNI library 
failed during initialization, so every call to `LZ4Factory.fastestInstance()` 
re-attempts (and fails) to initialize the native code. These initialization 
attempts are performed in a `static synchronized` method, so exceptions from 
failures are thrown while holding shared monitors and this causes 
monitor-contention performance issues. Here's an example stack trace showing 
the problem:
    
    ```java
    
    java.lang.Throwable.fillInStackTrace(Native Method)
    java.lang.Throwable.fillInStackTrace(Throwable.java:783) => holding 
Monitor(java.lang.NoClassDefFoundError441628568})
    java.lang.Throwable.<init>(Throwable.java:265)
    java.lang.Error.<init>(Error.java:70)
    java.lang.LinkageError.<init>(LinkageError.java:55)
    java.lang.NoClassDefFoundError.<init>(NoClassDefFoundError.java:59)
    shaded.net.jpountz.lz4.LZ4JNICompressor.compress(LZ4JNICompressor.java:36)
    shaded.net.jpountz.lz4.LZ4Factory.<init>(LZ4Factory.java:200)
    shaded.net.jpountz.lz4.LZ4Factory.instance(LZ4Factory.java:51)
    shaded.net.jpountz.lz4.LZ4Factory.nativeInstance(LZ4Factory.java:84) => 
holding Monitor(java.lang.Class1475983836})
    shaded.net.jpountz.lz4.LZ4Factory.fastestInstance(LZ4Factory.java:157)
    
shaded.net.jpountz.lz4.LZ4BlockOutputStream.<init>(LZ4BlockOutputStream.java:135)
    
org.apache.spark.io.LZ4CompressionCodec.compressedOutputStream(CompressionCodec.scala:122)
    
org.apache.spark.serializer.SerializerManager.wrapForCompression(SerializerManager.scala:156)
    
org.apache.spark.serializer.SerializerManager.wrapStream(SerializerManager.scala:131)
    
org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:120)
    
org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:249)
    
org.apache.spark.shuffle.sort.ShuffleExternalSorter.writeSortedFile(ShuffleExternalSorter.java:211)
    [...]
    ```
    
    To avoid this problem, this PR modifies Spark's `LZ4CompressionCodec` to 
call `fastestInstance()` itself and cache the result (which is safe because 
these factories [are thread-safe](https://github.com/lz4/lz4-java/issues/82)).
    
    ## How was this patch tested?
    
    Existing unit tests.
    
    Closes #24905 from JoshRosen/lz4-factory-flags.
    
    Lead-authored-by: Josh Rosen <rosenvi...@gmail.com>
    Co-authored-by: Josh Rosen <joshro...@stripe.com>
    Signed-off-by: Josh Rosen <rosenvi...@gmail.com>
---
 .../org/apache/spark/io/CompressionCodec.scala     | 28 +++++++++++++++++++---
 1 file changed, 25 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala 
b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index 065f05e..adbd59c 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -22,7 +22,8 @@ import java.util.Locale
 
 import com.github.luben.zstd.{ZstdInputStream, ZstdOutputStream}
 import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
-import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
+import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream, LZ4Factory}
+import net.jpountz.xxhash.XXHashFactory
 import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream}
 
 import org.apache.spark.SparkConf
@@ -118,14 +119,35 @@ private[spark] object CompressionCodec {
 @DeveloperApi
 class LZ4CompressionCodec(conf: SparkConf) extends CompressionCodec {
 
+  // SPARK-28102: if the LZ4 JNI libraries fail to initialize then 
`fastestInstance()` calls fall
+  // back to non-JNI implementations but do not remember the fact that JNI 
failed to load, so
+  // repeated calls to `fastestInstance()` will cause performance problems 
because the JNI load
+  // will be repeatedly re-attempted and that path is slow because it throws 
exceptions from a
+  // static synchronized method (causing lock contention). To avoid this 
problem, we cache the
+  // result of the `fastestInstance()` calls ourselves (both factories are 
thread-safe).
+  @transient private[this] lazy val lz4Factory: LZ4Factory = 
LZ4Factory.fastestInstance()
+  @transient private[this] lazy val xxHashFactory: XXHashFactory = 
XXHashFactory.fastestInstance()
+
+  private[this] val defaultSeed: Int = 0x9747b28c // 
LZ4BlockOutputStream.DEFAULT_SEED
+
   override def compressedOutputStream(s: OutputStream): OutputStream = {
     val blockSize = conf.get(IO_COMPRESSION_LZ4_BLOCKSIZE).toInt
-    new LZ4BlockOutputStream(s, blockSize)
+    val syncFlush = false
+    new LZ4BlockOutputStream(
+      s,
+      blockSize,
+      lz4Factory.fastCompressor(),
+      xxHashFactory.newStreamingHash32(defaultSeed).asChecksum,
+      syncFlush)
   }
 
   override def compressedInputStream(s: InputStream): InputStream = {
     val disableConcatenationOfByteStream = false
-    new LZ4BlockInputStream(s, disableConcatenationOfByteStream)
+    new LZ4BlockInputStream(
+      s,
+      lz4Factory.fastDecompressor(),
+      xxHashFactory.newStreamingHash32(defaultSeed).asChecksum,
+      disableConcatenationOfByteStream)
   }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to