This is an automated email from the ASF dual-hosted git repository. joshrosen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 6b27ad5 [SPARK-28102][CORE] Avoid performance problems when lz4-java JNI libraries fail to initialize 6b27ad5 is described below commit 6b27ad5ea11297c39ac216054f061af334387a59 Author: Josh Rosen <rosenvi...@gmail.com> AuthorDate: Wed Jun 19 15:26:26 2019 -0700 [SPARK-28102][CORE] Avoid performance problems when lz4-java JNI libraries fail to initialize ## What changes were proposed in this pull request? This PR fixes a performance problem in environments where `lz4-java`'s native JNI libraries fail to initialize. Spark's uses `lz4-java` for LZ4 compression. Under the hood, the `LZ4BlockInputStream` and `LZ4BlockOutputStream` constructors call `LZ4Factory.fastestInstance()`, which attempts to load JNI libraries and falls back on Java implementations in case the JNI library cannot be loaded or initialized. If the LZ4 JNI libraries are present on the library load path (`Native.isLoaded()`) but cannot be initialized (e.g. due to breakage caused by shading) then an exception will be thrown and caught, triggering fallback to `fastestJavaInstance()` (a non-JNI implementation). Unfortunately, the LZ4 library does not cache the fact that the JNI library failed during initialization, so every call to `LZ4Factory.fastestInstance()` re-attempts (and fails) to initialize the native code. These initialization attempts are performed in a `static synchronized` method, so exceptions from failures are thrown while holding shared monitors and this causes monitor-contention performance issues. Here's an example stack trace showing the problem: ```java java.lang.Throwable.fillInStackTrace(Native Method) java.lang.Throwable.fillInStackTrace(Throwable.java:783) => holding Monitor(java.lang.NoClassDefFoundError441628568}) java.lang.Throwable.<init>(Throwable.java:265) java.lang.Error.<init>(Error.java:70) java.lang.LinkageError.<init>(LinkageError.java:55) java.lang.NoClassDefFoundError.<init>(NoClassDefFoundError.java:59) shaded.net.jpountz.lz4.LZ4JNICompressor.compress(LZ4JNICompressor.java:36) shaded.net.jpountz.lz4.LZ4Factory.<init>(LZ4Factory.java:200) shaded.net.jpountz.lz4.LZ4Factory.instance(LZ4Factory.java:51) shaded.net.jpountz.lz4.LZ4Factory.nativeInstance(LZ4Factory.java:84) => holding Monitor(java.lang.Class1475983836}) shaded.net.jpountz.lz4.LZ4Factory.fastestInstance(LZ4Factory.java:157) shaded.net.jpountz.lz4.LZ4BlockOutputStream.<init>(LZ4BlockOutputStream.java:135) org.apache.spark.io.LZ4CompressionCodec.compressedOutputStream(CompressionCodec.scala:122) org.apache.spark.serializer.SerializerManager.wrapForCompression(SerializerManager.scala:156) org.apache.spark.serializer.SerializerManager.wrapStream(SerializerManager.scala:131) org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:120) org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:249) org.apache.spark.shuffle.sort.ShuffleExternalSorter.writeSortedFile(ShuffleExternalSorter.java:211) [...] ``` To avoid this problem, this PR modifies Spark's `LZ4CompressionCodec` to call `fastestInstance()` itself and cache the result (which is safe because these factories [are thread-safe](https://github.com/lz4/lz4-java/issues/82)). ## How was this patch tested? Existing unit tests. Closes #24905 from JoshRosen/lz4-factory-flags. Lead-authored-by: Josh Rosen <rosenvi...@gmail.com> Co-authored-by: Josh Rosen <joshro...@stripe.com> Signed-off-by: Josh Rosen <rosenvi...@gmail.com> --- .../org/apache/spark/io/CompressionCodec.scala | 28 +++++++++++++++++++--- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index 065f05e..adbd59c 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -22,7 +22,8 @@ import java.util.Locale import com.github.luben.zstd.{ZstdInputStream, ZstdOutputStream} import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream} -import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream} +import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream, LZ4Factory} +import net.jpountz.xxhash.XXHashFactory import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream} import org.apache.spark.SparkConf @@ -118,14 +119,35 @@ private[spark] object CompressionCodec { @DeveloperApi class LZ4CompressionCodec(conf: SparkConf) extends CompressionCodec { + // SPARK-28102: if the LZ4 JNI libraries fail to initialize then `fastestInstance()` calls fall + // back to non-JNI implementations but do not remember the fact that JNI failed to load, so + // repeated calls to `fastestInstance()` will cause performance problems because the JNI load + // will be repeatedly re-attempted and that path is slow because it throws exceptions from a + // static synchronized method (causing lock contention). To avoid this problem, we cache the + // result of the `fastestInstance()` calls ourselves (both factories are thread-safe). + @transient private[this] lazy val lz4Factory: LZ4Factory = LZ4Factory.fastestInstance() + @transient private[this] lazy val xxHashFactory: XXHashFactory = XXHashFactory.fastestInstance() + + private[this] val defaultSeed: Int = 0x9747b28c // LZ4BlockOutputStream.DEFAULT_SEED + override def compressedOutputStream(s: OutputStream): OutputStream = { val blockSize = conf.get(IO_COMPRESSION_LZ4_BLOCKSIZE).toInt - new LZ4BlockOutputStream(s, blockSize) + val syncFlush = false + new LZ4BlockOutputStream( + s, + blockSize, + lz4Factory.fastCompressor(), + xxHashFactory.newStreamingHash32(defaultSeed).asChecksum, + syncFlush) } override def compressedInputStream(s: InputStream): InputStream = { val disableConcatenationOfByteStream = false - new LZ4BlockInputStream(s, disableConcatenationOfByteStream) + new LZ4BlockInputStream( + s, + lz4Factory.fastDecompressor(), + xxHashFactory.newStreamingHash32(defaultSeed).asChecksum, + disableConcatenationOfByteStream) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org