Github user gaborgsomogyi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21430#discussion_r191130186
  
    --- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
 ---
    @@ -115,6 +117,50 @@ class ReceivedBlockTrackerSuite
         tracker2.stop()
       }
     
    +  test("block allocation to batch should not loose blocks from received 
queue") {
    +    val tracker1 = createTracker(createSpyTracker = true)
    +    tracker1.isWriteAheadLogEnabled should be (true)
    +    tracker1.getUnallocatedBlocks(streamId) shouldEqual Seq.empty
    +
    +    // Add blocks
    +    val blockInfos = generateBlockInfos()
    +    blockInfos.map(tracker1.addBlock)
    +    tracker1.getUnallocatedBlocks(streamId) shouldEqual blockInfos
    +
    +    // Try to allocate the blocks to a batch and verify that it's failing
    +    // The blocks should stay in the received queue when WAL write failing
    +    doThrow(new RuntimeException("Not able to write BatchAllocationEvent"))
    +      .when(tracker1).writeToLog(any(classOf[BatchAllocationEvent]))
    +    try {
    --- End diff --
    
    Changed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to