That's a lot of email exchanges for me to catch up :)

My original proposed alternative solution is indeed relying on
pre-aggregate before sending to the single-partition topic, so that the
traffic on that single-partition topic would not be huge (I called it
partial-aggregate but the intent was the same).

What I was thinking is that, given such a scenario could be common, if
we've decided to go down this route should we provide a new API that wrap's
John's proposed topology (right now with KIP-328 users still need to
leverage this trick manually):


----------

final KStream<String, String> siteEvents = builder.stream("/site-events");

final KStream<Integer, Integer> keyedByPartition = siteEvents.transform(/*
generate KeyValue(key, 1) for the pre-aggregate*/);

final KTable<Integer, Long> countsByPartition =
keyedByPartition.groupByKey().count();   /* pre-aggregate */

final KGroupedTable<String, Long> singlePartition =
countsByPartition.groupBy((key, value) -> new KeyValue<>("ALL", value));
 /* sent the suppressed pre-aggregate values to the single partition topic
*/

final KTable<String, Long> totalCount = singlePartition.reduce((l, r) -> l +
r, (l, r) -> l - r);   /* read from the single partition topic, do reduce
on the data*/

----------

Note that if we wrap them all into a new operator, users would need to
provide two functions, for the aggregate and for the final "reduce" (in my
previous email I called it merger function, but for the same intent).



Guozhang



On Thu, Jul 5, 2018 at 3:38 PM, John Roesler <j...@confluent.io> wrote:

> Ok, I didn't get quite as far as I hoped, and several things are far from
> ready, but here's what I have so far:
> https://github.com/apache/kafka/pull/5337
>
> The "unit" test works, and is a good example of how you should expect it to
> behave:
> https://github.com/apache/kafka/pull/5337/files#diff-
> 2fdec52b9cc3d0e564f0c12a199bed77
>
> I have one working integration test, but it's slow going getting the timing
> right, so no promises of any kind ;)
>
> Let me know what you think!
>
> Thanks,
> -John
>
> On Thu, Jul 5, 2018 at 8:39 AM John Roesler <j...@confluent.io> wrote:
>
> > Hey Flávio,
> >
> > Thanks! I haven't got anything usable yet, but I'm working on it now. I'm
> > hoping to push up my branch by the end of the day.
> >
> > I don't know if you've seen it but Streams actually already has something
> > like this, in the form of caching on materialized stores. If you pass in
> a
> > "Materialized.withCachingEnabled()", you should be able to get a POC
> > working by setting the max cache size pretty high and setting the commit
> > interval for your desired rate:
> > https://docs.confluent.io/current/streams/developer-
> guide/memory-mgmt.html#streams-developer-guide-memory-management
> > .
> >
> > There are a couple of cases in joins and whatnot where it doesn't work,
> > but for the aggregations we discussed, it should. The reason for KIP-328
> is
> > to provide finer control and hopefully a more straightforward API.
> >
> > Let me know if that works, and I'll drop a message in here when I create
> > the draft PR for KIP-328. I'd really appreciate your feedback.
> >
> > Thanks,
> > -John
> >
> > On Wed, Jul 4, 2018 at 10:17 PM flaviost...@gmail.com <
> > flaviost...@gmail.com> wrote:
> >
> >> John, that was fantastic, man!
> >> Have you built any custom implementation of your KIP in your machine so
> >> that I could test it out here? I wish I could test it out.
> >> If you need any help implementing this feature, please tell me.
> >>
> >> Thanks.
> >>
> >> -Flávio Stutz
> >>
> >>
> >>
> >>
> >> On 2018/07/03 18:04:52, John Roesler <j...@confluent.io> wrote:
> >> > Hi Flávio,
> >> > Thanks! I think that we can actually do this, but the API could be
> >> better.
> >> > I've included Java code below, but I'll copy and modify your example
> so
> >> > we're on the same page.
> >> >
> >> > EXERCISE 1:
> >> >   - The case is "total counting of events for a huge website"
> >> >   - Tasks from Application A will have something like:
> >> >          .stream(/site-events)
> >> >          .transform( re-key s.t. the new key is the partition id)
> >> >          .groupByKey() // you have to do this before count
> >> >          .count()
> >> >           // you explicitly published to a one-partition topic here,
> but
> >> > it's actually sufficient just
> >> >           // to re-group onto one key. You could name and pre-create
> the
> >> > intermediate topic here,
> >> >           // but you don't need a separate application for the final
> >> > aggregation.
> >> >          .groupBy((partitionId, partialCount) -> new KeyValue("ALL",
> >> > partialCount))
> >> >          .aggregate(sum up the partialCounts)
> >> >          .publish(/counter-total)
> >> >
> >> > I've left out the suppressions, but they would go right after the
> >> count()
> >> > and the aggregate().
> >> >
> >> > With this program, you don't have to worry about the
> double-aggregation
> >> you
> >> > mentioned in the last email. The KTable produced by the first count()
> >> will
> >> > maintain the correct count per partition. If the value changes for any
> >> > partition, it'll emit a retraction of the old value and then the new
> >> value
> >> > downstream, so that the final aggregation can update itself properly.
> >> >
> >> > I think we can optimize both the execution and the programability by
> >> adding
> >> > a "global aggregation" concept. But In principle, it seems like this
> >> usage
> >> > of the current API will support your use case.
> >> >
> >> > Once again, though, this is just to present an alternative. I haven't
> >> done
> >> > the math on whether your proposal would be more efficient.
> >> >
> >> > Thanks,
> >> > -John
> >> >
> >> > Here's the same algorithm written in Java:
> >> >
> >> > final KStream<String, String> siteEvents =
> >> builder.stream("/site-events");
> >> >
> >> > // here we re-key the events so that the key is actually the partition
> >> id.
> >> > // we don't need the value to do a count, so I just set it to "1".
> >> > final KStream<Integer, Integer> keyedByPartition =
> >> siteEvents.transform(()
> >> > -> new Transformer<String, String, KeyValue<Integer, Integer>>() {
> >> >     private ProcessorContext context;
> >> >
> >> >     @Override
> >> >     public void init(final ProcessorContext context) {
> >> >         this.context = context;
> >> >     }
> >> >
> >> >     @Override
> >> >     public KeyValue<Integer, Integer> transform(final String key,
> final
> >> > String value) {
> >> >         return new KeyValue<>(context.partition(), 1);
> >> >     }
> >> > });
> >> >
> >> > // Note that we can't do "count()" on a KStream, we have to group it
> >> first.
> >> > I'm grouping by the key, so it will produce the count for each key.
> >> > // Since the key is actually the partition id, it will produce the
> >> > pre-aggregated count per partition.
> >> > // Note that the result is a KTable<PartitionId,Count>. It'll always
> >> > contain the most recent count for each partition.
> >> > final KTable<Integer, Long> countsByPartition =
> >> > keyedByPartition.groupByKey().count();
> >> >
> >> > // Now we get ready for the final roll-up. We re-group all the
> >> constituent
> >> > counts
> >> > final KGroupedTable<String, Long> singlePartition =
> >> > countsByPartition.groupBy((key, value) -> new KeyValue<>("ALL",
> value));
> >> >
> >> > final KTable<String, Long> totalCount = singlePartition.reduce((l, r)
> >> -> l
> >> > + r, (l, r) -> l - r);
> >> >
> >> > totalCount.toStream().foreach((k, v) -> {
> >> >     // k is always "ALL"
> >> >     // v is always the most recent total value
> >> >     System.out.println("The total event count is: " + v);
> >> > });
> >> >
> >> >
> >> > On Tue, Jul 3, 2018 at 9:21 AM flaviost...@gmail.com <
> >> flaviost...@gmail.com>
> >> > wrote:
> >> >
> >> > > Great feature you have there!
> >> > >
> >> > > I'll try to exercise here how we would achieve the same functional
> >> > > objectives using your KIP:
> >> > >
> >> > > EXERCISE 1:
> >> > >   - The case is "total counting of events for a huge website"
> >> > >   - Tasks from Application A will have something like:
> >> > >          .stream(/site-events)
> >> > >          .count()
> >> > >          .publish(/single-partitioned-topic-with-count-partials)
> >> > >   - The published messages will be, for example:
> >> > >           ["counter-task1", 2345]
> >> > >           ["counter-task2", 8495]
> >> > >           ["counter-task3", 4839]
> >> > >   - Single Task from Application B will have something like:
> >> > >          .stream(/single-partitioned-topic-with-count-partials)
> >> > >          .aggregate(by messages whose key starts with "counter")
> >> > >          .publish(/counter-total)
> >> > >   - FAIL HERE. How would I know what is the overall partitions?
> Maybe
> >> two
> >> > > partials for the same task will arrive before other tasks and it
> maybe
> >> > > aggregated twice.
> >> > >
> >> > > I tried to think about using GlobalKTables, but I didn't get an easy
> >> way
> >> > > to aggregate the keys from that table. Do you have any clue?
> >> > >
> >> > > Thanks.
> >> > >
> >> > > -Flávio Stutz
> >> > >
> >> > >
> >> > >
> >> > >
> >> > >
> >> > >
> >> > > /partial-counters-to-single-partitioned-topic
> >> > >
> >> > > On 2018/07/02 20:03:57, John Roesler <j...@confluent.io> wrote:
> >> > > > Hi Flávio,
> >> > > >
> >> > > > Thanks for the KIP. I'll apologize that I'm arriving late to the
> >> > > > discussion. I've tried to catch up, but I might have missed some
> >> nuances.
> >> > > >
> >> > > > Regarding KIP-328, the idea is to add the ability to suppress
> >> > > intermediate
> >> > > > results from all KTables, not just windowed ones. I think this
> could
> >> > > > support your use case in combination with the strategy that
> Guozhang
> >> > > > proposed of having one or more pre-aggregation steps that
> >> ultimately push
> >> > > > into a single-partition topic for final aggregation. Suppressing
> >> > > > intermediate results would solve the problem you noted that today
> >> > > > pre-aggregating doesn't do much to staunch the flow up updates.
> >> > > >
> >> > > > I'm not sure if this would be good enough for you overall; I just
> >> wanted
> >> > > to
> >> > > > clarify the role of KIP-328.
> >> > > > In particular, the solution you mentioned is to have the
> downstream
> >> > > KTables
> >> > > > actually query the upstream ones to compute their results. I'm not
> >> sure
> >> > > > whether it's more efficient to do these queries on the schedule,
> or
> >> to
> >> > > have
> >> > > > the upstream tables emit their results, on the same schedule.
> >> > > >
> >> > > > What do you think?
> >> > > >
> >> > > > Thanks,
> >> > > > -John
> >> > > >
> >> > > > On Sun, Jul 1, 2018 at 10:03 PM flaviost...@gmail.com <
> >> > > flaviost...@gmail.com>
> >> > > > wrote:
> >> > > >
> >> > > > > For what I understood, that KIP is related to how KStreams will
> >> handle
> >> > > > > KTable updates in Windowed scenarios to optimize resource usage.
> >> > > > > I couldn't see any specific relation to this KIP. Had you?
> >> > > > >
> >> > > > > -Flávio Stutz
> >> > > > >
> >> > > > >
> >> > > > > On 2018/06/29 18:14:46, "Matthias J. Sax" <
> matth...@confluent.io>
> >> > > wrote:
> >> > > > > > Flavio,
> >> > > > > >
> >> > > > > > thanks for cleaning up the KIP number collision.
> >> > > > > >
> >> > > > > > With regard to KIP-328
> >> > > > > > (
> >> > > > >
> >> > >
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 328%3A+Ability+to+suppress+updates+for+KTables
> >> > > > > )
> >> > > > > > I am wondering how both relate to each other?
> >> > > > > >
> >> > > > > > Any thoughts?
> >> > > > > >
> >> > > > > >
> >> > > > > > -Matthias
> >> > > > > >
> >> > > > > > On 6/29/18 10:23 AM, flaviost...@gmail.com wrote:
> >> > > > > > > Just copying a follow up from another thread to here (sorry
> >> about
> >> > > the
> >> > > > > mess):
> >> > > > > > >
> >> > > > > > > From: Guozhang Wang <wangg...@gmail.com>
> >> > > > > > > Subject: Re: [DISCUSS] KIP-323: Schedulable KTable as Graph
> >> source
> >> > > > > > > Date: 2018/06/25 22:24:17
> >> > > > > > > List: dev@kafka.apache.org
> >> > > > > > >
> >> > > > > > > Flávio, thanks for creating this KIP.
> >> > > > > > >
> >> > > > > > > I think this "single-aggregation" use case is common enough
> >> that we
> >> > > > > should
> >> > > > > > > consider how to efficiently supports it: for example, for
> KSQL
> >> > > that's
> >> > > > > built
> >> > > > > > > on top of Streams, we've seen lots of query statements whose
> >> > > return is
> >> > > > > > > expected a single row indicating the "total aggregate" etc.
> >> See
> >> > > > > > > https://github.com/confluentinc/ksql/issues/430 for
> details.
> >> > > > > > >
> >> > > > > > > I've not read through
> >> > > https://issues.apache.org/jira/browse/KAFKA-6953,
> >> > > > > but
> >> > > > > > > I'm wondering if we have discussed the option of supporting
> >> it in a
> >> > > > > > > "pre-aggregate" manner: that is we do partial aggregates on
> >> > > parallel
> >> > > > > tasks,
> >> > > > > > > and then sends the partial aggregated value via a single
> topic
> >> > > > > partition
> >> > > > > > > for the final aggregate, to reduce the traffic on that
> single
> >> > > > > partition and
> >> > > > > > > hence the final aggregate workload.
> >> > > > > > > Of course, for non-commutative aggregates we'd probably need
> >> to
> >> > > provide
> >> > > > > > > another API in addition to aggregate, like the `merge`
> >> function for
> >> > > > > > > session-based aggregates, to let users customize the
> >> operations of
> >> > > > > merging
> >> > > > > > > two partial aggregates into a single partial aggregate.
> >> What's its
> >> > > > > pros and
> >> > > > > > > cons compared with the current proposal?
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > Guozhang
> >> > > > > > > On 2018/06/26 18:22:27, Flávio Stutz <flaviost...@gmail.com
> >
> >> > > wrote:
> >> > > > > > >> Hey, guys, I've just created a new KIP about creating a new
> >> DSL
> >> > > graph
> >> > > > > > >> source for realtime partitioned consolidations.
> >> > > > > > >>
> >> > > > > > >> We have faced the following scenario/problem in a lot of
> >> > > situations
> >> > > > > with
> >> > > > > > >> KStreams:
> >> > > > > > >>    - Huge incoming data being processed by numerous
> >> application
> >> > > > > instances
> >> > > > > > >>    - Need to aggregate different fields whose records span
> >> all
> >> > > topic
> >> > > > > > >> partitions (something like “total amount spent by people
> >> aged > 30
> >> > > > > yrs”
> >> > > > > > >> when processing a topic partitioned by userid).
> >> > > > > > >>
> >> > > > > > >> The challenge here is to manage this kind of situation
> >> without any
> >> > > > > > >> bottlenecks. We don't need the “global aggregation” to be
> >> > > processed
> >> > > > > at each
> >> > > > > > >> incoming message. On a scenario of 500 instances, each
> >> handling 1k
> >> > > > > > >> messages/s, any single point of aggregation (single
> >> partitioned
> >> > > > > topics,
> >> > > > > > >> global tables or external databases) would create a
> >> bottleneck of
> >> > > 500k
> >> > > > > > >> messages/s for single threaded/CPU elements.
> >> > > > > > >>
> >> > > > > > >> For this scenario, it is possible to store the partial
> >> > > aggregations on
> >> > > > > > >> local stores and, from time to time, query those states and
> >> > > aggregate
> >> > > > > them
> >> > > > > > >> as a single value, avoiding bottlenecks. This is a way to
> >> create a
> >> > > > > "timed
> >> > > > > > >> aggregation barrier”.
> >> > > > > > >>
> >> > > > > > >> If we leverage this kind of built-in feature we could
> greatly
> >> > > enhance
> >> > > > > the
> >> > > > > > >> ability of KStreams to better handle the CAP Theorem
> >> > > characteristics,
> >> > > > > so
> >> > > > > > >> that one could choose to have Consistency over Availability
> >> when
> >> > > > > needed.
> >> > > > > > >>
> >> > > > > > >> We started this discussion with Matthias J. Sax here:
> >> > > > > > >> https://issues.apache.org/jira/browse/KAFKA-6953
> >> > > > > > >>
> >> > > > > > >> If you want to see more, go to KIP-326 at:
> >> > > > > > >>
> >> > > > >
> >> > >
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 326%3A+Schedulable+KTable+as+Graph+source
> >> > > > > > >>
> >> > > > > > >> -Flávio Stutz
> >> > > > > > >>
> >> > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >
>



-- 
-- Guozhang

Reply via email to