@Adam

This is something that has come up on the list before, you may be thinking
of http://blinkdb.org/. This is something that would definitely be
interesting to explore once we are stable and passed 1.0. We certainly can
try to help you along if you would like to start some of this work.

@Marcin

Please do file a bug for the error you are seeing, we are aggressively
hunting down remaining bugs like this.

In regards to the UDAF, the implementation of the function itself would not
be too difficult, but as Jinfeng said, we currently do not track data
sorted-ness as a trait in the query planner. In regards to your point about
this only being used in this specific situation, this introduces some
dangerous possibilities for the planner. Even if your data is sorted on
disk, if it is in a couple of files, or in large splittable files, we will
try to parallelize the read and various other parts of the query. If we
don't know you want to preserve the original order, we may plan the query
in a way that would change the order by the time it made it to your
function. That is why it is important to expose this sorted-ness trait from
the storage system, as well as define this new concept of a aggregate
function that can only be used in streaming aggregate.

Someone can correct me if I'm wrong, but I believe we currently consider a
sort followed by a streaming aggregate to be functionally equivalent to a
hash aggregation without the sort. Both of these methods as far as I know
employ the same aggregate functions.

Taking a look at a plan for a distinct aggregate with hash agg turned off,
it is planned as a two phase aggregation. First grouping by the column to
count, and then counting the resulting rows of that aggregation. The plan
with hash aggregate on also has two phases, but will consume more memory
and possibly exceed your cluster limits.

00-00    Screen
00-01      StreamAgg(group=[{}], EXPR$0=[COUNT($0)])
00-02        StreamAgg(group=[{0}])
00-03          Sort(sort0=[$0], dir0=[ASC])
00-04            Scan(groupscan=[ParquetGroupScan
[entries=[ReadEntryWithPath [path=/tpch/lineitem.parquet]],
selectionRoot=/tpch/lineitem.parquet, numFiles=1, columns=[`l_orderkey`]]])

| 00-00    Screen
00-01      StreamAgg(group=[{}], EXPR$0=[COUNT($0)])
00-02        HashAgg(group=[{0}])
00-03          Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath
[path=/tpch/lineitem.parquet]], selectionRoot=/tpch/lineitem.parquet,
numFiles=1, columns=[`l_orderkey`]]])

If your dataset is very large, the strategy you suggest could allow you to
do a single phase aggregation where you track the count yourself by looking
when the value changes. I think the best path for now would be to file the
bug and see if we can't get it fixed in an upcoming release, you very well
might get the performance you need out of the existing two phase
aggregation.

- Jason



On Wed, Apr 8, 2015 at 11:38 PM, Adam Gilmore <dragoncu...@gmail.com> wrote:

> Ted - I'd be really interested in doing something like that (approximate
> aggregation results).  This would be very interesting in terms of standard
> deviation, median, etc.
>
> I know there is another project out there that trades off speed vs accuracy
> (the name of which escapes me).  If we could easily add something onto
> Drill like this, it'd be hugely beneficial.
>
> On Wed, Apr 8, 2015 at 8:25 AM, Ted Dunning <ted.dunn...@gmail.com> wrote:
>
> > Marcin,
> >
> > They did comment.  The answer is that the default is to use hashed
> > aggregation (which will be faster when there is lots of memory) with the
> > option to use sort aggregation (which is basically what you were
> > suggesting).
> >
> > Did you mean to suggest that your data is already known to be sorted and
> > thus the sort step should be omitted?
> >
> >
> > On Tue, Apr 7, 2015 at 3:21 PM, Marcin Karpinski <mkarpin...@opera.com>
> > wrote:
> >
> > > @Jacques, thanks for the information - I'm definitely going to check
> out
> > > that option.
> > >
> > > I'm also curious that none of you guys commented on my original idea of
> > > counting distinct values by a simple aggregation of pre-sorted data -
> is
> > it
> > > because it doesn't make sense to you guys, or because you think your
> > > suggestions are easier to implement?
> > >
> > > On Tue, Apr 7, 2015 at 5:55 PM, Jacques Nadeau <jacq...@apache.org>
> > wrote:
> > >
> > > > Two additional notes here:
> > > >
> > > > Drill can actually do an aggregation using either a hash table based
> > > > aggregation or a sort based aggregation.  By default, generally the
> > hash
> > > > aggregation will be selected first.  However, you can disable hash
> > based
> > > > aggregation if you specifically think that a sort based aggregation
> > will
> > > > perform better for use case.  You can do this by running the command
> > > ALTER
> > > > SESSION SET `planner.enable_hashagg` = FALSE;
> > > >
> > > > We have always had it on our roadmap to implement an approximate
> count
> > > > distinct function but haven't gotten to it yet.  As Ted mentions,
> using
> > > > this technique would substantially reduce data shuffling and could be
> > > done
> > > > with a moderate level of effort since our UDAF interface is
> pluggable.
> > > >
> > > >
> > > >
> > > > On Tue, Apr 7, 2015 at 8:20 AM, Ted Dunning <ted.dunn...@gmail.com>
> > > wrote:
> > > >
> > > > > How precise do your counts need to be?  Can you accept a fraction
> of
> > a
> > > > > percent statistical error?
> > > > >
> > > > >
> > > > >
> > > > > On Tue, Apr 7, 2015 at 8:11 AM, Aman Sinha <asi...@maprtech.com>
> > > wrote:
> > > > >
> > > > > > Drill already does most of this type of transformation.  If you
> do
> > an
> > > > > > 'EXPLAIN PLAN FOR <your count(distinct) query>'
> > > > > > you will see that it first does a grouping on the column and then
> > > > applies
> > > > > > the COUNT(column).  The first level grouping can be done either
> > based
> > > > on
> > > > > > sorting or hashing and this is configurable through a system
> > option.
> > > > > >
> > > > > > Aman
> > > > > >
> > > > > > On Tue, Apr 7, 2015 at 3:30 AM, Marcin Karpinski <
> > > mkarpin...@opera.com
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Guys,
> > > > > > >
> > > > > > > I have a specific use case for Drill, in which I'd like to be
> > able
> > > to
> > > > > > count
> > > > > > > unique values in columns with tens millions of distinct values.
> > The
> > > > > COUNT
> > > > > > > DISTINCT method, unfortunately, does not scale both time- and
> > > > > memory-wise
> > > > > > > and the idea is to sort the data beforehand by the values of
> that
> > > > > column
> > > > > > > (let's call it ID), to have the row groups split at new a new
> ID
> > > > > boundary
> > > > > > > and to extend Drill with an alternative version of COUNT that
> > would
> > > > > > simply
> > > > > > > count the number of times the ID changes through out the entire
> > > > table.
> > > > > > This
> > > > > > > way, we could expect that counting unique values of pre-sorted
> > > > columns
> > > > > > > could have complexity comparable to that of the regular COUNT
> > > > operator
> > > > > (a
> > > > > > > full scan). So, to sum up, I have three questions:
> > > > > > >
> > > > > > > 1. Can such a scenario be realized in Drill?
> > > > > > > 2. Can it be done in a modular way (eg, a dedicated UDAF or an
> > > > > operator),
> > > > > > > so without heavy hacking throughout entire Drill?
> > > > > > > 3. How to do it?
> > > > > > >
> > > > > > > Our initial experience with Drill was very good - it's an
> > excellent
> > > > > tool.
> > > > > > > But in order to be able to adopt it, we need to sort out this
> one
> > > > > central
> > > > > > > issue.
> > > > > > >
> > > > > > > Cheers,
> > > > > > > Marcin
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to