Hi Flávio,

Sure thing. And apologies in advance if I missed the point.

Below is some more-or-less realistic Java code to demonstrate how, given a
high-volume (heavily partitioned) stream of purchases, we can "step down"
the update rate with rate-limited intermediate aggregations.
Please bear in mind that the suppression API itself is still under debate,
so this is just for illustration purposes.

Basically, the "suppress" operator creates a processor whose job is just to
store the latest value for each key and not emit it until the configured
time.

So if key "X" gets updated 1000x/sec, we can use suppress to make sure it
doesn't get emitted to the next processor more than once per second.

Does this make sense?

Thanks,
-John

public class KTableSuppressProcessorTest {
    private static class Purchase {
        final long customerId;
        final int value;

        private Purchase(final long customerId, final int value) {
            this.customerId = customerId;
            this.value = value;
        }
    }

    private static class PurchaseSerde implements Serde<Purchase> {...}

    public Topology buildTopology() {
        final StreamsBuilder builder = new StreamsBuilder();
        final String purchases = "purchases";

        final KTable<Long, Purchase> input = builder.table(
            purchases,
            Consumed.with(Serdes.Long(), new PurchaseSerde())
        );

        // Fairly sloppy, but the idea is to "split" each customer id into
one id per partition.
        // This way, we can first total their purchases inside each
partition before aggregating them
        // across partitions
        final KTable<Long, Purchase> purchasesWithPartitionedCustomers =
input.transformValues(
            () -> new ValueTransformerWithKey<Long, Purchase, Purchase>() {
                private ProcessorContext context;

                @Override
                public void init(final ProcessorContext context) {
                    this.context = context;
                }

                @Override
                public Purchase transform(final Long readOnlyKey, final
Purchase purchase) {
                    final int partition = context.partition();
                    return new Purchase(
                        purchase.customerId * 1000 + partition, // Assuming
we have < 1k partitions...
                        purchase.value
                    );
                }
            });

        final KGroupedTable<Long, Integer>
purchaseValueByPartitionedCustomer =
            purchasesWithPartitionedCustomers.groupBy(
                (id, purchase) -> new KeyValue<>(purchase.customerId,
purchase.value)
            );

        final Suppression<Long, Integer> oncePerKeyPerSecond =
Suppression.suppressIntermediateEvents(
            IntermediateSuppression
                .emitAfter(Duration.ofSeconds(1))
                .bufferKeys(5000)
                .bufferFullStrategy(EMIT)
        );

        // First level of aggregation. Each customer gets their purchases
aggregated *just within each partition*.
        // The result of this aggregation is emitted at most once per
second per customer per purchase-partition
        final KTable<Long, Integer> totalValueByPartitionedCustomer =
            purchaseValueByPartitionedCustomer
                .reduce((l, r) -> l + r, (l, r) -> l - r)
                .suppress(oncePerKeyPerSecond);

        // This is where we reverse the partitioning of each customer and
then aggregate
        // each customer's purchases across partitions
        // The result of this aggregation is emitted at most once per
second per customer
        final KTable<Long, Integer>
aggregatedTotalValueByPartitionedCustomer =
            totalValueByPartitionedCustomer
                .groupBy((key, value) -> new KeyValue<>(key / 1000, value))
                .reduce((l, r) -> l + r, (l, r) -> l - r)
                .suppress(oncePerKeyPerSecond);

        // Sending all the intermediate totals to a single key to get the
final aggregation
        // The result of this aggregation is emitted at most once per second
        final KTable<String, Integer> total =
aggregatedTotalValueByPartitionedCustomer
            .groupBy((key, value) -> new KeyValue<>("ALL", value))
            .reduce((l, r) -> l + r, (l, r) -> l - r)
            .suppress(Suppression.suppressIntermediateEvents(
                IntermediateSuppression.emitAfter(Duration.ofSeconds(1))
            ));

        // This topic will contain just one key ("ALL"), and the value will
be
        // the ever-updating all-time purchase value
        // Note that it'll be updated once per second.
        total.toStream().to("total-purchases-value");

        return builder.build();
    }
}

On Mon, Jul 2, 2018 at 3:38 PM flaviost...@gmail.com <flaviost...@gmail.com>
wrote:

> Thanks for clarifying the real usage of KIP-328. Now I understood a bit
> better.
> I didn't see how that feature would be used to minimize the number of
> publications to the single partitioned output topic. When it is falls into
> supression, the graph stops going down? Could you explain better? If that
> is possible I think it would be great.
>
> Thanks for the intervention!
>
> -Flávio Stutz
>
>
>
>
> On 2018/07/02 20:03:57, John Roesler <j...@confluent.io> wrote:
> > Hi Flávio,
> >
> > Thanks for the KIP. I'll apologize that I'm arriving late to the
> > discussion. I've tried to catch up, but I might have missed some nuances.
> >
> > Regarding KIP-328, the idea is to add the ability to suppress
> intermediate
> > results from all KTables, not just windowed ones. I think this could
> > support your use case in combination with the strategy that Guozhang
> > proposed of having one or more pre-aggregation steps that ultimately push
> > into a single-partition topic for final aggregation. Suppressing
> > intermediate results would solve the problem you noted that today
> > pre-aggregating doesn't do much to staunch the flow up updates.
> >
> > I'm not sure if this would be good enough for you overall; I just wanted
> to
> > clarify the role of KIP-328.
> > In particular, the solution you mentioned is to have the downstream
> KTables
> > actually query the upstream ones to compute their results. I'm not sure
> > whether it's more efficient to do these queries on the schedule, or to
> have
> > the upstream tables emit their results, on the same schedule.
> >
> > What do you think?
> >
> > Thanks,
> > -John
> >
> > On Sun, Jul 1, 2018 at 10:03 PM flaviost...@gmail.com <
> flaviost...@gmail.com>
> > wrote:
> >
> > > For what I understood, that KIP is related to how KStreams will handle
> > > KTable updates in Windowed scenarios to optimize resource usage.
> > > I couldn't see any specific relation to this KIP. Had you?
> > >
> > > -Flávio Stutz
> > >
> > >
> > > On 2018/06/29 18:14:46, "Matthias J. Sax" <matth...@confluent.io>
> wrote:
> > > > Flavio,
> > > >
> > > > thanks for cleaning up the KIP number collision.
> > > >
> > > > With regard to KIP-328
> > > > (
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables
> > > )
> > > > I am wondering how both relate to each other?
> > > >
> > > > Any thoughts?
> > > >
> > > >
> > > > -Matthias
> > > >
> > > > On 6/29/18 10:23 AM, flaviost...@gmail.com wrote:
> > > > > Just copying a follow up from another thread to here (sorry about
> the
> > > mess):
> > > > >
> > > > > From: Guozhang Wang <wangg...@gmail.com>
> > > > > Subject: Re: [DISCUSS] KIP-323: Schedulable KTable as Graph source
> > > > > Date: 2018/06/25 22:24:17
> > > > > List: dev@kafka.apache.org
> > > > >
> > > > > Flávio, thanks for creating this KIP.
> > > > >
> > > > > I think this "single-aggregation" use case is common enough that we
> > > should
> > > > > consider how to efficiently supports it: for example, for KSQL
> that's
> > > built
> > > > > on top of Streams, we've seen lots of query statements whose
> return is
> > > > > expected a single row indicating the "total aggregate" etc. See
> > > > > https://github.com/confluentinc/ksql/issues/430 for details.
> > > > >
> > > > > I've not read through
> https://issues.apache.org/jira/browse/KAFKA-6953,
> > > but
> > > > > I'm wondering if we have discussed the option of supporting it in a
> > > > > "pre-aggregate" manner: that is we do partial aggregates on
> parallel
> > > tasks,
> > > > > and then sends the partial aggregated value via a single topic
> > > partition
> > > > > for the final aggregate, to reduce the traffic on that single
> > > partition and
> > > > > hence the final aggregate workload.
> > > > > Of course, for non-commutative aggregates we'd probably need to
> provide
> > > > > another API in addition to aggregate, like the `merge` function for
> > > > > session-based aggregates, to let users customize the operations of
> > > merging
> > > > > two partial aggregates into a single partial aggregate. What's its
> > > pros and
> > > > > cons compared with the current proposal?
> > > > >
> > > > >
> > > > > Guozhang
> > > > > On 2018/06/26 18:22:27, Flávio Stutz <flaviost...@gmail.com>
> wrote:
> > > > >> Hey, guys, I've just created a new KIP about creating a new DSL
> graph
> > > > >> source for realtime partitioned consolidations.
> > > > >>
> > > > >> We have faced the following scenario/problem in a lot of
> situations
> > > with
> > > > >> KStreams:
> > > > >>    - Huge incoming data being processed by numerous application
> > > instances
> > > > >>    - Need to aggregate different fields whose records span all
> topic
> > > > >> partitions (something like “total amount spent by people aged > 30
> > > yrs”
> > > > >> when processing a topic partitioned by userid).
> > > > >>
> > > > >> The challenge here is to manage this kind of situation without any
> > > > >> bottlenecks. We don't need the “global aggregation” to be
> processed
> > > at each
> > > > >> incoming message. On a scenario of 500 instances, each handling 1k
> > > > >> messages/s, any single point of aggregation (single partitioned
> > > topics,
> > > > >> global tables or external databases) would create a bottleneck of
> 500k
> > > > >> messages/s for single threaded/CPU elements.
> > > > >>
> > > > >> For this scenario, it is possible to store the partial
> aggregations on
> > > > >> local stores and, from time to time, query those states and
> aggregate
> > > them
> > > > >> as a single value, avoiding bottlenecks. This is a way to create a
> > > "timed
> > > > >> aggregation barrier”.
> > > > >>
> > > > >> If we leverage this kind of built-in feature we could greatly
> enhance
> > > the
> > > > >> ability of KStreams to better handle the CAP Theorem
> characteristics,
> > > so
> > > > >> that one could choose to have Consistency over Availability when
> > > needed.
> > > > >>
> > > > >> We started this discussion with Matthias J. Sax here:
> > > > >> https://issues.apache.org/jira/browse/KAFKA-6953
> > > > >>
> > > > >> If you want to see more, go to KIP-326 at:
> > > > >>
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-326%3A+Schedulable+KTable+as+Graph+source
> > > > >>
> > > > >> -Flávio Stutz
> > > > >>
> > > >
> > > >
> > >
> >
>

Reply via email to