Repository: spark
Updated Branches:
  refs/heads/master a40bca011 -> ad0badba1


[SPARK-7777][Streaming] Handle the case when there is no block in a batch

In the old implementation, if a batch has no block, 
`areWALRecordHandlesPresent` will be `true` and it will return 
`WriteAheadLogBackedBlockRDD`.

This PR handles this case by returning `WriteAheadLogBackedBlockRDD` or 
`BlockRDD` according to the configuration.

Author: zsxwing <zsxw...@gmail.com>

Closes #6372 from zsxwing/SPARK-7777 and squashes the following commits:

788f895 [zsxwing] Handle the case when there is no block in a batch


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ad0badba
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ad0badba
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ad0badba

Branch: refs/heads/master
Commit: ad0badba1450295982738934da2cc121cde18213
Parents: a40bca0
Author: zsxwing <zsxw...@gmail.com>
Authored: Sat May 23 02:11:17 2015 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Sat May 23 02:11:17 2015 -0700

----------------------------------------------------------------------
 .../dstream/ReceiverInputDStream.scala          | 47 ++++++++++++--------
 .../spark/streaming/InputStreamsSuite.scala     | 31 +++++++++++++
 2 files changed, 60 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ad0badba/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
index 5cfe43a..e4ff05e 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
@@ -73,27 +73,38 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient 
ssc_ : StreamingCont
         val inputInfo = InputInfo(id, blockInfos.map(_.numRecords).sum)
         ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
 
-        // Are WAL record handles present with all the blocks
-        val areWALRecordHandlesPresent = blockInfos.forall { 
_.walRecordHandleOption.nonEmpty }
+        if (blockInfos.nonEmpty) {
+          // Are WAL record handles present with all the blocks
+          val areWALRecordHandlesPresent = blockInfos.forall { 
_.walRecordHandleOption.nonEmpty }
 
-        if (areWALRecordHandlesPresent) {
-          // If all the blocks have WAL record handle, then create a 
WALBackedBlockRDD
-          val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
-          val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get 
}.toArray
-          new WriteAheadLogBackedBlockRDD[T](
-            ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)
-        } else {
-          // Else, create a BlockRDD. However, if there are some blocks with 
WAL info but not others
-          // then that is unexpected and log a warning accordingly.
-          if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) {
-            if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
-              logError("Some blocks do not have Write Ahead Log information; " 
+
-                "this is unexpected and data may not be recoverable after 
driver failures")
-            } else {
-              logWarning("Some blocks have Write Ahead Log information; this 
is unexpected")
+          if (areWALRecordHandlesPresent) {
+            // If all the blocks have WAL record handle, then create a 
WALBackedBlockRDD
+            val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
+            val walRecordHandles = blockInfos.map { 
_.walRecordHandleOption.get }.toArray
+            new WriteAheadLogBackedBlockRDD[T](
+              ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)
+          } else {
+            // Else, create a BlockRDD. However, if there are some blocks with 
WAL info but not
+            // others then that is unexpected and log a warning accordingly.
+            if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) {
+              if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
+                logError("Some blocks do not have Write Ahead Log information; 
" +
+                  "this is unexpected and data may not be recoverable after 
driver failures")
+              } else {
+                logWarning("Some blocks have Write Ahead Log information; this 
is unexpected")
+              }
             }
+            new BlockRDD[T](ssc.sc, blockIds)
+          }
+        } else {
+          // If no block is ready now, creating WriteAheadLogBackedBlockRDD or 
BlockRDD
+          // according to the configuration
+          if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
+            new WriteAheadLogBackedBlockRDD[T](
+              ssc.sparkContext, Array.empty, Array.empty, Array.empty)
+          } else {
+            new BlockRDD[T](ssc.sc, Array.empty)
           }
-          new BlockRDD[T](ssc.sc, blockIds)
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/ad0badba/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 93e6b0c..0122514 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -39,6 +39,7 @@ import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.scheduler.{StreamingListenerBatchCompleted, 
StreamingListener}
 import org.apache.spark.util.{ManualClock, Utils}
 import org.apache.spark.streaming.dstream.{InputDStream, ReceiverInputDStream}
+import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD
 import org.apache.spark.streaming.receiver.Receiver
 
 class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
@@ -105,6 +106,36 @@ class InputStreamsSuite extends TestSuiteBase with 
BeforeAndAfter {
     }
   }
 
+  test("socket input stream - no block in a batch") {
+    withTestServer(new TestServer()) { testServer =>
+      testServer.start()
+
+      withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
+        ssc.addStreamingListener(ssc.progressListener)
+
+        val batchCounter = new BatchCounter(ssc)
+        val networkStream = ssc.socketTextStream(
+          "localhost", testServer.port, StorageLevel.MEMORY_AND_DISK)
+        val outputBuffer = new ArrayBuffer[Seq[String]] with 
SynchronizedBuffer[Seq[String]]
+        val outputStream = new TestOutputStream(networkStream, outputBuffer)
+        outputStream.register()
+        ssc.start()
+
+        val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+        clock.advance(batchDuration.milliseconds)
+
+        // Make sure the first batch is finished
+        if (!batchCounter.waitUntilBatchesCompleted(1, 30000)) {
+          fail("Timeout: cannot finish all batches in 30 seconds")
+        }
+
+        networkStream.generatedRDDs.foreach { case (_, rdd) =>
+          assert(!rdd.isInstanceOf[WriteAheadLogBackedBlockRDD[_]])
+        }
+      }
+    }
+  }
+
   test("binary records stream") {
     val testDir: File = null
     try {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to