Repasting my comment from the other email thread:

--------------------------

Flávio, thanks for creating this KIP.

I think this "single-aggregation" use case is common enough that we should
consider how to efficiently supports it: for example, for KSQL that's built
on top of Streams, we've seen lots of query statements whose return is
expected a single row indicating the "total aggregate" etc. See
https://github.com/confluentinc/ksql/issues/430 for details.

I've not read through https://issues.apache.org/jira/browse/KAFKA-6953, but
I'm wondering if we have discussed the option of supporting it in a
"pre-aggregate" manner: that is we do partial aggregates on parallel tasks,
and then sends the partial aggregated value via a single topic partition
for the final aggregate, to reduce the traffic on that single partition and
hence the final aggregate workload.
Of course, for non-commutative aggregates we'd probably need to provide
another API in addition to aggregate, like the `merge` function for
session-based aggregates, to let users customize the operations of merging
two partial aggregates into a single partial aggregate. What's its pros and
cons compared with the current proposal?


Guozhang




On Fri, Jun 29, 2018 at 12:53 PM, Bill Bejeck <bbej...@gmail.com> wrote:

> Hi Flávio,
>
> Thanks for creating the KIP.
>
> I agree with Guozhang on comparing the pros and cons of the approach he
> outlined vs the one in the proposed KIP.
>
> I also have a few clarification questions on the current KIP
>
> Will the triggering mechanism always be time, or would it make sense to
> expand to use other mechanisms such as the number of records, or some value
> present in one of the records?
> When setting "all instances" to true, how is the leader chosen?
> If "all instances" is set to false are all the partial aggregates forwarded
> to single output topic?
>
> Thanks again,
> Bill
>
>
> On Fri, Jun 29, 2018 at 2:15 PM Matthias J. Sax <matth...@confluent.io>
> wrote:
>
> > Flavio,
> >
> > thanks for cleaning up the KIP number collision.
> >
> > With regard to KIP-328
> > (
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 328%3A+Ability+to+suppress+updates+for+KTables
> > )
> > I am wondering how both relate to each other?
> >
> > Any thoughts?
> >
> >
> > -Matthias
> >
> > On 6/29/18 10:23 AM, flaviost...@gmail.com wrote:
> > > Just copying a follow up from another thread to here (sorry about the
> > mess):
> > >
> > > From: Guozhang Wang <wangg...@gmail.com>
> > > Subject: Re: [DISCUSS] KIP-323: Schedulable KTable as Graph source
> > > Date: 2018/06/25 22:24:17
> > > List: dev@kafka.apache.org
> > >
> > > Flávio, thanks for creating this KIP.
> > >
> > > I think this "single-aggregation" use case is common enough that we
> > should
> > > consider how to efficiently supports it: for example, for KSQL that's
> > built
> > > on top of Streams, we've seen lots of query statements whose return is
> > > expected a single row indicating the "total aggregate" etc. See
> > > https://github.com/confluentinc/ksql/issues/430 for details.
> > >
> > > I've not read through https://issues.apache.org/jira/browse/KAFKA-6953
> ,
> > but
> > > I'm wondering if we have discussed the option of supporting it in a
> > > "pre-aggregate" manner: that is we do partial aggregates on parallel
> > tasks,
> > > and then sends the partial aggregated value via a single topic
> partition
> > > for the final aggregate, to reduce the traffic on that single partition
> > and
> > > hence the final aggregate workload.
> > > Of course, for non-commutative aggregates we'd probably need to provide
> > > another API in addition to aggregate, like the `merge` function for
> > > session-based aggregates, to let users customize the operations of
> > merging
> > > two partial aggregates into a single partial aggregate. What's its pros
> > and
> > > cons compared with the current proposal?
> > >
> > >
> > > Guozhang
> > > On 2018/06/26 18:22:27, Flávio Stutz <flaviost...@gmail.com> wrote:
> > >> Hey, guys, I've just created a new KIP about creating a new DSL graph
> > >> source for realtime partitioned consolidations.
> > >>
> > >> We have faced the following scenario/problem in a lot of situations
> with
> > >> KStreams:
> > >>    - Huge incoming data being processed by numerous application
> > instances
> > >>    - Need to aggregate different fields whose records span all topic
> > >> partitions (something like “total amount spent by people aged > 30
> yrs”
> > >> when processing a topic partitioned by userid).
> > >>
> > >> The challenge here is to manage this kind of situation without any
> > >> bottlenecks. We don't need the “global aggregation” to be processed at
> > each
> > >> incoming message. On a scenario of 500 instances, each handling 1k
> > >> messages/s, any single point of aggregation (single partitioned
> topics,
> > >> global tables or external databases) would create a bottleneck of 500k
> > >> messages/s for single threaded/CPU elements.
> > >>
> > >> For this scenario, it is possible to store the partial aggregations on
> > >> local stores and, from time to time, query those states and aggregate
> > them
> > >> as a single value, avoiding bottlenecks. This is a way to create a
> > "timed
> > >> aggregation barrier”.
> > >>
> > >> If we leverage this kind of built-in feature we could greatly enhance
> > the
> > >> ability of KStreams to better handle the CAP Theorem characteristics,
> so
> > >> that one could choose to have Consistency over Availability when
> needed.
> > >>
> > >> We started this discussion with Matthias J. Sax here:
> > >> https://issues.apache.org/jira/browse/KAFKA-6953
> > >>
> > >> If you want to see more, go to KIP-326 at:
> > >>
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 326%3A+Schedulable+KTable+as+Graph+source
> > >>
> > >> -Flávio Stutz
> > >>
> >
> >
>



-- 
-- Guozhang

Reply via email to