Re: HyperLogLogUDT
in Kibana etc. >>>>> >>>>> Others may want to do something similar into Hive, Cassandra, HBase or >>>>> whatever they are using. In this case they'd need to use this particular >>>>> implementation from Spark which may be tricky to include in a dependency >>>>> etc. >>>>> >>>>> If there are enhancements, does it not make sense to do a PR to >>>>> StreamLib? Or does this interact in some better way with Tungsten? >>>>> >>>>> I am unclear on how the interop with Tungsten raw memory works - some >>>>> pointers on that and where to look in the Spark code would be helpful. >>>>> >>>>> On Sat, Sep 12, 2015 at 10:45 AM, Herman van Hövell tot Westerflier < >>>>> hvanhov...@questtec.nl> wrote: >>>>> >>>>>> Hello Nick, >>>>>> >>>>>> I have been working on a (UDT-less) implementation of HLL++. You can >>>>>> find the PR here: https://github.com/apache/spark/pull/8362. This >>>>>> current implements the dense version of HLL++, which is a further >>>>>> development of HLL. It returns a Long, but it shouldn't be to hard to >>>>>> return a Row containing the cardinality and/or the HLL registers (the >>>>>> binary data). >>>>>> >>>>>> I am curious what the stance is on using UDTs in the new UDAF >>>>>> interface. Is this still viable? This wouldn't work with UnsafeRow for >>>>>> instance. The OpenHashSetUDT for instance would be a nice building block >>>>>> for CollectSet and all Distinct Aggregate operators. Are there any >>>>>> opinions >>>>>> on this? >>>>>> >>>>>> Kind regards, >>>>>> >>>>>> Herman van Hövell tot Westerflier >>>>>> >>>>>> QuestTec B.V. >>>>>> Torenwacht 98 >>>>>> 2353 DC Leiderdorp >>>>>> hvanhov...@questtec.nl >>>>>> +599 9 521 4402 >>>>>> >>>>>> >>>>>> 2015-09-12 10:07 GMT+02:00 Nick Pentreath <nick.pentre...@gmail.com>: >>>>>> >>>>>>> Inspired by this post: >>>>>>> http://eugenezhulenev.com/blog/2015/07/15/interactive-audience-analytics-with-spark-and-hyperloglog/, >>>>>>> I've started putting together something based on the Spark 1.5 UDAF >>>>>>> interface: https://gist.github.com/MLnick/eca566604f2e4e3c6141 >>>>>>> >>>>>>> Some questions - >>>>>>> >>>>>>> 1. How do I get the UDAF to accept input arguments of different type? >>>>>>> We can hash anything basically for HLL - Int, Long, String, Object, raw >>>>>>> bytes etc. Right now it seems we'd need to build a new UDAF for each >>>>>>> input >>>>>>> type, which seems strange - I should be able to use one UDAF that can >>>>>>> handle raw input of different types, as well as handle existing HLLs >>>>>>> that >>>>>>> can be merged/aggregated (e.g. for grouped data) >>>>>>> 2. @Reynold, how would I ensure this works for Tungsten (ie against >>>>>>> raw bytes in memory)? Or does the new Aggregate2 stuff automatically do >>>>>>> that? Where should I look for examples on how this works internally? >>>>>>> 3. I've based this on the Sum and Avg examples for the new UDAF >>>>>>> interface - any suggestions or issue please advise. Is the intermediate >>>>>>> buffer efficient? >>>>>>> 4. The current HyperLogLogUDT is private - so I've had to make my own >>>>>>> one which is a bit pointless as it's copy-pasted. Any thoughts on >>>>>>> exposing >>>>>>> that type? Or I need to make the package spark.sql ... >>>>>>> >>>>>>> Nick >>>>>>> >>>>>>> On Thu, Jul 2, 2015 at 8:06 AM, Reynold Xin <r...@databricks.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Yes - it's very interesting. However, ideally we should have a >>>>>>>> version of hyperloglog that can work directly against some raw bytes in >>>>>>>
Re: HyperLogLogUDT
t;>> >>>> Met vriendelijke groet/Kind regards, >>>> >>>> Herman van Hövell tot Westerflier >>>> >>>> QuestTec B.V. >>>> Torenwacht 98 >>>> 2353 DC Leiderdorp >>>> hvanhov...@questtec.nl >>>> +599 9 521 4402 >>>> >>>> >>>> 2015-09-12 11:06 GMT+02:00 Nick Pentreath <nick.pentre...@gmail.com>: >>>> >>>>> I should add that surely the idea behind UDT is exactly that it can >>>>> (a) fit automatically into DFs and Tungsten and (b) that it can be used >>>>> efficiently in writing ones own UDTs and UDAFs? >>>>> >>>>> >>>>> On Sat, Sep 12, 2015 at 11:05 AM, Nick Pentreath < >>>>> nick.pentre...@gmail.com> wrote: >>>>> >>>>>> Can I ask why you've done this as a custom implementation rather than >>>>>> using StreamLib, which is already implemented and widely used? It seems >>>>>> more portable to me to use a library - for example, I'd like to export >>>>>> the >>>>>> grouped data with raw HLLs to say Elasticsearch, and then do further >>>>>> on-demand aggregation in ES and visualization in Kibana etc. >>>>>> >>>>>> Others may want to do something similar into Hive, Cassandra, HBase >>>>>> or whatever they are using. In this case they'd need to use this >>>>>> particular >>>>>> implementation from Spark which may be tricky to include in a dependency >>>>>> etc. >>>>>> >>>>>> If there are enhancements, does it not make sense to do a PR to >>>>>> StreamLib? Or does this interact in some better way with Tungsten? >>>>>> >>>>>> I am unclear on how the interop with Tungsten raw memory works - some >>>>>> pointers on that and where to look in the Spark code would be helpful. >>>>>> >>>>>> On Sat, Sep 12, 2015 at 10:45 AM, Herman van Hövell tot Westerflier < >>>>>> hvanhov...@questtec.nl> wrote: >>>>>> >>>>>>> Hello Nick, >>>>>>> >>>>>>> I have been working on a (UDT-less) implementation of HLL++. You can >>>>>>> find the PR here: https://github.com/apache/spark/pull/8362. This >>>>>>> current implements the dense version of HLL++, which is a further >>>>>>> development of HLL. It returns a Long, but it shouldn't be to hard to >>>>>>> return a Row containing the cardinality and/or the HLL registers (the >>>>>>> binary data). >>>>>>> >>>>>>> I am curious what the stance is on using UDTs in the new UDAF >>>>>>> interface. Is this still viable? This wouldn't work with UnsafeRow for >>>>>>> instance. The OpenHashSetUDT for instance would be a nice building block >>>>>>> for CollectSet and all Distinct Aggregate operators. Are there any >>>>>>> opinions >>>>>>> on this? >>>>>>> >>>>>>> Kind regards, >>>>>>> >>>>>>> Herman van Hövell tot Westerflier >>>>>>> >>>>>>> QuestTec B.V. >>>>>>> Torenwacht 98 >>>>>>> 2353 DC Leiderdorp >>>>>>> hvanhov...@questtec.nl >>>>>>> +599 9 521 4402 >>>>>>> >>>>>>> >>>>>>> 2015-09-12 10:07 GMT+02:00 Nick Pentreath <nick.pentre...@gmail.com> >>>>>>> : >>>>>>> >>>>>>>> Inspired by this post: >>>>>>>> http://eugenezhulenev.com/blog/2015/07/15/interactive-audience-analytics-with-spark-and-hyperloglog/, >>>>>>>> I've started putting together something based on the Spark 1.5 UDAF >>>>>>>> interface: https://gist.github.com/MLnick/eca566604f2e4e3c6141 >>>>>>>> >>>>>>>> Some questions - >>>>>>>> >>>>>>>> 1. How do I get the UDAF to accept input arguments of different >>>>>>>> type? We can hash anything basically for HLL - Int, Long, String, >>>>>>>> Object, >>>>>>>> raw bytes etc. Right now it seems we'd need to build a new UDAF f
Re: HyperLogLogUDT
>>> +599 9 521 4402 >>>> >>>> >>>> 2015-09-12 10:07 GMT+02:00 Nick Pentreath <nick.pentre...@gmail.com>: >>>> >>>>> Inspired by this post: >>>>> http://eugenezhulenev.com/blog/2015/07/15/interactive-audience-analytics-with-spark-and-hyperloglog/, >>>>> I've started putting together something based on the Spark 1.5 UDAF >>>>> interface: https://gist.github.com/MLnick/eca566604f2e4e3c6141 >>>>> >>>>> Some questions - >>>>> >>>>> 1. How do I get the UDAF to accept input arguments of different type? >>>>> We can hash anything basically for HLL - Int, Long, String, Object, raw >>>>> bytes etc. Right now it seems we'd need to build a new UDAF for each input >>>>> type, which seems strange - I should be able to use one UDAF that can >>>>> handle raw input of different types, as well as handle existing HLLs that >>>>> can be merged/aggregated (e.g. for grouped data) >>>>> 2. @Reynold, how would I ensure this works for Tungsten (ie against >>>>> raw bytes in memory)? Or does the new Aggregate2 stuff automatically do >>>>> that? Where should I look for examples on how this works internally? >>>>> 3. I've based this on the Sum and Avg examples for the new UDAF >>>>> interface - any suggestions or issue please advise. Is the intermediate >>>>> buffer efficient? >>>>> 4. The current HyperLogLogUDT is private - so I've had to make my own >>>>> one which is a bit pointless as it's copy-pasted. Any thoughts on exposing >>>>> that type? Or I need to make the package spark.sql ... >>>>> >>>>> Nick >>>>> >>>>> On Thu, Jul 2, 2015 at 8:06 AM, Reynold Xin <r...@databricks.com> >>>>> wrote: >>>>> >>>>>> Yes - it's very interesting. However, ideally we should have a >>>>>> version of hyperloglog that can work directly against some raw bytes in >>>>>> memory (rather than java objects), in order for this to fit the Tungsten >>>>>> execution model where everything is operating directly against some >>>>>> memory >>>>>> address. >>>>>> >>>>>> On Wed, Jul 1, 2015 at 11:00 PM, Nick Pentreath < >>>>>> nick.pentre...@gmail.com> wrote: >>>>>> >>>>>>> Sure I can copy the code but my aim was more to understand: >>>>>>> >>>>>>> (A) if this is broadly interesting enough to folks to think about >>>>>>> updating / extending the existing UDAF within Spark >>>>>>> (b) how to register ones own custom UDAF - in which case it could be >>>>>>> a Spark package for example >>>>>>> >>>>>>> All examples deal with registering a UDF but nothing about UDAFs >>>>>>> >>>>>>> — >>>>>>> Sent from Mailbox <https://www.dropbox.com/mailbox> >>>>>>> >>>>>>> >>>>>>> On Wed, Jul 1, 2015 at 6:32 PM, Daniel Darabos < >>>>>>> daniel.dara...@lynxanalytics.com> wrote: >>>>>>> >>>>>>>> It's already possible to just copy the code from >>>>>>>> countApproxDistinct >>>>>>>> <https://github.com/apache/spark/blob/v1.4.0/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1153> >>>>>>>> and >>>>>>>> access the HLL directly, or do anything you like. >>>>>>>> >>>>>>>> On Wed, Jul 1, 2015 at 5:26 PM, Nick Pentreath < >>>>>>>> nick.pentre...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Any thoughts? >>>>>>>>> >>>>>>>>> — >>>>>>>>> Sent from Mailbox <https://www.dropbox.com/mailbox> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, Jun 23, 2015 at 11:19 AM, Nick Pentreath < >>>>>>>>> nick.pentre...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Hey Spark devs >>>>>>>>>> >>>>>>>>>> I've been looking at DF UDFs and UDAFs. The approx distinct is >>>>>>>>>> using hyperloglog, >>>>>>>>>> but there is only an option to return the count as a Long. >>>>>>>>>> >>>>>>>>>> It can be useful to be able to return and store the actual data >>>>>>>>>> structure (ie serialized HLL). This effectively allows one to do >>>>>>>>>> aggregation / rollups over columns while still preserving the >>>>>>>>>> ability to >>>>>>>>>> get distinct counts. >>>>>>>>>> >>>>>>>>>> For example, one can store daily aggregates of events, grouped by >>>>>>>>>> various columns, while storing for each grouping the HLL of say >>>>>>>>>> unique >>>>>>>>>> users. So you can get the uniques per day directly but could also >>>>>>>>>> very >>>>>>>>>> easily do arbitrary aggregates (say monthly, annually) and still be >>>>>>>>>> able to >>>>>>>>>> get a unique count for that period by merging the daily HLLS. >>>>>>>>>> >>>>>>>>>> I did this a while back as a Hive UDAF ( >>>>>>>>>> https://github.com/MLnick/hive-udf) which returns a Struct field >>>>>>>>>> containing a "cardinality" field and a "binary" field containing the >>>>>>>>>> serialized HLL. >>>>>>>>>> >>>>>>>>>> I was wondering if there would be interest in something like >>>>>>>>>> this? I am not so clear on how UDTs work with regards to SerDe - so >>>>>>>>>> could >>>>>>>>>> one adapt the HyperLogLogUDT to be a Struct with the serialized HLL >>>>>>>>>> as a >>>>>>>>>> field as well as count as a field? Then I assume this would >>>>>>>>>> automatically >>>>>>>>>> play nicely with DataFrame I/O etc. The gotcha is one needs to then >>>>>>>>>> call >>>>>>>>>> "approx_count_field.count" (or is there a concept of a "default >>>>>>>>>> field" for >>>>>>>>>> a Struct?). >>>>>>>>>> >>>>>>>>>> Also, being able to provide the bitsize parameter may be useful... >>>>>>>>>> >>>>>>>>>> The same thinking would apply potentially to other approximate >>>>>>>>>> (and mergeable) data structures like T-Digest and maybe CMS. >>>>>>>>>> >>>>>>>>>> Nick >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >
Re: HyperLogLogUDT
it shouldn't be to hard to >>>>> return a Row containing the cardinality and/or the HLL registers (the >>>>> binary data). >>>>> >>>>> I am curious what the stance is on using UDTs in the new UDAF >>>>> interface. Is this still viable? This wouldn't work with UnsafeRow for >>>>> instance. The OpenHashSetUDT for instance would be a nice building block >>>>> for CollectSet and all Distinct Aggregate operators. Are there any >>>>> opinions >>>>> on this? >>>>> >>>>> Kind regards, >>>>> >>>>> Herman van Hövell tot Westerflier >>>>> >>>>> QuestTec B.V. >>>>> Torenwacht 98 >>>>> 2353 DC Leiderdorp >>>>> hvanhov...@questtec.nl >>>>> +599 9 521 4402 >>>>> >>>>> >>>>> 2015-09-12 10:07 GMT+02:00 Nick Pentreath <nick.pentre...@gmail.com>: >>>>> >>>>>> Inspired by this post: >>>>>> http://eugenezhulenev.com/blog/2015/07/15/interactive-audience-analytics-with-spark-and-hyperloglog/, >>>>>> I've started putting together something based on the Spark 1.5 UDAF >>>>>> interface: https://gist.github.com/MLnick/eca566604f2e4e3c6141 >>>>>> >>>>>> Some questions - >>>>>> >>>>>> 1. How do I get the UDAF to accept input arguments of different type? >>>>>> We can hash anything basically for HLL - Int, Long, String, Object, raw >>>>>> bytes etc. Right now it seems we'd need to build a new UDAF for each >>>>>> input >>>>>> type, which seems strange - I should be able to use one UDAF that can >>>>>> handle raw input of different types, as well as handle existing HLLs that >>>>>> can be merged/aggregated (e.g. for grouped data) >>>>>> 2. @Reynold, how would I ensure this works for Tungsten (ie against >>>>>> raw bytes in memory)? Or does the new Aggregate2 stuff automatically do >>>>>> that? Where should I look for examples on how this works internally? >>>>>> 3. I've based this on the Sum and Avg examples for the new UDAF >>>>>> interface - any suggestions or issue please advise. Is the intermediate >>>>>> buffer efficient? >>>>>> 4. The current HyperLogLogUDT is private - so I've had to make my own >>>>>> one which is a bit pointless as it's copy-pasted. Any thoughts on >>>>>> exposing >>>>>> that type? Or I need to make the package spark.sql ... >>>>>> >>>>>> Nick >>>>>> >>>>>> On Thu, Jul 2, 2015 at 8:06 AM, Reynold Xin <r...@databricks.com> >>>>>> wrote: >>>>>> >>>>>>> Yes - it's very interesting. However, ideally we should have a >>>>>>> version of hyperloglog that can work directly against some raw bytes in >>>>>>> memory (rather than java objects), in order for this to fit the Tungsten >>>>>>> execution model where everything is operating directly against some >>>>>>> memory >>>>>>> address. >>>>>>> >>>>>>> On Wed, Jul 1, 2015 at 11:00 PM, Nick Pentreath < >>>>>>> nick.pentre...@gmail.com> wrote: >>>>>>> >>>>>>>> Sure I can copy the code but my aim was more to understand: >>>>>>>> >>>>>>>> (A) if this is broadly interesting enough to folks to think about >>>>>>>> updating / extending the existing UDAF within Spark >>>>>>>> (b) how to register ones own custom UDAF - in which case it could >>>>>>>> be a Spark package for example >>>>>>>> >>>>>>>> All examples deal with registering a UDF but nothing about UDAFs >>>>>>>> >>>>>>>> — >>>>>>>> Sent from Mailbox <https://www.dropbox.com/mailbox> >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Jul 1, 2015 at 6:32 PM, Daniel Darabos < >>>>>>>> daniel.dara...@lynxanalytics.com> wrote: >>>>>>>> >>>>>>>>> It's already possible to just copy the code from >>>>>>>>> co
Re: HyperLogLogUDT
Inspired by this post: http://eugenezhulenev.com/blog/2015/07/15/interactive-audience-analytics-with-spark-and-hyperloglog/, I've started putting together something based on the Spark 1.5 UDAF interface: https://gist.github.com/MLnick/eca566604f2e4e3c6141 Some questions - 1. How do I get the UDAF to accept input arguments of different type? We can hash anything basically for HLL - Int, Long, String, Object, raw bytes etc. Right now it seems we'd need to build a new UDAF for each input type, which seems strange - I should be able to use one UDAF that can handle raw input of different types, as well as handle existing HLLs that can be merged/aggregated (e.g. for grouped data) 2. @Reynold, how would I ensure this works for Tungsten (ie against raw bytes in memory)? Or does the new Aggregate2 stuff automatically do that? Where should I look for examples on how this works internally? 3. I've based this on the Sum and Avg examples for the new UDAF interface - any suggestions or issue please advise. Is the intermediate buffer efficient? 4. The current HyperLogLogUDT is private - so I've had to make my own one which is a bit pointless as it's copy-pasted. Any thoughts on exposing that type? Or I need to make the package spark.sql ... Nick On Thu, Jul 2, 2015 at 8:06 AM, Reynold Xin <r...@databricks.com> wrote: > Yes - it's very interesting. However, ideally we should have a version of > hyperloglog that can work directly against some raw bytes in memory (rather > than java objects), in order for this to fit the Tungsten execution model > where everything is operating directly against some memory address. > > On Wed, Jul 1, 2015 at 11:00 PM, Nick Pentreath <nick.pentre...@gmail.com> > wrote: > >> Sure I can copy the code but my aim was more to understand: >> >> (A) if this is broadly interesting enough to folks to think about >> updating / extending the existing UDAF within Spark >> (b) how to register ones own custom UDAF - in which case it could be a >> Spark package for example >> >> All examples deal with registering a UDF but nothing about UDAFs >> >> — >> Sent from Mailbox <https://www.dropbox.com/mailbox> >> >> >> On Wed, Jul 1, 2015 at 6:32 PM, Daniel Darabos < >> daniel.dara...@lynxanalytics.com> wrote: >> >>> It's already possible to just copy the code from countApproxDistinct >>> <https://github.com/apache/spark/blob/v1.4.0/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1153> >>> and >>> access the HLL directly, or do anything you like. >>> >>> On Wed, Jul 1, 2015 at 5:26 PM, Nick Pentreath <nick.pentre...@gmail.com >>> > wrote: >>> >>>> Any thoughts? >>>> >>>> — >>>> Sent from Mailbox <https://www.dropbox.com/mailbox> >>>> >>>> >>>> On Tue, Jun 23, 2015 at 11:19 AM, Nick Pentreath < >>>> nick.pentre...@gmail.com> wrote: >>>> >>>>> Hey Spark devs >>>>> >>>>> I've been looking at DF UDFs and UDAFs. The approx distinct is using >>>>> hyperloglog, >>>>> but there is only an option to return the count as a Long. >>>>> >>>>> It can be useful to be able to return and store the actual data >>>>> structure (ie serialized HLL). This effectively allows one to do >>>>> aggregation / rollups over columns while still preserving the ability to >>>>> get distinct counts. >>>>> >>>>> For example, one can store daily aggregates of events, grouped by >>>>> various columns, while storing for each grouping the HLL of say unique >>>>> users. So you can get the uniques per day directly but could also very >>>>> easily do arbitrary aggregates (say monthly, annually) and still be able >>>>> to >>>>> get a unique count for that period by merging the daily HLLS. >>>>> >>>>> I did this a while back as a Hive UDAF ( >>>>> https://github.com/MLnick/hive-udf) which returns a Struct field >>>>> containing a "cardinality" field and a "binary" field containing the >>>>> serialized HLL. >>>>> >>>>> I was wondering if there would be interest in something like this? I >>>>> am not so clear on how UDTs work with regards to SerDe - so could one >>>>> adapt >>>>> the HyperLogLogUDT to be a Struct with the serialized HLL as a field as >>>>> well as count as a field? Then I assume this would automatically play >>>>> nicely with DataFrame I/O etc. The gotcha is one needs to then call >>>>> "approx_count_field.count" (or is there a concept of a "default field" for >>>>> a Struct?). >>>>> >>>>> Also, being able to provide the bitsize parameter may be useful... >>>>> >>>>> The same thinking would apply potentially to other approximate (and >>>>> mergeable) data structures like T-Digest and maybe CMS. >>>>> >>>>> Nick >>>>> >>>> >>>> >>> >> >
Re: HyperLogLogUDT
I should add that surely the idea behind UDT is exactly that it can (a) fit automatically into DFs and Tungsten and (b) that it can be used efficiently in writing ones own UDTs and UDAFs? On Sat, Sep 12, 2015 at 11:05 AM, Nick Pentreath <nick.pentre...@gmail.com> wrote: > Can I ask why you've done this as a custom implementation rather than > using StreamLib, which is already implemented and widely used? It seems > more portable to me to use a library - for example, I'd like to export the > grouped data with raw HLLs to say Elasticsearch, and then do further > on-demand aggregation in ES and visualization in Kibana etc. > > Others may want to do something similar into Hive, Cassandra, HBase or > whatever they are using. In this case they'd need to use this particular > implementation from Spark which may be tricky to include in a dependency > etc. > > If there are enhancements, does it not make sense to do a PR to StreamLib? > Or does this interact in some better way with Tungsten? > > I am unclear on how the interop with Tungsten raw memory works - some > pointers on that and where to look in the Spark code would be helpful. > > On Sat, Sep 12, 2015 at 10:45 AM, Herman van Hövell tot Westerflier < > hvanhov...@questtec.nl> wrote: > >> Hello Nick, >> >> I have been working on a (UDT-less) implementation of HLL++. You can find >> the PR here: https://github.com/apache/spark/pull/8362. This current >> implements the dense version of HLL++, which is a further development of >> HLL. It returns a Long, but it shouldn't be to hard to return a Row >> containing the cardinality and/or the HLL registers (the binary data). >> >> I am curious what the stance is on using UDTs in the new UDAF interface. >> Is this still viable? This wouldn't work with UnsafeRow for instance. The >> OpenHashSetUDT for instance would be a nice building block for CollectSet >> and all Distinct Aggregate operators. Are there any opinions on this? >> >> Kind regards, >> >> Herman van Hövell tot Westerflier >> >> QuestTec B.V. >> Torenwacht 98 >> 2353 DC Leiderdorp >> hvanhov...@questtec.nl >> +599 9 521 4402 >> >> >> 2015-09-12 10:07 GMT+02:00 Nick Pentreath <nick.pentre...@gmail.com>: >> >>> Inspired by this post: >>> http://eugenezhulenev.com/blog/2015/07/15/interactive-audience-analytics-with-spark-and-hyperloglog/, >>> I've started putting together something based on the Spark 1.5 UDAF >>> interface: https://gist.github.com/MLnick/eca566604f2e4e3c6141 >>> >>> Some questions - >>> >>> 1. How do I get the UDAF to accept input arguments of different type? We >>> can hash anything basically for HLL - Int, Long, String, Object, raw bytes >>> etc. Right now it seems we'd need to build a new UDAF for each input type, >>> which seems strange - I should be able to use one UDAF that can handle raw >>> input of different types, as well as handle existing HLLs that can be >>> merged/aggregated (e.g. for grouped data) >>> 2. @Reynold, how would I ensure this works for Tungsten (ie against raw >>> bytes in memory)? Or does the new Aggregate2 stuff automatically do that? >>> Where should I look for examples on how this works internally? >>> 3. I've based this on the Sum and Avg examples for the new UDAF >>> interface - any suggestions or issue please advise. Is the intermediate >>> buffer efficient? >>> 4. The current HyperLogLogUDT is private - so I've had to make my own >>> one which is a bit pointless as it's copy-pasted. Any thoughts on exposing >>> that type? Or I need to make the package spark.sql ... >>> >>> Nick >>> >>> On Thu, Jul 2, 2015 at 8:06 AM, Reynold Xin <r...@databricks.com> wrote: >>> >>>> Yes - it's very interesting. However, ideally we should have a version >>>> of hyperloglog that can work directly against some raw bytes in memory >>>> (rather than java objects), in order for this to fit the Tungsten execution >>>> model where everything is operating directly against some memory address. >>>> >>>> On Wed, Jul 1, 2015 at 11:00 PM, Nick Pentreath < >>>> nick.pentre...@gmail.com> wrote: >>>> >>>>> Sure I can copy the code but my aim was more to understand: >>>>> >>>>> (A) if this is broadly interesting enough to folks to think about >>>>> updating / extending the existing UDAF within Spark >>>>> (b) how to register ones own custom UDAF - in which case it could be
Re: HyperLogLogUDT
Hello Nick, I have been working on a (UDT-less) implementation of HLL++. You can find the PR here: https://github.com/apache/spark/pull/8362. This current implements the dense version of HLL++, which is a further development of HLL. It returns a Long, but it shouldn't be to hard to return a Row containing the cardinality and/or the HLL registers (the binary data). I am curious what the stance is on using UDTs in the new UDAF interface. Is this still viable? This wouldn't work with UnsafeRow for instance. The OpenHashSetUDT for instance would be a nice building block for CollectSet and all Distinct Aggregate operators. Are there any opinions on this? Kind regards, Herman van Hövell tot Westerflier QuestTec B.V. Torenwacht 98 2353 DC Leiderdorp hvanhov...@questtec.nl +599 9 521 4402 2015-09-12 10:07 GMT+02:00 Nick Pentreath <nick.pentre...@gmail.com>: > Inspired by this post: > http://eugenezhulenev.com/blog/2015/07/15/interactive-audience-analytics-with-spark-and-hyperloglog/, > I've started putting together something based on the Spark 1.5 UDAF > interface: https://gist.github.com/MLnick/eca566604f2e4e3c6141 > > Some questions - > > 1. How do I get the UDAF to accept input arguments of different type? We > can hash anything basically for HLL - Int, Long, String, Object, raw bytes > etc. Right now it seems we'd need to build a new UDAF for each input type, > which seems strange - I should be able to use one UDAF that can handle raw > input of different types, as well as handle existing HLLs that can be > merged/aggregated (e.g. for grouped data) > 2. @Reynold, how would I ensure this works for Tungsten (ie against raw > bytes in memory)? Or does the new Aggregate2 stuff automatically do that? > Where should I look for examples on how this works internally? > 3. I've based this on the Sum and Avg examples for the new UDAF interface > - any suggestions or issue please advise. Is the intermediate buffer > efficient? > 4. The current HyperLogLogUDT is private - so I've had to make my own one > which is a bit pointless as it's copy-pasted. Any thoughts on exposing that > type? Or I need to make the package spark.sql ... > > Nick > > On Thu, Jul 2, 2015 at 8:06 AM, Reynold Xin <r...@databricks.com> wrote: > >> Yes - it's very interesting. However, ideally we should have a version of >> hyperloglog that can work directly against some raw bytes in memory (rather >> than java objects), in order for this to fit the Tungsten execution model >> where everything is operating directly against some memory address. >> >> On Wed, Jul 1, 2015 at 11:00 PM, Nick Pentreath <nick.pentre...@gmail.com >> > wrote: >> >>> Sure I can copy the code but my aim was more to understand: >>> >>> (A) if this is broadly interesting enough to folks to think about >>> updating / extending the existing UDAF within Spark >>> (b) how to register ones own custom UDAF - in which case it could be a >>> Spark package for example >>> >>> All examples deal with registering a UDF but nothing about UDAFs >>> >>> — >>> Sent from Mailbox <https://www.dropbox.com/mailbox> >>> >>> >>> On Wed, Jul 1, 2015 at 6:32 PM, Daniel Darabos < >>> daniel.dara...@lynxanalytics.com> wrote: >>> >>>> It's already possible to just copy the code from countApproxDistinct >>>> <https://github.com/apache/spark/blob/v1.4.0/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1153> >>>> and >>>> access the HLL directly, or do anything you like. >>>> >>>> On Wed, Jul 1, 2015 at 5:26 PM, Nick Pentreath < >>>> nick.pentre...@gmail.com> wrote: >>>> >>>>> Any thoughts? >>>>> >>>>> — >>>>> Sent from Mailbox <https://www.dropbox.com/mailbox> >>>>> >>>>> >>>>> On Tue, Jun 23, 2015 at 11:19 AM, Nick Pentreath < >>>>> nick.pentre...@gmail.com> wrote: >>>>> >>>>>> Hey Spark devs >>>>>> >>>>>> I've been looking at DF UDFs and UDAFs. The approx distinct is using >>>>>> hyperloglog, >>>>>> but there is only an option to return the count as a Long. >>>>>> >>>>>> It can be useful to be able to return and store the actual data >>>>>> structure (ie serialized HLL). This effectively allows one to do >>>>>> aggregation / rollups over columns while still preserving the ability to >>>>>> get distinct counts. >>>>>> >>>>>> For exa
Re: HyperLogLogUDT
Can I ask why you've done this as a custom implementation rather than using StreamLib, which is already implemented and widely used? It seems more portable to me to use a library - for example, I'd like to export the grouped data with raw HLLs to say Elasticsearch, and then do further on-demand aggregation in ES and visualization in Kibana etc. Others may want to do something similar into Hive, Cassandra, HBase or whatever they are using. In this case they'd need to use this particular implementation from Spark which may be tricky to include in a dependency etc. If there are enhancements, does it not make sense to do a PR to StreamLib? Or does this interact in some better way with Tungsten? I am unclear on how the interop with Tungsten raw memory works - some pointers on that and where to look in the Spark code would be helpful. On Sat, Sep 12, 2015 at 10:45 AM, Herman van Hövell tot Westerflier < hvanhov...@questtec.nl> wrote: > Hello Nick, > > I have been working on a (UDT-less) implementation of HLL++. You can find > the PR here: https://github.com/apache/spark/pull/8362. This current > implements the dense version of HLL++, which is a further development of > HLL. It returns a Long, but it shouldn't be to hard to return a Row > containing the cardinality and/or the HLL registers (the binary data). > > I am curious what the stance is on using UDTs in the new UDAF interface. > Is this still viable? This wouldn't work with UnsafeRow for instance. The > OpenHashSetUDT for instance would be a nice building block for CollectSet > and all Distinct Aggregate operators. Are there any opinions on this? > > Kind regards, > > Herman van Hövell tot Westerflier > > QuestTec B.V. > Torenwacht 98 > 2353 DC Leiderdorp > hvanhov...@questtec.nl > +599 9 521 4402 > > > 2015-09-12 10:07 GMT+02:00 Nick Pentreath <nick.pentre...@gmail.com>: > >> Inspired by this post: >> http://eugenezhulenev.com/blog/2015/07/15/interactive-audience-analytics-with-spark-and-hyperloglog/, >> I've started putting together something based on the Spark 1.5 UDAF >> interface: https://gist.github.com/MLnick/eca566604f2e4e3c6141 >> >> Some questions - >> >> 1. How do I get the UDAF to accept input arguments of different type? We >> can hash anything basically for HLL - Int, Long, String, Object, raw bytes >> etc. Right now it seems we'd need to build a new UDAF for each input type, >> which seems strange - I should be able to use one UDAF that can handle raw >> input of different types, as well as handle existing HLLs that can be >> merged/aggregated (e.g. for grouped data) >> 2. @Reynold, how would I ensure this works for Tungsten (ie against raw >> bytes in memory)? Or does the new Aggregate2 stuff automatically do that? >> Where should I look for examples on how this works internally? >> 3. I've based this on the Sum and Avg examples for the new UDAF interface >> - any suggestions or issue please advise. Is the intermediate buffer >> efficient? >> 4. The current HyperLogLogUDT is private - so I've had to make my own one >> which is a bit pointless as it's copy-pasted. Any thoughts on exposing that >> type? Or I need to make the package spark.sql ... >> >> Nick >> >> On Thu, Jul 2, 2015 at 8:06 AM, Reynold Xin <r...@databricks.com> wrote: >> >>> Yes - it's very interesting. However, ideally we should have a version >>> of hyperloglog that can work directly against some raw bytes in memory >>> (rather than java objects), in order for this to fit the Tungsten execution >>> model where everything is operating directly against some memory address. >>> >>> On Wed, Jul 1, 2015 at 11:00 PM, Nick Pentreath < >>> nick.pentre...@gmail.com> wrote: >>> >>>> Sure I can copy the code but my aim was more to understand: >>>> >>>> (A) if this is broadly interesting enough to folks to think about >>>> updating / extending the existing UDAF within Spark >>>> (b) how to register ones own custom UDAF - in which case it could be a >>>> Spark package for example >>>> >>>> All examples deal with registering a UDF but nothing about UDAFs >>>> >>>> — >>>> Sent from Mailbox <https://www.dropbox.com/mailbox> >>>> >>>> >>>> On Wed, Jul 1, 2015 at 6:32 PM, Daniel Darabos < >>>> daniel.dara...@lynxanalytics.com> wrote: >>>> >>>>> It's already possible to just copy the code from countApproxDistinct >>>>> <https://github.com/apache/spark/blob/v1.4.0/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1153&
Re: HyperLogLogUDT
I am typically all for code re-use. The reason for writing this is to prevent the indirection of a UDT and work directly against memory. A UDT will work fine at the moment because we still use GenericMutableRow/SpecificMutableRow as aggregation buffers. However if you would use an UnsafeRow as an AggregationBuffer (which is attractive when you have a lot of groups during aggregation) the use of an UDT is either impossible or it would become very slow because it would require us to deserialize/serialize a UDT on every update. As for compatibility, the implementation produces exactly the same results as the ClearSpring implementation. You could easily export the HLL++ register values to the current ClearSpring implementation and export those. Met vriendelijke groet/Kind regards, Herman van Hövell tot Westerflier QuestTec B.V. Torenwacht 98 2353 DC Leiderdorp hvanhov...@questtec.nl +599 9 521 4402 2015-09-12 11:06 GMT+02:00 Nick Pentreath <nick.pentre...@gmail.com>: > I should add that surely the idea behind UDT is exactly that it can (a) > fit automatically into DFs and Tungsten and (b) that it can be used > efficiently in writing ones own UDTs and UDAFs? > > > On Sat, Sep 12, 2015 at 11:05 AM, Nick Pentreath <nick.pentre...@gmail.com > > wrote: > >> Can I ask why you've done this as a custom implementation rather than >> using StreamLib, which is already implemented and widely used? It seems >> more portable to me to use a library - for example, I'd like to export the >> grouped data with raw HLLs to say Elasticsearch, and then do further >> on-demand aggregation in ES and visualization in Kibana etc. >> >> Others may want to do something similar into Hive, Cassandra, HBase or >> whatever they are using. In this case they'd need to use this particular >> implementation from Spark which may be tricky to include in a dependency >> etc. >> >> If there are enhancements, does it not make sense to do a PR to >> StreamLib? Or does this interact in some better way with Tungsten? >> >> I am unclear on how the interop with Tungsten raw memory works - some >> pointers on that and where to look in the Spark code would be helpful. >> >> On Sat, Sep 12, 2015 at 10:45 AM, Herman van Hövell tot Westerflier < >> hvanhov...@questtec.nl> wrote: >> >>> Hello Nick, >>> >>> I have been working on a (UDT-less) implementation of HLL++. You can >>> find the PR here: https://github.com/apache/spark/pull/8362. This >>> current implements the dense version of HLL++, which is a further >>> development of HLL. It returns a Long, but it shouldn't be to hard to >>> return a Row containing the cardinality and/or the HLL registers (the >>> binary data). >>> >>> I am curious what the stance is on using UDTs in the new UDAF interface. >>> Is this still viable? This wouldn't work with UnsafeRow for instance. The >>> OpenHashSetUDT for instance would be a nice building block for CollectSet >>> and all Distinct Aggregate operators. Are there any opinions on this? >>> >>> Kind regards, >>> >>> Herman van Hövell tot Westerflier >>> >>> QuestTec B.V. >>> Torenwacht 98 >>> 2353 DC Leiderdorp >>> hvanhov...@questtec.nl >>> +599 9 521 4402 >>> >>> >>> 2015-09-12 10:07 GMT+02:00 Nick Pentreath <nick.pentre...@gmail.com>: >>> >>>> Inspired by this post: >>>> http://eugenezhulenev.com/blog/2015/07/15/interactive-audience-analytics-with-spark-and-hyperloglog/, >>>> I've started putting together something based on the Spark 1.5 UDAF >>>> interface: https://gist.github.com/MLnick/eca566604f2e4e3c6141 >>>> >>>> Some questions - >>>> >>>> 1. How do I get the UDAF to accept input arguments of different type? >>>> We can hash anything basically for HLL - Int, Long, String, Object, raw >>>> bytes etc. Right now it seems we'd need to build a new UDAF for each input >>>> type, which seems strange - I should be able to use one UDAF that can >>>> handle raw input of different types, as well as handle existing HLLs that >>>> can be merged/aggregated (e.g. for grouped data) >>>> 2. @Reynold, how would I ensure this works for Tungsten (ie against raw >>>> bytes in memory)? Or does the new Aggregate2 stuff automatically do that? >>>> Where should I look for examples on how this works internally? >>>> 3. I've based this on the Sum and Avg examples for the new UDAF >>>> interface - any suggestions or issue please advise. Is the intermediate >
Re: HyperLogLogUDT
Yes - it's very interesting. However, ideally we should have a version of hyperloglog that can work directly against some raw bytes in memory (rather than java objects), in order for this to fit the Tungsten execution model where everything is operating directly against some memory address. On Wed, Jul 1, 2015 at 11:00 PM, Nick Pentreath nick.pentre...@gmail.com wrote: Sure I can copy the code but my aim was more to understand: (A) if this is broadly interesting enough to folks to think about updating / extending the existing UDAF within Spark (b) how to register ones own custom UDAF - in which case it could be a Spark package for example All examples deal with registering a UDF but nothing about UDAFs — Sent from Mailbox https://www.dropbox.com/mailbox On Wed, Jul 1, 2015 at 6:32 PM, Daniel Darabos daniel.dara...@lynxanalytics.com wrote: It's already possible to just copy the code from countApproxDistinct https://github.com/apache/spark/blob/v1.4.0/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1153 and access the HLL directly, or do anything you like. On Wed, Jul 1, 2015 at 5:26 PM, Nick Pentreath nick.pentre...@gmail.com wrote: Any thoughts? — Sent from Mailbox https://www.dropbox.com/mailbox On Tue, Jun 23, 2015 at 11:19 AM, Nick Pentreath nick.pentre...@gmail.com wrote: Hey Spark devs I've been looking at DF UDFs and UDAFs. The approx distinct is using hyperloglog, but there is only an option to return the count as a Long. It can be useful to be able to return and store the actual data structure (ie serialized HLL). This effectively allows one to do aggregation / rollups over columns while still preserving the ability to get distinct counts. For example, one can store daily aggregates of events, grouped by various columns, while storing for each grouping the HLL of say unique users. So you can get the uniques per day directly but could also very easily do arbitrary aggregates (say monthly, annually) and still be able to get a unique count for that period by merging the daily HLLS. I did this a while back as a Hive UDAF ( https://github.com/MLnick/hive-udf) which returns a Struct field containing a cardinality field and a binary field containing the serialized HLL. I was wondering if there would be interest in something like this? I am not so clear on how UDTs work with regards to SerDe - so could one adapt the HyperLogLogUDT to be a Struct with the serialized HLL as a field as well as count as a field? Then I assume this would automatically play nicely with DataFrame I/O etc. The gotcha is one needs to then call approx_count_field.count (or is there a concept of a default field for a Struct?). Also, being able to provide the bitsize parameter may be useful... The same thinking would apply potentially to other approximate (and mergeable) data structures like T-Digest and maybe CMS. Nick
Re: HyperLogLogUDT
Sure I can copy the code but my aim was more to understand: (A) if this is broadly interesting enough to folks to think about updating / extending the existing UDAF within Spark (b) how to register ones own custom UDAF - in which case it could be a Spark package for example All examples deal with registering a UDF but nothing about UDAFs — Sent from Mailbox On Wed, Jul 1, 2015 at 6:32 PM, Daniel Darabos daniel.dara...@lynxanalytics.com wrote: It's already possible to just copy the code from countApproxDistinct https://github.com/apache/spark/blob/v1.4.0/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1153 and access the HLL directly, or do anything you like. On Wed, Jul 1, 2015 at 5:26 PM, Nick Pentreath nick.pentre...@gmail.com wrote: Any thoughts? — Sent from Mailbox https://www.dropbox.com/mailbox On Tue, Jun 23, 2015 at 11:19 AM, Nick Pentreath nick.pentre...@gmail.com wrote: Hey Spark devs I've been looking at DF UDFs and UDAFs. The approx distinct is using hyperloglog, but there is only an option to return the count as a Long. It can be useful to be able to return and store the actual data structure (ie serialized HLL). This effectively allows one to do aggregation / rollups over columns while still preserving the ability to get distinct counts. For example, one can store daily aggregates of events, grouped by various columns, while storing for each grouping the HLL of say unique users. So you can get the uniques per day directly but could also very easily do arbitrary aggregates (say monthly, annually) and still be able to get a unique count for that period by merging the daily HLLS. I did this a while back as a Hive UDAF ( https://github.com/MLnick/hive-udf) which returns a Struct field containing a cardinality field and a binary field containing the serialized HLL. I was wondering if there would be interest in something like this? I am not so clear on how UDTs work with regards to SerDe - so could one adapt the HyperLogLogUDT to be a Struct with the serialized HLL as a field as well as count as a field? Then I assume this would automatically play nicely with DataFrame I/O etc. The gotcha is one needs to then call approx_count_field.count (or is there a concept of a default field for a Struct?). Also, being able to provide the bitsize parameter may be useful... The same thinking would apply potentially to other approximate (and mergeable) data structures like T-Digest and maybe CMS. Nick
Re: HyperLogLogUDT
Any thoughts? — Sent from Mailbox On Tue, Jun 23, 2015 at 11:19 AM, Nick Pentreath nick.pentre...@gmail.com wrote: Hey Spark devs I've been looking at DF UDFs and UDAFs. The approx distinct is using hyperloglog, but there is only an option to return the count as a Long. It can be useful to be able to return and store the actual data structure (ie serialized HLL). This effectively allows one to do aggregation / rollups over columns while still preserving the ability to get distinct counts. For example, one can store daily aggregates of events, grouped by various columns, while storing for each grouping the HLL of say unique users. So you can get the uniques per day directly but could also very easily do arbitrary aggregates (say monthly, annually) and still be able to get a unique count for that period by merging the daily HLLS. I did this a while back as a Hive UDAF (https://github.com/MLnick/hive-udf) which returns a Struct field containing a cardinality field and a binary field containing the serialized HLL. I was wondering if there would be interest in something like this? I am not so clear on how UDTs work with regards to SerDe - so could one adapt the HyperLogLogUDT to be a Struct with the serialized HLL as a field as well as count as a field? Then I assume this would automatically play nicely with DataFrame I/O etc. The gotcha is one needs to then call approx_count_field.count (or is there a concept of a default field for a Struct?). Also, being able to provide the bitsize parameter may be useful... The same thinking would apply potentially to other approximate (and mergeable) data structures like T-Digest and maybe CMS. Nick
Re: HyperLogLogUDT
It's already possible to just copy the code from countApproxDistinct https://github.com/apache/spark/blob/v1.4.0/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1153 and access the HLL directly, or do anything you like. On Wed, Jul 1, 2015 at 5:26 PM, Nick Pentreath nick.pentre...@gmail.com wrote: Any thoughts? — Sent from Mailbox https://www.dropbox.com/mailbox On Tue, Jun 23, 2015 at 11:19 AM, Nick Pentreath nick.pentre...@gmail.com wrote: Hey Spark devs I've been looking at DF UDFs and UDAFs. The approx distinct is using hyperloglog, but there is only an option to return the count as a Long. It can be useful to be able to return and store the actual data structure (ie serialized HLL). This effectively allows one to do aggregation / rollups over columns while still preserving the ability to get distinct counts. For example, one can store daily aggregates of events, grouped by various columns, while storing for each grouping the HLL of say unique users. So you can get the uniques per day directly but could also very easily do arbitrary aggregates (say monthly, annually) and still be able to get a unique count for that period by merging the daily HLLS. I did this a while back as a Hive UDAF ( https://github.com/MLnick/hive-udf) which returns a Struct field containing a cardinality field and a binary field containing the serialized HLL. I was wondering if there would be interest in something like this? I am not so clear on how UDTs work with regards to SerDe - so could one adapt the HyperLogLogUDT to be a Struct with the serialized HLL as a field as well as count as a field? Then I assume this would automatically play nicely with DataFrame I/O etc. The gotcha is one needs to then call approx_count_field.count (or is there a concept of a default field for a Struct?). Also, being able to provide the bitsize parameter may be useful... The same thinking would apply potentially to other approximate (and mergeable) data structures like T-Digest and maybe CMS. Nick
HyperLogLogUDT
Hey Spark devs I've been looking at DF UDFs and UDAFs. The approx distinct is using hyperloglog, but there is only an option to return the count as a Long. It can be useful to be able to return and store the actual data structure (ie serialized HLL). This effectively allows one to do aggregation / rollups over columns while still preserving the ability to get distinct counts. For example, one can store daily aggregates of events, grouped by various columns, while storing for each grouping the HLL of say unique users. So you can get the uniques per day directly but could also very easily do arbitrary aggregates (say monthly, annually) and still be able to get a unique count for that period by merging the daily HLLS. I did this a while back as a Hive UDAF (https://github.com/MLnick/hive-udf) which returns a Struct field containing a cardinality field and a binary field containing the serialized HLL. I was wondering if there would be interest in something like this? I am not so clear on how UDTs work with regards to SerDe - so could one adapt the HyperLogLogUDT to be a Struct with the serialized HLL as a field as well as count as a field? Then I assume this would automatically play nicely with DataFrame I/O etc. The gotcha is one needs to then call approx_count_field.count (or is there a concept of a default field for a Struct?). Also, being able to provide the bitsize parameter may be useful... The same thinking would apply potentially to other approximate (and mergeable) data structures like T-Digest and maybe CMS. Nick