Re: Processor API StateStore and Recovery with State Machines question.

2018-07-26 Thread Adam Bellemare
Thanks Guozhang, I appreciate the explanation. That cleared up a lot for me
and confirmed what I thought.

Based on the above, in my scenario, the state machine could get into an
incorrect state. (Just for whoever may be watching this thread).

On Tue, Jul 24, 2018 at 6:20 PM, Guozhang Wang  wrote:

> Hello Adam,
>
> I figured that rather than answering your questions one-by-one, I'd give
> you a more general explanation between consumer offset commits, changelog
> and state store.
>
> If you have a state store update processor, the state store maintenance
> workflow is this:
>
>
> 1) updating the state store:
>
> 1.a write to state store.
> 1.b write to changelog topic
>
>
> Note that 1.a) could be async: the state store may be caching enabled, and
> also even the state store itself may have some write buffer (e.g. rocksDB);
> also 1.b) is async and batching enabled as well, and the actual sending
> request is done via another thread.
>
> So at the end of 1.b) either is possible: data is written persistently to
> the local files of the state store, but have not been sent to changelog, or
> data not written persistently to local files yet, but have been sent to
> changelog, or both have happened, or neither has happened.
>
>
> 2) committing the state store:
>
> 2.a) flush state store (make sure all previous writes have been persisted)
> 2.b) flush on producer (make sure all previous writes to changelog topics
> have been acknowledged).
> 2.c) commit offset.
>
> That is, if committing succeeded, by the end of 2.c) all should be done,
> and everything is consistent.
>
> Now if there is a crash after 1.b) and before 2), then like above said, any
> scenarios may happen, but note that consumer's offset will definitely NOT
> committed yet (it should only be done in 2.c) ), so upon restarting the
> data will be re-processed, and hence either state store's image or
> changelog may contained duplicated results, aka "at-least-once".
>
> 3) Finally, when exactly-once is enabled, if there is any crashes, the
> changelog topic / state store will be "rewinded" (I omit the implementation
> details here, but just assume that logically, we can rewind them) to the
> previously successful commit, so `exactly-once` is guaranteed.
>
>
> Guozhang
>
> On Sun, Jul 22, 2018 at 5:29 PM, Adam Bellemare 
> wrote:
>
> > Hi Folks
> >
> > I have a quick question about a scenario that I would appreciate some
> > insight on. This is related to a KIP I am working on, but I wanted to
> break
> > this out into its own scenario to reach a wider audience. In this
> scenario,
> > I am using builder.internalTopologyBuilder to create the following
> within
> > the internals of Kafka Streaming:
> >
> > 1) Internal Topic Source (builder.internalTopologyBuilder.addSource(...)
> )
> >
> > 2) ProcessorSupplier with StateStore, Changelogging enabled. For the
> > purpose of this question, this processor is a very simple state machine.
> > All it does is alternately block each other event, of a given key, from
> > processing. For instance:
> > (A,1)
> > (A,2)
> > (A,3)
> > It would block the propagation of (A,2). The state of the system after
> > processing each event is:
> > blockNext = true
> > blockNext = false
> > blockNext = true
> >
> > The expecation is that this component would always block the same event,
> in
> > any failure mode and subsequent recovery (ie: ALWAYS blocks (A,2), but
> not
> > (A,1) or (A,3) ). In other words, it would maintain perfect state in
> > accordance with the offsets of the upstream and downstream elements.
> >
> > 3) The third component is a KTable with a Materialized StateStore where I
> > want to sink the remaining events. It is also backed by a change log. The
> > events arriving would be:
> > (A,1)
> > (A,3)
> >
> > The components are ordered as:
> > 1 -> 2 -> 3
> >
> >
> > Note that I am keeping the state machine in a separate state store. My
> main
> > questions are:
> >
> > 1) Will this workflow be consistent in all manners of failure? For
> example,
> > are the state stores change logs fully written to internal topics before
> > the offset is updated for the consumer in #1?
> >
> > 2) Is it possible that one State Store with changelogging will be logged
> to
> > Kafka safely (say component #3) but the other (#2) will not be, prior to
> a
> > sudden, hard termination of the node?
> >
> > 3) Is the alternate possible, where #2 is backed up to its Kafka Topic
> but
> > #3 is not? Does the ordering of the topology matter in this case?
> >
> > 4) Is it possible that the state store #2 is updated and logged, but the
> > source topic (#1) offset is not updated?
> >
> > In all of these cases, my main concern is keeping the state and the
> > expected output consistent. For any failure mode, will I be able to
> recover
> > to a fully consistent state given the requirements of the state machine
> in
> > #2?
> >
> > Though this is a trivial example, I am not certain about the dynamics
> > between maintaining state, 

Re: Processor API StateStore and Recovery with State Machines question.

2018-07-24 Thread Guozhang Wang
Hello Adam,

I figured that rather than answering your questions one-by-one, I'd give
you a more general explanation between consumer offset commits, changelog
and state store.

If you have a state store update processor, the state store maintenance
workflow is this:


1) updating the state store:

1.a write to state store.
1.b write to changelog topic


Note that 1.a) could be async: the state store may be caching enabled, and
also even the state store itself may have some write buffer (e.g. rocksDB);
also 1.b) is async and batching enabled as well, and the actual sending
request is done via another thread.

So at the end of 1.b) either is possible: data is written persistently to
the local files of the state store, but have not been sent to changelog, or
data not written persistently to local files yet, but have been sent to
changelog, or both have happened, or neither has happened.


2) committing the state store:

2.a) flush state store (make sure all previous writes have been persisted)
2.b) flush on producer (make sure all previous writes to changelog topics
have been acknowledged).
2.c) commit offset.

That is, if committing succeeded, by the end of 2.c) all should be done,
and everything is consistent.

Now if there is a crash after 1.b) and before 2), then like above said, any
scenarios may happen, but note that consumer's offset will definitely NOT
committed yet (it should only be done in 2.c) ), so upon restarting the
data will be re-processed, and hence either state store's image or
changelog may contained duplicated results, aka "at-least-once".

3) Finally, when exactly-once is enabled, if there is any crashes, the
changelog topic / state store will be "rewinded" (I omit the implementation
details here, but just assume that logically, we can rewind them) to the
previously successful commit, so `exactly-once` is guaranteed.


Guozhang

On Sun, Jul 22, 2018 at 5:29 PM, Adam Bellemare 
wrote:

> Hi Folks
>
> I have a quick question about a scenario that I would appreciate some
> insight on. This is related to a KIP I am working on, but I wanted to break
> this out into its own scenario to reach a wider audience. In this scenario,
> I am using builder.internalTopologyBuilder to create the following within
> the internals of Kafka Streaming:
>
> 1) Internal Topic Source (builder.internalTopologyBuilder.addSource(...) )
>
> 2) ProcessorSupplier with StateStore, Changelogging enabled. For the
> purpose of this question, this processor is a very simple state machine.
> All it does is alternately block each other event, of a given key, from
> processing. For instance:
> (A,1)
> (A,2)
> (A,3)
> It would block the propagation of (A,2). The state of the system after
> processing each event is:
> blockNext = true
> blockNext = false
> blockNext = true
>
> The expecation is that this component would always block the same event, in
> any failure mode and subsequent recovery (ie: ALWAYS blocks (A,2), but not
> (A,1) or (A,3) ). In other words, it would maintain perfect state in
> accordance with the offsets of the upstream and downstream elements.
>
> 3) The third component is a KTable with a Materialized StateStore where I
> want to sink the remaining events. It is also backed by a change log. The
> events arriving would be:
> (A,1)
> (A,3)
>
> The components are ordered as:
> 1 -> 2 -> 3
>
>
> Note that I am keeping the state machine in a separate state store. My main
> questions are:
>
> 1) Will this workflow be consistent in all manners of failure? For example,
> are the state stores change logs fully written to internal topics before
> the offset is updated for the consumer in #1?
>
> 2) Is it possible that one State Store with changelogging will be logged to
> Kafka safely (say component #3) but the other (#2) will not be, prior to a
> sudden, hard termination of the node?
>
> 3) Is the alternate possible, where #2 is backed up to its Kafka Topic but
> #3 is not? Does the ordering of the topology matter in this case?
>
> 4) Is it possible that the state store #2 is updated and logged, but the
> source topic (#1) offset is not updated?
>
> In all of these cases, my main concern is keeping the state and the
> expected output consistent. For any failure mode, will I be able to recover
> to a fully consistent state given the requirements of the state machine in
> #2?
>
> Though this is a trivial example, I am not certain about the dynamics
> between maintaining state, recovering from internal changelog topics, and
> the order in which all of these things apply. Any words of wisdom or
> explanations would be helpful here. I have been looking through the code
> but I wanted to get second opinions on this.
>
>
>
> Thanks,
>
> Adam
>



-- 
-- Guozhang


Processor API StateStore and Recovery with State Machines question.

2018-07-22 Thread Adam Bellemare
Hi Folks

I have a quick question about a scenario that I would appreciate some
insight on. This is related to a KIP I am working on, but I wanted to break
this out into its own scenario to reach a wider audience. In this scenario,
I am using builder.internalTopologyBuilder to create the following within
the internals of Kafka Streaming:

1) Internal Topic Source (builder.internalTopologyBuilder.addSource(...) )

2) ProcessorSupplier with StateStore, Changelogging enabled. For the
purpose of this question, this processor is a very simple state machine.
All it does is alternately block each other event, of a given key, from
processing. For instance:
(A,1)
(A,2)
(A,3)
It would block the propagation of (A,2). The state of the system after
processing each event is:
blockNext = true
blockNext = false
blockNext = true

The expecation is that this component would always block the same event, in
any failure mode and subsequent recovery (ie: ALWAYS blocks (A,2), but not
(A,1) or (A,3) ). In other words, it would maintain perfect state in
accordance with the offsets of the upstream and downstream elements.

3) The third component is a KTable with a Materialized StateStore where I
want to sink the remaining events. It is also backed by a change log. The
events arriving would be:
(A,1)
(A,3)

The components are ordered as:
1 -> 2 -> 3


Note that I am keeping the state machine in a separate state store. My main
questions are:

1) Will this workflow be consistent in all manners of failure? For example,
are the state stores change logs fully written to internal topics before
the offset is updated for the consumer in #1?

2) Is it possible that one State Store with changelogging will be logged to
Kafka safely (say component #3) but the other (#2) will not be, prior to a
sudden, hard termination of the node?

3) Is the alternate possible, where #2 is backed up to its Kafka Topic but
#3 is not? Does the ordering of the topology matter in this case?

4) Is it possible that the state store #2 is updated and logged, but the
source topic (#1) offset is not updated?

In all of these cases, my main concern is keeping the state and the
expected output consistent. For any failure mode, will I be able to recover
to a fully consistent state given the requirements of the state machine in
#2?

Though this is a trivial example, I am not certain about the dynamics
between maintaining state, recovering from internal changelog topics, and
the order in which all of these things apply. Any words of wisdom or
explanations would be helpful here. I have been looking through the code
but I wanted to get second opinions on this.



Thanks,

Adam