Repository: spark Updated Branches: refs/heads/branch-1.3 91442fdfc -> 0a6310373
[SPARK-7660] Wrap SnappyOutputStream to work around snappy-java bug This patch wraps `SnappyOutputStream` to ensure that `close()` is idempotent and to guard against write-after-`close()` bugs. This is a workaround for https://github.com/xerial/snappy-java/issues/107, a bug where a non-idempotent `close()` method can lead to stream corruption. We can remove this workaround if we upgrade to a snappy-java version that contains my fix for this bug, but in the meantime this patch offers a backportable Spark fix. Author: Josh Rosen <joshro...@databricks.com> Closes #6176 from JoshRosen/SPARK-7660-wrap-snappy and squashes the following commits: 8b77aae [Josh Rosen] Wrap SnappyOutputStream to fix SPARK-7660 (cherry picked from commit f2cc6b5bccc3a70fd7d69183b1a068800831fe19) Signed-off-by: Josh Rosen <joshro...@databricks.com> Conflicts: core/src/main/scala/org/apache/spark/io/CompressionCodec.scala core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0a631037 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0a631037 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0a631037 Branch: refs/heads/branch-1.3 Commit: 0a63103739db029c1e871ed0f6c36aa43e326121 Parents: 91442fd Author: Josh Rosen <joshro...@databricks.com> Authored: Sun May 17 09:30:49 2015 -0700 Committer: Josh Rosen <joshro...@databricks.com> Committed: Sun May 17 09:38:31 2015 -0700 ---------------------------------------------------------------------- .../org/apache/spark/io/CompressionCodec.scala | 49 +++++++++++++++++++- 1 file changed, 47 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/0a631037/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index 0709b6d..2fcb53d 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -17,7 +17,7 @@ package org.apache.spark.io -import java.io.{InputStream, OutputStream} +import java.io.{IOException, InputStream, OutputStream} import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream} import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream} @@ -154,8 +154,53 @@ class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec { override def compressedOutputStream(s: OutputStream): OutputStream = { val blockSize = conf.getInt("spark.io.compression.snappy.block.size", 32768) - new SnappyOutputStream(s, blockSize) + new SnappyOutputStreamWrapper(new SnappyOutputStream(s, blockSize)) } override def compressedInputStream(s: InputStream): InputStream = new SnappyInputStream(s) } + +/** + * Wrapper over [[SnappyOutputStream]] which guards against write-after-close and double-close + * issues. See SPARK-7660 for more details. This wrapping can be removed if we upgrade to a version + * of snappy-java that contains the fix for https://github.com/xerial/snappy-java/issues/107. + */ +private final class SnappyOutputStreamWrapper(os: SnappyOutputStream) extends OutputStream { + + private[this] var closed: Boolean = false + + override def write(b: Int): Unit = { + if (closed) { + throw new IOException("Stream is closed") + } + os.write(b) + } + + override def write(b: Array[Byte]): Unit = { + if (closed) { + throw new IOException("Stream is closed") + } + os.write(b) + } + + override def write(b: Array[Byte], off: Int, len: Int): Unit = { + if (closed) { + throw new IOException("Stream is closed") + } + os.write(b, off, len) + } + + override def flush(): Unit = { + if (closed) { + throw new IOException("Stream is closed") + } + os.flush() + } + + override def close(): Unit = { + if (!closed) { + closed = true + os.close() + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org