Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20620#discussion_r168911639
  
    --- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
 ---
    @@ -193,12 +193,15 @@ private[streaming] class ReceivedBlockTracker(
           getReceivedBlockQueue(receivedBlockInfo.streamId) += 
receivedBlockInfo
         }
     
    -    // Insert the recovered block-to-batch allocations and clear the queue 
of received blocks
    -    // (when the blocks were originally allocated to the batch, the queue 
must have been cleared).
    +    // Insert the recovered block-to-batch allocations and removes them 
from queue of
    +    // received blocks.
         def insertAllocatedBatch(batchTime: Time, allocatedBlocks: 
AllocatedBlocks) {
           logTrace(s"Recovery: Inserting allocated batch for time $batchTime 
to " +
             s"${allocatedBlocks.streamIdToAllocatedBlocks}")
    -      streamIdToUnallocatedBlockQueues.values.foreach { _.clear() }
    +      allocatedBlocks.streamIdToAllocatedBlocks.foreach {
    +        case (streamId, allocatedBlocks) =>
    --- End diff --
    
    nit: Can we use another name other than `allocatedBlocks` to avoid 
confusion?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to