John, that was fantastic, man!
Have you built any custom implementation of your KIP in your machine so that I 
could test it out here? I wish I could test it out.
If you need any help implementing this feature, please tell me.

Thanks.

-Flávio Stutz




On 2018/07/03 18:04:52, John Roesler <j...@confluent.io> wrote: 
> Hi Flávio,
> Thanks! I think that we can actually do this, but the API could be better.
> I've included Java code below, but I'll copy and modify your example so
> we're on the same page.
> 
> EXERCISE 1:
>   - The case is "total counting of events for a huge website"
>   - Tasks from Application A will have something like:
>          .stream(/site-events)
>          .transform( re-key s.t. the new key is the partition id)
>          .groupByKey() // you have to do this before count
>          .count()
>           // you explicitly published to a one-partition topic here, but
> it's actually sufficient just
>           // to re-group onto one key. You could name and pre-create the
> intermediate topic here,
>           // but you don't need a separate application for the final
> aggregation.
>          .groupBy((partitionId, partialCount) -> new KeyValue("ALL",
> partialCount))
>          .aggregate(sum up the partialCounts)
>          .publish(/counter-total)
> 
> I've left out the suppressions, but they would go right after the count()
> and the aggregate().
> 
> With this program, you don't have to worry about the double-aggregation you
> mentioned in the last email. The KTable produced by the first count() will
> maintain the correct count per partition. If the value changes for any
> partition, it'll emit a retraction of the old value and then the new value
> downstream, so that the final aggregation can update itself properly.
> 
> I think we can optimize both the execution and the programability by adding
> a "global aggregation" concept. But In principle, it seems like this usage
> of the current API will support your use case.
> 
> Once again, though, this is just to present an alternative. I haven't done
> the math on whether your proposal would be more efficient.
> 
> Thanks,
> -John
> 
> Here's the same algorithm written in Java:
> 
> final KStream<String, String> siteEvents = builder.stream("/site-events");
> 
> // here we re-key the events so that the key is actually the partition id.
> // we don't need the value to do a count, so I just set it to "1".
> final KStream<Integer, Integer> keyedByPartition = siteEvents.transform(()
> -> new Transformer<String, String, KeyValue<Integer, Integer>>() {
>     private ProcessorContext context;
> 
>     @Override
>     public void init(final ProcessorContext context) {
>         this.context = context;
>     }
> 
>     @Override
>     public KeyValue<Integer, Integer> transform(final String key, final
> String value) {
>         return new KeyValue<>(context.partition(), 1);
>     }
> });
> 
> // Note that we can't do "count()" on a KStream, we have to group it first.
> I'm grouping by the key, so it will produce the count for each key.
> // Since the key is actually the partition id, it will produce the
> pre-aggregated count per partition.
> // Note that the result is a KTable<PartitionId,Count>. It'll always
> contain the most recent count for each partition.
> final KTable<Integer, Long> countsByPartition =
> keyedByPartition.groupByKey().count();
> 
> // Now we get ready for the final roll-up. We re-group all the constituent
> counts
> final KGroupedTable<String, Long> singlePartition =
> countsByPartition.groupBy((key, value) -> new KeyValue<>("ALL", value));
> 
> final KTable<String, Long> totalCount = singlePartition.reduce((l, r) -> l
> + r, (l, r) -> l - r);
> 
> totalCount.toStream().foreach((k, v) -> {
>     // k is always "ALL"
>     // v is always the most recent total value
>     System.out.println("The total event count is: " + v);
> });
> 
> 
> On Tue, Jul 3, 2018 at 9:21 AM flaviost...@gmail.com <flaviost...@gmail.com>
> wrote:
> 
> > Great feature you have there!
> >
> > I'll try to exercise here how we would achieve the same functional
> > objectives using your KIP:
> >
> > EXERCISE 1:
> >   - The case is "total counting of events for a huge website"
> >   - Tasks from Application A will have something like:
> >          .stream(/site-events)
> >          .count()
> >          .publish(/single-partitioned-topic-with-count-partials)
> >   - The published messages will be, for example:
> >           ["counter-task1", 2345]
> >           ["counter-task2", 8495]
> >           ["counter-task3", 4839]
> >   - Single Task from Application B will have something like:
> >          .stream(/single-partitioned-topic-with-count-partials)
> >          .aggregate(by messages whose key starts with "counter")
> >          .publish(/counter-total)
> >   - FAIL HERE. How would I know what is the overall partitions? Maybe two
> > partials for the same task will arrive before other tasks and it maybe
> > aggregated twice.
> >
> > I tried to think about using GlobalKTables, but I didn't get an easy way
> > to aggregate the keys from that table. Do you have any clue?
> >
> > Thanks.
> >
> > -Flávio Stutz
> >
> >
> >
> >
> >
> >
> > /partial-counters-to-single-partitioned-topic
> >
> > On 2018/07/02 20:03:57, John Roesler <j...@confluent.io> wrote:
> > > Hi Flávio,
> > >
> > > Thanks for the KIP. I'll apologize that I'm arriving late to the
> > > discussion. I've tried to catch up, but I might have missed some nuances.
> > >
> > > Regarding KIP-328, the idea is to add the ability to suppress
> > intermediate
> > > results from all KTables, not just windowed ones. I think this could
> > > support your use case in combination with the strategy that Guozhang
> > > proposed of having one or more pre-aggregation steps that ultimately push
> > > into a single-partition topic for final aggregation. Suppressing
> > > intermediate results would solve the problem you noted that today
> > > pre-aggregating doesn't do much to staunch the flow up updates.
> > >
> > > I'm not sure if this would be good enough for you overall; I just wanted
> > to
> > > clarify the role of KIP-328.
> > > In particular, the solution you mentioned is to have the downstream
> > KTables
> > > actually query the upstream ones to compute their results. I'm not sure
> > > whether it's more efficient to do these queries on the schedule, or to
> > have
> > > the upstream tables emit their results, on the same schedule.
> > >
> > > What do you think?
> > >
> > > Thanks,
> > > -John
> > >
> > > On Sun, Jul 1, 2018 at 10:03 PM flaviost...@gmail.com <
> > flaviost...@gmail.com>
> > > wrote:
> > >
> > > > For what I understood, that KIP is related to how KStreams will handle
> > > > KTable updates in Windowed scenarios to optimize resource usage.
> > > > I couldn't see any specific relation to this KIP. Had you?
> > > >
> > > > -Flávio Stutz
> > > >
> > > >
> > > > On 2018/06/29 18:14:46, "Matthias J. Sax" <matth...@confluent.io>
> > wrote:
> > > > > Flavio,
> > > > >
> > > > > thanks for cleaning up the KIP number collision.
> > > > >
> > > > > With regard to KIP-328
> > > > > (
> > > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables
> > > > )
> > > > > I am wondering how both relate to each other?
> > > > >
> > > > > Any thoughts?
> > > > >
> > > > >
> > > > > -Matthias
> > > > >
> > > > > On 6/29/18 10:23 AM, flaviost...@gmail.com wrote:
> > > > > > Just copying a follow up from another thread to here (sorry about
> > the
> > > > mess):
> > > > > >
> > > > > > From: Guozhang Wang <wangg...@gmail.com>
> > > > > > Subject: Re: [DISCUSS] KIP-323: Schedulable KTable as Graph source
> > > > > > Date: 2018/06/25 22:24:17
> > > > > > List: dev@kafka.apache.org
> > > > > >
> > > > > > Flávio, thanks for creating this KIP.
> > > > > >
> > > > > > I think this "single-aggregation" use case is common enough that we
> > > > should
> > > > > > consider how to efficiently supports it: for example, for KSQL
> > that's
> > > > built
> > > > > > on top of Streams, we've seen lots of query statements whose
> > return is
> > > > > > expected a single row indicating the "total aggregate" etc. See
> > > > > > https://github.com/confluentinc/ksql/issues/430 for details.
> > > > > >
> > > > > > I've not read through
> > https://issues.apache.org/jira/browse/KAFKA-6953,
> > > > but
> > > > > > I'm wondering if we have discussed the option of supporting it in a
> > > > > > "pre-aggregate" manner: that is we do partial aggregates on
> > parallel
> > > > tasks,
> > > > > > and then sends the partial aggregated value via a single topic
> > > > partition
> > > > > > for the final aggregate, to reduce the traffic on that single
> > > > partition and
> > > > > > hence the final aggregate workload.
> > > > > > Of course, for non-commutative aggregates we'd probably need to
> > provide
> > > > > > another API in addition to aggregate, like the `merge` function for
> > > > > > session-based aggregates, to let users customize the operations of
> > > > merging
> > > > > > two partial aggregates into a single partial aggregate. What's its
> > > > pros and
> > > > > > cons compared with the current proposal?
> > > > > >
> > > > > >
> > > > > > Guozhang
> > > > > > On 2018/06/26 18:22:27, Flávio Stutz <flaviost...@gmail.com>
> > wrote:
> > > > > >> Hey, guys, I've just created a new KIP about creating a new DSL
> > graph
> > > > > >> source for realtime partitioned consolidations.
> > > > > >>
> > > > > >> We have faced the following scenario/problem in a lot of
> > situations
> > > > with
> > > > > >> KStreams:
> > > > > >>    - Huge incoming data being processed by numerous application
> > > > instances
> > > > > >>    - Need to aggregate different fields whose records span all
> > topic
> > > > > >> partitions (something like “total amount spent by people aged > 30
> > > > yrs”
> > > > > >> when processing a topic partitioned by userid).
> > > > > >>
> > > > > >> The challenge here is to manage this kind of situation without any
> > > > > >> bottlenecks. We don't need the “global aggregation” to be
> > processed
> > > > at each
> > > > > >> incoming message. On a scenario of 500 instances, each handling 1k
> > > > > >> messages/s, any single point of aggregation (single partitioned
> > > > topics,
> > > > > >> global tables or external databases) would create a bottleneck of
> > 500k
> > > > > >> messages/s for single threaded/CPU elements.
> > > > > >>
> > > > > >> For this scenario, it is possible to store the partial
> > aggregations on
> > > > > >> local stores and, from time to time, query those states and
> > aggregate
> > > > them
> > > > > >> as a single value, avoiding bottlenecks. This is a way to create a
> > > > "timed
> > > > > >> aggregation barrier”.
> > > > > >>
> > > > > >> If we leverage this kind of built-in feature we could greatly
> > enhance
> > > > the
> > > > > >> ability of KStreams to better handle the CAP Theorem
> > characteristics,
> > > > so
> > > > > >> that one could choose to have Consistency over Availability when
> > > > needed.
> > > > > >>
> > > > > >> We started this discussion with Matthias J. Sax here:
> > > > > >> https://issues.apache.org/jira/browse/KAFKA-6953
> > > > > >>
> > > > > >> If you want to see more, go to KIP-326 at:
> > > > > >>
> > > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-326%3A+Schedulable+KTable+as+Graph+source
> > > > > >>
> > > > > >> -Flávio Stutz
> > > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
> 

Reply via email to