Repository: spark
Updated Branches:
  refs/heads/master d655d37dd -> 29cecd4a4


[SPARK-12388] change default compression to lz4

According the benchmark [1], LZ4-java could be 80% (or 30%) faster than Snappy.

After changing the compressor to LZ4, I saw 20% improvement on end-to-end time 
for a TPCDS query (Q4).

[1] https://github.com/ning/jvm-compressor-benchmark/wiki

cc rxin

Author: Davies Liu <dav...@databricks.com>

Closes #10342 from davies/lz4.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/29cecd4a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/29cecd4a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/29cecd4a

Branch: refs/heads/master
Commit: 29cecd4a42f6969613e5b2a40f2724f99e7eec01
Parents: d655d37
Author: Davies Liu <dav...@databricks.com>
Authored: Mon Dec 21 14:21:43 2015 -0800
Committer: Davies Liu <davies....@gmail.com>
Committed: Mon Dec 21 14:21:43 2015 -0800

----------------------------------------------------------------------
 .rat-excludes                                   |   1 +
 .../org/apache/spark/io/CompressionCodec.scala  |  12 +-
 .../apache/spark/io/LZ4BlockInputStream.java    | 263 +++++++++++++++++++
 .../apache/spark/io/CompressionCodecSuite.scala |   8 +-
 docs/configuration.md                           |   2 +-
 .../execution/ExchangeCoordinatorSuite.scala    |   4 +-
 6 files changed, 276 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/29cecd4a/.rat-excludes
----------------------------------------------------------------------
diff --git a/.rat-excludes b/.rat-excludes
index 7262c96..3544c0f 100644
--- a/.rat-excludes
+++ b/.rat-excludes
@@ -84,3 +84,4 @@ gen-java.*
 org.apache.spark.sql.sources.DataSourceRegister
 org.apache.spark.scheduler.SparkHistoryListenerFactory
 .*parquet
+LZ4BlockInputStream.java

http://git-wip-us.apache.org/repos/asf/spark/blob/29cecd4a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala 
b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index ca74eed..7178046 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -17,10 +17,10 @@
 
 package org.apache.spark.io
 
-import java.io.{IOException, InputStream, OutputStream}
+import java.io._
 
 import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
-import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
+import net.jpountz.lz4.LZ4BlockOutputStream
 import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream}
 
 import org.apache.spark.SparkConf
@@ -49,7 +49,8 @@ private[spark] object CompressionCodec {
   private val configKey = "spark.io.compression.codec"
 
   private[spark] def supportsConcatenationOfSerializedStreams(codec: 
CompressionCodec): Boolean = {
-    codec.isInstanceOf[SnappyCompressionCodec] || 
codec.isInstanceOf[LZFCompressionCodec]
+    (codec.isInstanceOf[SnappyCompressionCodec] || 
codec.isInstanceOf[LZFCompressionCodec]
+      || codec.isInstanceOf[LZ4CompressionCodec])
   }
 
   private val shortCompressionCodecNames = Map(
@@ -92,12 +93,11 @@ private[spark] object CompressionCodec {
     }
   }
 
-  val FALLBACK_COMPRESSION_CODEC = "lzf"
-  val DEFAULT_COMPRESSION_CODEC = "snappy"
+  val FALLBACK_COMPRESSION_CODEC = "snappy"
+  val DEFAULT_COMPRESSION_CODEC = "lz4"
   val ALL_COMPRESSION_CODECS = shortCompressionCodecNames.values.toSeq
 }
 
-
 /**
  * :: DeveloperApi ::
  * LZ4 implementation of [[org.apache.spark.io.CompressionCodec]].

http://git-wip-us.apache.org/repos/asf/spark/blob/29cecd4a/core/src/main/scala/org/apache/spark/io/LZ4BlockInputStream.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/io/LZ4BlockInputStream.java 
b/core/src/main/scala/org/apache/spark/io/LZ4BlockInputStream.java
new file mode 100644
index 0000000..27b6f0d
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/io/LZ4BlockInputStream.java
@@ -0,0 +1,263 @@
+package org.apache.spark.io;
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.EOFException;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.zip.Checksum;
+
+import net.jpountz.lz4.LZ4BlockOutputStream;
+import net.jpountz.lz4.LZ4Exception;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4FastDecompressor;
+import net.jpountz.util.SafeUtils;
+import net.jpountz.xxhash.StreamingXXHash32;
+import net.jpountz.xxhash.XXHash32;
+import net.jpountz.xxhash.XXHashFactory;
+
+/**
+ * {@link InputStream} implementation to decode data written with
+ * {@link LZ4BlockOutputStream}. This class is not thread-safe and does not
+ * support {@link #mark(int)}/{@link #reset()}.
+ * @see LZ4BlockOutputStream
+ *
+ * This is based on net.jpountz.lz4.LZ4BlockInputStream
+ *
+ * changes: 
https://github.com/davies/lz4-java/commit/cc1fa940ac57cc66a0b937300f805d37e2bf8411
+ *
+ * TODO: merge this into upstream
+ */
+public final class LZ4BlockInputStream extends FilterInputStream {
+
+  // Copied from net.jpountz.lz4.LZ4BlockOutputStream
+  static final byte[] MAGIC = new byte[] { 'L', 'Z', '4', 'B', 'l', 'o', 'c', 
'k' };
+  static final int MAGIC_LENGTH = MAGIC.length;
+
+  static final int HEADER_LENGTH =
+    MAGIC_LENGTH // magic bytes
+      + 1          // token
+      + 4          // compressed length
+      + 4          // decompressed length
+      + 4;         // checksum
+
+  static final int COMPRESSION_LEVEL_BASE = 10;
+
+  static final int COMPRESSION_METHOD_RAW = 0x10;
+  static final int COMPRESSION_METHOD_LZ4 = 0x20;
+
+  static final int DEFAULT_SEED = 0x9747b28c;
+
+  private final LZ4FastDecompressor decompressor;
+  private final Checksum checksum;
+  private byte[] buffer;
+  private byte[] compressedBuffer;
+  private int originalLen;
+  private int o;
+  private boolean finished;
+
+  /**
+   * Create a new {@link InputStream}.
+   *
+   * @param in            the {@link InputStream} to poll
+   * @param decompressor  the {@link LZ4FastDecompressor decompressor} 
instance to
+   *                      use
+   * @param checksum      the {@link Checksum} instance to use, must be
+   *                      equivalent to the instance which has been used to
+   *                      write the stream
+   */
+  public LZ4BlockInputStream(InputStream in, LZ4FastDecompressor decompressor, 
Checksum checksum) {
+    super(in);
+    this.decompressor = decompressor;
+    this.checksum = checksum;
+    this.buffer = new byte[0];
+    this.compressedBuffer = new byte[HEADER_LENGTH];
+    o = originalLen = 0;
+    finished = false;
+  }
+
+  /**
+   * Create a new instance using {@link XXHash32} for checksuming.
+   * @see #LZ4BlockInputStream(InputStream, LZ4FastDecompressor, Checksum)
+   * @see StreamingXXHash32#asChecksum()
+   */
+  public LZ4BlockInputStream(InputStream in, LZ4FastDecompressor decompressor) 
{
+    this(in, decompressor, 
XXHashFactory.fastestInstance().newStreamingHash32(DEFAULT_SEED).asChecksum());
+  }
+
+  /**
+   * Create a new instance which uses the fastest {@link LZ4FastDecompressor} 
available.
+   * @see LZ4Factory#fastestInstance()
+   * @see #LZ4BlockInputStream(InputStream, LZ4FastDecompressor)
+   */
+  public LZ4BlockInputStream(InputStream in) {
+    this(in, LZ4Factory.fastestInstance().fastDecompressor());
+  }
+
+  @Override
+  public int available() throws IOException {
+    refill();
+    return originalLen - o;
+  }
+
+  @Override
+  public int read() throws IOException {
+    refill();
+    if (finished) {
+      return -1;
+    }
+    return buffer[o++] & 0xFF;
+  }
+
+  @Override
+  public int read(byte[] b, int off, int len) throws IOException {
+    SafeUtils.checkRange(b, off, len);
+    refill();
+    if (finished) {
+      return -1;
+    }
+    len = Math.min(len, originalLen - o);
+    System.arraycopy(buffer, o, b, off, len);
+    o += len;
+    return len;
+  }
+
+  @Override
+  public int read(byte[] b) throws IOException {
+    return read(b, 0, b.length);
+  }
+
+  @Override
+  public long skip(long n) throws IOException {
+    refill();
+    if (finished) {
+      return -1;
+    }
+    final int skipped = (int) Math.min(n, originalLen - o);
+    o += skipped;
+    return skipped;
+  }
+
+  private void refill() throws IOException {
+    if (finished || o < originalLen) {
+      return;
+    }
+    try {
+      readFully(compressedBuffer, HEADER_LENGTH);
+    } catch (EOFException e) {
+      finished = true;
+      return;
+    }
+    for (int i = 0; i < MAGIC_LENGTH; ++i) {
+      if (compressedBuffer[i] != MAGIC[i]) {
+        throw new IOException("Stream is corrupted");
+      }
+    }
+    final int token = compressedBuffer[MAGIC_LENGTH] & 0xFF;
+    final int compressionMethod = token & 0xF0;
+    final int compressionLevel = COMPRESSION_LEVEL_BASE + (token & 0x0F);
+    if (compressionMethod != COMPRESSION_METHOD_RAW && compressionMethod != 
COMPRESSION_METHOD_LZ4)
+    {
+      throw new IOException("Stream is corrupted");
+    }
+    final int compressedLen = SafeUtils.readIntLE(compressedBuffer, 
MAGIC_LENGTH + 1);
+    originalLen = SafeUtils.readIntLE(compressedBuffer, MAGIC_LENGTH + 5);
+    final int check = SafeUtils.readIntLE(compressedBuffer, MAGIC_LENGTH + 9);
+    assert HEADER_LENGTH == MAGIC_LENGTH + 13;
+    if (originalLen > 1 << compressionLevel
+      || originalLen < 0
+      || compressedLen < 0
+      || (originalLen == 0 && compressedLen != 0)
+      || (originalLen != 0 && compressedLen == 0)
+      || (compressionMethod == COMPRESSION_METHOD_RAW && originalLen != 
compressedLen)) {
+      throw new IOException("Stream is corrupted");
+    }
+    if (originalLen == 0 && compressedLen == 0) {
+      if (check != 0) {
+        throw new IOException("Stream is corrupted");
+      }
+      refill();
+      return;
+    }
+    if (buffer.length < originalLen) {
+      buffer = new byte[Math.max(originalLen, buffer.length * 3 / 2)];
+    }
+    switch (compressionMethod) {
+      case COMPRESSION_METHOD_RAW:
+        readFully(buffer, originalLen);
+        break;
+      case COMPRESSION_METHOD_LZ4:
+        if (compressedBuffer.length < originalLen) {
+          compressedBuffer = new byte[Math.max(compressedLen, 
compressedBuffer.length * 3 / 2)];
+        }
+        readFully(compressedBuffer, compressedLen);
+        try {
+          final int compressedLen2 =
+            decompressor.decompress(compressedBuffer, 0, buffer, 0, 
originalLen);
+          if (compressedLen != compressedLen2) {
+            throw new IOException("Stream is corrupted");
+          }
+        } catch (LZ4Exception e) {
+          throw new IOException("Stream is corrupted", e);
+        }
+        break;
+      default:
+        throw new AssertionError();
+    }
+    checksum.reset();
+    checksum.update(buffer, 0, originalLen);
+    if ((int) checksum.getValue() != check) {
+      throw new IOException("Stream is corrupted");
+    }
+    o = 0;
+  }
+
+  private void readFully(byte[] b, int len) throws IOException {
+    int read = 0;
+    while (read < len) {
+      final int r = in.read(b, read, len - read);
+      if (r < 0) {
+        throw new EOFException("Stream ended prematurely");
+      }
+      read += r;
+    }
+    assert len == read;
+  }
+
+  @Override
+  public boolean markSupported() {
+    return false;
+  }
+
+  @SuppressWarnings("sync-override")
+  @Override
+  public void mark(int readlimit) {
+    // unsupported
+  }
+
+  @SuppressWarnings("sync-override")
+  @Override
+  public void reset() throws IOException {
+    throw new IOException("mark/reset not supported");
+  }
+
+  @Override
+  public String toString() {
+    return getClass().getSimpleName() + "(in=" + in
+      + ", decompressor=" + decompressor + ", checksum=" + checksum + ")";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/29cecd4a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala 
b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
index 1553ab6..9e9c2b0 100644
--- a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
+++ b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
@@ -46,7 +46,7 @@ class CompressionCodecSuite extends SparkFunSuite {
 
   test("default compression codec") {
     val codec = CompressionCodec.createCodec(conf)
-    assert(codec.getClass === classOf[SnappyCompressionCodec])
+    assert(codec.getClass === classOf[LZ4CompressionCodec])
     testCodec(codec)
   }
 
@@ -62,12 +62,10 @@ class CompressionCodecSuite extends SparkFunSuite {
     testCodec(codec)
   }
 
-  test("lz4 does not support concatenation of serialized streams") {
+  test("lz4 supports concatenation of serialized streams") {
     val codec = CompressionCodec.createCodec(conf, 
classOf[LZ4CompressionCodec].getName)
     assert(codec.getClass === classOf[LZ4CompressionCodec])
-    intercept[Exception] {
-      testConcatenationOfSerializedStreams(codec)
-    }
+    testConcatenationOfSerializedStreams(codec)
   }
 
   test("lzf compression codec") {

http://git-wip-us.apache.org/repos/asf/spark/blob/29cecd4a/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 85e7d12..a9ef37a 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -595,7 +595,7 @@ Apart from these, the following properties are also 
available, and may be useful
 </tr>
 <tr>
   <td><code>spark.io.compression.codec</code></td>
-  <td>snappy</td>
+  <td>lz4</td>
   <td>
     The codec used to compress internal data such as RDD partitions, broadcast 
variables and
     shuffle outputs. By default, Spark provides three codecs: 
<code>lz4</code>, <code>lzf</code>,

http://git-wip-us.apache.org/repos/asf/spark/blob/29cecd4a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala
index 101cf50..2715179 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala
@@ -319,7 +319,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with 
BeforeAndAfterAll {
         }
       }
 
-      withSQLContext(test, 1536, minNumPostShufflePartitions)
+      withSQLContext(test, 2000, minNumPostShufflePartitions)
     }
 
     test(s"determining the number of reducers: join operator$testNameNote") {
@@ -422,7 +422,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with 
BeforeAndAfterAll {
         }
       }
 
-      withSQLContext(test, 6144, minNumPostShufflePartitions)
+      withSQLContext(test, 6644, minNumPostShufflePartitions)
     }
 
     test(s"determining the number of reducers: complex query 2$testNameNote") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to