Github user akonopko commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19431#discussion_r166605547
  
    --- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
 ---
    @@ -126,7 +129,10 @@ private[spark] class DirectKafkaInputDStream[K, V](
     
       protected[streaming] def maxMessagesPerPartition(
         offsets: Map[TopicPartition, Long]): Option[Map[TopicPartition, Long]] 
= {
    -    val estimatedRateLimit = rateController.map(_.getLatestRate())
    +    val estimatedRateLimit = rateController.map(x => {
    +      val lr = x.getLatestRate()
    +      if (lr > 0) lr else initialRate
    --- End diff --
    
    Latest rate means rate of previous batch. Is it possible that in alive 
system 0 events were processed? Only if there is no backlog and no new events 
came during last batch. Completely possible. 
    
    This happens during first ran. And this parameter should limit it during 
1st ran. Quote from docs:
    
    `This is the initial maximum receiving rate at which each receiver will 
receive data for the
        first batch when the backpressure mechanism is enabled.`
    
    If it happened during system run, for example there is no backlog and no 
new events came, we still need to limit system rate since with LatestRate = 0 
it results in no limit, causing danger of overflowing the system. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to