Hello Yakubovich, I have been looking into a similar problem. @Lars please note that he wants to maintain the top N products over a sliding window, whereas the CountMinSketh algorithm is useful if we want to maintain global top N products list. Please correct me if I am wrong here.
I tried using CountMinSketch and realized that it doesn't suit my use case as I also wanted to maintain top N over last H hours. CountMinSketch has no notion of time, so in my understanding you cannot use that. Yakubovich, you can try doing something like this: val stream = <DStream from your source> // I am assuming that each entry is a comma separated list of product ids // and product ID is a string (doesn't really matter though) stream .flatMap(record => record.split(",")) .map(pid => (pid, 1L)) .reduceByKeyAndWindow(_ + _, _ - _, Seconds(S1), Seconds(S2)) .foreachRDD(rdd => { // `rdd` here is of type (pid, count) and has frequency of each PID over // a sliding window of S1 seconds which moves by S2 seconds every time. implicit val order = new scala.Ordering[(String, Long)] { override def compare(a1: (String, Long), a2: (String, Long)): Boolean = a1._2 > a2._2 } val topNPidTuples = rdd.top(N) // do whatever you want here. }) -- Thanks Jatin On Tue, Mar 22, 2016 at 12:11 PM, Rishi Mishra <rmis...@snappydata.io> wrote: > Hi Alexy, > We are also trying to solve similar problems using approximation. Would > like to hear more about your usage. We can discuss this offline without > boring others. :) > > Regards, > Rishitesh Mishra, > SnappyData . (http://www.snappydata.io/) > > https://in.linkedin.com/in/rishiteshmishra > > On Tue, Mar 22, 2016 at 1:19 AM, Lars Albertsson <la...@mapflat.com> > wrote: > >> Hi, >> >> If you can accept approximate top N results, there is a neat solution >> for this problem: Use an approximate Map<K, V> structure called >> Count-Min Sketch, in combination with a list of the M top items, where >> M > N. When you encounter an item not in the top M, you look up its >> count in the Count-Min Sketch do determine whether it qualifies. >> >> You will need to break down your event stream into time windows with a >> certain time unit, e.g. minutes or hours, and keep one Count-Min >> Sketch for each unit. The CMSs can be added, so you aggregate them to >> form your sliding windows. You also keep a top M (aka "heavy hitters") >> list for each window. >> >> The data structures required are surprisingly small, and will likely >> fit in memory on a single machine, if it can handle the traffic >> volume, so you might not need Spark at all. If you choose to use Spark >> in order to benefit from windowing, be aware that Spark lumps events >> in micro batches based on processing time, not event time. >> >> I made a presentation on approximate counting a couple of years ago. >> Slides and video here: >> >> http://www.slideshare.net/lallea/scalable-real-time-processing-techniques-39990105 >> . >> You can also search for presentation by Ted Dunning and Mikio Braun, >> who have held good presentations on the subject. >> >> There are AFAIK two open source implementations of Count-Min Sketch, >> one of them in Algebird. >> >> Let me know if anything is unclear. >> >> Good luck, and let us know how it goes. >> >> Regards, >> >> >> >> Lars Albertsson >> Data engineering consultant >> www.mapflat.com >> +46 70 7687109 >> >> >> On Fri, Mar 11, 2016 at 9:09 PM, Yakubovich, Alexey >> <alexey.yakubov...@searshc.com> wrote: >> > Good day, >> > >> > I have a following task: a stream of “page vies” coming to kafka topic. >> Each >> > view contains list of product Ids from a visited page. The task: to >> have in >> > “real time” Top N product. >> > >> > I am interested in some solution that would require minimum intermediate >> > writes … So need to build a sliding window for top N product, where the >> > product counters dynamically changes and window should present the TOP >> > product for the specified period of time. >> > >> > I believe there is no way to avoid maintaining all product counters >> counters >> > in memory/storage. But at least I would like to do all logic, all >> > calculation on a fly, in memory, not spilling multiple RDD from memory >> to >> > disk. >> > >> > So I believe I see one way of doing it: >> > Take, msg from kafka take and line up, all elementary action >> (increase by >> > 1 the counter for the product PID ) >> > Each action will be implemented as a call to HTable.increment() // or >> > easier, with incrementColumnValue()… >> > After each increment I can apply my own operation “offer” would >> provide >> > that only top N products with counters are kept in another Hbase table >> (also >> > with atomic operations). >> > But there is another stream of events: decreasing product counters when >> > view expires the legth of sliding window…. >> > >> > So my question: does anybody know/have and can share the piece code/ >> know >> > how: how to implement “sliding Top N window” better. >> > If nothing will be offered, I will share what I will do myself. >> > >> > Thank you >> > Alexey >> > This message, including any attachments, is the property of Sears >> Holdings >> > Corporation and/or one of its subsidiaries. It is confidential and may >> > contain proprietary or legally privileged information. If you are not >> the >> > intended recipient, please delete it without reading the contents. Thank >> > you. >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >