The default of 0 means no limit.  Each batch will grab as much as is
available, ie a range of offsets spanning from the end of the previous
batch to the highest available offsets on the leader.

If you set spark.streaming.kafka.maxRatePerPartition > 0, the number you
set is the maximum number of messages per partition per second.

If you have a reproducible case that behaves differently, please share it.





On Tue, Jun 2, 2015 at 5:28 PM, dgoldenberg <dgoldenberg...@gmail.com>
wrote:

> Hi,
>
> Could someone explain the behavior of the
> spark.streaming.kafka.maxRatePerPartition parameter? The doc says "An
> important (configuration) is spark.streaming.kafka.maxRatePerPartition
> which
> is the maximum rate at which each Kafka partition will be read by (the)
> direct API."
>
> What is the default behavior for this parameter? From some testing it
> appears that with it not being set, the RDD size tends to be quite low.
> With
> it set, we're seeing the consumer picking up items off the topic quite more
> actively, e.g. -Dspark.streaming.kafka.maxRatePerPartition=1000 in
> --driver-java-options.
>
> Does this parameter set the RDD size to a very low value?
>
> seems to be defaulting to 0... but what's the effect of that?
>   protected val maxMessagesPerPartition: Option[Long] = {
>     val ratePerSec = context.sparkContext.getConf.getInt(
>       "spark.streaming.kafka.maxRatePerPartition", 0)
>     if (ratePerSec > 0) {
>       val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble
> /
> 1000
>       Some((secsPerBatch * ratePerSec).toLong)
>     } else {
>       None
>     }
>   }
>   // limits the maximum number of messages per partition
>   protected def clamp(
>     leaderOffsets: Map[TopicAndPartition, LeaderOffset]):
> Map[TopicAndPartition, LeaderOffset] = {
>     maxMessagesPerPartition.map { mmp =>
>       leaderOffsets.map { case (tp, lo) =>
>         tp -> lo.copy(offset = Math.min(currentOffsets(tp) + mmp,
> lo.offset))
>       }
>     }.getOrElse(leaderOffsets)
>   }
>
> what would we limit by default?  And once Spark Streaming does pick up
> messages, would it be at the maximum value? does it ever fall below max
> even
> if there are max or more than max in the topic? Thanks.
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Behavior-of-the-spark-streaming-kafka-maxRatePerPartition-config-param-tp23117.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to