I think it would make a lot of sense to provide a simple DSL abstraction.

Something like:

KStream stream = ...
KTable count = stream.count();

The missing groupBy() or grouByKey() class indicates a global counting
operation. The JavaDocs should highlight the impact.

One open question is, what key we want to use for the result KTable?

Also, the details about optional parameters like `Materialized` need to
be discussed in details.



-Matthias


On 7/6/18 2:43 PM, Guozhang Wang wrote:
> That's a lot of email exchanges for me to catch up :)
> 
> My original proposed alternative solution is indeed relying on
> pre-aggregate before sending to the single-partition topic, so that the
> traffic on that single-partition topic would not be huge (I called it
> partial-aggregate but the intent was the same).
> 
> What I was thinking is that, given such a scenario could be common, if
> we've decided to go down this route should we provide a new API that wrap's
> John's proposed topology (right now with KIP-328 users still need to
> leverage this trick manually):
> 
> 
> ----------
> 
> final KStream<String, String> siteEvents = builder.stream("/site-events");
> 
> final KStream<Integer, Integer> keyedByPartition = siteEvents.transform(/*
> generate KeyValue(key, 1) for the pre-aggregate*/);
> 
> final KTable<Integer, Long> countsByPartition =
> keyedByPartition.groupByKey().count();   /* pre-aggregate */
> 
> final KGroupedTable<String, Long> singlePartition =
> countsByPartition.groupBy((key, value) -> new KeyValue<>("ALL", value));
>  /* sent the suppressed pre-aggregate values to the single partition topic
> */
> 
> final KTable<String, Long> totalCount = singlePartition.reduce((l, r) -> l +
> r, (l, r) -> l - r);   /* read from the single partition topic, do reduce
> on the data*/
> 
> ----------
> 
> Note that if we wrap them all into a new operator, users would need to
> provide two functions, for the aggregate and for the final "reduce" (in my
> previous email I called it merger function, but for the same intent).
> 
> 
> 
> Guozhang
> 
> 
> 
> On Thu, Jul 5, 2018 at 3:38 PM, John Roesler <j...@confluent.io> wrote:
> 
>> Ok, I didn't get quite as far as I hoped, and several things are far from
>> ready, but here's what I have so far:
>> https://github.com/apache/kafka/pull/5337
>>
>> The "unit" test works, and is a good example of how you should expect it to
>> behave:
>> https://github.com/apache/kafka/pull/5337/files#diff-
>> 2fdec52b9cc3d0e564f0c12a199bed77
>>
>> I have one working integration test, but it's slow going getting the timing
>> right, so no promises of any kind ;)
>>
>> Let me know what you think!
>>
>> Thanks,
>> -John
>>
>> On Thu, Jul 5, 2018 at 8:39 AM John Roesler <j...@confluent.io> wrote:
>>
>>> Hey Flávio,
>>>
>>> Thanks! I haven't got anything usable yet, but I'm working on it now. I'm
>>> hoping to push up my branch by the end of the day.
>>>
>>> I don't know if you've seen it but Streams actually already has something
>>> like this, in the form of caching on materialized stores. If you pass in
>> a
>>> "Materialized.withCachingEnabled()", you should be able to get a POC
>>> working by setting the max cache size pretty high and setting the commit
>>> interval for your desired rate:
>>> https://docs.confluent.io/current/streams/developer-
>> guide/memory-mgmt.html#streams-developer-guide-memory-management
>>> .
>>>
>>> There are a couple of cases in joins and whatnot where it doesn't work,
>>> but for the aggregations we discussed, it should. The reason for KIP-328
>> is
>>> to provide finer control and hopefully a more straightforward API.
>>>
>>> Let me know if that works, and I'll drop a message in here when I create
>>> the draft PR for KIP-328. I'd really appreciate your feedback.
>>>
>>> Thanks,
>>> -John
>>>
>>> On Wed, Jul 4, 2018 at 10:17 PM flaviost...@gmail.com <
>>> flaviost...@gmail.com> wrote:
>>>
>>>> John, that was fantastic, man!
>>>> Have you built any custom implementation of your KIP in your machine so
>>>> that I could test it out here? I wish I could test it out.
>>>> If you need any help implementing this feature, please tell me.
>>>>
>>>> Thanks.
>>>>
>>>> -Flávio Stutz
>>>>
>>>>
>>>>
>>>>
>>>> On 2018/07/03 18:04:52, John Roesler <j...@confluent.io> wrote:
>>>>> Hi Flávio,
>>>>> Thanks! I think that we can actually do this, but the API could be
>>>> better.
>>>>> I've included Java code below, but I'll copy and modify your example
>> so
>>>>> we're on the same page.
>>>>>
>>>>> EXERCISE 1:
>>>>>   - The case is "total counting of events for a huge website"
>>>>>   - Tasks from Application A will have something like:
>>>>>          .stream(/site-events)
>>>>>          .transform( re-key s.t. the new key is the partition id)
>>>>>          .groupByKey() // you have to do this before count
>>>>>          .count()
>>>>>           // you explicitly published to a one-partition topic here,
>> but
>>>>> it's actually sufficient just
>>>>>           // to re-group onto one key. You could name and pre-create
>> the
>>>>> intermediate topic here,
>>>>>           // but you don't need a separate application for the final
>>>>> aggregation.
>>>>>          .groupBy((partitionId, partialCount) -> new KeyValue("ALL",
>>>>> partialCount))
>>>>>          .aggregate(sum up the partialCounts)
>>>>>          .publish(/counter-total)
>>>>>
>>>>> I've left out the suppressions, but they would go right after the
>>>> count()
>>>>> and the aggregate().
>>>>>
>>>>> With this program, you don't have to worry about the
>> double-aggregation
>>>> you
>>>>> mentioned in the last email. The KTable produced by the first count()
>>>> will
>>>>> maintain the correct count per partition. If the value changes for any
>>>>> partition, it'll emit a retraction of the old value and then the new
>>>> value
>>>>> downstream, so that the final aggregation can update itself properly.
>>>>>
>>>>> I think we can optimize both the execution and the programability by
>>>> adding
>>>>> a "global aggregation" concept. But In principle, it seems like this
>>>> usage
>>>>> of the current API will support your use case.
>>>>>
>>>>> Once again, though, this is just to present an alternative. I haven't
>>>> done
>>>>> the math on whether your proposal would be more efficient.
>>>>>
>>>>> Thanks,
>>>>> -John
>>>>>
>>>>> Here's the same algorithm written in Java:
>>>>>
>>>>> final KStream<String, String> siteEvents =
>>>> builder.stream("/site-events");
>>>>>
>>>>> // here we re-key the events so that the key is actually the partition
>>>> id.
>>>>> // we don't need the value to do a count, so I just set it to "1".
>>>>> final KStream<Integer, Integer> keyedByPartition =
>>>> siteEvents.transform(()
>>>>> -> new Transformer<String, String, KeyValue<Integer, Integer>>() {
>>>>>     private ProcessorContext context;
>>>>>
>>>>>     @Override
>>>>>     public void init(final ProcessorContext context) {
>>>>>         this.context = context;
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     public KeyValue<Integer, Integer> transform(final String key,
>> final
>>>>> String value) {
>>>>>         return new KeyValue<>(context.partition(), 1);
>>>>>     }
>>>>> });
>>>>>
>>>>> // Note that we can't do "count()" on a KStream, we have to group it
>>>> first.
>>>>> I'm grouping by the key, so it will produce the count for each key.
>>>>> // Since the key is actually the partition id, it will produce the
>>>>> pre-aggregated count per partition.
>>>>> // Note that the result is a KTable<PartitionId,Count>. It'll always
>>>>> contain the most recent count for each partition.
>>>>> final KTable<Integer, Long> countsByPartition =
>>>>> keyedByPartition.groupByKey().count();
>>>>>
>>>>> // Now we get ready for the final roll-up. We re-group all the
>>>> constituent
>>>>> counts
>>>>> final KGroupedTable<String, Long> singlePartition =
>>>>> countsByPartition.groupBy((key, value) -> new KeyValue<>("ALL",
>> value));
>>>>>
>>>>> final KTable<String, Long> totalCount = singlePartition.reduce((l, r)
>>>> -> l
>>>>> + r, (l, r) -> l - r);
>>>>>
>>>>> totalCount.toStream().foreach((k, v) -> {
>>>>>     // k is always "ALL"
>>>>>     // v is always the most recent total value
>>>>>     System.out.println("The total event count is: " + v);
>>>>> });
>>>>>
>>>>>
>>>>> On Tue, Jul 3, 2018 at 9:21 AM flaviost...@gmail.com <
>>>> flaviost...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Great feature you have there!
>>>>>>
>>>>>> I'll try to exercise here how we would achieve the same functional
>>>>>> objectives using your KIP:
>>>>>>
>>>>>> EXERCISE 1:
>>>>>>   - The case is "total counting of events for a huge website"
>>>>>>   - Tasks from Application A will have something like:
>>>>>>          .stream(/site-events)
>>>>>>          .count()
>>>>>>          .publish(/single-partitioned-topic-with-count-partials)
>>>>>>   - The published messages will be, for example:
>>>>>>           ["counter-task1", 2345]
>>>>>>           ["counter-task2", 8495]
>>>>>>           ["counter-task3", 4839]
>>>>>>   - Single Task from Application B will have something like:
>>>>>>          .stream(/single-partitioned-topic-with-count-partials)
>>>>>>          .aggregate(by messages whose key starts with "counter")
>>>>>>          .publish(/counter-total)
>>>>>>   - FAIL HERE. How would I know what is the overall partitions?
>> Maybe
>>>> two
>>>>>> partials for the same task will arrive before other tasks and it
>> maybe
>>>>>> aggregated twice.
>>>>>>
>>>>>> I tried to think about using GlobalKTables, but I didn't get an easy
>>>> way
>>>>>> to aggregate the keys from that table. Do you have any clue?
>>>>>>
>>>>>> Thanks.
>>>>>>
>>>>>> -Flávio Stutz
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> /partial-counters-to-single-partitioned-topic
>>>>>>
>>>>>> On 2018/07/02 20:03:57, John Roesler <j...@confluent.io> wrote:
>>>>>>> Hi Flávio,
>>>>>>>
>>>>>>> Thanks for the KIP. I'll apologize that I'm arriving late to the
>>>>>>> discussion. I've tried to catch up, but I might have missed some
>>>> nuances.
>>>>>>>
>>>>>>> Regarding KIP-328, the idea is to add the ability to suppress
>>>>>> intermediate
>>>>>>> results from all KTables, not just windowed ones. I think this
>> could
>>>>>>> support your use case in combination with the strategy that
>> Guozhang
>>>>>>> proposed of having one or more pre-aggregation steps that
>>>> ultimately push
>>>>>>> into a single-partition topic for final aggregation. Suppressing
>>>>>>> intermediate results would solve the problem you noted that today
>>>>>>> pre-aggregating doesn't do much to staunch the flow up updates.
>>>>>>>
>>>>>>> I'm not sure if this would be good enough for you overall; I just
>>>> wanted
>>>>>> to
>>>>>>> clarify the role of KIP-328.
>>>>>>> In particular, the solution you mentioned is to have the
>> downstream
>>>>>> KTables
>>>>>>> actually query the upstream ones to compute their results. I'm not
>>>> sure
>>>>>>> whether it's more efficient to do these queries on the schedule,
>> or
>>>> to
>>>>>> have
>>>>>>> the upstream tables emit their results, on the same schedule.
>>>>>>>
>>>>>>> What do you think?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> -John
>>>>>>>
>>>>>>> On Sun, Jul 1, 2018 at 10:03 PM flaviost...@gmail.com <
>>>>>> flaviost...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> For what I understood, that KIP is related to how KStreams will
>>>> handle
>>>>>>>> KTable updates in Windowed scenarios to optimize resource usage.
>>>>>>>> I couldn't see any specific relation to this KIP. Had you?
>>>>>>>>
>>>>>>>> -Flávio Stutz
>>>>>>>>
>>>>>>>>
>>>>>>>> On 2018/06/29 18:14:46, "Matthias J. Sax" <
>> matth...@confluent.io>
>>>>>> wrote:
>>>>>>>>> Flavio,
>>>>>>>>>
>>>>>>>>> thanks for cleaning up the KIP number collision.
>>>>>>>>>
>>>>>>>>> With regard to KIP-328
>>>>>>>>> (
>>>>>>>>
>>>>>>
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> 328%3A+Ability+to+suppress+updates+for+KTables
>>>>>>>> )
>>>>>>>>> I am wondering how both relate to each other?
>>>>>>>>>
>>>>>>>>> Any thoughts?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> -Matthias
>>>>>>>>>
>>>>>>>>> On 6/29/18 10:23 AM, flaviost...@gmail.com wrote:
>>>>>>>>>> Just copying a follow up from another thread to here (sorry
>>>> about
>>>>>> the
>>>>>>>> mess):
>>>>>>>>>>
>>>>>>>>>> From: Guozhang Wang <wangg...@gmail.com>
>>>>>>>>>> Subject: Re: [DISCUSS] KIP-323: Schedulable KTable as Graph
>>>> source
>>>>>>>>>> Date: 2018/06/25 22:24:17
>>>>>>>>>> List: dev@kafka.apache.org
>>>>>>>>>>
>>>>>>>>>> Flávio, thanks for creating this KIP.
>>>>>>>>>>
>>>>>>>>>> I think this "single-aggregation" use case is common enough
>>>> that we
>>>>>>>> should
>>>>>>>>>> consider how to efficiently supports it: for example, for
>> KSQL
>>>>>> that's
>>>>>>>> built
>>>>>>>>>> on top of Streams, we've seen lots of query statements whose
>>>>>> return is
>>>>>>>>>> expected a single row indicating the "total aggregate" etc.
>>>> See
>>>>>>>>>> https://github.com/confluentinc/ksql/issues/430 for
>> details.
>>>>>>>>>>
>>>>>>>>>> I've not read through
>>>>>> https://issues.apache.org/jira/browse/KAFKA-6953,
>>>>>>>> but
>>>>>>>>>> I'm wondering if we have discussed the option of supporting
>>>> it in a
>>>>>>>>>> "pre-aggregate" manner: that is we do partial aggregates on
>>>>>> parallel
>>>>>>>> tasks,
>>>>>>>>>> and then sends the partial aggregated value via a single
>> topic
>>>>>>>> partition
>>>>>>>>>> for the final aggregate, to reduce the traffic on that
>> single
>>>>>>>> partition and
>>>>>>>>>> hence the final aggregate workload.
>>>>>>>>>> Of course, for non-commutative aggregates we'd probably need
>>>> to
>>>>>> provide
>>>>>>>>>> another API in addition to aggregate, like the `merge`
>>>> function for
>>>>>>>>>> session-based aggregates, to let users customize the
>>>> operations of
>>>>>>>> merging
>>>>>>>>>> two partial aggregates into a single partial aggregate.
>>>> What's its
>>>>>>>> pros and
>>>>>>>>>> cons compared with the current proposal?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Guozhang
>>>>>>>>>> On 2018/06/26 18:22:27, Flávio Stutz <flaviost...@gmail.com
>>>
>>>>>> wrote:
>>>>>>>>>>> Hey, guys, I've just created a new KIP about creating a new
>>>> DSL
>>>>>> graph
>>>>>>>>>>> source for realtime partitioned consolidations.
>>>>>>>>>>>
>>>>>>>>>>> We have faced the following scenario/problem in a lot of
>>>>>> situations
>>>>>>>> with
>>>>>>>>>>> KStreams:
>>>>>>>>>>>    - Huge incoming data being processed by numerous
>>>> application
>>>>>>>> instances
>>>>>>>>>>>    - Need to aggregate different fields whose records span
>>>> all
>>>>>> topic
>>>>>>>>>>> partitions (something like “total amount spent by people
>>>> aged > 30
>>>>>>>> yrs”
>>>>>>>>>>> when processing a topic partitioned by userid).
>>>>>>>>>>>
>>>>>>>>>>> The challenge here is to manage this kind of situation
>>>> without any
>>>>>>>>>>> bottlenecks. We don't need the “global aggregation” to be
>>>>>> processed
>>>>>>>> at each
>>>>>>>>>>> incoming message. On a scenario of 500 instances, each
>>>> handling 1k
>>>>>>>>>>> messages/s, any single point of aggregation (single
>>>> partitioned
>>>>>>>> topics,
>>>>>>>>>>> global tables or external databases) would create a
>>>> bottleneck of
>>>>>> 500k
>>>>>>>>>>> messages/s for single threaded/CPU elements.
>>>>>>>>>>>
>>>>>>>>>>> For this scenario, it is possible to store the partial
>>>>>> aggregations on
>>>>>>>>>>> local stores and, from time to time, query those states and
>>>>>> aggregate
>>>>>>>> them
>>>>>>>>>>> as a single value, avoiding bottlenecks. This is a way to
>>>> create a
>>>>>>>> "timed
>>>>>>>>>>> aggregation barrier”.
>>>>>>>>>>>
>>>>>>>>>>> If we leverage this kind of built-in feature we could
>> greatly
>>>>>> enhance
>>>>>>>> the
>>>>>>>>>>> ability of KStreams to better handle the CAP Theorem
>>>>>> characteristics,
>>>>>>>> so
>>>>>>>>>>> that one could choose to have Consistency over Availability
>>>> when
>>>>>>>> needed.
>>>>>>>>>>>
>>>>>>>>>>> We started this discussion with Matthias J. Sax here:
>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6953
>>>>>>>>>>>
>>>>>>>>>>> If you want to see more, go to KIP-326 at:
>>>>>>>>>>>
>>>>>>>>
>>>>>>
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> 326%3A+Schedulable+KTable+as+Graph+source
>>>>>>>>>>>
>>>>>>>>>>> -Flávio Stutz
>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
> 
> 
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to