Cons:
We tried the "single partition" strategy, but the problem is that for each 
incoming message to the Graph, we have another output message with the 
aggregated (cummulative or not) result, so that if we have a million messages/s 
(among all parallel tasks) being processed, we'll have another million 
message/s of aggregated values on the single partitioned topic, so that it will 
not be possible for a single consumer to handle this load. Did I miss something?

Pros:
Using a Schedulable Stream Source from state store's values, we can control the 
pressure of global aggregation that - despite of the strategy applied - have to 
be performed by a single instance. In this way, the pressure will be determined 
by the desired schedule time (that can be lowered as needed) and the number of 
tuples in the state store instances (the fewer different keys on state store 
instances the higher pratical frequency of global aggregation task). Using this 
strategy, the load of incoming messages won't affect the global aggregation 
mechanism, because we are spliting them apart temporarly (distributed partial 
aggregation among partition tasks happens during the incoming messages 
workload; and global final aggregation is performed by a single task from time 
to time, assynchronously).

[SOURCE PARTITION 1]  >-----100k messages/s---->  [TASK GRAPH 1]  >----100 
different aggregated values---->  [INSTANCE 1 STATE STORE] >---> END
[SOURCE PARTITION 2]  >-----100k messages/s---->  [TASK GRAPH 2]  >----200 
different aggregated values---->  [INSTANCE 2 STATE STORE] >---> END
[SOURCE PARTITION 3]  >-----100k messages/s---->  [TASK GRAPH 3]  >----100 
different aggregated values---->  [INSTANCE 3 STATE STORE] >---> END

[ONE SEC SCHEDULED SOURCE] >-----400 messages/s------> [TASK GRAPH X]  
>----global aggregated value----> [OUTPUT TOPIC /global-total]
(one second scheduled KTable with all state stores instances's tuples)

What do you think?

-Flávio Stutz





On 2018/06/29 17:23:29, flaviost...@gmail.com <flaviost...@gmail.com> wrote: 
> Just copying a follow up from another thread to here (sorry about the mess):
> 
> From: Guozhang Wang <wangg...@gmail.com>
> Subject: Re: [DISCUSS] KIP-323: Schedulable KTable as Graph source
> Date: 2018/06/25 22:24:17
> List: dev@kafka.apache.org
> 
> Flávio, thanks for creating this KIP.
> 
> I think this "single-aggregation" use case is common enough that we should
> consider how to efficiently supports it: for example, for KSQL that's built
> on top of Streams, we've seen lots of query statements whose return is
> expected a single row indicating the "total aggregate" etc. See
> https://github.com/confluentinc/ksql/issues/430 for details.
> 
> I've not read through https://issues.apache.org/jira/browse/KAFKA-6953, but
> I'm wondering if we have discussed the option of supporting it in a
> "pre-aggregate" manner: that is we do partial aggregates on parallel tasks,
> and then sends the partial aggregated value via a single topic partition
> for the final aggregate, to reduce the traffic on that single partition and
> hence the final aggregate workload.
> Of course, for non-commutative aggregates we'd probably need to provide
> another API in addition to aggregate, like the `merge` function for
> session-based aggregates, to let users customize the operations of merging
> two partial aggregates into a single partial aggregate. What's its pros and
> cons compared with the current proposal?
> 
> 
> Guozhang
> On 2018/06/26 18:22:27, Flávio Stutz <flaviost...@gmail.com> wrote: 
> > Hey, guys, I've just created a new KIP about creating a new DSL graph
> > source for realtime partitioned consolidations.
> > 
> > We have faced the following scenario/problem in a lot of situations with
> > KStreams:
> >    - Huge incoming data being processed by numerous application instances
> >    - Need to aggregate different fields whose records span all topic
> > partitions (something like “total amount spent by people aged > 30 yrs”
> > when processing a topic partitioned by userid).
> > 
> > The challenge here is to manage this kind of situation without any
> > bottlenecks. We don't need the “global aggregation” to be processed at each
> > incoming message. On a scenario of 500 instances, each handling 1k
> > messages/s, any single point of aggregation (single partitioned topics,
> > global tables or external databases) would create a bottleneck of 500k
> > messages/s for single threaded/CPU elements.
> > 
> > For this scenario, it is possible to store the partial aggregations on
> > local stores and, from time to time, query those states and aggregate them
> > as a single value, avoiding bottlenecks. This is a way to create a "timed
> > aggregation barrier”.
> > 
> > If we leverage this kind of built-in feature we could greatly enhance the
> > ability of KStreams to better handle the CAP Theorem characteristics, so
> > that one could choose to have Consistency over Availability when needed.
> > 
> > We started this discussion with Matthias J. Sax here:
> > https://issues.apache.org/jira/browse/KAFKA-6953
> > 
> > If you want to see more, go to KIP-326 at:
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-326%3A+Schedulable+KTable+as+Graph+source
> > 
> > -Flávio Stutz
> > 
> 

Reply via email to