[+spark list again] (I did not want to send "commercial spam" to the list :-))

The reduce function for CMSs is element-wise addition, and the reverse
reduce function is element-wise subtraction.

The heavy hitters list does not have monoid addition, but you can
cheat. I suggest creating a heavy hitter union whenever you do a CMS
addition or subtraction, sort by lookups in the aggregate CMS, and
trim to desired size.

We built our prototype with Storm, so the aggregation semantics were
different, and we probably did not bump into the same issues as you
do, since Storm does not micro batch. IIRC, we kept one CMS per time
unit window and a single heavy hitters list.


If you don't get CMSs to work well for you, you have the option of
keeping a top K list without a complementing CMS. See
http://www.slideshare.net/lallea/scalable-real-time-processing-techniques-39990105
or 
http://www.berlinbuzzwords.de/session/real-time-personalization-and-recommendation-stream-mining
for an explanation.


I have also seen a more brute force solution used with some success:
pour every item into an ElasticSearch cluster and query over time
windows. I guess it has to be complemented with some expiration
strategy. :-)

Regards,



Lars Albertsson
Data engineering consultant
www.mapflat.com
+46 70 7687109


On Fri, Mar 25, 2016 at 7:48 AM, Jatin Kumar <jku...@rocketfuelinc.com> wrote:
> Hello Lars,
>
> Thanks for your email. I tried exactly what you said and it doesn't perform
> good due to following reason:
>
> Spark's reduceByKeyAndCount() can take a reduce and inverse reduce function,
> which in my case is a simple + and - operation respectively. So it
> effectively just adds the latest batch, subtracts the oldest batch from
> already aggregated RDD of keys.
>
> Now CMS data structure doesn't have any inv reduce function so you end up
> doing a + over all batches every time you produce some result. This is the
> main blocker for me not using CMS.
>
> --
> Thanks
> Jatin
>
> On Fri, Mar 25, 2016 at 3:57 AM, Lars Albertsson <la...@mapflat.com> wrote:
>>
>> BTW, I freelance as data engineer, helping companies build things like
>> this. Let me know if you want assistance in creating a prototype
>> solution.
>>
>>
>> Lars Albertsson
>> Data engineering consultant
>> www.mapflat.com
>> +46 70 7687109
>>
>>
>> On Thu, Mar 24, 2016 at 11:23 PM, Lars Albertsson <la...@mapflat.com>
>> wrote:
>> > I am not aware of any open source examples. If you search for usages
>> > of stream-lib or Algebird, you might be lucky. Twitter uses CMSs, so
>> > they might have shared some code or presentation.
>> >
>> > We created a proprietary prototype of the solution I described, but I
>> > am not at liberty to share code.
>> >
>> > We did not proceed to take it into production at scale, so there might
>> > be practical issues that we did not discover. The algorithm
>> > implementation was simple and straightforward. We used the stream-lib
>> > CMS implementation. IIRC, the one in Algebird already includes support
>> > for heavy hitters.
>> >
>> > For our use case, the data structures ended up being small, on the
>> > order ot tens or hundreds of megabytes. It varies with use case, but
>> > it is probably a path worth investigating if approximate results are
>> > acceptable.
>> >
>> > Regards,
>> >
>> >
>> > Lars Albertsson
>> > Data engineering consultant
>> > www.mapflat.com
>> > +46 70 7687109
>> >
>> >
>> > On Wed, Mar 23, 2016 at 3:41 AM, Jatin Kumar <jku...@rocketfuelinc.com>
>> > wrote:
>> >> Lar, can you please point to an example?
>> >>
>> >> On Mar 23, 2016 2:16 AM, "Lars Albertsson" <la...@mapflat.com> wrote:
>> >>>
>> >>> @Jatin, I touched that case briefly in the linked presentation.
>> >>>
>> >>> You will have to decide on a time slot size, and then aggregate slots
>> >>> to form windows. E.g. if you select a time slot of an hour, you build
>> >>> a CMS and a heavy hitter list for the current hour slot, and start new
>> >>> ones at 00 minutes. In order to form e.g. a 12 hour window, the
>> >>> 12-hour CMS is calculated as the sum of the 12 hour slot CMSs, and the
>> >>> 12-hour heavy hitters is the union of the hour slot heavy hitters.
>> >>>
>> >>> Since the data structures are small, one can afford using small time
>> >>> slots. One can also keep a long history with different combinations of
>> >>> time windows by pushing out CMSs and heavy hitters to e.g. Kafka, and
>> >>> have different stream processors that aggregate different time windows
>> >>> and push results to Kafka or to lookup tables.
>> >>>
>> >>>
>> >>> Lars Albertsson
>> >>> Data engineering consultant
>> >>> www.mapflat.com
>> >>> +46 70 7687109
>> >>>
>> >>>
>> >>> On Tue, Mar 22, 2016 at 1:23 PM, Jatin Kumar
>> >>> <jku...@rocketfuelinc.com>
>> >>> wrote:
>> >>> > Hello Yakubovich,
>> >>> >
>> >>> > I have been looking into a similar problem. @Lars please note that
>> >>> > he
>> >>> > wants
>> >>> > to maintain the top N products over a sliding window, whereas the
>> >>> > CountMinSketh algorithm is useful if we want to maintain global top
>> >>> > N
>> >>> > products list. Please correct me if I am wrong here.
>> >>> >
>> >>> > I tried using CountMinSketch and realized that it doesn't suit my
>> >>> > use
>> >>> > case
>> >>> > as I also wanted to maintain top N over last H hours. CountMinSketch
>> >>> > has
>> >>> > no
>> >>> > notion of time, so in my understanding you cannot use that.
>> >>> >
>> >>> > Yakubovich, you can try doing something like this:
>> >>> >
>> >>> > val stream = <DStream from your source>
>> >>> > // I am assuming that each entry is a comma separated list of
>> >>> > product
>> >>> > ids
>> >>> > // and product ID is a string (doesn't really matter though)
>> >>> > stream
>> >>> >  .flatMap(record => record.split(","))
>> >>> >  .map(pid => (pid, 1L))
>> >>> >  .reduceByKeyAndWindow(_ + _, _ - _, Seconds(S1), Seconds(S2))
>> >>> >  .foreachRDD(rdd => {
>> >>> >    // `rdd` here is of type (pid, count) and has frequency of each
>> >>> > PID
>> >>> > over
>> >>> >    // a sliding window of S1 seconds which moves by S2 seconds every
>> >>> > time.
>> >>> >
>> >>> >    implicit val order = new scala.Ordering[(String, Long)] {
>> >>> >      override def compare(a1: (String, Long), a2: (String, Long)):
>> >>> > Boolean =
>> >>> > a1._2 > a2._2
>> >>> >    }
>> >>> >
>> >>> >    val topNPidTuples = rdd.top(N)
>> >>> >    // do whatever you want here.
>> >>> >  })
>> >>> >
>> >>> >
>> >>> >
>> >>> > --
>> >>> > Thanks
>> >>> > Jatin
>> >>> >
>> >>> > On Tue, Mar 22, 2016 at 12:11 PM, Rishi Mishra
>> >>> > <rmis...@snappydata.io>
>> >>> > wrote:
>> >>> >>
>> >>> >> Hi Alexy,
>> >>> >> We are also trying to solve similar problems using approximation.
>> >>> >> Would
>> >>> >> like to hear more about your usage.  We can discuss this offline
>> >>> >> without
>> >>> >> boring others.  :)
>> >>> >>
>> >>> >> Regards,
>> >>> >> Rishitesh Mishra,
>> >>> >> SnappyData . (http://www.snappydata.io/)
>> >>> >>
>> >>> >> https://in.linkedin.com/in/rishiteshmishra
>> >>> >>
>> >>> >> On Tue, Mar 22, 2016 at 1:19 AM, Lars Albertsson
>> >>> >> <la...@mapflat.com>
>> >>> >> wrote:
>> >>> >>>
>> >>> >>> Hi,
>> >>> >>>
>> >>> >>> If you can accept approximate top N results, there is a neat
>> >>> >>> solution
>> >>> >>> for this problem: Use an approximate Map<K, V> structure called
>> >>> >>> Count-Min Sketch, in combination with a list of the M top items,
>> >>> >>> where
>> >>> >>> M > N. When you encounter an item not in the top M, you look up
>> >>> >>> its
>> >>> >>> count in the Count-Min Sketch do determine whether it qualifies.
>> >>> >>>
>> >>> >>> You will need to break down your event stream into time windows
>> >>> >>> with a
>> >>> >>> certain time unit, e.g. minutes or hours, and keep one Count-Min
>> >>> >>> Sketch for each unit. The CMSs can be added, so you aggregate them
>> >>> >>> to
>> >>> >>> form your sliding windows. You also keep a top M (aka "heavy
>> >>> >>> hitters")
>> >>> >>> list for each window.
>> >>> >>>
>> >>> >>> The data structures required are surprisingly small, and will
>> >>> >>> likely
>> >>> >>> fit in memory on a single machine, if it can handle the traffic
>> >>> >>> volume, so you might not need Spark at all. If you choose to use
>> >>> >>> Spark
>> >>> >>> in order to benefit from windowing, be aware that Spark lumps
>> >>> >>> events
>> >>> >>> in micro batches based on processing time, not event time.
>> >>> >>>
>> >>> >>> I made a presentation on approximate counting a couple of years
>> >>> >>> ago.
>> >>> >>> Slides and video here:
>> >>> >>>
>> >>> >>>
>> >>> >>>
>> >>> >>> http://www.slideshare.net/lallea/scalable-real-time-processing-techniques-39990105.
>> >>> >>> You can also search for presentation by Ted Dunning and Mikio
>> >>> >>> Braun,
>> >>> >>> who have held good presentations on the subject.
>> >>> >>>
>> >>> >>> There are AFAIK two open source implementations of Count-Min
>> >>> >>> Sketch,
>> >>> >>> one of them in Algebird.
>> >>> >>>
>> >>> >>> Let me know if anything is unclear.
>> >>> >>>
>> >>> >>> Good luck, and let us know how it goes.
>> >>> >>>
>> >>> >>> Regards,
>> >>> >>>
>> >>> >>>
>> >>> >>>
>> >>> >>> Lars Albertsson
>> >>> >>> Data engineering consultant
>> >>> >>> www.mapflat.com
>> >>> >>> +46 70 7687109
>> >>> >>>
>> >>> >>>
>> >>> >>> On Fri, Mar 11, 2016 at 9:09 PM, Yakubovich, Alexey
>> >>> >>> <alexey.yakubov...@searshc.com> wrote:
>> >>> >>> > Good day,
>> >>> >>> >
>> >>> >>> > I have a following task: a stream of “page vies” coming to kafka
>> >>> >>> > topic.
>> >>> >>> > Each
>> >>> >>> > view contains list of product Ids from a visited page. The task:
>> >>> >>> > to
>> >>> >>> > have in
>> >>> >>> > “real time” Top N product.
>> >>> >>> >
>> >>> >>> > I am interested in some solution that would require minimum
>> >>> >>> > intermediate
>> >>> >>> > writes … So  need to build a sliding window for top N product,
>> >>> >>> > where
>> >>> >>> > the
>> >>> >>> > product counters dynamically changes and window should present
>> >>> >>> > the
>> >>> >>> > TOP
>> >>> >>> > product for the specified period of time.
>> >>> >>> >
>> >>> >>> > I believe there is no way to avoid maintaining all product
>> >>> >>> > counters
>> >>> >>> > counters
>> >>> >>> > in memory/storage.  But at least I would like to do all logic,
>> >>> >>> > all
>> >>> >>> > calculation on a fly, in memory, not spilling multiple RDD from
>> >>> >>> > memory
>> >>> >>> > to
>> >>> >>> > disk.
>> >>> >>> >
>> >>> >>> > So I believe I see one way of doing it:
>> >>> >>> >    Take, msg from kafka take and line up, all elementary action
>> >>> >>> > (increase by
>> >>> >>> > 1 the counter for the product PID )
>> >>> >>> >   Each action will be implemented as a call to
>> >>> >>> > HTable.increment()
>> >>> >>> > //
>> >>> >>> > or
>> >>> >>> > easier, with incrementColumnValue()…
>> >>> >>> >   After each increment I can apply my own operation “offer”
>> >>> >>> > would
>> >>> >>> > provide
>> >>> >>> > that only top N products with counters are kept in another Hbase
>> >>> >>> > table
>> >>> >>> > (also
>> >>> >>> > with atomic operations).
>> >>> >>> >  But there is another stream of events: decreasing product
>> >>> >>> > counters
>> >>> >>> > when
>> >>> >>> > view expires the legth of sliding window….
>> >>> >>> >
>> >>> >>> > So my question: does anybody know/have and can share the piece
>> >>> >>> > code/
>> >>> >>> > know
>> >>> >>> > how: how to implement “sliding Top N window” better.
>> >>> >>> > If nothing will be offered, I will share what I will do myself.
>> >>> >>> >
>> >>> >>> > Thank you
>> >>> >>> > Alexey
>> >>> >>> > This message, including any attachments, is the property of
>> >>> >>> > Sears
>> >>> >>> > Holdings
>> >>> >>> > Corporation and/or one of its subsidiaries. It is confidential
>> >>> >>> > and
>> >>> >>> > may
>> >>> >>> > contain proprietary or legally privileged information. If you
>> >>> >>> > are
>> >>> >>> > not
>> >>> >>> > the
>> >>> >>> > intended recipient, please delete it without reading the
>> >>> >>> > contents.
>> >>> >>> > Thank
>> >>> >>> > you.
>> >>> >>>
>> >>> >>>
>> >>> >>> ---------------------------------------------------------------------
>> >>> >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> >>> >>> For additional commands, e-mail: user-h...@spark.apache.org
>> >>> >>>
>> >>> >>
>> >>> >
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to