Github user koeninger commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22223#discussion_r213825892
  
    --- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
 ---
    @@ -154,7 +153,8 @@ private[spark] class DirectKafkaInputDStream[K, V](
         if (effectiveRateLimitPerPartition.values.sum > 0) {
           val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble 
/ 1000
           Some(effectiveRateLimitPerPartition.map {
    -        case (tp, limit) => tp -> Math.max((secsPerBatch * limit).toLong, 
1L)
    +        case (tp, limit) => tp -> Math.max((secsPerBatch * limit).toLong,
    +          Math.max(ppc.minRatePerPartition(tp), 1L))
    --- End diff --
    
    Is the second Math.max actually necessary?
    The default implementation of minRatePerPartition will be 1 anyway.
    If someone makes a custom implementation that e.g. returns zero, should 
they get what they asked for?.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to