Great feature you have there!

I'll try to exercise here how we would achieve the same functional objectives 
using your KIP:

EXERCISE 1:
  - The case is "total counting of events for a huge website"
  - Tasks from Application A will have something like:
         .stream(/site-events)
         .count()
         .publish(/single-partitioned-topic-with-count-partials)
  - The published messages will be, for example:
          ["counter-task1", 2345]
          ["counter-task2", 8495]
          ["counter-task3", 4839]
  - Single Task from Application B will have something like:
         .stream(/single-partitioned-topic-with-count-partials)
         .aggregate(by messages whose key starts with "counter")
         .publish(/counter-total)
  - FAIL HERE. How would I know what is the overall partitions? Maybe two 
partials for the same task will arrive before other tasks and it maybe 
aggregated twice.

I tried to think about using GlobalKTables, but I didn't get an easy way to 
aggregate the keys from that table. Do you have any clue?

Thanks.

-Flávio Stutz






/partial-counters-to-single-partitioned-topic

On 2018/07/02 20:03:57, John Roesler <j...@confluent.io> wrote: 
> Hi Flávio,
> 
> Thanks for the KIP. I'll apologize that I'm arriving late to the
> discussion. I've tried to catch up, but I might have missed some nuances.
> 
> Regarding KIP-328, the idea is to add the ability to suppress intermediate
> results from all KTables, not just windowed ones. I think this could
> support your use case in combination with the strategy that Guozhang
> proposed of having one or more pre-aggregation steps that ultimately push
> into a single-partition topic for final aggregation. Suppressing
> intermediate results would solve the problem you noted that today
> pre-aggregating doesn't do much to staunch the flow up updates.
> 
> I'm not sure if this would be good enough for you overall; I just wanted to
> clarify the role of KIP-328.
> In particular, the solution you mentioned is to have the downstream KTables
> actually query the upstream ones to compute their results. I'm not sure
> whether it's more efficient to do these queries on the schedule, or to have
> the upstream tables emit their results, on the same schedule.
> 
> What do you think?
> 
> Thanks,
> -John
> 
> On Sun, Jul 1, 2018 at 10:03 PM flaviost...@gmail.com <flaviost...@gmail.com>
> wrote:
> 
> > For what I understood, that KIP is related to how KStreams will handle
> > KTable updates in Windowed scenarios to optimize resource usage.
> > I couldn't see any specific relation to this KIP. Had you?
> >
> > -Flávio Stutz
> >
> >
> > On 2018/06/29 18:14:46, "Matthias J. Sax" <matth...@confluent.io> wrote:
> > > Flavio,
> > >
> > > thanks for cleaning up the KIP number collision.
> > >
> > > With regard to KIP-328
> > > (
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables
> > )
> > > I am wondering how both relate to each other?
> > >
> > > Any thoughts?
> > >
> > >
> > > -Matthias
> > >
> > > On 6/29/18 10:23 AM, flaviost...@gmail.com wrote:
> > > > Just copying a follow up from another thread to here (sorry about the
> > mess):
> > > >
> > > > From: Guozhang Wang <wangg...@gmail.com>
> > > > Subject: Re: [DISCUSS] KIP-323: Schedulable KTable as Graph source
> > > > Date: 2018/06/25 22:24:17
> > > > List: dev@kafka.apache.org
> > > >
> > > > Flávio, thanks for creating this KIP.
> > > >
> > > > I think this "single-aggregation" use case is common enough that we
> > should
> > > > consider how to efficiently supports it: for example, for KSQL that's
> > built
> > > > on top of Streams, we've seen lots of query statements whose return is
> > > > expected a single row indicating the "total aggregate" etc. See
> > > > https://github.com/confluentinc/ksql/issues/430 for details.
> > > >
> > > > I've not read through https://issues.apache.org/jira/browse/KAFKA-6953,
> > but
> > > > I'm wondering if we have discussed the option of supporting it in a
> > > > "pre-aggregate" manner: that is we do partial aggregates on parallel
> > tasks,
> > > > and then sends the partial aggregated value via a single topic
> > partition
> > > > for the final aggregate, to reduce the traffic on that single
> > partition and
> > > > hence the final aggregate workload.
> > > > Of course, for non-commutative aggregates we'd probably need to provide
> > > > another API in addition to aggregate, like the `merge` function for
> > > > session-based aggregates, to let users customize the operations of
> > merging
> > > > two partial aggregates into a single partial aggregate. What's its
> > pros and
> > > > cons compared with the current proposal?
> > > >
> > > >
> > > > Guozhang
> > > > On 2018/06/26 18:22:27, Flávio Stutz <flaviost...@gmail.com> wrote:
> > > >> Hey, guys, I've just created a new KIP about creating a new DSL graph
> > > >> source for realtime partitioned consolidations.
> > > >>
> > > >> We have faced the following scenario/problem in a lot of situations
> > with
> > > >> KStreams:
> > > >>    - Huge incoming data being processed by numerous application
> > instances
> > > >>    - Need to aggregate different fields whose records span all topic
> > > >> partitions (something like “total amount spent by people aged > 30
> > yrs”
> > > >> when processing a topic partitioned by userid).
> > > >>
> > > >> The challenge here is to manage this kind of situation without any
> > > >> bottlenecks. We don't need the “global aggregation” to be processed
> > at each
> > > >> incoming message. On a scenario of 500 instances, each handling 1k
> > > >> messages/s, any single point of aggregation (single partitioned
> > topics,
> > > >> global tables or external databases) would create a bottleneck of 500k
> > > >> messages/s for single threaded/CPU elements.
> > > >>
> > > >> For this scenario, it is possible to store the partial aggregations on
> > > >> local stores and, from time to time, query those states and aggregate
> > them
> > > >> as a single value, avoiding bottlenecks. This is a way to create a
> > "timed
> > > >> aggregation barrier”.
> > > >>
> > > >> If we leverage this kind of built-in feature we could greatly enhance
> > the
> > > >> ability of KStreams to better handle the CAP Theorem characteristics,
> > so
> > > >> that one could choose to have Consistency over Availability when
> > needed.
> > > >>
> > > >> We started this discussion with Matthias J. Sax here:
> > > >> https://issues.apache.org/jira/browse/KAFKA-6953
> > > >>
> > > >> If you want to see more, go to KIP-326 at:
> > > >>
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-326%3A+Schedulable+KTable+as+Graph+source
> > > >>
> > > >> -Flávio Stutz
> > > >>
> > >
> > >
> >
> 

Reply via email to