Hi Nick, The buffer exposed to UDAF interface is just a view of underlying buffer (this underlying buffer is shared by different aggregate functions and every function takes one or multiple slots). If you need a UDAF, extending UserDefinedAggregationFunction is the preferred approach. AggregateFunction2 is used for built-in aggregate function.
Thanks, Yin On Sat, Sep 12, 2015 at 10:40 AM, Nick Pentreath <nick.pentre...@gmail.com> wrote: > Ok, that makes sense. So this is (a) more efficient, since as far as I can > see it is updating the HLL registers directly in the buffer for each value, > and (b) would be "Tungsten-compatible" as it can work against UnsafeRow? Is > it currently possible to specify an UnsafeRow as a buffer in a UDAF? > > So is extending AggregateFunction2 the preferred approach over the > UserDefinedAggregationFunction interface? Or it is that internal only? > > I see one of the main use cases for things like HLL / CMS and other > approximate data structure being the fact that you can store them as > columns representing distinct counts in an aggregation. And then do further > arbitrary aggregations on that data as required. e.g. store hourly > aggregate data, and compute daily or monthly aggregates from that, while > still keeping the ability to have distinct counts on certain fields. > > So exposing the serialized HLL as Array[Byte] say, so that it can be > further aggregated in a later DF operation, or saved to an external data > source, would be super useful. > > > > On Sat, Sep 12, 2015 at 6:06 PM, Herman van Hövell tot Westerflier < > hvanhov...@questtec.nl> wrote: > >> I am typically all for code re-use. The reason for writing this is to >> prevent the indirection of a UDT and work directly against memory. A UDT >> will work fine at the moment because we still use >> GenericMutableRow/SpecificMutableRow as aggregation buffers. However if you >> would use an UnsafeRow as an AggregationBuffer (which is attractive when >> you have a lot of groups during aggregation) the use of an UDT is either >> impossible or it would become very slow because it would require us to >> deserialize/serialize a UDT on every update. >> >> As for compatibility, the implementation produces exactly the same >> results as the ClearSpring implementation. You could easily export the >> HLL++ register values to the current ClearSpring implementation and export >> those. >> >> Met vriendelijke groet/Kind regards, >> >> Herman van Hövell tot Westerflier >> >> QuestTec B.V. >> Torenwacht 98 >> 2353 DC Leiderdorp >> hvanhov...@questtec.nl >> +599 9 521 4402 >> >> >> 2015-09-12 11:06 GMT+02:00 Nick Pentreath <nick.pentre...@gmail.com>: >> >>> I should add that surely the idea behind UDT is exactly that it can (a) >>> fit automatically into DFs and Tungsten and (b) that it can be used >>> efficiently in writing ones own UDTs and UDAFs? >>> >>> >>> On Sat, Sep 12, 2015 at 11:05 AM, Nick Pentreath < >>> nick.pentre...@gmail.com> wrote: >>> >>>> Can I ask why you've done this as a custom implementation rather than >>>> using StreamLib, which is already implemented and widely used? It seems >>>> more portable to me to use a library - for example, I'd like to export the >>>> grouped data with raw HLLs to say Elasticsearch, and then do further >>>> on-demand aggregation in ES and visualization in Kibana etc. >>>> >>>> Others may want to do something similar into Hive, Cassandra, HBase or >>>> whatever they are using. In this case they'd need to use this particular >>>> implementation from Spark which may be tricky to include in a dependency >>>> etc. >>>> >>>> If there are enhancements, does it not make sense to do a PR to >>>> StreamLib? Or does this interact in some better way with Tungsten? >>>> >>>> I am unclear on how the interop with Tungsten raw memory works - some >>>> pointers on that and where to look in the Spark code would be helpful. >>>> >>>> On Sat, Sep 12, 2015 at 10:45 AM, Herman van Hövell tot Westerflier < >>>> hvanhov...@questtec.nl> wrote: >>>> >>>>> Hello Nick, >>>>> >>>>> I have been working on a (UDT-less) implementation of HLL++. You can >>>>> find the PR here: https://github.com/apache/spark/pull/8362. This >>>>> current implements the dense version of HLL++, which is a further >>>>> development of HLL. It returns a Long, but it shouldn't be to hard to >>>>> return a Row containing the cardinality and/or the HLL registers (the >>>>> binary data). >>>>> >>>>> I am curious what the stance is on using UDTs in the new UDAF >>>>> interface. Is this still viable? This wouldn't work with UnsafeRow for >>>>> instance. The OpenHashSetUDT for instance would be a nice building block >>>>> for CollectSet and all Distinct Aggregate operators. Are there any >>>>> opinions >>>>> on this? >>>>> >>>>> Kind regards, >>>>> >>>>> Herman van Hövell tot Westerflier >>>>> >>>>> QuestTec B.V. >>>>> Torenwacht 98 >>>>> 2353 DC Leiderdorp >>>>> hvanhov...@questtec.nl >>>>> +599 9 521 4402 >>>>> >>>>> >>>>> 2015-09-12 10:07 GMT+02:00 Nick Pentreath <nick.pentre...@gmail.com>: >>>>> >>>>>> Inspired by this post: >>>>>> http://eugenezhulenev.com/blog/2015/07/15/interactive-audience-analytics-with-spark-and-hyperloglog/, >>>>>> I've started putting together something based on the Spark 1.5 UDAF >>>>>> interface: https://gist.github.com/MLnick/eca566604f2e4e3c6141 >>>>>> >>>>>> Some questions - >>>>>> >>>>>> 1. How do I get the UDAF to accept input arguments of different type? >>>>>> We can hash anything basically for HLL - Int, Long, String, Object, raw >>>>>> bytes etc. Right now it seems we'd need to build a new UDAF for each >>>>>> input >>>>>> type, which seems strange - I should be able to use one UDAF that can >>>>>> handle raw input of different types, as well as handle existing HLLs that >>>>>> can be merged/aggregated (e.g. for grouped data) >>>>>> 2. @Reynold, how would I ensure this works for Tungsten (ie against >>>>>> raw bytes in memory)? Or does the new Aggregate2 stuff automatically do >>>>>> that? Where should I look for examples on how this works internally? >>>>>> 3. I've based this on the Sum and Avg examples for the new UDAF >>>>>> interface - any suggestions or issue please advise. Is the intermediate >>>>>> buffer efficient? >>>>>> 4. The current HyperLogLogUDT is private - so I've had to make my own >>>>>> one which is a bit pointless as it's copy-pasted. Any thoughts on >>>>>> exposing >>>>>> that type? Or I need to make the package spark.sql ... >>>>>> >>>>>> Nick >>>>>> >>>>>> On Thu, Jul 2, 2015 at 8:06 AM, Reynold Xin <r...@databricks.com> >>>>>> wrote: >>>>>> >>>>>>> Yes - it's very interesting. However, ideally we should have a >>>>>>> version of hyperloglog that can work directly against some raw bytes in >>>>>>> memory (rather than java objects), in order for this to fit the Tungsten >>>>>>> execution model where everything is operating directly against some >>>>>>> memory >>>>>>> address. >>>>>>> >>>>>>> On Wed, Jul 1, 2015 at 11:00 PM, Nick Pentreath < >>>>>>> nick.pentre...@gmail.com> wrote: >>>>>>> >>>>>>>> Sure I can copy the code but my aim was more to understand: >>>>>>>> >>>>>>>> (A) if this is broadly interesting enough to folks to think about >>>>>>>> updating / extending the existing UDAF within Spark >>>>>>>> (b) how to register ones own custom UDAF - in which case it could >>>>>>>> be a Spark package for example >>>>>>>> >>>>>>>> All examples deal with registering a UDF but nothing about UDAFs >>>>>>>> >>>>>>>> — >>>>>>>> Sent from Mailbox <https://www.dropbox.com/mailbox> >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Jul 1, 2015 at 6:32 PM, Daniel Darabos < >>>>>>>> daniel.dara...@lynxanalytics.com> wrote: >>>>>>>> >>>>>>>>> It's already possible to just copy the code from >>>>>>>>> countApproxDistinct >>>>>>>>> <https://github.com/apache/spark/blob/v1.4.0/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1153> >>>>>>>>> and >>>>>>>>> access the HLL directly, or do anything you like. >>>>>>>>> >>>>>>>>> On Wed, Jul 1, 2015 at 5:26 PM, Nick Pentreath < >>>>>>>>> nick.pentre...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Any thoughts? >>>>>>>>>> >>>>>>>>>> — >>>>>>>>>> Sent from Mailbox <https://www.dropbox.com/mailbox> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Tue, Jun 23, 2015 at 11:19 AM, Nick Pentreath < >>>>>>>>>> nick.pentre...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hey Spark devs >>>>>>>>>>> >>>>>>>>>>> I've been looking at DF UDFs and UDAFs. The approx distinct is >>>>>>>>>>> using hyperloglog, >>>>>>>>>>> but there is only an option to return the count as a Long. >>>>>>>>>>> >>>>>>>>>>> It can be useful to be able to return and store the actual data >>>>>>>>>>> structure (ie serialized HLL). This effectively allows one to do >>>>>>>>>>> aggregation / rollups over columns while still preserving the >>>>>>>>>>> ability to >>>>>>>>>>> get distinct counts. >>>>>>>>>>> >>>>>>>>>>> For example, one can store daily aggregates of events, grouped >>>>>>>>>>> by various columns, while storing for each grouping the HLL of say >>>>>>>>>>> unique >>>>>>>>>>> users. So you can get the uniques per day directly but could also >>>>>>>>>>> very >>>>>>>>>>> easily do arbitrary aggregates (say monthly, annually) and still be >>>>>>>>>>> able to >>>>>>>>>>> get a unique count for that period by merging the daily HLLS. >>>>>>>>>>> >>>>>>>>>>> I did this a while back as a Hive UDAF ( >>>>>>>>>>> https://github.com/MLnick/hive-udf) which returns a Struct >>>>>>>>>>> field containing a "cardinality" field and a "binary" field >>>>>>>>>>> containing the >>>>>>>>>>> serialized HLL. >>>>>>>>>>> >>>>>>>>>>> I was wondering if there would be interest in something like >>>>>>>>>>> this? I am not so clear on how UDTs work with regards to SerDe - so >>>>>>>>>>> could >>>>>>>>>>> one adapt the HyperLogLogUDT to be a Struct with the serialized HLL >>>>>>>>>>> as a >>>>>>>>>>> field as well as count as a field? Then I assume this would >>>>>>>>>>> automatically >>>>>>>>>>> play nicely with DataFrame I/O etc. The gotcha is one needs to then >>>>>>>>>>> call >>>>>>>>>>> "approx_count_field.count" (or is there a concept of a "default >>>>>>>>>>> field" for >>>>>>>>>>> a Struct?). >>>>>>>>>>> >>>>>>>>>>> Also, being able to provide the bitsize parameter may be >>>>>>>>>>> useful... >>>>>>>>>>> >>>>>>>>>>> The same thinking would apply potentially to other approximate >>>>>>>>>>> (and mergeable) data structures like T-Digest and maybe CMS. >>>>>>>>>>> >>>>>>>>>>> Nick >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >