> So with >> exactly_once, it must roll-back commit(s) to the state store in a failure >> scenario?
Yes. Dirty writes into the stores are "cleaned up" if you enable exactly-once processing semantics. "commit" and never rolled back, as a commit indicates successful processing :) -Matthias On 8/20/19 8:07 PM, Alex Brekken wrote: > Thanks guys. I knew that re-processing messages was a possibility with > at_least_once processing, but I guess I hadn't considered the potential > impact on the state stores as far as aggregations are concerned. So with > exactly_once, it must roll-back commit(s) to the state store in a failure > scenario? I haven't dug into the code to see how it works, but given the > behavior I'm seeing it must.. > > Tim - I actually saw your related question from last week right after I > sent mine. :) > > Alex > > On Tue, Aug 20, 2019 at 2:28 PM Bruno Cadonna <[email protected]> wrote: > >> Hi Alex, >> >> what you describe about failing before offsets are committed is one >> reason why records are processed multiple times under the >> at-least-once processing guarantee. That is reality of life as you >> stated. Kafka Streams in exactly-once mode guarantees that this >> duplicate state updates do not happen. >> >> The exactly-once processing guarantee was implemented in Kafka Streams >> for use cases where correctness is of highest importance. >> >> Best, >> Bruno >> >> >> >> On Mon, Aug 19, 2019 at 9:24 PM Alex Brekken <[email protected]> wrote: >>> >>> Hi all, I have a (relatively) simple streams topology that is producing >>> some counts, and while testing this code I'm seeing some occasional >>> incorrect aggregation results. This seems to happen when a re-balance >>> occurs - typically due to a timeout or communication hiccup with the >> Kafka >>> broker. The topology is built with the DSL, and utilizes 2 KTables: the >>> first is really just a de-dup table and the second is the result of the >>> aggregation. So at a high level the topology consumes from a source >> topic, >>> groupsByKey() and then does a reduce() where we always return the >>> newValue. Then it does a groupBy() on a new key, and finally an >>> aggregate() call with an adder and subtractor. Because our source topic >>> frequently contains duplicate messages, this seemed like a good way to >>> handle the dupes: the subtractor gets invoked anytime we replace a value >> in >>> the "upstream" KTable and removes it from the count, then adds it back >>> again in the adder. >>> >>> In the happy-path scenario where we never see any exceptions or >> rebalances, >>> this whole thing works great - the counts at the end are 100% correct. >> But >>> when rebalancing is triggered we sometimes get bad counts. My theory is >>> that when a timeout or connectivity problem happens during that second >>> aggregation, the data ends up getting saved to the state store but the >>> offsets don't get committed and the message(s) in the repartition topic >>> that feed the aggregation get replayed after the stream task gets >>> rebalanced, causing the counts to get incorrectly incremented or >>> decremented. (depending on whether the message was triggering the adder >> or >>> the subtractor) I can simulate this problem (or something similar to >> this >>> problem) when debugging the application in my IDE just by pausing >> execution >>> on a breakpoint inside the aggregation's adder or subtractor method for a >>> few seconds. The result of the adder or subtractor gets saved to the >> state >>> store which means that when the messages in the repartition topic get >>> re-processed, the counts get doubled. If I enable "exactly_once" >>> processing, I'm unable to recreate the problem and the counts are always >>> accurate. >>> >>> My questions are: >>> >>> 1. Is this expected behavior? In a hostile application environment where >>> connectivity problems and rebalances happen frequently, is some degree of >>> incorrectly aggregated data just a reality of life? >>> >>> 2. Is exactly_once processing the right solution if correctness is of >>> highest importance? Or should I be looking at different ways of writing >>> the topology? >>> >>> Thanks for any advice! >>> >>> Alex >> >
signature.asc
Description: OpenPGP digital signature
