I don't think you need to do anything differently. This looks like the correct way to run this. The arrow streaming engine is pretty new and probably has some good low hanging performance improvements that could be made. I think most of the dedicated performance work has been on either optimizing kernels or optimizing scanning from disk (which isn't being measured here).
I notice you mention "scaling well", are you trying this query on larger scale factors as well? The lineitems table at SF1 is pretty small (~6million rows if I recall) and I'd have to look into it more but I would guess pyarrow is scanning the table in 1M chunks so you won't see much parallelism beyond 6 cores. If I had to guess I might point to that as the root cause. Another possibility is the suggestion I had at [1]. I would guess that pyarrow is using a scanner source node instead of the table source node (since the table source node was created after the pyarrow group_by/aggregate functionality) and that adds a bit of overhead when scanning something already in memory. Beyond those two guesses I'm sure we are doing something less efficient than we could. This test case is helpful, I will jot it down to look into further when I get some spare cycles. In the meantime I'd encourage you to dig in further. A quick run through perf could probably tell you if one approach or the other is utilizing more cores (I'd first crank up the # of runs so the query execution time is much greater than the setup time). [1] https://github.com/duckdb/duckdb/issues/3138 On Sat, Mar 5, 2022 at 9:11 AM H G <zac...@gmail.com> wrote: > > Pyarrow is not scaling well when compared with duckdb. Is there something > that needs to be done differently? > > Minimal example: > > import pyarrow.parquet as pq > lineitem = pq.read_table('lineitemsf1.snappy.parquet') > con = duckdb.connect() > > %timeit lineitem.group_by("l_returnflag").aggregate([("l_extendedprice", > "sum")]) > ungrouped_aggregate = '''SELECT SUM(l_extendedprice) FROM lineitem GROUP BY > l_returnflag''' > > %timeit con.execute(ungrouped_aggregate).fetch_arrow_table() > > Results > %timeit lineitem.group_by("l_returnflag").aggregate([("l_extendedprice", > "sum")]) > 207 ms ± 9.31 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) > > %timeit con.execute(ungrouped_aggregate).fetch_arrow_table() > 71.8 ms ± 2.31 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)