Github user akonopko commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19431#discussion_r166605578
  
    --- Diff: 
external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 ---
    @@ -91,9 +91,16 @@ class DirectKafkaInputDStream[
       private val maxRateLimitPerPartition: Long = 
context.sparkContext.getConf.getLong(
           "spark.streaming.kafka.maxRatePerPartition", 0)
     
    +  private val initialRate = context.sparkContext.getConf.getLong(
    +    "spark.streaming.backpressure.initialRate", 0)
    +
       protected[streaming] def maxMessagesPerPartition(
           offsets: Map[TopicAndPartition, Long]): 
Option[Map[TopicAndPartition, Long]] = {
    -    val estimatedRateLimit = rateController.map(_.getLatestRate())
    +
    +    val estimatedRateLimit = rateController.map(x => {
    --- End diff --
    
    Fixed


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to