I think it would make a lot of sense to provide a simple DSL abstraction. Something like:
KStream stream = ... KTable count = stream.count(); The missing groupBy() or grouByKey() class indicates a global counting operation. The JavaDocs should highlight the impact. One open question is, what key we want to use for the result KTable? Also, the details about optional parameters like `Materialized` need to be discussed in details. -Matthias On 7/6/18 2:43 PM, Guozhang Wang wrote: > That's a lot of email exchanges for me to catch up :) > > My original proposed alternative solution is indeed relying on > pre-aggregate before sending to the single-partition topic, so that the > traffic on that single-partition topic would not be huge (I called it > partial-aggregate but the intent was the same). > > What I was thinking is that, given such a scenario could be common, if > we've decided to go down this route should we provide a new API that wrap's > John's proposed topology (right now with KIP-328 users still need to > leverage this trick manually): > > > ---------- > > final KStream<String, String> siteEvents = builder.stream("/site-events"); > > final KStream<Integer, Integer> keyedByPartition = siteEvents.transform(/* > generate KeyValue(key, 1) for the pre-aggregate*/); > > final KTable<Integer, Long> countsByPartition = > keyedByPartition.groupByKey().count(); /* pre-aggregate */ > > final KGroupedTable<String, Long> singlePartition = > countsByPartition.groupBy((key, value) -> new KeyValue<>("ALL", value)); > /* sent the suppressed pre-aggregate values to the single partition topic > */ > > final KTable<String, Long> totalCount = singlePartition.reduce((l, r) -> l + > r, (l, r) -> l - r); /* read from the single partition topic, do reduce > on the data*/ > > ---------- > > Note that if we wrap them all into a new operator, users would need to > provide two functions, for the aggregate and for the final "reduce" (in my > previous email I called it merger function, but for the same intent). > > > > Guozhang > > > > On Thu, Jul 5, 2018 at 3:38 PM, John Roesler <j...@confluent.io> wrote: > >> Ok, I didn't get quite as far as I hoped, and several things are far from >> ready, but here's what I have so far: >> https://github.com/apache/kafka/pull/5337 >> >> The "unit" test works, and is a good example of how you should expect it to >> behave: >> https://github.com/apache/kafka/pull/5337/files#diff- >> 2fdec52b9cc3d0e564f0c12a199bed77 >> >> I have one working integration test, but it's slow going getting the timing >> right, so no promises of any kind ;) >> >> Let me know what you think! >> >> Thanks, >> -John >> >> On Thu, Jul 5, 2018 at 8:39 AM John Roesler <j...@confluent.io> wrote: >> >>> Hey Flávio, >>> >>> Thanks! I haven't got anything usable yet, but I'm working on it now. I'm >>> hoping to push up my branch by the end of the day. >>> >>> I don't know if you've seen it but Streams actually already has something >>> like this, in the form of caching on materialized stores. If you pass in >> a >>> "Materialized.withCachingEnabled()", you should be able to get a POC >>> working by setting the max cache size pretty high and setting the commit >>> interval for your desired rate: >>> https://docs.confluent.io/current/streams/developer- >> guide/memory-mgmt.html#streams-developer-guide-memory-management >>> . >>> >>> There are a couple of cases in joins and whatnot where it doesn't work, >>> but for the aggregations we discussed, it should. The reason for KIP-328 >> is >>> to provide finer control and hopefully a more straightforward API. >>> >>> Let me know if that works, and I'll drop a message in here when I create >>> the draft PR for KIP-328. I'd really appreciate your feedback. >>> >>> Thanks, >>> -John >>> >>> On Wed, Jul 4, 2018 at 10:17 PM flaviost...@gmail.com < >>> flaviost...@gmail.com> wrote: >>> >>>> John, that was fantastic, man! >>>> Have you built any custom implementation of your KIP in your machine so >>>> that I could test it out here? I wish I could test it out. >>>> If you need any help implementing this feature, please tell me. >>>> >>>> Thanks. >>>> >>>> -Flávio Stutz >>>> >>>> >>>> >>>> >>>> On 2018/07/03 18:04:52, John Roesler <j...@confluent.io> wrote: >>>>> Hi Flávio, >>>>> Thanks! I think that we can actually do this, but the API could be >>>> better. >>>>> I've included Java code below, but I'll copy and modify your example >> so >>>>> we're on the same page. >>>>> >>>>> EXERCISE 1: >>>>> - The case is "total counting of events for a huge website" >>>>> - Tasks from Application A will have something like: >>>>> .stream(/site-events) >>>>> .transform( re-key s.t. the new key is the partition id) >>>>> .groupByKey() // you have to do this before count >>>>> .count() >>>>> // you explicitly published to a one-partition topic here, >> but >>>>> it's actually sufficient just >>>>> // to re-group onto one key. You could name and pre-create >> the >>>>> intermediate topic here, >>>>> // but you don't need a separate application for the final >>>>> aggregation. >>>>> .groupBy((partitionId, partialCount) -> new KeyValue("ALL", >>>>> partialCount)) >>>>> .aggregate(sum up the partialCounts) >>>>> .publish(/counter-total) >>>>> >>>>> I've left out the suppressions, but they would go right after the >>>> count() >>>>> and the aggregate(). >>>>> >>>>> With this program, you don't have to worry about the >> double-aggregation >>>> you >>>>> mentioned in the last email. The KTable produced by the first count() >>>> will >>>>> maintain the correct count per partition. If the value changes for any >>>>> partition, it'll emit a retraction of the old value and then the new >>>> value >>>>> downstream, so that the final aggregation can update itself properly. >>>>> >>>>> I think we can optimize both the execution and the programability by >>>> adding >>>>> a "global aggregation" concept. But In principle, it seems like this >>>> usage >>>>> of the current API will support your use case. >>>>> >>>>> Once again, though, this is just to present an alternative. I haven't >>>> done >>>>> the math on whether your proposal would be more efficient. >>>>> >>>>> Thanks, >>>>> -John >>>>> >>>>> Here's the same algorithm written in Java: >>>>> >>>>> final KStream<String, String> siteEvents = >>>> builder.stream("/site-events"); >>>>> >>>>> // here we re-key the events so that the key is actually the partition >>>> id. >>>>> // we don't need the value to do a count, so I just set it to "1". >>>>> final KStream<Integer, Integer> keyedByPartition = >>>> siteEvents.transform(() >>>>> -> new Transformer<String, String, KeyValue<Integer, Integer>>() { >>>>> private ProcessorContext context; >>>>> >>>>> @Override >>>>> public void init(final ProcessorContext context) { >>>>> this.context = context; >>>>> } >>>>> >>>>> @Override >>>>> public KeyValue<Integer, Integer> transform(final String key, >> final >>>>> String value) { >>>>> return new KeyValue<>(context.partition(), 1); >>>>> } >>>>> }); >>>>> >>>>> // Note that we can't do "count()" on a KStream, we have to group it >>>> first. >>>>> I'm grouping by the key, so it will produce the count for each key. >>>>> // Since the key is actually the partition id, it will produce the >>>>> pre-aggregated count per partition. >>>>> // Note that the result is a KTable<PartitionId,Count>. It'll always >>>>> contain the most recent count for each partition. >>>>> final KTable<Integer, Long> countsByPartition = >>>>> keyedByPartition.groupByKey().count(); >>>>> >>>>> // Now we get ready for the final roll-up. We re-group all the >>>> constituent >>>>> counts >>>>> final KGroupedTable<String, Long> singlePartition = >>>>> countsByPartition.groupBy((key, value) -> new KeyValue<>("ALL", >> value)); >>>>> >>>>> final KTable<String, Long> totalCount = singlePartition.reduce((l, r) >>>> -> l >>>>> + r, (l, r) -> l - r); >>>>> >>>>> totalCount.toStream().foreach((k, v) -> { >>>>> // k is always "ALL" >>>>> // v is always the most recent total value >>>>> System.out.println("The total event count is: " + v); >>>>> }); >>>>> >>>>> >>>>> On Tue, Jul 3, 2018 at 9:21 AM flaviost...@gmail.com < >>>> flaviost...@gmail.com> >>>>> wrote: >>>>> >>>>>> Great feature you have there! >>>>>> >>>>>> I'll try to exercise here how we would achieve the same functional >>>>>> objectives using your KIP: >>>>>> >>>>>> EXERCISE 1: >>>>>> - The case is "total counting of events for a huge website" >>>>>> - Tasks from Application A will have something like: >>>>>> .stream(/site-events) >>>>>> .count() >>>>>> .publish(/single-partitioned-topic-with-count-partials) >>>>>> - The published messages will be, for example: >>>>>> ["counter-task1", 2345] >>>>>> ["counter-task2", 8495] >>>>>> ["counter-task3", 4839] >>>>>> - Single Task from Application B will have something like: >>>>>> .stream(/single-partitioned-topic-with-count-partials) >>>>>> .aggregate(by messages whose key starts with "counter") >>>>>> .publish(/counter-total) >>>>>> - FAIL HERE. How would I know what is the overall partitions? >> Maybe >>>> two >>>>>> partials for the same task will arrive before other tasks and it >> maybe >>>>>> aggregated twice. >>>>>> >>>>>> I tried to think about using GlobalKTables, but I didn't get an easy >>>> way >>>>>> to aggregate the keys from that table. Do you have any clue? >>>>>> >>>>>> Thanks. >>>>>> >>>>>> -Flávio Stutz >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> /partial-counters-to-single-partitioned-topic >>>>>> >>>>>> On 2018/07/02 20:03:57, John Roesler <j...@confluent.io> wrote: >>>>>>> Hi Flávio, >>>>>>> >>>>>>> Thanks for the KIP. I'll apologize that I'm arriving late to the >>>>>>> discussion. I've tried to catch up, but I might have missed some >>>> nuances. >>>>>>> >>>>>>> Regarding KIP-328, the idea is to add the ability to suppress >>>>>> intermediate >>>>>>> results from all KTables, not just windowed ones. I think this >> could >>>>>>> support your use case in combination with the strategy that >> Guozhang >>>>>>> proposed of having one or more pre-aggregation steps that >>>> ultimately push >>>>>>> into a single-partition topic for final aggregation. Suppressing >>>>>>> intermediate results would solve the problem you noted that today >>>>>>> pre-aggregating doesn't do much to staunch the flow up updates. >>>>>>> >>>>>>> I'm not sure if this would be good enough for you overall; I just >>>> wanted >>>>>> to >>>>>>> clarify the role of KIP-328. >>>>>>> In particular, the solution you mentioned is to have the >> downstream >>>>>> KTables >>>>>>> actually query the upstream ones to compute their results. I'm not >>>> sure >>>>>>> whether it's more efficient to do these queries on the schedule, >> or >>>> to >>>>>> have >>>>>>> the upstream tables emit their results, on the same schedule. >>>>>>> >>>>>>> What do you think? >>>>>>> >>>>>>> Thanks, >>>>>>> -John >>>>>>> >>>>>>> On Sun, Jul 1, 2018 at 10:03 PM flaviost...@gmail.com < >>>>>> flaviost...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> For what I understood, that KIP is related to how KStreams will >>>> handle >>>>>>>> KTable updates in Windowed scenarios to optimize resource usage. >>>>>>>> I couldn't see any specific relation to this KIP. Had you? >>>>>>>> >>>>>>>> -Flávio Stutz >>>>>>>> >>>>>>>> >>>>>>>> On 2018/06/29 18:14:46, "Matthias J. Sax" < >> matth...@confluent.io> >>>>>> wrote: >>>>>>>>> Flavio, >>>>>>>>> >>>>>>>>> thanks for cleaning up the KIP number collision. >>>>>>>>> >>>>>>>>> With regard to KIP-328 >>>>>>>>> ( >>>>>>>> >>>>>> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >> 328%3A+Ability+to+suppress+updates+for+KTables >>>>>>>> ) >>>>>>>>> I am wondering how both relate to each other? >>>>>>>>> >>>>>>>>> Any thoughts? >>>>>>>>> >>>>>>>>> >>>>>>>>> -Matthias >>>>>>>>> >>>>>>>>> On 6/29/18 10:23 AM, flaviost...@gmail.com wrote: >>>>>>>>>> Just copying a follow up from another thread to here (sorry >>>> about >>>>>> the >>>>>>>> mess): >>>>>>>>>> >>>>>>>>>> From: Guozhang Wang <wangg...@gmail.com> >>>>>>>>>> Subject: Re: [DISCUSS] KIP-323: Schedulable KTable as Graph >>>> source >>>>>>>>>> Date: 2018/06/25 22:24:17 >>>>>>>>>> List: dev@kafka.apache.org >>>>>>>>>> >>>>>>>>>> Flávio, thanks for creating this KIP. >>>>>>>>>> >>>>>>>>>> I think this "single-aggregation" use case is common enough >>>> that we >>>>>>>> should >>>>>>>>>> consider how to efficiently supports it: for example, for >> KSQL >>>>>> that's >>>>>>>> built >>>>>>>>>> on top of Streams, we've seen lots of query statements whose >>>>>> return is >>>>>>>>>> expected a single row indicating the "total aggregate" etc. >>>> See >>>>>>>>>> https://github.com/confluentinc/ksql/issues/430 for >> details. >>>>>>>>>> >>>>>>>>>> I've not read through >>>>>> https://issues.apache.org/jira/browse/KAFKA-6953, >>>>>>>> but >>>>>>>>>> I'm wondering if we have discussed the option of supporting >>>> it in a >>>>>>>>>> "pre-aggregate" manner: that is we do partial aggregates on >>>>>> parallel >>>>>>>> tasks, >>>>>>>>>> and then sends the partial aggregated value via a single >> topic >>>>>>>> partition >>>>>>>>>> for the final aggregate, to reduce the traffic on that >> single >>>>>>>> partition and >>>>>>>>>> hence the final aggregate workload. >>>>>>>>>> Of course, for non-commutative aggregates we'd probably need >>>> to >>>>>> provide >>>>>>>>>> another API in addition to aggregate, like the `merge` >>>> function for >>>>>>>>>> session-based aggregates, to let users customize the >>>> operations of >>>>>>>> merging >>>>>>>>>> two partial aggregates into a single partial aggregate. >>>> What's its >>>>>>>> pros and >>>>>>>>>> cons compared with the current proposal? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Guozhang >>>>>>>>>> On 2018/06/26 18:22:27, Flávio Stutz <flaviost...@gmail.com >>> >>>>>> wrote: >>>>>>>>>>> Hey, guys, I've just created a new KIP about creating a new >>>> DSL >>>>>> graph >>>>>>>>>>> source for realtime partitioned consolidations. >>>>>>>>>>> >>>>>>>>>>> We have faced the following scenario/problem in a lot of >>>>>> situations >>>>>>>> with >>>>>>>>>>> KStreams: >>>>>>>>>>> - Huge incoming data being processed by numerous >>>> application >>>>>>>> instances >>>>>>>>>>> - Need to aggregate different fields whose records span >>>> all >>>>>> topic >>>>>>>>>>> partitions (something like “total amount spent by people >>>> aged > 30 >>>>>>>> yrs” >>>>>>>>>>> when processing a topic partitioned by userid). >>>>>>>>>>> >>>>>>>>>>> The challenge here is to manage this kind of situation >>>> without any >>>>>>>>>>> bottlenecks. We don't need the “global aggregation” to be >>>>>> processed >>>>>>>> at each >>>>>>>>>>> incoming message. On a scenario of 500 instances, each >>>> handling 1k >>>>>>>>>>> messages/s, any single point of aggregation (single >>>> partitioned >>>>>>>> topics, >>>>>>>>>>> global tables or external databases) would create a >>>> bottleneck of >>>>>> 500k >>>>>>>>>>> messages/s for single threaded/CPU elements. >>>>>>>>>>> >>>>>>>>>>> For this scenario, it is possible to store the partial >>>>>> aggregations on >>>>>>>>>>> local stores and, from time to time, query those states and >>>>>> aggregate >>>>>>>> them >>>>>>>>>>> as a single value, avoiding bottlenecks. This is a way to >>>> create a >>>>>>>> "timed >>>>>>>>>>> aggregation barrier”. >>>>>>>>>>> >>>>>>>>>>> If we leverage this kind of built-in feature we could >> greatly >>>>>> enhance >>>>>>>> the >>>>>>>>>>> ability of KStreams to better handle the CAP Theorem >>>>>> characteristics, >>>>>>>> so >>>>>>>>>>> that one could choose to have Consistency over Availability >>>> when >>>>>>>> needed. >>>>>>>>>>> >>>>>>>>>>> We started this discussion with Matthias J. Sax here: >>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6953 >>>>>>>>>>> >>>>>>>>>>> If you want to see more, go to KIP-326 at: >>>>>>>>>>> >>>>>>>> >>>>>> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >> 326%3A+Schedulable+KTable+as+Graph+source >>>>>>>>>>> >>>>>>>>>>> -Flávio Stutz >>>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> > > >
signature.asc
Description: OpenPGP digital signature