GitHub user pepijnve edited a discussion: Multiple 'group by's, one scan
In the system I'm working on I want to perform multiple aggregates using
different group by criteria over large data sets.
I don't think grouping sets are an option since those support computing a
single set of aggregates over multiple groupings. What I'm trying to achieve
instead is one multiple sets of aggregates that each have their own group by
strategy.
A simple way to do this is to just run multiple queries of course. That works
but requires scanning through the data multiple times. That becomes prohibitive
pretty quickly as the number of sets of aggregates increases.
While I was experimenting with the multiple query approach and combining those
into a single query using 'union all' I started wondering if I couldn't write
an operator to have my cake and eat it. So rather than this:
```
select 1 as setid, k1 as groupkey1, count(1) as agg1, null as groupkey2, null
as agg2 from table group by groupkey1
union all
select 2 as setid, null as groupkey1, null as agg1, k2 as groupkey2, sum(col1)
as agg2 from table group by groupkey2
```
which results in a logical plan that sort of looks like this (edited for
brevity/clarity)
```
Union
Projection: 1 AS setid, k1 AS groupkey1, count(1) AS agg1, NULL AS
groupkey2, NULL AS agg2
Aggregate: groupBy=[k1], aggr=[[count(1)]] |
TableScan: table
Projection: 2 AS setid, NULL AS groupkey1, NULL AS agg1, k2 AS groupkey2,
count(1) AS agg2
Aggregate: groupBy=[k2], aggr=[[sum(col1)]]
TableScan: table
```
what I would want to do instead is something like this
```
Unify
Projection: 1 AS setid, k1 AS groupkey1, count(1) AS agg1, NULL AS
groupkey2, NULL AS agg2
Aggregate: groupBy=[k1], aggr=[[count(1)]] |
CommonInputPlaceholder
Projection: 2 AS setid, NULL AS groupkey1, NULL AS agg1, k2 AS groupkey2,
count(1) AS agg2
Aggregate: groupBy=[k2], aggr=[[sum(col1)]]
CommonInputPlaceholder
CommonInput
TableScan: table
```
`CommonInputPlaceholder` is a stub node that has the same schema as the
`CommonInput` child.
The Unify operator works by setting up queues for each
`CommonInputPlaceholder`. It polls the `CommonInput` child, and places a
duplicate of each record batch it receives onto each queue. This is kind of
similar to how RepartitionExec does its thing but instead of assigning each
record batch once, we duplicate and assign it multiple times.
With quite some trial and error I've been able to get something up and running,
but I have a feeling I'm going against the grain of the framework. Getting the
optimizer to do the right thing for instance proved to be a challenge since it
expects plans to be trees rather than DAGs.
My question for the group is if someone else has tried to implement something
like this before? Or if what I'm trying to accomplish can be done in some other
way? Perhaps someone has advice on how to best go about implementing this?
I realize this colors outside the lines of what you can express in SQL (as far
as I know at least). I'm creating my queries by directly instantiating logical
plans so for now that's not an issue for the system I'm working on.
Edit: I accidentally ended up writing an example that _can_ be done with
grouping sets since the sets of aggregates were identical. Update example to
use different aggregates.
GitHub link: https://github.com/apache/datafusion/discussions/15982
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]