Hi, Could someone explain the behavior of the spark.streaming.kafka.maxRatePerPartition parameter? The doc says "An important (configuration) is spark.streaming.kafka.maxRatePerPartition which is the maximum rate at which each Kafka partition will be read by (the) direct API."
What is the default behavior for this parameter? From some testing it appears that with it not being set, the RDD size tends to be quite low. With it set, we're seeing the consumer picking up items off the topic quite more actively, e.g. -Dspark.streaming.kafka.maxRatePerPartition=1000 in --driver-java-options. Does this parameter set the RDD size to a very low value? seems to be defaulting to 0... but what's the effect of that? protected val maxMessagesPerPartition: Option[Long] = { val ratePerSec = context.sparkContext.getConf.getInt( "spark.streaming.kafka.maxRatePerPartition", 0) if (ratePerSec > 0) { val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000 Some((secsPerBatch * ratePerSec).toLong) } else { None } } // limits the maximum number of messages per partition protected def clamp( leaderOffsets: Map[TopicAndPartition, LeaderOffset]): Map[TopicAndPartition, LeaderOffset] = { maxMessagesPerPartition.map { mmp => leaderOffsets.map { case (tp, lo) => tp -> lo.copy(offset = Math.min(currentOffsets(tp) + mmp, lo.offset)) } }.getOrElse(leaderOffsets) } what would we limit by default? And once Spark Streaming does pick up messages, would it be at the maximum value? does it ever fall below max even if there are max or more than max in the topic? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Behavior-of-the-spark-streaming-kafka-maxRatePerPartition-config-param-tp23117.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org