So I see in the logs that PIDRateEstimator is choosing a new rate, and
the rate it's choosing is 100.

That happens to be the default minimum of an (apparently undocumented) setting,

spark.streaming.backpressure.pid.minRate

Try setting that to 1 and see if there's different behavior.

BTW, how many kafka partitions are you using, and how many actually
have data for a given batch?


On Thu, Oct 13, 2016 at 4:33 AM, Samy Dindane <s...@dindane.com> wrote:
> Hey Cody,
>
> Thanks for the reply. Really helpful.
>
> Following your suggestion, I set spark.streaming.backpressure.enabled to
> true and maxRatePerPartition to 100000.
> I know I can handle 100k records at the same time, but definitely not in 1
> second (the batchDuration), so I expect the backpressure to lower that
> number.
>
> Unfortunately the backpressure doesn't work and I keep getting 100k records
> per batch.
>
> Here is my output log:
> https://gist.github.com/Dinduks/d9fa67fc8a036d3cad8e859c508acdba
> And this is my conf:
>
>     conf.set("spark.streaming.kafka.consumer.poll.ms", "30000")
>     conf.set("spark.streaming.kafka.maxRatePerPartition", "100000")
>     conf.set("spark.streaming.backpressure.enabled", "true")
>
> That's not normal, is it? Do you notice anything odd in my logs?
>
> Thanks a lot.
>
>
>
> On 10/12/2016 07:31 PM, Cody Koeninger wrote:
>>
>> Cool, just wanted to make sure.
>>
>> To answer your question about
>>
>>> Isn't "spark.streaming.backpressure.initialRate" supposed to do this?
>>
>>
>> that configuration was added well after the integration of the direct
>> stream with the backpressure code, and was added only to the receiver
>> code, which the direct stream doesn't share since it isn't a receiver.
>> Not making excuses about it being confusing, just explaining how
>> things ended up that way :(  So yeah, maxRatePerPartition is the
>> closest thing you have on the direct stream side to being able to
>> limit before the backpressure estimator has something to work with.
>>
>> So to try and debug what you're seeing, if you add a line like this to
>> your log4j.properties
>>
>> log4j.logger.org.apache.spark.streaming.scheduler.rate=TRACE
>>
>> you should start seeing log lines like
>>
>> 16/10/12 12:18:01 TRACE PIDRateEstimator:
>> time = 1476292681092, # records = 20, processing time = 20949,
>> scheduling delay = 6
>> 16/10/12 12:18:01 TRACE PIDRateEstimator:
>> latestRate = -1.0, error = -1.9546995083297531
>> latestError = -1.0, historicalError = 0.001145639409995704
>> delaySinceUpdate = 1.476292681093E9, dError = -6.466871512381435E-10
>>
>> and then once it updates, lines like
>>
>> 16/10/12 12:18:32 TRACE PIDRateEstimator: New rate = 1.0
>>
>> For a really artificially constrained example where
>> maxRatePerPartition is set such that it limits to 20 per batch but the
>> system can really only handle 5 per batch, the streaming UI will look
>> something like this:
>>
>> https://i.imgsafe.org/e730492453.png
>>
>> notice the cutover point
>>
>>
>> On Wed, Oct 12, 2016 at 11:00 AM, Samy Dindane <s...@dindane.com> wrote:
>>>
>>> I am 100% sure.
>>>
>>> println(conf.get("spark.streaming.backpressure.enabled")) prints true.
>>>
>>>
>>> On 10/12/2016 05:48 PM, Cody Koeninger wrote:
>>>>
>>>>
>>>> Just to make 100% sure, did you set
>>>>
>>>> spark.streaming.backpressure.enabled
>>>>
>>>> to true?
>>>>
>>>> On Wed, Oct 12, 2016 at 10:09 AM, Samy Dindane <s...@dindane.com> wrote:
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On 10/12/2016 04:40 PM, Cody Koeninger wrote:
>>>>>>
>>>>>>
>>>>>>
>>>>>> How would backpressure know anything about the capacity of your system
>>>>>> on the very first batch?
>>>>>
>>>>>
>>>>>
>>>>> Isn't "spark.streaming.backpressure.initialRate" supposed to do this?
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> You should be able to set maxRatePerPartition at a value that makes
>>>>>> sure your first batch doesn't blow things up, and let backpressure
>>>>>> scale from there.
>>>>>
>>>>>
>>>>>
>>>>> Backpressure doesn't scale even when using maxRatePerPartition: when I
>>>>> enable backpressure and set maxRatePerPartition to n, I always get n
>>>>> records, even if my batch takes longer than batchDuration to finish.
>>>>>
>>>>> Example:
>>>>> * I set batchDuration to 1 sec: `val ssc = new StreamingContext(conf,
>>>>> Durations.seconds(1))`
>>>>> * I set backpressure.initialRate and/or maxRatePerPartition to 100,000
>>>>> and
>>>>> enable backpressure
>>>>> * Since I can't handle 100,000 records in 1 second, I expect the
>>>>> backpressure to kick in in the second batch, and get less than 100,000;
>>>>> but
>>>>> this does not happen
>>>>>
>>>>> What am I missing here?
>>>>>
>>>>>
>>>>>
>>>>>>
>>>>>> On Wed, Oct 12, 2016 at 8:53 AM, Samy Dindane <s...@dindane.com>
>>>>>> wrote:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> That's what I was looking for, thank you.
>>>>>>>
>>>>>>> Unfortunately, neither
>>>>>>>
>>>>>>> * spark.streaming.backpressure.initialRate
>>>>>>> * spark.streaming.backpressure.enabled
>>>>>>> * spark.streaming.receiver.maxRate
>>>>>>> * spark.streaming.receiver.initialRate
>>>>>>>
>>>>>>> change how many records I get (I tried many different combinations).
>>>>>>>
>>>>>>> The only configuration that works is
>>>>>>> "spark.streaming.kafka.maxRatePerPartition".
>>>>>>> That's better than nothing, but I'd be useful to have backpressure
>>>>>>> enabled
>>>>>>> for automatic scaling.
>>>>>>>
>>>>>>> Do you have any idea about why aren't backpressure working? How to
>>>>>>> debug
>>>>>>> this?
>>>>>>>
>>>>>>>
>>>>>>> On 10/11/2016 06:08 PM, Cody Koeninger wrote:
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> http://spark.apache.org/docs/latest/configuration.html
>>>>>>>>
>>>>>>>> "This rate is upper bounded by the values
>>>>>>>> spark.streaming.receiver.maxRate and
>>>>>>>> spark.streaming.kafka.maxRatePerPartition if they are set (see
>>>>>>>> below)."
>>>>>>>>
>>>>>>>> On Tue, Oct 11, 2016 at 10:57 AM, Samy Dindane <s...@dindane.com>
>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> Is it possible to limit the size of the batches returned by the
>>>>>>>>> Kafka
>>>>>>>>> consumer for Spark Streaming?
>>>>>>>>> I am asking because the first batch I get has hundred of millions
>>>>>>>>> of
>>>>>>>>> records
>>>>>>>>> and it takes ages to process and checkpoint them.
>>>>>>>>>
>>>>>>>>> Thank you.
>>>>>>>>>
>>>>>>>>> Samy
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>>>>>>
>>>>>>>
>>>>>
>>>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to