Thanks for the detailed explanation. Just tested it, worked like a charm. On Mon, Jun 20, 2016 at 1:02 PM, N B <nb.nos...@gmail.com> wrote:
> Its actually necessary to retire keys that become "Zero" or "Empty" so to > speak. In your case, the key is "imageURL" and values are a dictionary, one > of whose fields is "count" that you are maintaining. For simplicity and > illustration's sake I will assume imageURL to be a strings like "abc". Your > slide duration is 60 and window duration is 1800 seconds. > > Now consider the following chain of events in your stream. > > batch 1 : "abc" > batch 2 : "xyz" > batch 3 : "abc" > > and now for the rest of the stream, the keys "abc" or "xyz" never occur. > > At the end of the third batch, the generated window rdd has > { "abc" -> count = 2, "xyz" -> count = 1 }. > When the first batch falls off after 1800 seconds, it will become > { "abc -> count = 1, "xyz" -> count = 1 }. > 60 seconds later, it will become > { "abc" -> count = 1, "xyz" -> count = 0 } > and a further 60 seconds later, the 3rd batch is removed from the window > and the new window rdd becomes > { "abc" -> count = 0, "xyz" -> count = 0 }. > > I hope you can see what is wrong with this. These keys will be perpetually > held in memory even though there is no need for them to be there. This > growth in the size of the generated window rdd is what's giving rise to the > deteriorating processing time in your case. > > A filter function that's equivalent of "count != 0" will suffice to > remember only those keys that have not become "Zero". > > HTH, > NB > > > > On Thu, Jun 16, 2016 at 8:12 PM, Roshan Singh <singh.rosha...@gmail.com> > wrote: > >> Hi, >> According to the docs ( >> https://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#pyspark.streaming.DStream.reduceByKeyAndWindow), >> filerFunc can be used to retain expiring keys. I do not want to retain any >> expiring key, so I do not understand how can this help me stabilize it. >> Please correct me if this is not the case. >> >> I am also specifying both reduceFunc and invReduceFunc. Can you can a >> sample code of what you are using. >> >> Thanks. >> >> On Fri, Jun 17, 2016 at 3:43 AM, N B <nb.nos...@gmail.com> wrote: >> >>> We had this same issue with the reduceByKeyAndWindow API that you are >>> using. For fixing this issue, you have to use different flavor of that >>> API, specifically the 2 versions that allow you to give a 'Filter function' >>> to them. Putting in the filter functions helped stabilize our application >>> too. >>> >>> HTH >>> NB >>> >>> >>> On Sun, Jun 12, 2016 at 11:19 PM, Roshan Singh <singh.rosha...@gmail.com >>> > wrote: >>> >>>> Hi all, >>>> I have a python streaming job which is supposed to run 24x7. I am >>>> unable to stabilize it. The job just counts no of links shared in a 30 >>>> minute sliding window. I am using reduceByKeyAndWindow operation with a >>>> batch of 30 seconds, slide interval of 60 seconds. >>>> >>>> The kafka queue has a rate of nearly 2200 messages/second which can >>>> increase to 3000 but the mean is 2200. >>>> >>>> I have played around with batch size, slide interval, and by increasing >>>> parallelism with no fruitful result. These just delay the destabilization. >>>> >>>> GC time is usually between 60-100 ms. >>>> >>>> I also noticed that the jobs were not distributed to other nodes in the >>>> spark UI, for which I have used configured spark.locality.wait as 100ms. >>>> After which I have noticed that the job is getting distributed properly. >>>> >>>> I have a cluster of 6 slaves and one master each with 16 cores and 15gb >>>> of ram. >>>> >>>> Code and configuration: http://pastebin.com/93DMSiji >>>> >>>> Streaming screenshot: http://imgur.com/psNfjwJ >>>> >>>> I need help in debugging the issue. Any help will be appreciated. >>>> >>>> -- >>>> Roshan Singh >>>> >>>> >>> >> >> >> -- >> Roshan Singh >> http://roshansingh.in >> > > -- Roshan Singh http://roshansingh.in