Thanks for the detailed explanation. Just tested it, worked like a charm.

On Mon, Jun 20, 2016 at 1:02 PM, N B <nb.nos...@gmail.com> wrote:

> Its actually necessary to retire keys that become "Zero" or "Empty" so to
> speak. In your case, the key is "imageURL" and values are a dictionary, one
> of whose fields is "count" that you are maintaining. For simplicity and
> illustration's sake I will assume imageURL to be a strings like "abc". Your
> slide duration is 60 and window duration is 1800 seconds.
>
> Now consider the following chain of events in your stream.
>
> batch 1 : "abc"
> batch 2 : "xyz"
> batch 3 : "abc"
>
> and now for the rest of the stream, the keys "abc" or "xyz" never occur.
>
> At the end of the third batch, the generated window rdd has
> { "abc" -> count = 2, "xyz" -> count = 1 }.
> When the first batch falls off after 1800 seconds, it will become
> { "abc -> count = 1, "xyz" -> count = 1 }.
> 60 seconds later, it will become
> { "abc" -> count = 1, "xyz" -> count = 0 }
> and a further 60 seconds later, the 3rd batch is removed from the window
> and the new window rdd becomes
> { "abc" -> count = 0, "xyz" -> count = 0 }.
>
> I hope you can see what is wrong with this. These keys will be perpetually
> held in memory even though there is no need for them to be there. This
> growth in the size of the generated window rdd is what's giving rise to the
> deteriorating processing time in your case.
>
> A filter function that's equivalent of "count != 0" will suffice to
> remember only those keys that have not become "Zero".
>
> HTH,
> NB
>
>
>
> On Thu, Jun 16, 2016 at 8:12 PM, Roshan Singh <singh.rosha...@gmail.com>
> wrote:
>
>> Hi,
>> According to the docs (
>> https://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#pyspark.streaming.DStream.reduceByKeyAndWindow),
>> filerFunc can be used to retain expiring keys. I do not want to retain any
>> expiring key, so I do not understand how can this help me stabilize it.
>> Please correct me if this is not the case.
>>
>> I am also specifying both reduceFunc and invReduceFunc. Can you can a
>> sample code of what you are using.
>>
>> Thanks.
>>
>> On Fri, Jun 17, 2016 at 3:43 AM, N B <nb.nos...@gmail.com> wrote:
>>
>>> We had this same issue with the reduceByKeyAndWindow API that you are
>>> using. For fixing this issue, you have to use  different flavor of that
>>> API, specifically the 2 versions that allow you to give a 'Filter function'
>>> to them. Putting in the filter functions helped stabilize our application
>>> too.
>>>
>>> HTH
>>> NB
>>>
>>>
>>> On Sun, Jun 12, 2016 at 11:19 PM, Roshan Singh <singh.rosha...@gmail.com
>>> > wrote:
>>>
>>>> Hi all,
>>>> I have a python streaming job which is supposed to run 24x7. I am
>>>> unable to stabilize it. The job just counts no of links shared in a 30
>>>> minute sliding window. I am using reduceByKeyAndWindow operation with a
>>>> batch of 30 seconds, slide interval of 60 seconds.
>>>>
>>>> The kafka queue has a rate of nearly 2200 messages/second which can
>>>> increase to 3000 but the mean is 2200.
>>>>
>>>> I have played around with batch size, slide interval, and by increasing
>>>> parallelism with no fruitful result. These just delay the destabilization.
>>>>
>>>> GC time is usually between 60-100 ms.
>>>>
>>>> I also noticed that the jobs were not distributed to other nodes in the
>>>> spark UI, for which I have used configured spark.locality.wait as 100ms.
>>>> After which I have noticed that the job is getting distributed properly.
>>>>
>>>> I have a cluster of 6 slaves and one master each with 16 cores and 15gb
>>>> of ram.
>>>>
>>>> Code and configuration: http://pastebin.com/93DMSiji
>>>>
>>>> Streaming screenshot: http://imgur.com/psNfjwJ
>>>>
>>>> I need help in debugging the issue. Any help will be appreciated.
>>>>
>>>> --
>>>> Roshan Singh
>>>>
>>>>
>>>
>>
>>
>> --
>> Roshan Singh
>> http://roshansingh.in
>>
>
>


-- 
Roshan Singh
http://roshansingh.in

Reply via email to