Hi Flávio,

Thanks for the KIP. I'll apologize that I'm arriving late to the
discussion. I've tried to catch up, but I might have missed some nuances.

Regarding KIP-328, the idea is to add the ability to suppress intermediate
results from all KTables, not just windowed ones. I think this could
support your use case in combination with the strategy that Guozhang
proposed of having one or more pre-aggregation steps that ultimately push
into a single-partition topic for final aggregation. Suppressing
intermediate results would solve the problem you noted that today
pre-aggregating doesn't do much to staunch the flow up updates.

I'm not sure if this would be good enough for you overall; I just wanted to
clarify the role of KIP-328.
In particular, the solution you mentioned is to have the downstream KTables
actually query the upstream ones to compute their results. I'm not sure
whether it's more efficient to do these queries on the schedule, or to have
the upstream tables emit their results, on the same schedule.

What do you think?

Thanks,
-John

On Sun, Jul 1, 2018 at 10:03 PM flaviost...@gmail.com <flaviost...@gmail.com>
wrote:

> For what I understood, that KIP is related to how KStreams will handle
> KTable updates in Windowed scenarios to optimize resource usage.
> I couldn't see any specific relation to this KIP. Had you?
>
> -Flávio Stutz
>
>
> On 2018/06/29 18:14:46, "Matthias J. Sax" <matth...@confluent.io> wrote:
> > Flavio,
> >
> > thanks for cleaning up the KIP number collision.
> >
> > With regard to KIP-328
> > (
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables
> )
> > I am wondering how both relate to each other?
> >
> > Any thoughts?
> >
> >
> > -Matthias
> >
> > On 6/29/18 10:23 AM, flaviost...@gmail.com wrote:
> > > Just copying a follow up from another thread to here (sorry about the
> mess):
> > >
> > > From: Guozhang Wang <wangg...@gmail.com>
> > > Subject: Re: [DISCUSS] KIP-323: Schedulable KTable as Graph source
> > > Date: 2018/06/25 22:24:17
> > > List: dev@kafka.apache.org
> > >
> > > Flávio, thanks for creating this KIP.
> > >
> > > I think this "single-aggregation" use case is common enough that we
> should
> > > consider how to efficiently supports it: for example, for KSQL that's
> built
> > > on top of Streams, we've seen lots of query statements whose return is
> > > expected a single row indicating the "total aggregate" etc. See
> > > https://github.com/confluentinc/ksql/issues/430 for details.
> > >
> > > I've not read through https://issues.apache.org/jira/browse/KAFKA-6953,
> but
> > > I'm wondering if we have discussed the option of supporting it in a
> > > "pre-aggregate" manner: that is we do partial aggregates on parallel
> tasks,
> > > and then sends the partial aggregated value via a single topic
> partition
> > > for the final aggregate, to reduce the traffic on that single
> partition and
> > > hence the final aggregate workload.
> > > Of course, for non-commutative aggregates we'd probably need to provide
> > > another API in addition to aggregate, like the `merge` function for
> > > session-based aggregates, to let users customize the operations of
> merging
> > > two partial aggregates into a single partial aggregate. What's its
> pros and
> > > cons compared with the current proposal?
> > >
> > >
> > > Guozhang
> > > On 2018/06/26 18:22:27, Flávio Stutz <flaviost...@gmail.com> wrote:
> > >> Hey, guys, I've just created a new KIP about creating a new DSL graph
> > >> source for realtime partitioned consolidations.
> > >>
> > >> We have faced the following scenario/problem in a lot of situations
> with
> > >> KStreams:
> > >>    - Huge incoming data being processed by numerous application
> instances
> > >>    - Need to aggregate different fields whose records span all topic
> > >> partitions (something like “total amount spent by people aged > 30
> yrs”
> > >> when processing a topic partitioned by userid).
> > >>
> > >> The challenge here is to manage this kind of situation without any
> > >> bottlenecks. We don't need the “global aggregation” to be processed
> at each
> > >> incoming message. On a scenario of 500 instances, each handling 1k
> > >> messages/s, any single point of aggregation (single partitioned
> topics,
> > >> global tables or external databases) would create a bottleneck of 500k
> > >> messages/s for single threaded/CPU elements.
> > >>
> > >> For this scenario, it is possible to store the partial aggregations on
> > >> local stores and, from time to time, query those states and aggregate
> them
> > >> as a single value, avoiding bottlenecks. This is a way to create a
> "timed
> > >> aggregation barrier”.
> > >>
> > >> If we leverage this kind of built-in feature we could greatly enhance
> the
> > >> ability of KStreams to better handle the CAP Theorem characteristics,
> so
> > >> that one could choose to have Consistency over Availability when
> needed.
> > >>
> > >> We started this discussion with Matthias J. Sax here:
> > >> https://issues.apache.org/jira/browse/KAFKA-6953
> > >>
> > >> If you want to see more, go to KIP-326 at:
> > >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-326%3A+Schedulable+KTable+as+Graph+source
> > >>
> > >> -Flávio Stutz
> > >>
> >
> >
>

Reply via email to