I don't think you need to do anything differently.  This looks like
the correct way to run this.  The arrow streaming engine is pretty new
and probably has some good low hanging performance improvements that
could be made.  I think most of the dedicated performance work has
been on either optimizing kernels or optimizing scanning from disk
(which isn't being measured here).

I notice you mention "scaling well", are you trying this query on
larger scale factors as well?  The lineitems table at SF1 is pretty
small (~6million rows if I recall) and I'd have to look into it more
but I would guess pyarrow is scanning the table in 1M chunks so you
won't see much parallelism beyond 6 cores.  If I had to guess I might
point to that as the root cause.

Another possibility is the suggestion I had at [1].  I would guess
that pyarrow is using a scanner source node instead of the table
source node (since the table source node was created after the pyarrow
group_by/aggregate functionality) and that adds a bit of overhead when
scanning something already in memory.

Beyond those two guesses I'm sure we are doing something less
efficient than we could.  This test case is helpful, I will jot it
down to look into further when I get some spare cycles.  In the
meantime I'd encourage you to dig in further.  A quick run through
perf could probably tell you if one approach or the other is utilizing
more cores (I'd first crank up the # of runs so the query execution
time is much greater than the setup time).

[1] https://github.com/duckdb/duckdb/issues/3138

On Sat, Mar 5, 2022 at 9:11 AM H G <zac...@gmail.com> wrote:
>
> Pyarrow is not scaling well when compared with duckdb. Is there something 
> that needs to be done differently?
>
> Minimal example:
>
> import pyarrow.parquet as pq
> lineitem = pq.read_table('lineitemsf1.snappy.parquet')
> con = duckdb.connect()
>
> %timeit lineitem.group_by("l_returnflag").aggregate([("l_extendedprice", 
> "sum")])
> ungrouped_aggregate = '''SELECT SUM(l_extendedprice) FROM lineitem GROUP BY 
> l_returnflag'''
>
> %timeit con.execute(ungrouped_aggregate).fetch_arrow_table()
>
> Results
> %timeit lineitem.group_by("l_returnflag").aggregate([("l_extendedprice", 
> "sum")])
> 207 ms ± 9.31 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
>
> %timeit con.execute(ungrouped_aggregate).fetch_arrow_table()
> 71.8 ms ± 2.31 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

Reply via email to