Repository: spark
Updated Branches:
  refs/heads/master 5e92583d3 -> ed2de0299


[SPARK-14719] WriteAheadLogBasedBlockHandler should ignore BlockManager put 
errors

WriteAheadLogBasedBlockHandler will currently throw exceptions if its 
BlockManager `put()` calls fail, even though those calls are only performed as 
a performance optimization. Instead, it should log and ignore exceptions during 
that `put()`.

This is a longstanding issue that was masked by an incorrect test case. I think 
that we haven't noticed this in production because

1. most people probably use a `MEMORY_AND_DISK` storage level, and
2. typically, individual blocks may be small enough relative to the total 
storage memory such that they're able to evict blocks from previous batches, so 
`put()` failures here may be rare in practice.

This patch fixes the faulty test and fixes the bug.

/cc tdas

Author: Josh Rosen <joshro...@databricks.com>

Closes #12484 from JoshRosen/received-block-hadndler-fix.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ed2de029
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ed2de029
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ed2de029

Branch: refs/heads/master
Commit: ed2de0299a5a54b566b91ae9f47b6626c484c1d3
Parents: 5e92583
Author: Josh Rosen <joshro...@databricks.com>
Authored: Mon Apr 18 19:36:40 2016 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Mon Apr 18 19:36:40 2016 -0700

----------------------------------------------------------------------
 .../receiver/ReceivedBlockHandler.scala         | 22 +++++----
 .../streaming/ReceivedBlockHandlerSuite.scala   | 49 +++++++++++---------
 2 files changed, 40 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ed2de029/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index 7aea1c9..f381fa4 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -20,6 +20,7 @@ package org.apache.spark.streaming.receiver
 import scala.concurrent.{Await, ExecutionContext, Future}
 import scala.concurrent.duration._
 import scala.language.{existentials, postfixOps}
+import scala.util.control.NonFatal
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
@@ -189,14 +190,19 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
 
     // Store the block in block manager
     val storeInBlockManagerFuture = Future {
-      val putSucceeded = blockManager.putBytes(
-        blockId,
-        serializedBlock,
-        effectiveStorageLevel,
-        tellMaster = true)
-      if (!putSucceeded) {
-        throw new SparkException(
-          s"Could not store $blockId to block manager with storage level 
$storageLevel")
+      try {
+        val putSucceeded = blockManager.putBytes(
+          blockId,
+          serializedBlock,
+          effectiveStorageLevel,
+          tellMaster = true)
+        if (!putSucceeded) {
+          logWarning(
+            s"Could not store $blockId to block manager with storage level 
$storageLevel")
+        }
+      } catch {
+        case NonFatal(t) =>
+          logError(s"Could not store $blockId to block manager with storage 
level $storageLevel", t)
       }
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ed2de029/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index 4be4882..ea87b0d 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -127,7 +127,17 @@ class ReceivedBlockHandlerSuite
 
   test("BlockManagerBasedBlockHandler - handle errors in storing block") {
     withBlockManagerBasedBlockHandler { handler =>
-      testErrorHandling(handler)
+      // Handle error in iterator (e.g. divide-by-zero error)
+      intercept[Exception] {
+        val iterator = (10 to (-10, -1)).toIterator.map { _ / 0 }
+        handler.storeBlock(StreamBlockId(1, 1), IteratorBlock(iterator))
+      }
+
+      // Handler error in block manager storing (e.g. too big block)
+      intercept[SparkException] {
+        val byteBuffer = ByteBuffer.wrap(new Array[Byte](blockManagerSize + 1))
+        handler.storeBlock(StreamBlockId(1, 1), ByteBufferBlock(byteBuffer))
+      }
     }
   }
 
@@ -167,7 +177,15 @@ class ReceivedBlockHandlerSuite
 
   test("WriteAheadLogBasedBlockHandler - handle errors in storing block") {
     withWriteAheadLogBasedBlockHandler { handler =>
-      testErrorHandling(handler)
+      // Handle error in iterator (e.g. divide-by-zero error)
+      intercept[Exception] {
+        val iterator = (10 to (-10, -1)).toIterator.map { _ / 0 }
+        handler.storeBlock(StreamBlockId(1, 1), IteratorBlock(iterator))
+      }
+
+      // Throws no errors when storing blocks that are too large to be cached
+      val byteBuffer = ByteBuffer.wrap(new Array[Byte](blockManagerSize + 1))
+      handler.storeBlock(StreamBlockId(1, 1), ByteBufferBlock(byteBuffer))
     }
   }
 
@@ -204,26 +222,26 @@ class ReceivedBlockHandlerSuite
     sparkConf.set("spark.storage.unrollFraction", "0.4")
     // Block Manager with 12000 * 0.4 = 4800 bytes of free space for unroll
     blockManager = createBlockManager(12000, sparkConf)
+    // This block is way too large to possibly be cached in memory:
+    def hugeBlock: IteratorBlock = IteratorBlock(List.fill(100)(new 
Array[Byte](1000)).iterator)
 
     // there is not enough space to store this block in MEMORY,
     // But BlockManager will be able to serialize this block to WAL
     // and hence count returns correct value.
-     testRecordcount(false, StorageLevel.MEMORY_ONLY,
-      IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), 
blockManager, Some(70))
+    testRecordcount(false, StorageLevel.MEMORY_ONLY, hugeBlock, blockManager, 
Some(100))
 
     // there is not enough space to store this block in MEMORY,
     // But BlockManager will be able to serialize this block to DISK
     // and hence count returns correct value.
-    testRecordcount(true, StorageLevel.MEMORY_AND_DISK,
-      IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), 
blockManager, Some(70))
+    testRecordcount(true, StorageLevel.MEMORY_AND_DISK, hugeBlock, 
blockManager, Some(100))
 
     // there is not enough space to store this block With MEMORY_ONLY 
StorageLevel.
     // BlockManager will not be able to unroll this block
     // and hence it will not tryToPut this block, resulting the SparkException
     storageLevel = StorageLevel.MEMORY_ONLY
     withBlockManagerBasedBlockHandler { handler =>
-      val thrown = intercept[SparkException] {
-        storeSingleBlock(handler, IteratorBlock((List.fill(70)(new 
Array[Byte](100))).iterator))
+      intercept[SparkException] {
+        storeSingleBlock(handler, hugeBlock)
       }
     }
   }
@@ -346,21 +364,6 @@ class ReceivedBlockHandlerSuite
     storeAndVerify(blocks.map { b => 
ByteBufferBlock(dataToByteBuffer(b).toByteBuffer) })
   }
 
-  /** Test error handling when blocks that cannot be stored */
-  private def testErrorHandling(receivedBlockHandler: ReceivedBlockHandler) {
-    // Handle error in iterator (e.g. divide-by-zero error)
-    intercept[Exception] {
-      val iterator = (10 to (-10, -1)).toIterator.map { _ / 0 }
-      receivedBlockHandler.storeBlock(StreamBlockId(1, 1), 
IteratorBlock(iterator))
-    }
-
-    // Handler error in block manager storing (e.g. too big block)
-    intercept[SparkException] {
-      val byteBuffer = ByteBuffer.wrap(new Array[Byte](blockManagerSize + 1))
-      receivedBlockHandler.storeBlock(StreamBlockId(1, 1), 
ByteBufferBlock(byteBuffer))
-    }
-  }
-
   /** Instantiate a BlockManagerBasedBlockHandler and run a code with it */
   private def withBlockManagerBasedBlockHandler(body: 
BlockManagerBasedBlockHandler => Unit) {
     body(new BlockManagerBasedBlockHandler(blockManager, storageLevel))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to