Repository: spark
Updated Branches:
  refs/heads/branch-1.2 5505a0d07 -> 6c41e1cb9


[SPARK-7660] Wrap SnappyOutputStream to work around snappy-java bug

This patch wraps `SnappyOutputStream` to ensure that `close()` is idempotent 
and to guard against write-after-`close()` bugs. This is a workaround for 
https://github.com/xerial/snappy-java/issues/107, a bug where a non-idempotent 
`close()` method can lead to stream corruption. We can remove this workaround 
if we upgrade to a snappy-java version that contains my fix for this bug, but 
in the meantime this patch offers a backportable Spark fix.

Author: Josh Rosen <joshro...@databricks.com>

Closes #6176 from JoshRosen/SPARK-7660-wrap-snappy and squashes the following 
commits:

8b77aae [Josh Rosen] Wrap SnappyOutputStream to fix SPARK-7660

(cherry picked from commit f2cc6b5bccc3a70fd7d69183b1a068800831fe19)
Signed-off-by: Josh Rosen <joshro...@databricks.com>

Conflicts:
        core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
        
core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6c41e1cb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6c41e1cb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6c41e1cb

Branch: refs/heads/branch-1.2
Commit: 6c41e1cb9c4913d9d539cf7a5b4fe6cb2c075032
Parents: 5505a0d
Author: Josh Rosen <joshro...@databricks.com>
Authored: Sun May 17 09:30:49 2015 -0700
Committer: Josh Rosen <joshro...@databricks.com>
Committed: Sun May 17 09:41:08 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/io/CompressionCodec.scala  | 49 +++++++++++++++++++-
 1 file changed, 47 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6c41e1cb/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala 
b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index 1ac7f4e..2343e69 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.io
 
-import java.io.{InputStream, OutputStream}
+import java.io.{IOException, InputStream, OutputStream}
 
 import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
 import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
@@ -122,8 +122,53 @@ class SnappyCompressionCodec(conf: SparkConf) extends 
CompressionCodec {
 
   override def compressedOutputStream(s: OutputStream): OutputStream = {
     val blockSize = conf.getInt("spark.io.compression.snappy.block.size", 
32768)
-    new SnappyOutputStream(s, blockSize)
+    new SnappyOutputStreamWrapper(new SnappyOutputStream(s, blockSize))
   }
 
   override def compressedInputStream(s: InputStream): InputStream = new 
SnappyInputStream(s)
 }
+
+/**
+ * Wrapper over [[SnappyOutputStream]] which guards against write-after-close 
and double-close
+ * issues. See SPARK-7660 for more details. This wrapping can be removed if we 
upgrade to a version
+ * of snappy-java that contains the fix for 
https://github.com/xerial/snappy-java/issues/107.
+ */
+private final class SnappyOutputStreamWrapper(os: SnappyOutputStream) extends 
OutputStream {
+
+  private[this] var closed: Boolean = false
+
+  override def write(b: Int): Unit = {
+    if (closed) {
+      throw new IOException("Stream is closed")
+    }
+    os.write(b)
+  }
+
+  override def write(b: Array[Byte]): Unit = {
+    if (closed) {
+      throw new IOException("Stream is closed")
+    }
+    os.write(b)
+  }
+
+  override def write(b: Array[Byte], off: Int, len: Int): Unit = {
+    if (closed) {
+      throw new IOException("Stream is closed")
+    }
+    os.write(b, off, len)
+  }
+
+  override def flush(): Unit = {
+    if (closed) {
+      throw new IOException("Stream is closed")
+    }
+    os.flush()
+  }
+
+  override def close(): Unit = {
+    if (!closed) {
+      closed = true
+      os.close()
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to