I am sorry, the signature of compare() is different. It should be:

   implicit val order = new scala.Ordering[(String, Long)] {
     override def compare(a1: (String, Long), a2: (String, Long)): Int = {
       a1._2.compareTo(a2._2)
     }
   }


--
Thanks
Jatin

On Tue, Mar 22, 2016 at 5:53 PM, Jatin Kumar <jku...@rocketfuelinc.com>
wrote:

> Hello Yakubovich,
>
> I have been looking into a similar problem. @Lars please note that he
> wants to maintain the top N products over a sliding window, whereas the
> CountMinSketh algorithm is useful if we want to maintain global top N
> products list. Please correct me if I am wrong here.
>
> I tried using CountMinSketch and realized that it doesn't suit my use case
> as I also wanted to maintain top N over last H hours. CountMinSketch has no
> notion of time, so in my understanding you cannot use that.
>
> Yakubovich, you can try doing something like this:
>
> val stream = <DStream from your source>
> // I am assuming that each entry is a comma separated list of product ids
> // and product ID is a string (doesn't really matter though)
> stream
>  .flatMap(record => record.split(","))
>  .map(pid => (pid, 1L))
>  .reduceByKeyAndWindow(_ + _, _ - _, Seconds(S1), Seconds(S2))
>  .foreachRDD(rdd => {
>    // `rdd` here is of type (pid, count) and has frequency of each PID over
>    // a sliding window of S1 seconds which moves by S2 seconds every time.
>
>    implicit val order = new scala.Ordering[(String, Long)] {
>      override def compare(a1: (String, Long), a2: (String, Long)): Boolean = 
> a1._2 > a2._2
>    }
>
>    val topNPidTuples = rdd.top(N)
>    // do whatever you want here.
>  })
>
>
>
> --
> Thanks
> Jatin
>
> On Tue, Mar 22, 2016 at 12:11 PM, Rishi Mishra <rmis...@snappydata.io>
> wrote:
>
>> Hi Alexy,
>> We are also trying to solve similar problems using approximation. Would
>> like to hear more about your usage.  We can discuss this offline without
>> boring others.  :)
>>
>> Regards,
>> Rishitesh Mishra,
>> SnappyData . (http://www.snappydata.io/)
>>
>> https://in.linkedin.com/in/rishiteshmishra
>>
>> On Tue, Mar 22, 2016 at 1:19 AM, Lars Albertsson <la...@mapflat.com>
>> wrote:
>>
>>> Hi,
>>>
>>> If you can accept approximate top N results, there is a neat solution
>>> for this problem: Use an approximate Map<K, V> structure called
>>> Count-Min Sketch, in combination with a list of the M top items, where
>>> M > N. When you encounter an item not in the top M, you look up its
>>> count in the Count-Min Sketch do determine whether it qualifies.
>>>
>>> You will need to break down your event stream into time windows with a
>>> certain time unit, e.g. minutes or hours, and keep one Count-Min
>>> Sketch for each unit. The CMSs can be added, so you aggregate them to
>>> form your sliding windows. You also keep a top M (aka "heavy hitters")
>>> list for each window.
>>>
>>> The data structures required are surprisingly small, and will likely
>>> fit in memory on a single machine, if it can handle the traffic
>>> volume, so you might not need Spark at all. If you choose to use Spark
>>> in order to benefit from windowing, be aware that Spark lumps events
>>> in micro batches based on processing time, not event time.
>>>
>>> I made a presentation on approximate counting a couple of years ago.
>>> Slides and video here:
>>>
>>> http://www.slideshare.net/lallea/scalable-real-time-processing-techniques-39990105
>>> .
>>> You can also search for presentation by Ted Dunning and Mikio Braun,
>>> who have held good presentations on the subject.
>>>
>>> There are AFAIK two open source implementations of Count-Min Sketch,
>>> one of them in Algebird.
>>>
>>> Let me know if anything is unclear.
>>>
>>> Good luck, and let us know how it goes.
>>>
>>> Regards,
>>>
>>>
>>>
>>> Lars Albertsson
>>> Data engineering consultant
>>> www.mapflat.com
>>> +46 70 7687109
>>>
>>>
>>> On Fri, Mar 11, 2016 at 9:09 PM, Yakubovich, Alexey
>>> <alexey.yakubov...@searshc.com> wrote:
>>> > Good day,
>>> >
>>> > I have a following task: a stream of “page vies” coming to kafka
>>> topic. Each
>>> > view contains list of product Ids from a visited page. The task: to
>>> have in
>>> > “real time” Top N product.
>>> >
>>> > I am interested in some solution that would require minimum
>>> intermediate
>>> > writes … So  need to build a sliding window for top N product, where
>>> the
>>> > product counters dynamically changes and window should present the TOP
>>> > product for the specified period of time.
>>> >
>>> > I believe there is no way to avoid maintaining all product counters
>>> counters
>>> > in memory/storage.  But at least I would like to do all logic, all
>>> > calculation on a fly, in memory, not spilling multiple RDD from memory
>>> to
>>> > disk.
>>> >
>>> > So I believe I see one way of doing it:
>>> >    Take, msg from kafka take and line up, all elementary action
>>> (increase by
>>> > 1 the counter for the product PID )
>>> >   Each action will be implemented as a call to HTable.increment()  //
>>> or
>>> > easier, with incrementColumnValue()…
>>> >   After each increment I can apply my own operation “offer” would
>>> provide
>>> > that only top N products with counters are kept in another Hbase table
>>> (also
>>> > with atomic operations).
>>> >  But there is another stream of events: decreasing product counters
>>> when
>>> > view expires the legth of sliding window….
>>> >
>>> > So my question: does anybody know/have and can share the piece code/
>>> know
>>> > how: how to implement “sliding Top N window” better.
>>> > If nothing will be offered, I will share what I will do myself.
>>> >
>>> > Thank you
>>> > Alexey
>>> > This message, including any attachments, is the property of Sears
>>> Holdings
>>> > Corporation and/or one of its subsidiaries. It is confidential and may
>>> > contain proprietary or legally privileged information. If you are not
>>> the
>>> > intended recipient, please delete it without reading the contents.
>>> Thank
>>> > you.
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>

Reply via email to