Note that there are a few aggregates that cannot be distributed (e.g.
median).  Currently, the way aggregates are implemented in Acero, we
distribute across threads, and so we don't actually have an implementation
for these aggregates anyways so probably a bit of a moot point (although
there is some work being done to support those types aggregates in certain
workloads).

> Since acero's aggregate kernels already maintain such
> intermediate status internally, I wonder if it is possible
> to have some APIs in aggregate kernels to retrieve these
> intermediate status to enable such use scenarios. Thanks.

One potential challenge is that there is no guarantee the intermediate
state is representable as Arrow data.  However, I don't know of any that
are not at the moment.

One possible API would be to add an "output_intermediate" option to the
AggregateNodeOptions.  If set to true then, instead of Finalize, we call
some other aggregate method FinalizeIntermediate.  This would require
changes to the aggregate kernels (they will need to add this new method).
They will need to convert their state into an exec batch / scalar.  I don't
think this is a ton of work though so it could be doable if someone were
motivated enough.

> 2) a materialized view that stores the intermediate status
> for aggregation, so that partial aggregated results (the
> intermediate status of aggregation) is stored in materialized
> view on disk, it will be faster when reading the materialized
> view since only the `finalize` computation is needed to get the results

I'm not sure I understand the value in storing the intermediate result as a
materialized view instead of just storing the finalized computation.

--

Note that, in addition to aggregation, sorting is another operation that
typically needs a special node when used in a distributed solution.
Typically each partition-worker sorts with a regular sort node.  Then the
final stage uses an "order preserving merge" to read in the data from the
various inputs.  This is sometimes a dedicated node and sometimes a
consideration of the "exchange/capture" node depending on the engine.

On Fri, Jul 7, 2023 at 1:19 AM Jiangtao Peng <pengjtn...@gmail.com> wrote:

> Hi Sasha,
>
> So far we have two use scenarios that may need the intermediate status of
> aggregate kernels during its consumption:
> 1) a shuffle-free single stage distributed query engine. We have our data
> partitioned and stored in multiple nodes, and would like to create a query
> plan with multiple fragments and retrieves partitioned data from all these
> nodes in parallel for better performance. Data shuffling is non trivial to
> implement for us, and we are looking for an approach that is simpler to
> implement. For aggregation query, one way to do it seems to: split the
> aggregation into pre-aggregation and finalize/combine two steps. For
> pre-aggregation, the aggregation operator only consumes the data and stores
> the intermediate results internally. For the `finalize/combine` step, it
> combines multiple partitioned intermediate results as the final result.
>
> 2) a materialized view that stores the intermediate status for
> aggregation, so that partial aggregated results (the intermediate status of
> aggregation) is stored in materialized view on disk, it will be faster when
> reading the materialized view since only the `finalize` computation is
> needed to get the results
>
> Although for some aggregation kernel such as `avg`, we could use two
> existing aggregate kernels (sum/count) to manually maintain such
> intermediate status, but it requires developers to understand how these
> kernels are implemented internally, which is probably not easy and new
> aggregate kernels may be added in the future.
>
> Since acero's aggregate kernels already maintain such intermediate status
> internally, I wonder if it is possible to have some APIs in aggregate
> kernels to retrieve these intermediate status to enable such use scenarios.
> Thanks.
>
>
>
> Jiangtao
>
>
>
> *From: *Sasha Krassovsky <krassovskysa...@gmail.com>
> *Date: *Friday, July 7, 2023 at 2:21 PM
> *To: *user@arrow.apache.org <user@arrow.apache.org>
> *Subject: *Re: [C++][Acero] can Acero support distributed computation?
>
> Yes, what you’ve said is correct for Mean. But my point earlier is that
> there should only be a few of such special cases. A simple case would be
> e.g. Max, where Aggregate outputs Max and then merge outputs Max(Max).
>
>
>
> Sasha
>
>
>
> 6 июля 2023 г., в 23:13, Jiangtao Peng <pengjtn...@gmail.com> написал(а):
>
> 
>
> Sorry for my unclear expression.
>
>
>
> Take mean aggregation as example, does Aggregate "output" sum and count
> value, and Accumulate will "input" sum and count value, then "merge"
> sum(sum)/sum(count) as "output"?
>
> My point is how to implement Pre-Aggregation and Post-Aggregation using
> Acero.
>
>
>
> Best,
>
> Jiangtao
>
>
>
>
>
> *From: *Sasha Krassovsky <krassovskysa...@gmail.com>
> *Date: *Friday, July 7, 2023 at 1:25 PM
> *To: *user@arrow.apache.org <user@arrow.apache.org>
> *Subject: *Re: [C++][Acero] can Acero support distributed computation?
>
> Can you clarify what you mean by “data flow”? Each machine will be
> executing the same query plan. The query plan will contain an operator
> called Shuffle, and above the Shuffle will be an Aggregate, and above that
> will be an Accumulate node. The SourceNode will read data from disk on each
> machine. The data will then be put through the ShuffleNode, partitioned,
> and sent to other machines. The other machines will perform their local
> aggregations until they all agree that input is finished. Next the data on
> each machine will be input into the Accumulate node, which ships it to the
> master. The master will then perform the merge on all of the data it
> receives from all the participating machines.
>
>
>
> Sasha
>
>
>
>
> 6 июля 2023 г., в 22:17, Jiangtao Peng <pengjtn...@gmail.com> написал(а):
>
> 
>
> Sure, distributed aggregations can be split into “compute” and “merge” two
> phase. But how about data flow of “compute” and “merge” on different nodes?
>
>
>
>
>
> Best,
>
> Jiangtao
>
>
>
> *From: *Sasha Krassovsky <krassovskysa...@gmail.com>
> *Date: *Friday, July 7, 2023 at 11:07 AM
> *To: *user@arrow.apache.org <user@arrow.apache.org>
> *Subject: *Re: [C++][Acero] can Acero support distributed computation?
>
> Distributed aggregations have two phases: “compute” (each node does its
> own aggregation) and “merge” (the master merged the partial results). For
> most aggregates (e.g. Sum, Min, Max), merge is just the same as compute,
> except over the partial results. However, for Mean in particular, the
> compute phase will actually be sum and count separately, and the merge
> phase will be sum of partial sums divided by sum of partial counts.
> Count(distinct) will be merged by Sum as well. There may be other special
> cases I’m not thinking of, that’s the main idea.
>
>
>
> Sasha
>
>
>
> 6 июля 2023 г., в 19:29, Jiangtao Peng <pengjtn...@gmail.com> написал(а):
>
> 
>
> Hi Sasha,
>
>
>
> Thanks for your reply!
>
>
>
> Maybe Shuffle node is enough for data distribution. How about aggregation
> node? For example, mean kernel with group will maintain sum and count value
> for each group. Mater node merging mean result needs sum and count value on
> each partition. But mean kernel seems not support method to export such sum
> and count value, also, mean kernel doesn't support method to load these sum
> and count value for additional merge. If it is feasible to provide
> export/load method on aggregation kernel? Any other tips would be
> appreciated.
>
>
>
> Thanks,
>
> Jiangtao
>
>
>
> *From: *Sasha Krassovsky <krassovskysa...@gmail.com>
> *Date: *Friday, July 7, 2023 at 10:12 AM
> *To: *user@arrow.apache.org <user@arrow.apache.org>
> *Subject: *Re: [C++][Acero] can Acero support distributed computation?
>
> Hi Jiangtao,
>
> Acero doesn’t support any distributed computation on its own. However, to
> get some simple distributed computation going it would be sufficient to add
> a Shuffle node. For example for Aggregation, the Shuffle would assign a
> range of hashes to each node, and then each node would hash-partition its
> batches locally and send each partition to be aggregated on the
> corresponding nodes. You’d also need a master node to merge the results
> afterwards.
>
>
>
> In general the Shuffle-by-hash scheme works well for relational queries
> where order doesn’t matter, but the time series functionality (i.e.
> as-of-join) wouldn’t work as well.
>
>
>
> Hope this helps!
>
> Sasha Krassovsky
>
>
>
>
>
>
> 6 июля 2023 г., в 19:04, Jiangtao Peng <pengjtn...@gmail.com> написал(а):
>
> 
>
> Hi there,
>
> I'm learning Acero streaming execution engine recently. And I’m wondering
> if Acero support distributed computing.
>
>
>
> I have read code about aggregation node and kernel; Aggregation kernel
> seems to hide the details of aggregation middle state. If use multiple
> nodes with Acero execution engine, how to split aggregation tasks?
>
>
>
> If current execution engine does not support distributed computing, taking
> aggregation as an example, how would you plan to transform the aggregate
> kernel to support distributed computation?
>
>
>
> Any help or tips would be appreciated.
>
> Thanks,
> Jiangtao
>
>

Reply via email to