@Adam This is something that has come up on the list before, you may be thinking of http://blinkdb.org/. This is something that would definitely be interesting to explore once we are stable and passed 1.0. We certainly can try to help you along if you would like to start some of this work.
@Marcin Please do file a bug for the error you are seeing, we are aggressively hunting down remaining bugs like this. In regards to the UDAF, the implementation of the function itself would not be too difficult, but as Jinfeng said, we currently do not track data sorted-ness as a trait in the query planner. In regards to your point about this only being used in this specific situation, this introduces some dangerous possibilities for the planner. Even if your data is sorted on disk, if it is in a couple of files, or in large splittable files, we will try to parallelize the read and various other parts of the query. If we don't know you want to preserve the original order, we may plan the query in a way that would change the order by the time it made it to your function. That is why it is important to expose this sorted-ness trait from the storage system, as well as define this new concept of a aggregate function that can only be used in streaming aggregate. Someone can correct me if I'm wrong, but I believe we currently consider a sort followed by a streaming aggregate to be functionally equivalent to a hash aggregation without the sort. Both of these methods as far as I know employ the same aggregate functions. Taking a look at a plan for a distinct aggregate with hash agg turned off, it is planned as a two phase aggregation. First grouping by the column to count, and then counting the resulting rows of that aggregation. The plan with hash aggregate on also has two phases, but will consume more memory and possibly exceed your cluster limits. 00-00 Screen 00-01 StreamAgg(group=[{}], EXPR$0=[COUNT($0)]) 00-02 StreamAgg(group=[{0}]) 00-03 Sort(sort0=[$0], dir0=[ASC]) 00-04 Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath [path=/tpch/lineitem.parquet]], selectionRoot=/tpch/lineitem.parquet, numFiles=1, columns=[`l_orderkey`]]]) | 00-00 Screen 00-01 StreamAgg(group=[{}], EXPR$0=[COUNT($0)]) 00-02 HashAgg(group=[{0}]) 00-03 Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath [path=/tpch/lineitem.parquet]], selectionRoot=/tpch/lineitem.parquet, numFiles=1, columns=[`l_orderkey`]]]) If your dataset is very large, the strategy you suggest could allow you to do a single phase aggregation where you track the count yourself by looking when the value changes. I think the best path for now would be to file the bug and see if we can't get it fixed in an upcoming release, you very well might get the performance you need out of the existing two phase aggregation. - Jason On Wed, Apr 8, 2015 at 11:38 PM, Adam Gilmore <dragoncu...@gmail.com> wrote: > Ted - I'd be really interested in doing something like that (approximate > aggregation results). This would be very interesting in terms of standard > deviation, median, etc. > > I know there is another project out there that trades off speed vs accuracy > (the name of which escapes me). If we could easily add something onto > Drill like this, it'd be hugely beneficial. > > On Wed, Apr 8, 2015 at 8:25 AM, Ted Dunning <ted.dunn...@gmail.com> wrote: > > > Marcin, > > > > They did comment. The answer is that the default is to use hashed > > aggregation (which will be faster when there is lots of memory) with the > > option to use sort aggregation (which is basically what you were > > suggesting). > > > > Did you mean to suggest that your data is already known to be sorted and > > thus the sort step should be omitted? > > > > > > On Tue, Apr 7, 2015 at 3:21 PM, Marcin Karpinski <mkarpin...@opera.com> > > wrote: > > > > > @Jacques, thanks for the information - I'm definitely going to check > out > > > that option. > > > > > > I'm also curious that none of you guys commented on my original idea of > > > counting distinct values by a simple aggregation of pre-sorted data - > is > > it > > > because it doesn't make sense to you guys, or because you think your > > > suggestions are easier to implement? > > > > > > On Tue, Apr 7, 2015 at 5:55 PM, Jacques Nadeau <jacq...@apache.org> > > wrote: > > > > > > > Two additional notes here: > > > > > > > > Drill can actually do an aggregation using either a hash table based > > > > aggregation or a sort based aggregation. By default, generally the > > hash > > > > aggregation will be selected first. However, you can disable hash > > based > > > > aggregation if you specifically think that a sort based aggregation > > will > > > > perform better for use case. You can do this by running the command > > > ALTER > > > > SESSION SET `planner.enable_hashagg` = FALSE; > > > > > > > > We have always had it on our roadmap to implement an approximate > count > > > > distinct function but haven't gotten to it yet. As Ted mentions, > using > > > > this technique would substantially reduce data shuffling and could be > > > done > > > > with a moderate level of effort since our UDAF interface is > pluggable. > > > > > > > > > > > > > > > > On Tue, Apr 7, 2015 at 8:20 AM, Ted Dunning <ted.dunn...@gmail.com> > > > wrote: > > > > > > > > > How precise do your counts need to be? Can you accept a fraction > of > > a > > > > > percent statistical error? > > > > > > > > > > > > > > > > > > > > On Tue, Apr 7, 2015 at 8:11 AM, Aman Sinha <asi...@maprtech.com> > > > wrote: > > > > > > > > > > > Drill already does most of this type of transformation. If you > do > > an > > > > > > 'EXPLAIN PLAN FOR <your count(distinct) query>' > > > > > > you will see that it first does a grouping on the column and then > > > > applies > > > > > > the COUNT(column). The first level grouping can be done either > > based > > > > on > > > > > > sorting or hashing and this is configurable through a system > > option. > > > > > > > > > > > > Aman > > > > > > > > > > > > On Tue, Apr 7, 2015 at 3:30 AM, Marcin Karpinski < > > > mkarpin...@opera.com > > > > > > > > > > > wrote: > > > > > > > > > > > > > Hi Guys, > > > > > > > > > > > > > > I have a specific use case for Drill, in which I'd like to be > > able > > > to > > > > > > count > > > > > > > unique values in columns with tens millions of distinct values. > > The > > > > > COUNT > > > > > > > DISTINCT method, unfortunately, does not scale both time- and > > > > > memory-wise > > > > > > > and the idea is to sort the data beforehand by the values of > that > > > > > column > > > > > > > (let's call it ID), to have the row groups split at new a new > ID > > > > > boundary > > > > > > > and to extend Drill with an alternative version of COUNT that > > would > > > > > > simply > > > > > > > count the number of times the ID changes through out the entire > > > > table. > > > > > > This > > > > > > > way, we could expect that counting unique values of pre-sorted > > > > columns > > > > > > > could have complexity comparable to that of the regular COUNT > > > > operator > > > > > (a > > > > > > > full scan). So, to sum up, I have three questions: > > > > > > > > > > > > > > 1. Can such a scenario be realized in Drill? > > > > > > > 2. Can it be done in a modular way (eg, a dedicated UDAF or an > > > > > operator), > > > > > > > so without heavy hacking throughout entire Drill? > > > > > > > 3. How to do it? > > > > > > > > > > > > > > Our initial experience with Drill was very good - it's an > > excellent > > > > > tool. > > > > > > > But in order to be able to adopt it, we need to sort out this > one > > > > > central > > > > > > > issue. > > > > > > > > > > > > > > Cheers, > > > > > > > Marcin > > > > > > > > > > > > > > > > > > > > > > > > > > > >