Thanks Guozhang, I appreciate the explanation. That cleared up a lot for me and confirmed what I thought.
Based on the above, in my scenario, the state machine could get into an incorrect state. (Just for whoever may be watching this thread). On Tue, Jul 24, 2018 at 6:20 PM, Guozhang Wang <[email protected]> wrote: > Hello Adam, > > I figured that rather than answering your questions one-by-one, I'd give > you a more general explanation between consumer offset commits, changelog > and state store. > > If you have a state store update processor, the state store maintenance > workflow is this: > > > 1) updating the state store: > > 1.a write to state store. > 1.b write to changelog topic > > > Note that 1.a) could be async: the state store may be caching enabled, and > also even the state store itself may have some write buffer (e.g. rocksDB); > also 1.b) is async and batching enabled as well, and the actual sending > request is done via another thread. > > So at the end of 1.b) either is possible: data is written persistently to > the local files of the state store, but have not been sent to changelog, or > data not written persistently to local files yet, but have been sent to > changelog, or both have happened, or neither has happened. > > > 2) committing the state store: > > 2.a) flush state store (make sure all previous writes have been persisted) > 2.b) flush on producer (make sure all previous writes to changelog topics > have been acknowledged). > 2.c) commit offset. > > That is, if committing succeeded, by the end of 2.c) all should be done, > and everything is consistent. > > Now if there is a crash after 1.b) and before 2), then like above said, any > scenarios may happen, but note that consumer's offset will definitely NOT > committed yet (it should only be done in 2.c) ), so upon restarting the > data will be re-processed, and hence either state store's image or > changelog may contained duplicated results, aka "at-least-once". > > 3) Finally, when exactly-once is enabled, if there is any crashes, the > changelog topic / state store will be "rewinded" (I omit the implementation > details here, but just assume that logically, we can rewind them) to the > previously successful commit, so `exactly-once` is guaranteed. > > > Guozhang > > On Sun, Jul 22, 2018 at 5:29 PM, Adam Bellemare <[email protected]> > wrote: > > > Hi Folks > > > > I have a quick question about a scenario that I would appreciate some > > insight on. This is related to a KIP I am working on, but I wanted to > break > > this out into its own scenario to reach a wider audience. In this > scenario, > > I am using builder.internalTopologyBuilder to create the following > within > > the internals of Kafka Streaming: > > > > 1) Internal Topic Source (builder.internalTopologyBuilder.addSource(...) > ) > > > > 2) ProcessorSupplier with StateStore, Changelogging enabled. For the > > purpose of this question, this processor is a very simple state machine. > > All it does is alternately block each other event, of a given key, from > > processing. For instance: > > (A,1) > > (A,2) > > (A,3) > > It would block the propagation of (A,2). The state of the system after > > processing each event is: > > blockNext = true > > blockNext = false > > blockNext = true > > > > The expecation is that this component would always block the same event, > in > > any failure mode and subsequent recovery (ie: ALWAYS blocks (A,2), but > not > > (A,1) or (A,3) ). In other words, it would maintain perfect state in > > accordance with the offsets of the upstream and downstream elements. > > > > 3) The third component is a KTable with a Materialized StateStore where I > > want to sink the remaining events. It is also backed by a change log. The > > events arriving would be: > > (A,1) > > (A,3) > > > > The components are ordered as: > > 1 -> 2 -> 3 > > > > > > Note that I am keeping the state machine in a separate state store. My > main > > questions are: > > > > 1) Will this workflow be consistent in all manners of failure? For > example, > > are the state stores change logs fully written to internal topics before > > the offset is updated for the consumer in #1? > > > > 2) Is it possible that one State Store with changelogging will be logged > to > > Kafka safely (say component #3) but the other (#2) will not be, prior to > a > > sudden, hard termination of the node? > > > > 3) Is the alternate possible, where #2 is backed up to its Kafka Topic > but > > #3 is not? Does the ordering of the topology matter in this case? > > > > 4) Is it possible that the state store #2 is updated and logged, but the > > source topic (#1) offset is not updated? > > > > In all of these cases, my main concern is keeping the state and the > > expected output consistent. For any failure mode, will I be able to > recover > > to a fully consistent state given the requirements of the state machine > in > > #2? > > > > Though this is a trivial example, I am not certain about the dynamics > > between maintaining state, recovering from internal changelog topics, and > > the order in which all of these things apply. Any words of wisdom or > > explanations would be helpful here. I have been looking through the code > > but I wanted to get second opinions on this. > > > > > > > > Thanks, > > > > Adam > > > > > > -- > -- Guozhang >
