This sounds inverted to me.

If we write to a LoggedStore during process and window and we write checkpoints 
in commit, then this should ensure the Ti <= Ts assertion.  Ts will move 
forward in time during process/window, and Ti will catch up on commit.  If the 
process dies between then at worst Ts is ahead of Ti.  That is good, because on 
recovery replayed messages between Ti and Ts.  So, I think this covers any 
problems between checkpoints and KV consistency.

I still think there is an issue with KV and output consistency.  If KV values 
are immediately committed to the changelog when you do a db.put are output 
messages immediately committed to the output stream when you call 
Collector.send()?  Looking through it, that may be the case.  In which case, 
the job can ensure that  output is send before state is updated to guarantee 
that on failure/recovery Ts is before To.

This code, however, has me worried:
https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala#L155-L171

this makes it look like Ts moves forward in time first (#160) then output 
(#166) then the input checkpoint (#170).  If we fail between 
storageManager.flush() and collector.flush() won't we have a recovery scenario 
where our storage contains side-effects of input messages that are not 
represented in the output stream or the input checkpoint.  That seems to create 
a situation where a naive process that produces output based on a combination 
of input and state may not produce the expected output.  Again, this may be 
something that can be addressed in the process method itself in some (maybe 
all) cases, but it is a caveat that I hadn't seen mentioned elsewhere.

The example is a naïve implementation of a task that outputs the first of a set 
of messages defined by a grouping field.  In the state, you would store whether 
you had seen a message from given group.  If the state is empty for a group 
then you emit that message and update the state to reflect that you have seen a 
message.  This is safe for at-least-once delivery semantics because you 
repeated messages after the first can also be dropped.  However, If we fail in 
the way described above, when we recover we will replay the first message, but 
our state will indicate that we have already seen a message for that group and 
we would not produce output.   This violates the implied "at least once" output 
semantics but not in a way that our process can be aware of easily.  Instead, 
this particular case could be solved by storing the offset of the first message 
in the state (not just the fact that we saw it), so that on replay of the first 
message you can determine that, while you have seen an input from this group, 
this is the replay of that first input and it should be re-emitted to the 
output stream.

-Bart




-----Original Message-----
From: Yan Fang [mailto:yanfang...@gmail.com]
Sent: Tuesday, April 7, 2015 4:12 PM
To: dev@samza.apache.org
Subject: Re: consistency between input, output and changelog streams

Hi Bart,

In terms of your assumption,

* Ts <= To , this is correction. The code backups this assumption is here:
in RunLoop
<https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala>
,
the commit is called after each process and window methods. E.g. process1
-> commit T1 -> process2 -> fail -> commit T2 will not happen. When
restarting the job, it consumes from T1. The messages between T1 and T2 will be 
reprocessed.

* Ti <= Ts is not always true. There are two scenarios ( we call db.put() in 
process/window ).

    1) process1 -> db.put1 success -> commit T1 -> process2 -> db.put2 success 
-> following process fails -> commit T2 will not happen. In this scenario, Ti 
<= Ts because the latest changelog happens later than the checkpoint. In this 
scenario, when we reprocess the stream from T1, same operation will happen 
twice because db.put2 already succeeds.* It is usually fine that putting the 
same data twice/deleting the same data twice.
It may have some issues if the db.put2 is accumulating based on its previous 
value. -- @Chris, is this true ?*

    2) process1 -> db.put1 -> commit T1 -> process2 -> fail . This is Ti >= Ts 
because the latest checkpoint happens after changelog.

    The changelog code is in the LoggedStore 
<https://github.com/apache/samza/blob/master/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala>
,
you can tell that, the changelog is written after each db operation 
(put/delete,etc). All the db operations are called in process or window method.

Hope this can help you.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Tue, Apr 7, 2015 at 7:27 AM, Bart Wyatt <bart.wy...@dsvolition.com>
wrote:

> We are trying to make sure that we are handling the proper edge cases
> in our stateful tasks so that our processes can survive failure well.
>
> Given the changelog will recreate the KV store (state) up to the point
> of time of the last durable changelog write(Ts), the checkpoint will
> start input from the point of time represented in the last durable
> checkpoint
> write(Ti) and the output will have messages from it at the 3rd point
> in time of the last durable output write(To), our current assumption
> is that in all recovery cases:
>
> Ti <= Ts <= To
>
> This means that some input may be "replayed" from the point of view of
> the KV store which is handled by normal at-least-once-delivery
> semantics processing and that we may duplicate output messages that
> would have been produced between Ts and To which is also consistent
> with at-least-once-delivery.
>
> However, I cannot find code that backs this assumptions and I'm hoping
> I've just missed it, because:
>
> If To < Ts, then we may drop output because the state assumed it was
> already written and due to timing of actual writes to kafka or
> durability concerns the output is not there.  This is important for a
> job, for example, that emits "session started @ X" messages on the
> first message for any given session key.  The state will see a
> repeated message as a duplicate and not emit the output.  I think this
> is solvable in the job as long as To >= Ti, but I am not certain the
> solution is generally applicable to tasks where side-effects of
> previous input exist in the state and have an effect on future output.
>
> If Ts < Ti, then our stateful task will effectively drop input, even
> though it may have produced some or all of the output for those
> messages in its previous incarnation, as the state used for all future
> processes will not have the side effects of processing the messages
> between Ts and Ti. We see no solution for this at the task level as it
> would require collusion between two backing system (checkpoints and
> changelogs) to correct, presumably by rewinding Ti to Ts.
>
> Perhaps my code search failed because I was expecting some colluding
> system that would wait for output to write out before writing
> changelog entries and then again before checkpoints and that was to
> presumptive.  Is there something about the code, the assumption or my
> edge analysis that I've missed to address this?
>
> -Bart
>
>
> ________________________________
> This e-mail may contain CONFIDENTIAL AND PROPRIETARY INFORMATION
> and/or PRIVILEGED AND CONFIDENTIAL COMMUNICATION intended solely for
> the recipient and, therefore, may not be retransmitted to any party
> outside of the recipient's organization without the prior written consent of 
> the sender.
> If you have received this e-mail in error please notify the sender
> immediately by telephone or reply e-mail and destroy the original
> message without making a copy. Deep Silver, Inc. accepts no liability
> for any losses or damages resulting from infected e-mail transmissions
> and viruses in e-mail attachments.
>


________________________________
This e-mail may contain CONFIDENTIAL AND PROPRIETARY INFORMATION and/or 
PRIVILEGED AND CONFIDENTIAL COMMUNICATION intended solely for the recipient 
and, therefore, may not be retransmitted to any party outside of the 
recipient's organization without the prior written consent of the sender. If 
you have received this e-mail in error please notify the sender immediately by 
telephone or reply e-mail and destroy the original message without making a 
copy. Deep Silver, Inc. accepts no liability for any losses or damages 
resulting from infected e-mail transmissions and viruses in e-mail attachments.

Reply via email to