Hi Flávio,
Thanks! I think that we can actually do this, but the API could be better.
I've included Java code below, but I'll copy and modify your example so
we're on the same page.

EXERCISE 1:
  - The case is "total counting of events for a huge website"
  - Tasks from Application A will have something like:
         .stream(/site-events)
         .transform( re-key s.t. the new key is the partition id)
         .groupByKey() // you have to do this before count
         .count()
          // you explicitly published to a one-partition topic here, but
it's actually sufficient just
          // to re-group onto one key. You could name and pre-create the
intermediate topic here,
          // but you don't need a separate application for the final
aggregation.
         .groupBy((partitionId, partialCount) -> new KeyValue("ALL",
partialCount))
         .aggregate(sum up the partialCounts)
         .publish(/counter-total)

I've left out the suppressions, but they would go right after the count()
and the aggregate().

With this program, you don't have to worry about the double-aggregation you
mentioned in the last email. The KTable produced by the first count() will
maintain the correct count per partition. If the value changes for any
partition, it'll emit a retraction of the old value and then the new value
downstream, so that the final aggregation can update itself properly.

I think we can optimize both the execution and the programability by adding
a "global aggregation" concept. But In principle, it seems like this usage
of the current API will support your use case.

Once again, though, this is just to present an alternative. I haven't done
the math on whether your proposal would be more efficient.

Thanks,
-John

Here's the same algorithm written in Java:

final KStream<String, String> siteEvents = builder.stream("/site-events");

// here we re-key the events so that the key is actually the partition id.
// we don't need the value to do a count, so I just set it to "1".
final KStream<Integer, Integer> keyedByPartition = siteEvents.transform(()
-> new Transformer<String, String, KeyValue<Integer, Integer>>() {
    private ProcessorContext context;

    @Override
    public void init(final ProcessorContext context) {
        this.context = context;
    }

    @Override
    public KeyValue<Integer, Integer> transform(final String key, final
String value) {
        return new KeyValue<>(context.partition(), 1);
    }
});

// Note that we can't do "count()" on a KStream, we have to group it first.
I'm grouping by the key, so it will produce the count for each key.
// Since the key is actually the partition id, it will produce the
pre-aggregated count per partition.
// Note that the result is a KTable<PartitionId,Count>. It'll always
contain the most recent count for each partition.
final KTable<Integer, Long> countsByPartition =
keyedByPartition.groupByKey().count();

// Now we get ready for the final roll-up. We re-group all the constituent
counts
final KGroupedTable<String, Long> singlePartition =
countsByPartition.groupBy((key, value) -> new KeyValue<>("ALL", value));

final KTable<String, Long> totalCount = singlePartition.reduce((l, r) -> l
+ r, (l, r) -> l - r);

totalCount.toStream().foreach((k, v) -> {
    // k is always "ALL"
    // v is always the most recent total value
    System.out.println("The total event count is: " + v);
});


On Tue, Jul 3, 2018 at 9:21 AM flaviost...@gmail.com <flaviost...@gmail.com>
wrote:

> Great feature you have there!
>
> I'll try to exercise here how we would achieve the same functional
> objectives using your KIP:
>
> EXERCISE 1:
>   - The case is "total counting of events for a huge website"
>   - Tasks from Application A will have something like:
>          .stream(/site-events)
>          .count()
>          .publish(/single-partitioned-topic-with-count-partials)
>   - The published messages will be, for example:
>           ["counter-task1", 2345]
>           ["counter-task2", 8495]
>           ["counter-task3", 4839]
>   - Single Task from Application B will have something like:
>          .stream(/single-partitioned-topic-with-count-partials)
>          .aggregate(by messages whose key starts with "counter")
>          .publish(/counter-total)
>   - FAIL HERE. How would I know what is the overall partitions? Maybe two
> partials for the same task will arrive before other tasks and it maybe
> aggregated twice.
>
> I tried to think about using GlobalKTables, but I didn't get an easy way
> to aggregate the keys from that table. Do you have any clue?
>
> Thanks.
>
> -Flávio Stutz
>
>
>
>
>
>
> /partial-counters-to-single-partitioned-topic
>
> On 2018/07/02 20:03:57, John Roesler <j...@confluent.io> wrote:
> > Hi Flávio,
> >
> > Thanks for the KIP. I'll apologize that I'm arriving late to the
> > discussion. I've tried to catch up, but I might have missed some nuances.
> >
> > Regarding KIP-328, the idea is to add the ability to suppress
> intermediate
> > results from all KTables, not just windowed ones. I think this could
> > support your use case in combination with the strategy that Guozhang
> > proposed of having one or more pre-aggregation steps that ultimately push
> > into a single-partition topic for final aggregation. Suppressing
> > intermediate results would solve the problem you noted that today
> > pre-aggregating doesn't do much to staunch the flow up updates.
> >
> > I'm not sure if this would be good enough for you overall; I just wanted
> to
> > clarify the role of KIP-328.
> > In particular, the solution you mentioned is to have the downstream
> KTables
> > actually query the upstream ones to compute their results. I'm not sure
> > whether it's more efficient to do these queries on the schedule, or to
> have
> > the upstream tables emit their results, on the same schedule.
> >
> > What do you think?
> >
> > Thanks,
> > -John
> >
> > On Sun, Jul 1, 2018 at 10:03 PM flaviost...@gmail.com <
> flaviost...@gmail.com>
> > wrote:
> >
> > > For what I understood, that KIP is related to how KStreams will handle
> > > KTable updates in Windowed scenarios to optimize resource usage.
> > > I couldn't see any specific relation to this KIP. Had you?
> > >
> > > -Flávio Stutz
> > >
> > >
> > > On 2018/06/29 18:14:46, "Matthias J. Sax" <matth...@confluent.io>
> wrote:
> > > > Flavio,
> > > >
> > > > thanks for cleaning up the KIP number collision.
> > > >
> > > > With regard to KIP-328
> > > > (
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables
> > > )
> > > > I am wondering how both relate to each other?
> > > >
> > > > Any thoughts?
> > > >
> > > >
> > > > -Matthias
> > > >
> > > > On 6/29/18 10:23 AM, flaviost...@gmail.com wrote:
> > > > > Just copying a follow up from another thread to here (sorry about
> the
> > > mess):
> > > > >
> > > > > From: Guozhang Wang <wangg...@gmail.com>
> > > > > Subject: Re: [DISCUSS] KIP-323: Schedulable KTable as Graph source
> > > > > Date: 2018/06/25 22:24:17
> > > > > List: dev@kafka.apache.org
> > > > >
> > > > > Flávio, thanks for creating this KIP.
> > > > >
> > > > > I think this "single-aggregation" use case is common enough that we
> > > should
> > > > > consider how to efficiently supports it: for example, for KSQL
> that's
> > > built
> > > > > on top of Streams, we've seen lots of query statements whose
> return is
> > > > > expected a single row indicating the "total aggregate" etc. See
> > > > > https://github.com/confluentinc/ksql/issues/430 for details.
> > > > >
> > > > > I've not read through
> https://issues.apache.org/jira/browse/KAFKA-6953,
> > > but
> > > > > I'm wondering if we have discussed the option of supporting it in a
> > > > > "pre-aggregate" manner: that is we do partial aggregates on
> parallel
> > > tasks,
> > > > > and then sends the partial aggregated value via a single topic
> > > partition
> > > > > for the final aggregate, to reduce the traffic on that single
> > > partition and
> > > > > hence the final aggregate workload.
> > > > > Of course, for non-commutative aggregates we'd probably need to
> provide
> > > > > another API in addition to aggregate, like the `merge` function for
> > > > > session-based aggregates, to let users customize the operations of
> > > merging
> > > > > two partial aggregates into a single partial aggregate. What's its
> > > pros and
> > > > > cons compared with the current proposal?
> > > > >
> > > > >
> > > > > Guozhang
> > > > > On 2018/06/26 18:22:27, Flávio Stutz <flaviost...@gmail.com>
> wrote:
> > > > >> Hey, guys, I've just created a new KIP about creating a new DSL
> graph
> > > > >> source for realtime partitioned consolidations.
> > > > >>
> > > > >> We have faced the following scenario/problem in a lot of
> situations
> > > with
> > > > >> KStreams:
> > > > >>    - Huge incoming data being processed by numerous application
> > > instances
> > > > >>    - Need to aggregate different fields whose records span all
> topic
> > > > >> partitions (something like “total amount spent by people aged > 30
> > > yrs”
> > > > >> when processing a topic partitioned by userid).
> > > > >>
> > > > >> The challenge here is to manage this kind of situation without any
> > > > >> bottlenecks. We don't need the “global aggregation” to be
> processed
> > > at each
> > > > >> incoming message. On a scenario of 500 instances, each handling 1k
> > > > >> messages/s, any single point of aggregation (single partitioned
> > > topics,
> > > > >> global tables or external databases) would create a bottleneck of
> 500k
> > > > >> messages/s for single threaded/CPU elements.
> > > > >>
> > > > >> For this scenario, it is possible to store the partial
> aggregations on
> > > > >> local stores and, from time to time, query those states and
> aggregate
> > > them
> > > > >> as a single value, avoiding bottlenecks. This is a way to create a
> > > "timed
> > > > >> aggregation barrier”.
> > > > >>
> > > > >> If we leverage this kind of built-in feature we could greatly
> enhance
> > > the
> > > > >> ability of KStreams to better handle the CAP Theorem
> characteristics,
> > > so
> > > > >> that one could choose to have Consistency over Availability when
> > > needed.
> > > > >>
> > > > >> We started this discussion with Matthias J. Sax here:
> > > > >> https://issues.apache.org/jira/browse/KAFKA-6953
> > > > >>
> > > > >> If you want to see more, go to KIP-326 at:
> > > > >>
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-326%3A+Schedulable+KTable+as+Graph+source
> > > > >>
> > > > >> -Flávio Stutz
> > > > >>
> > > >
> > > >
> > >
> >
>

Reply via email to