[GitHub] spark pull request #21430: [SPARK-23991][DSTREAMS] Fix data loss when WAL wr...

2018-05-29 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21430


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21430: [SPARK-23991][DSTREAMS] Fix data loss when WAL wr...

2018-05-28 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/21430#discussion_r191130186
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
 ---
@@ -115,6 +117,50 @@ class ReceivedBlockTrackerSuite
 tracker2.stop()
   }
 
+  test("block allocation to batch should not loose blocks from received 
queue") {
+val tracker1 = createTracker(createSpyTracker = true)
+tracker1.isWriteAheadLogEnabled should be (true)
+tracker1.getUnallocatedBlocks(streamId) shouldEqual Seq.empty
+
+// Add blocks
+val blockInfos = generateBlockInfos()
+blockInfos.map(tracker1.addBlock)
+tracker1.getUnallocatedBlocks(streamId) shouldEqual blockInfos
+
+// Try to allocate the blocks to a batch and verify that it's failing
+// The blocks should stay in the received queue when WAL write failing
+doThrow(new RuntimeException("Not able to write BatchAllocationEvent"))
+  .when(tracker1).writeToLog(any(classOf[BatchAllocationEvent]))
+try {
--- End diff --

Changed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21430: [SPARK-23991][DSTREAMS] Fix data loss when WAL wr...

2018-05-28 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/21430#discussion_r191130212
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
 ---
@@ -308,12 +354,16 @@ class ReceivedBlockTrackerSuite
* want to control time by manually incrementing it to test log clean.
*/
   def createTracker(
+  createSpyTracker: Boolean = false,
   setCheckpointDir: Boolean = true,
   recoverFromWriteAheadLog: Boolean = false,
   clock: Clock = new SystemClock): ReceivedBlockTracker = {
 val cpDirOption = if (setCheckpointDir) 
Some(checkpointDirectory.toString) else None
-val tracker = new ReceivedBlockTracker(
+var tracker = new ReceivedBlockTracker(
   conf, hadoopConf, Seq(streamId), clock, recoverFromWriteAheadLog, 
cpDirOption)
+if (createSpyTracker) {
+  tracker = spy(tracker)
--- End diff --

Changed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21430: [SPARK-23991][DSTREAMS] Fix data loss when WAL wr...

2018-05-28 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/21430#discussion_r191130178
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
 ---
@@ -112,10 +112,13 @@ private[streaming] class ReceivedBlockTracker(
   def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
 if (lastAllocatedBatchTime == null || batchTime > 
lastAllocatedBatchTime) {
   val streamIdToBlocks = streamIds.map { streamId =>
-  (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
+(streamId, getReceivedBlockQueue(streamId).clone())
   }.toMap
   val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
   if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
+streamIds.foreach {
--- End diff --

Changed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21430: [SPARK-23991][DSTREAMS] Fix data loss when WAL wr...

2018-05-25 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21430#discussion_r190992559
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
 ---
@@ -112,10 +112,13 @@ private[streaming] class ReceivedBlockTracker(
   def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
 if (lastAllocatedBatchTime == null || batchTime > 
lastAllocatedBatchTime) {
   val streamIdToBlocks = streamIds.map { streamId =>
-  (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
+(streamId, getReceivedBlockQueue(streamId).clone())
   }.toMap
   val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
   if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
+streamIds.foreach {
--- End diff --

nit: use parentheses when using placeholders (`_`), braces otherwise.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21430: [SPARK-23991][DSTREAMS] Fix data loss when WAL wr...

2018-05-25 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21430#discussion_r190992915
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
 ---
@@ -115,6 +117,50 @@ class ReceivedBlockTrackerSuite
 tracker2.stop()
   }
 
+  test("block allocation to batch should not loose blocks from received 
queue") {
+val tracker1 = createTracker(createSpyTracker = true)
+tracker1.isWriteAheadLogEnabled should be (true)
+tracker1.getUnallocatedBlocks(streamId) shouldEqual Seq.empty
+
+// Add blocks
+val blockInfos = generateBlockInfos()
+blockInfos.map(tracker1.addBlock)
+tracker1.getUnallocatedBlocks(streamId) shouldEqual blockInfos
+
+// Try to allocate the blocks to a batch and verify that it's failing
+// The blocks should stay in the received queue when WAL write failing
+doThrow(new RuntimeException("Not able to write BatchAllocationEvent"))
+  .when(tracker1).writeToLog(any(classOf[BatchAllocationEvent]))
+try {
--- End diff --

`intercept[RuntimeException] { ... }`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21430: [SPARK-23991][DSTREAMS] Fix data loss when WAL wr...

2018-05-25 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21430#discussion_r190993327
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
 ---
@@ -308,12 +354,16 @@ class ReceivedBlockTrackerSuite
* want to control time by manually incrementing it to test log clean.
*/
   def createTracker(
+  createSpyTracker: Boolean = false,
   setCheckpointDir: Boolean = true,
   recoverFromWriteAheadLog: Boolean = false,
   clock: Clock = new SystemClock): ReceivedBlockTracker = {
 val cpDirOption = if (setCheckpointDir) 
Some(checkpointDirectory.toString) else None
-val tracker = new ReceivedBlockTracker(
+var tracker = new ReceivedBlockTracker(
   conf, hadoopConf, Seq(streamId), clock, recoverFromWriteAheadLog, 
cpDirOption)
+if (createSpyTracker) {
+  tracker = spy(tracker)
--- End diff --

Why not do this in the caller? Less code and cleaner.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21430: [SPARK-23991][DSTREAMS] Fix data loss when WAL wr...

2018-05-25 Thread gaborgsomogyi
GitHub user gaborgsomogyi opened a pull request:

https://github.com/apache/spark/pull/21430

[SPARK-23991][DSTREAMS] Fix data loss when WAL write fails in 
allocateBlocksToBatch

## What changes were proposed in this pull request?

When blocks tried to get allocated to a batch and WAL write fails then the 
blocks will be removed from the received block queue. This fact simply produces 
data loss because the next allocation will not find the mentioned blocks in the 
queue.

In this PR blocks will be removed from the received queue only if WAL write 
succeded.

## How was this patch tested?

Additional unit test.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/gaborgsomogyi/spark SPARK-23991

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21430.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21430


commit 2d35dfacd54d747e6a4167d46234d4b3ce87529b
Author: Gabor Somogyi 
Date:   2018-05-25T12:52:36Z

[SPARK-23991][DSTREAMS] Fix data loss when WAL write fails in 
allocateBlocksToBatch




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org