Re: [DISCUSS][C++] Group by operation for RecordBatch and Table
Hi Wes, Thank you very much giving us the detail explanation of your thoughts. I need the knowledge of the SOTA of query engine you pointed out if I’ll contribute to C++ Query Engine or just write the binding of it. I’m studying the article and the codes. Regards, Kenta Murata On Thu, Aug 6, 2020 at 4:17 Wes McKinney wrote: > I see there's a bunch of additional aggregation code in Dremio that > might serve as inspiration (some of which is related to distributed > aggregation, so may not be relevant) > > > https://github.com/dremio/dremio-oss/tree/master/sabot/kernel/src/main/java/com/dremio/sabot/op/aggregate > > Maybe Andy or one of the other active Rust DataFusion developers can > comment on the approach taken for hash aggs there > > On Wed, Aug 5, 2020 at 1:52 PM Wes McKinney wrote: > > > > hi Kenta, > > > > Yes, I think it only makes sense to implement this in the context of > > the query engine project. Here's a list of assorted thoughts about it: > > > > * I have been mentally planning to follow the Vectorwise-type query > > engine architecture that's discussed in [1] [2] and many other > > academic papers. I believe this is how some other current generation > > open source columnar query engines work, such as Dremio [3] and DuckDB > > [4][5]. > > * Hash (aka "group") aggregations need to be able to process arbitrary > > expressions, not only a plain input column. So it's not enough to be > > able to compute "sum(x) group by y" where "x" and "y" are fields in a > > RecordBatch, we need to be able to compute "$AGG_FUNC($EXPR) GROUP BY > > $GROUP_EXPR_1, $GROUP_EXPR_2, ..." where $EXPR / $GROUP_EXPR_1 / ... > > are any column expressions computing from the input relations (keep in > > mind that an aggregation could apply to stream of record batches > > produced by a join). In any case, expression evaluation is a > > closely-related task and should be implemented ASAP. > > * Hash aggregation functions themselves should probably be introduced > > as a new Function type in arrow::compute. I don't think it would be > > appropriate to use the existing "SCALAR_AGGREGATE" functions, instead > > we should introduce a new HASH_AGGREGATE function type that accepts > > input data to be aggregated along with an array of pre-computed bucket > > ids (which are computed by probing the HT). So rather than > > Update(state, args) like we have for scalar aggregate, the primary > > interface for group aggregation is Update(state, bucket_ids, args) > > * The HashAggregation operator should be able to process an arbitrary > > iterator of record batches > > * We will probably want to adapt an existing or implement a new > > concurrent hash table so that aggregations can be performed in > > parallel without requiring a post-aggregation merge step > > * There's some general support machinery for hashing multiple fields > > and then doing efficient vectorized hash table probes (to assign > > aggregation bucket id's to each row position) > > > > I think it is worth investing the effort to build something that is > > reasonably consistent with the "state of the art" in database systems > > (at least according to what we are able to build with our current > > resources) rather than building something more crude that has to be > > replaced with new implementation later. > > > > I'd like to help personally with this work (particularly since the > > natural next step with my recent work in arrow/compute is to implement > > expression evaluation) but I won't have significant bandwidth for it > > until later this month or early September. If someone feels that they > > sufficiently understand the state of the art for this type of workload > > and wants to help with laying down the abstract C++ APIs for > > Volcano-style query execution and an implementation of hash > > aggregation, that sounds great. > > > > Thanks, > > Wes > > > > [1]: https://www.vldb.org/pvldb/vol11/p2209-kersten.pdf > > [2]: https://github.com/TimoKersten/db-engine-paradigms > > [3]: > https://github.com/dremio/dremio-oss/tree/master/sabot/kernel/src/main/java/com/dremio/sabot/op/aggregate/hash > > [4]: > https://github.com/cwida/duckdb/blob/master/src/include/duckdb/execution/aggregate_hashtable.hpp > > [5]: > https://github.com/cwida/duckdb/blob/master/src/execution/aggregate_hashtable.cpp > > > > On Wed, Aug 5, 2020 at 10:23 AM Kenta Murata wrote: > > > > > > Hi folks, > > > > > > Red Arrow, the Ruby binding of Arrow GLib, implements grouped > aggregation > > > features for RecordBatch and Table. Because these features are > written in > > > Ruby, they are too slow for large size data. We need to make them much > > > faster. > > > > > > To improve their calculation speed, they should be written in C++, and > > > should be put in Arrow C++ instead of Red Arrow. > > > > > > Is anyone working on implementing group-by operation for RecordBatch > and > > > Table in Arrow C++? If no one has worked on it, I would like to try > it. > > > > > > By the way, I
Re: [DISCUSS][C++] Group by operation for RecordBatch and Table
I see there's a bunch of additional aggregation code in Dremio that might serve as inspiration (some of which is related to distributed aggregation, so may not be relevant) https://github.com/dremio/dremio-oss/tree/master/sabot/kernel/src/main/java/com/dremio/sabot/op/aggregate Maybe Andy or one of the other active Rust DataFusion developers can comment on the approach taken for hash aggs there On Wed, Aug 5, 2020 at 1:52 PM Wes McKinney wrote: > > hi Kenta, > > Yes, I think it only makes sense to implement this in the context of > the query engine project. Here's a list of assorted thoughts about it: > > * I have been mentally planning to follow the Vectorwise-type query > engine architecture that's discussed in [1] [2] and many other > academic papers. I believe this is how some other current generation > open source columnar query engines work, such as Dremio [3] and DuckDB > [4][5]. > * Hash (aka "group") aggregations need to be able to process arbitrary > expressions, not only a plain input column. So it's not enough to be > able to compute "sum(x) group by y" where "x" and "y" are fields in a > RecordBatch, we need to be able to compute "$AGG_FUNC($EXPR) GROUP BY > $GROUP_EXPR_1, $GROUP_EXPR_2, ..." where $EXPR / $GROUP_EXPR_1 / ... > are any column expressions computing from the input relations (keep in > mind that an aggregation could apply to stream of record batches > produced by a join). In any case, expression evaluation is a > closely-related task and should be implemented ASAP. > * Hash aggregation functions themselves should probably be introduced > as a new Function type in arrow::compute. I don't think it would be > appropriate to use the existing "SCALAR_AGGREGATE" functions, instead > we should introduce a new HASH_AGGREGATE function type that accepts > input data to be aggregated along with an array of pre-computed bucket > ids (which are computed by probing the HT). So rather than > Update(state, args) like we have for scalar aggregate, the primary > interface for group aggregation is Update(state, bucket_ids, args) > * The HashAggregation operator should be able to process an arbitrary > iterator of record batches > * We will probably want to adapt an existing or implement a new > concurrent hash table so that aggregations can be performed in > parallel without requiring a post-aggregation merge step > * There's some general support machinery for hashing multiple fields > and then doing efficient vectorized hash table probes (to assign > aggregation bucket id's to each row position) > > I think it is worth investing the effort to build something that is > reasonably consistent with the "state of the art" in database systems > (at least according to what we are able to build with our current > resources) rather than building something more crude that has to be > replaced with new implementation later. > > I'd like to help personally with this work (particularly since the > natural next step with my recent work in arrow/compute is to implement > expression evaluation) but I won't have significant bandwidth for it > until later this month or early September. If someone feels that they > sufficiently understand the state of the art for this type of workload > and wants to help with laying down the abstract C++ APIs for > Volcano-style query execution and an implementation of hash > aggregation, that sounds great. > > Thanks, > Wes > > [1]: https://www.vldb.org/pvldb/vol11/p2209-kersten.pdf > [2]: https://github.com/TimoKersten/db-engine-paradigms > [3]: > https://github.com/dremio/dremio-oss/tree/master/sabot/kernel/src/main/java/com/dremio/sabot/op/aggregate/hash > [4]: > https://github.com/cwida/duckdb/blob/master/src/include/duckdb/execution/aggregate_hashtable.hpp > [5]: > https://github.com/cwida/duckdb/blob/master/src/execution/aggregate_hashtable.cpp > > On Wed, Aug 5, 2020 at 10:23 AM Kenta Murata wrote: > > > > Hi folks, > > > > Red Arrow, the Ruby binding of Arrow GLib, implements grouped aggregation > > features for RecordBatch and Table. Because these features are written in > > Ruby, they are too slow for large size data. We need to make them much > > faster. > > > > To improve their calculation speed, they should be written in C++, and > > should be put in Arrow C++ instead of Red Arrow. > > > > Is anyone working on implementing group-by operation for RecordBatch and > > Table in Arrow C++? If no one has worked on it, I would like to try it. > > > > By the way, I found that the grouped aggregation feature is mentioned in > > the design document of Arrow C++ Query Engine. Is Query Engine, not Arrow > > C++ Core, a suitable location to implement group-by operation?
Re: [DISCUSS][C++] Group by operation for RecordBatch and Table
hi Kenta, Yes, I think it only makes sense to implement this in the context of the query engine project. Here's a list of assorted thoughts about it: * I have been mentally planning to follow the Vectorwise-type query engine architecture that's discussed in [1] [2] and many other academic papers. I believe this is how some other current generation open source columnar query engines work, such as Dremio [3] and DuckDB [4][5]. * Hash (aka "group") aggregations need to be able to process arbitrary expressions, not only a plain input column. So it's not enough to be able to compute "sum(x) group by y" where "x" and "y" are fields in a RecordBatch, we need to be able to compute "$AGG_FUNC($EXPR) GROUP BY $GROUP_EXPR_1, $GROUP_EXPR_2, ..." where $EXPR / $GROUP_EXPR_1 / ... are any column expressions computing from the input relations (keep in mind that an aggregation could apply to stream of record batches produced by a join). In any case, expression evaluation is a closely-related task and should be implemented ASAP. * Hash aggregation functions themselves should probably be introduced as a new Function type in arrow::compute. I don't think it would be appropriate to use the existing "SCALAR_AGGREGATE" functions, instead we should introduce a new HASH_AGGREGATE function type that accepts input data to be aggregated along with an array of pre-computed bucket ids (which are computed by probing the HT). So rather than Update(state, args) like we have for scalar aggregate, the primary interface for group aggregation is Update(state, bucket_ids, args) * The HashAggregation operator should be able to process an arbitrary iterator of record batches * We will probably want to adapt an existing or implement a new concurrent hash table so that aggregations can be performed in parallel without requiring a post-aggregation merge step * There's some general support machinery for hashing multiple fields and then doing efficient vectorized hash table probes (to assign aggregation bucket id's to each row position) I think it is worth investing the effort to build something that is reasonably consistent with the "state of the art" in database systems (at least according to what we are able to build with our current resources) rather than building something more crude that has to be replaced with new implementation later. I'd like to help personally with this work (particularly since the natural next step with my recent work in arrow/compute is to implement expression evaluation) but I won't have significant bandwidth for it until later this month or early September. If someone feels that they sufficiently understand the state of the art for this type of workload and wants to help with laying down the abstract C++ APIs for Volcano-style query execution and an implementation of hash aggregation, that sounds great. Thanks, Wes [1]: https://www.vldb.org/pvldb/vol11/p2209-kersten.pdf [2]: https://github.com/TimoKersten/db-engine-paradigms [3]: https://github.com/dremio/dremio-oss/tree/master/sabot/kernel/src/main/java/com/dremio/sabot/op/aggregate/hash [4]: https://github.com/cwida/duckdb/blob/master/src/include/duckdb/execution/aggregate_hashtable.hpp [5]: https://github.com/cwida/duckdb/blob/master/src/execution/aggregate_hashtable.cpp On Wed, Aug 5, 2020 at 10:23 AM Kenta Murata wrote: > > Hi folks, > > Red Arrow, the Ruby binding of Arrow GLib, implements grouped aggregation > features for RecordBatch and Table. Because these features are written in > Ruby, they are too slow for large size data. We need to make them much > faster. > > To improve their calculation speed, they should be written in C++, and > should be put in Arrow C++ instead of Red Arrow. > > Is anyone working on implementing group-by operation for RecordBatch and > Table in Arrow C++? If no one has worked on it, I would like to try it. > > By the way, I found that the grouped aggregation feature is mentioned in > the design document of Arrow C++ Query Engine. Is Query Engine, not Arrow > C++ Core, a suitable location to implement group-by operation?
[DISCUSS][C++] Group by operation for RecordBatch and Table
Hi folks, Red Arrow, the Ruby binding of Arrow GLib, implements grouped aggregation features for RecordBatch and Table. Because these features are written in Ruby, they are too slow for large size data. We need to make them much faster. To improve their calculation speed, they should be written in C++, and should be put in Arrow C++ instead of Red Arrow. Is anyone working on implementing group-by operation for RecordBatch and Table in Arrow C++? If no one has worked on it, I would like to try it. By the way, I found that the grouped aggregation feature is mentioned in the design document of Arrow C++ Query Engine. Is Query Engine, not Arrow C++ Core, a suitable location to implement group-by operation?