Thanks Guozhang, I appreciate the explanation. That cleared up a lot for me
and confirmed what I thought.

Based on the above, in my scenario, the state machine could get into an
incorrect state. (Just for whoever may be watching this thread).

On Tue, Jul 24, 2018 at 6:20 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Adam,
>
> I figured that rather than answering your questions one-by-one, I'd give
> you a more general explanation between consumer offset commits, changelog
> and state store.
>
> If you have a state store update processor, the state store maintenance
> workflow is this:
>
>
> 1) updating the state store:
>
> 1.a write to state store.
> 1.b write to changelog topic
>
>
> Note that 1.a) could be async: the state store may be caching enabled, and
> also even the state store itself may have some write buffer (e.g. rocksDB);
> also 1.b) is async and batching enabled as well, and the actual sending
> request is done via another thread.
>
> So at the end of 1.b) either is possible: data is written persistently to
> the local files of the state store, but have not been sent to changelog, or
> data not written persistently to local files yet, but have been sent to
> changelog, or both have happened, or neither has happened.
>
>
> 2) committing the state store:
>
> 2.a) flush state store (make sure all previous writes have been persisted)
> 2.b) flush on producer (make sure all previous writes to changelog topics
> have been acknowledged).
> 2.c) commit offset.
>
> That is, if committing succeeded, by the end of 2.c) all should be done,
> and everything is consistent.
>
> Now if there is a crash after 1.b) and before 2), then like above said, any
> scenarios may happen, but note that consumer's offset will definitely NOT
> committed yet (it should only be done in 2.c) ), so upon restarting the
> data will be re-processed, and hence either state store's image or
> changelog may contained duplicated results, aka "at-least-once".
>
> 3) Finally, when exactly-once is enabled, if there is any crashes, the
> changelog topic / state store will be "rewinded" (I omit the implementation
> details here, but just assume that logically, we can rewind them) to the
> previously successful commit, so `exactly-once` is guaranteed.
>
>
> Guozhang
>
> On Sun, Jul 22, 2018 at 5:29 PM, Adam Bellemare <adam.bellem...@gmail.com>
> wrote:
>
> > Hi Folks
> >
> > I have a quick question about a scenario that I would appreciate some
> > insight on. This is related to a KIP I am working on, but I wanted to
> break
> > this out into its own scenario to reach a wider audience. In this
> scenario,
> > I am using builder.internalTopologyBuilder to create the following
> within
> > the internals of Kafka Streaming:
> >
> > 1) Internal Topic Source (builder.internalTopologyBuilder.addSource(...)
> )
> >
> > 2) ProcessorSupplier with StateStore, Changelogging enabled. For the
> > purpose of this question, this processor is a very simple state machine.
> > All it does is alternately block each other event, of a given key, from
> > processing. For instance:
> > (A,1)
> > (A,2)
> > (A,3)
> > It would block the propagation of (A,2). The state of the system after
> > processing each event is:
> > blockNext = true
> > blockNext = false
> > blockNext = true
> >
> > The expecation is that this component would always block the same event,
> in
> > any failure mode and subsequent recovery (ie: ALWAYS blocks (A,2), but
> not
> > (A,1) or (A,3) ). In other words, it would maintain perfect state in
> > accordance with the offsets of the upstream and downstream elements.
> >
> > 3) The third component is a KTable with a Materialized StateStore where I
> > want to sink the remaining events. It is also backed by a change log. The
> > events arriving would be:
> > (A,1)
> > (A,3)
> >
> > The components are ordered as:
> > 1 -> 2 -> 3
> >
> >
> > Note that I am keeping the state machine in a separate state store. My
> main
> > questions are:
> >
> > 1) Will this workflow be consistent in all manners of failure? For
> example,
> > are the state stores change logs fully written to internal topics before
> > the offset is updated for the consumer in #1?
> >
> > 2) Is it possible that one State Store with changelogging will be logged
> to
> > Kafka safely (say component #3) but the other (#2) will not be, prior to
> a
> > sudden, hard termination of the node?
> >
> > 3) Is the alternate possible, where #2 is backed up to its Kafka Topic
> but
> > #3 is not? Does the ordering of the topology matter in this case?
> >
> > 4) Is it possible that the state store #2 is updated and logged, but the
> > source topic (#1) offset is not updated?
> >
> > In all of these cases, my main concern is keeping the state and the
> > expected output consistent. For any failure mode, will I be able to
> recover
> > to a fully consistent state given the requirements of the state machine
> in
> > #2?
> >
> > Though this is a trivial example, I am not certain about the dynamics
> > between maintaining state, recovering from internal changelog topics, and
> > the order in which all of these things apply. Any words of wisdom or
> > explanations would be helpful here. I have been looking through the code
> > but I wanted to get second opinions on this.
> >
> >
> >
> > Thanks,
> >
> > Adam
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to