I see there's a bunch of additional aggregation code in Dremio that might serve as inspiration (some of which is related to distributed aggregation, so may not be relevant)
https://github.com/dremio/dremio-oss/tree/master/sabot/kernel/src/main/java/com/dremio/sabot/op/aggregate Maybe Andy or one of the other active Rust DataFusion developers can comment on the approach taken for hash aggs there On Wed, Aug 5, 2020 at 1:52 PM Wes McKinney <wesmck...@gmail.com> wrote: > > hi Kenta, > > Yes, I think it only makes sense to implement this in the context of > the query engine project. Here's a list of assorted thoughts about it: > > * I have been mentally planning to follow the Vectorwise-type query > engine architecture that's discussed in [1] [2] and many other > academic papers. I believe this is how some other current generation > open source columnar query engines work, such as Dremio [3] and DuckDB > [4][5]. > * Hash (aka "group") aggregations need to be able to process arbitrary > expressions, not only a plain input column. So it's not enough to be > able to compute "sum(x) group by y" where "x" and "y" are fields in a > RecordBatch, we need to be able to compute "$AGG_FUNC($EXPR) GROUP BY > $GROUP_EXPR_1, $GROUP_EXPR_2, ..." where $EXPR / $GROUP_EXPR_1 / ... > are any column expressions computing from the input relations (keep in > mind that an aggregation could apply to stream of record batches > produced by a join). In any case, expression evaluation is a > closely-related task and should be implemented ASAP. > * Hash aggregation functions themselves should probably be introduced > as a new Function type in arrow::compute. I don't think it would be > appropriate to use the existing "SCALAR_AGGREGATE" functions, instead > we should introduce a new HASH_AGGREGATE function type that accepts > input data to be aggregated along with an array of pre-computed bucket > ids (which are computed by probing the HT). So rather than > Update(state, args) like we have for scalar aggregate, the primary > interface for group aggregation is Update(state, bucket_ids, args) > * The HashAggregation operator should be able to process an arbitrary > iterator of record batches > * We will probably want to adapt an existing or implement a new > concurrent hash table so that aggregations can be performed in > parallel without requiring a post-aggregation merge step > * There's some general support machinery for hashing multiple fields > and then doing efficient vectorized hash table probes (to assign > aggregation bucket id's to each row position) > > I think it is worth investing the effort to build something that is > reasonably consistent with the "state of the art" in database systems > (at least according to what we are able to build with our current > resources) rather than building something more crude that has to be > replaced with new implementation later. > > I'd like to help personally with this work (particularly since the > natural next step with my recent work in arrow/compute is to implement > expression evaluation) but I won't have significant bandwidth for it > until later this month or early September. If someone feels that they > sufficiently understand the state of the art for this type of workload > and wants to help with laying down the abstract C++ APIs for > Volcano-style query execution and an implementation of hash > aggregation, that sounds great. > > Thanks, > Wes > > [1]: https://www.vldb.org/pvldb/vol11/p2209-kersten.pdf > [2]: https://github.com/TimoKersten/db-engine-paradigms > [3]: > https://github.com/dremio/dremio-oss/tree/master/sabot/kernel/src/main/java/com/dremio/sabot/op/aggregate/hash > [4]: > https://github.com/cwida/duckdb/blob/master/src/include/duckdb/execution/aggregate_hashtable.hpp > [5]: > https://github.com/cwida/duckdb/blob/master/src/execution/aggregate_hashtable.cpp > > On Wed, Aug 5, 2020 at 10:23 AM Kenta Murata <m...@mrkn.jp> wrote: > > > > Hi folks, > > > > Red Arrow, the Ruby binding of Arrow GLib, implements grouped aggregation > > features for RecordBatch and Table. Because these features are written in > > Ruby, they are too slow for large size data. We need to make them much > > faster. > > > > To improve their calculation speed, they should be written in C++, and > > should be put in Arrow C++ instead of Red Arrow. > > > > Is anyone working on implementing group-by operation for RecordBatch and > > Table in Arrow C++? If no one has worked on it, I would like to try it. > > > > By the way, I found that the grouped aggregation feature is mentioned in > > the design document of Arrow C++ Query Engine. Is Query Engine, not Arrow > > C++ Core, a suitable location to implement group-by operation?