So I see in the logs that PIDRateEstimator is choosing a new rate, and the rate it's choosing is 100.
That happens to be the default minimum of an (apparently undocumented) setting, spark.streaming.backpressure.pid.minRate Try setting that to 1 and see if there's different behavior. BTW, how many kafka partitions are you using, and how many actually have data for a given batch? On Thu, Oct 13, 2016 at 4:33 AM, Samy Dindane <s...@dindane.com> wrote: > Hey Cody, > > Thanks for the reply. Really helpful. > > Following your suggestion, I set spark.streaming.backpressure.enabled to > true and maxRatePerPartition to 100000. > I know I can handle 100k records at the same time, but definitely not in 1 > second (the batchDuration), so I expect the backpressure to lower that > number. > > Unfortunately the backpressure doesn't work and I keep getting 100k records > per batch. > > Here is my output log: > https://gist.github.com/Dinduks/d9fa67fc8a036d3cad8e859c508acdba > And this is my conf: > > conf.set("spark.streaming.kafka.consumer.poll.ms", "30000") > conf.set("spark.streaming.kafka.maxRatePerPartition", "100000") > conf.set("spark.streaming.backpressure.enabled", "true") > > That's not normal, is it? Do you notice anything odd in my logs? > > Thanks a lot. > > > > On 10/12/2016 07:31 PM, Cody Koeninger wrote: >> >> Cool, just wanted to make sure. >> >> To answer your question about >> >>> Isn't "spark.streaming.backpressure.initialRate" supposed to do this? >> >> >> that configuration was added well after the integration of the direct >> stream with the backpressure code, and was added only to the receiver >> code, which the direct stream doesn't share since it isn't a receiver. >> Not making excuses about it being confusing, just explaining how >> things ended up that way :( So yeah, maxRatePerPartition is the >> closest thing you have on the direct stream side to being able to >> limit before the backpressure estimator has something to work with. >> >> So to try and debug what you're seeing, if you add a line like this to >> your log4j.properties >> >> log4j.logger.org.apache.spark.streaming.scheduler.rate=TRACE >> >> you should start seeing log lines like >> >> 16/10/12 12:18:01 TRACE PIDRateEstimator: >> time = 1476292681092, # records = 20, processing time = 20949, >> scheduling delay = 6 >> 16/10/12 12:18:01 TRACE PIDRateEstimator: >> latestRate = -1.0, error = -1.9546995083297531 >> latestError = -1.0, historicalError = 0.001145639409995704 >> delaySinceUpdate = 1.476292681093E9, dError = -6.466871512381435E-10 >> >> and then once it updates, lines like >> >> 16/10/12 12:18:32 TRACE PIDRateEstimator: New rate = 1.0 >> >> For a really artificially constrained example where >> maxRatePerPartition is set such that it limits to 20 per batch but the >> system can really only handle 5 per batch, the streaming UI will look >> something like this: >> >> https://i.imgsafe.org/e730492453.png >> >> notice the cutover point >> >> >> On Wed, Oct 12, 2016 at 11:00 AM, Samy Dindane <s...@dindane.com> wrote: >>> >>> I am 100% sure. >>> >>> println(conf.get("spark.streaming.backpressure.enabled")) prints true. >>> >>> >>> On 10/12/2016 05:48 PM, Cody Koeninger wrote: >>>> >>>> >>>> Just to make 100% sure, did you set >>>> >>>> spark.streaming.backpressure.enabled >>>> >>>> to true? >>>> >>>> On Wed, Oct 12, 2016 at 10:09 AM, Samy Dindane <s...@dindane.com> wrote: >>>>> >>>>> >>>>> >>>>> >>>>> On 10/12/2016 04:40 PM, Cody Koeninger wrote: >>>>>> >>>>>> >>>>>> >>>>>> How would backpressure know anything about the capacity of your system >>>>>> on the very first batch? >>>>> >>>>> >>>>> >>>>> Isn't "spark.streaming.backpressure.initialRate" supposed to do this? >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> You should be able to set maxRatePerPartition at a value that makes >>>>>> sure your first batch doesn't blow things up, and let backpressure >>>>>> scale from there. >>>>> >>>>> >>>>> >>>>> Backpressure doesn't scale even when using maxRatePerPartition: when I >>>>> enable backpressure and set maxRatePerPartition to n, I always get n >>>>> records, even if my batch takes longer than batchDuration to finish. >>>>> >>>>> Example: >>>>> * I set batchDuration to 1 sec: `val ssc = new StreamingContext(conf, >>>>> Durations.seconds(1))` >>>>> * I set backpressure.initialRate and/or maxRatePerPartition to 100,000 >>>>> and >>>>> enable backpressure >>>>> * Since I can't handle 100,000 records in 1 second, I expect the >>>>> backpressure to kick in in the second batch, and get less than 100,000; >>>>> but >>>>> this does not happen >>>>> >>>>> What am I missing here? >>>>> >>>>> >>>>> >>>>>> >>>>>> On Wed, Oct 12, 2016 at 8:53 AM, Samy Dindane <s...@dindane.com> >>>>>> wrote: >>>>>>> >>>>>>> >>>>>>> >>>>>>> That's what I was looking for, thank you. >>>>>>> >>>>>>> Unfortunately, neither >>>>>>> >>>>>>> * spark.streaming.backpressure.initialRate >>>>>>> * spark.streaming.backpressure.enabled >>>>>>> * spark.streaming.receiver.maxRate >>>>>>> * spark.streaming.receiver.initialRate >>>>>>> >>>>>>> change how many records I get (I tried many different combinations). >>>>>>> >>>>>>> The only configuration that works is >>>>>>> "spark.streaming.kafka.maxRatePerPartition". >>>>>>> That's better than nothing, but I'd be useful to have backpressure >>>>>>> enabled >>>>>>> for automatic scaling. >>>>>>> >>>>>>> Do you have any idea about why aren't backpressure working? How to >>>>>>> debug >>>>>>> this? >>>>>>> >>>>>>> >>>>>>> On 10/11/2016 06:08 PM, Cody Koeninger wrote: >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> http://spark.apache.org/docs/latest/configuration.html >>>>>>>> >>>>>>>> "This rate is upper bounded by the values >>>>>>>> spark.streaming.receiver.maxRate and >>>>>>>> spark.streaming.kafka.maxRatePerPartition if they are set (see >>>>>>>> below)." >>>>>>>> >>>>>>>> On Tue, Oct 11, 2016 at 10:57 AM, Samy Dindane <s...@dindane.com> >>>>>>>> wrote: >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> Is it possible to limit the size of the batches returned by the >>>>>>>>> Kafka >>>>>>>>> consumer for Spark Streaming? >>>>>>>>> I am asking because the first batch I get has hundred of millions >>>>>>>>> of >>>>>>>>> records >>>>>>>>> and it takes ages to process and checkpoint them. >>>>>>>>> >>>>>>>>> Thank you. >>>>>>>>> >>>>>>>>> Samy >>>>>>>>> >>>>>>>>> >>>>>>>>> --------------------------------------------------------------------- >>>>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>>>>>>> >>>>>>> >>>>> >>> > --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org