Hi Bart,

Found the case you described in Samza state-management doc
<http://samza.apache.org/learn/documentation/0.9/container/state-management.html>
:

*"For many of the stateful processing use cases discussed above, this is
not a problem: if the effect of a message on state is idempotent, it is
safe for the same message to be processed more than once. For example, if
the store contains the ZIP code for each user, then processing the same
profile update twice has no effect, because the duplicate update does not
change the ZIP code.*

*However, for non-idempotent operations such as counting, at-least-once
delivery guarantees can give incorrect results. If a Samza task fails and
is restarted, it may double-count some messages that were processed shortly
before the failure. We are planning to address this limitation in a future
release of Samza."*


Samza currently has not solved this limitation. So I think you have to deal
with this situation in the processing side.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Wed, Apr 8, 2015 at 11:05 AM, Yan Fang <yanfang...@gmail.com> wrote:

> -- Hi Bart,
>
> " If KV values are immediately committed to the changelog when you do a
> db.put are output messages immediately committed to the output stream when
> you call Collector.send()?"
>
> Yes.
>
> "this makes it look like Ts moves forward in time first (#160) then
> output (#166) then the input checkpoint (#170).  If we fail between
> storageManager.flush() and collector.flush() won't we have a recovery
> scenario where our storage contains side-effects of input messages that are
> not represented in the output stream or the input checkpoint.  That seems
> to create a situation where a naive process that produces output based on a
> combination of input and state may not produce the expected output.  Again,
> this may be something that can be addressed in the process method itself in
> some (maybe all) cases, but it is a caveat that I hadn't seen mentioned
> elsewhere."
>
> Yes, I found out this caveat as well. Maybe we miss something.
>
> -- @Chris, is it a caveat that we can avoid besides doing something in the
> process method ?
>
> Cheers,
>
> Fang, Yan
> yanfang...@gmail.com
>
> On Wed, Apr 8, 2015 at 7:19 AM, Bart Wyatt <bart.wy...@dsvolition.com>
> wrote:
>
>> This sounds inverted to me.
>>
>> If we write to a LoggedStore during process and window and we write
>> checkpoints in commit, then this should ensure the Ti <= Ts assertion.  Ts
>> will move forward in time during process/window, and Ti will catch up on
>> commit.  If the process dies between then at worst Ts is ahead of Ti.  That
>> is good, because on recovery replayed messages between Ti and Ts.  So, I
>> think this covers any problems between checkpoints and KV consistency.
>>
>> I still think there is an issue with KV and output consistency.  If KV
>> values are immediately committed to the changelog when you do a db.put are
>> output messages immediately committed to the output stream when you call
>> Collector.send()?  Looking through it, that may be the case.  In which
>> case, the job can ensure that  output is send before state is updated to
>> guarantee that on failure/recovery Ts is before To.
>>
>> This code, however, has me worried:
>>
>> https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala#L155-L171
>>
>> this makes it look like Ts moves forward in time first (#160) then output
>> (#166) then the input checkpoint (#170).  If we fail between
>> storageManager.flush() and collector.flush() won't we have a recovery
>> scenario where our storage contains side-effects of input messages that are
>> not represented in the output stream or the input checkpoint.  That seems
>> to create a situation where a naive process that produces output based on a
>> combination of input and state may not produce the expected output.  Again,
>> this may be something that can be addressed in the process method itself in
>> some (maybe all) cases, but it is a caveat that I hadn't seen mentioned
>> elsewhere.
>>
>> The example is a naïve implementation of a task that outputs the first of
>> a set of messages defined by a grouping field.  In the state, you would
>> store whether you had seen a message from given group.  If the state is
>> empty for a group then you emit that message and update the state to
>> reflect that you have seen a message.  This is safe for at-least-once
>> delivery semantics because you repeated messages after the first can also
>> be dropped.  However, If we fail in the way described above, when we
>> recover we will replay the first message, but our state will indicate that
>> we have already seen a message for that group and we would not produce
>> output.   This violates the implied "at least once" output semantics but
>> not in a way that our process can be aware of easily.  Instead, this
>> particular case could be solved by storing the offset of the first message
>> in the state (not just the fact that we saw it), so that on replay of the
>> first message you can determine that, while you have seen an input from
>> this group, this is the replay of that first input and it should be
>> re-emitted to the output stream.
>>
>> -Bart
>>
>>
>>
>>
>> -----Original Message-----
>> From: Yan Fang [mailto:yanfang...@gmail.com]
>> Sent: Tuesday, April 7, 2015 4:12 PM
>> To: dev@samza.apache.org
>> Subject: Re: consistency between input, output and changelog streams
>>
>> Hi Bart,
>>
>> In terms of your assumption,
>>
>> * Ts <= To , this is correction. The code backups this assumption is here:
>> in RunLoop
>> <
>> https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
>> >
>> ,
>> the commit is called after each process and window methods. E.g. process1
>> -> commit T1 -> process2 -> fail -> commit T2 will not happen. When
>> restarting the job, it consumes from T1. The messages between T1 and T2
>> will be reprocessed.
>>
>> * Ti <= Ts is not always true. There are two scenarios ( we call db.put()
>> in process/window ).
>>
>>     1) process1 -> db.put1 success -> commit T1 -> process2 -> db.put2
>> success -> following process fails -> commit T2 will not happen. In this
>> scenario, Ti <= Ts because the latest changelog happens later than the
>> checkpoint. In this scenario, when we reprocess the stream from T1, same
>> operation will happen twice because db.put2 already succeeds.* It is
>> usually fine that putting the same data twice/deleting the same data twice.
>> It may have some issues if the db.put2 is accumulating based on its
>> previous value. -- @Chris, is this true ?*
>>
>>     2) process1 -> db.put1 -> commit T1 -> process2 -> fail . This is Ti
>> >= Ts because the latest checkpoint happens after changelog.
>>
>>     The changelog code is in the LoggedStore <
>> https://github.com/apache/samza/blob/master/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
>> >
>> ,
>> you can tell that, the changelog is written after each db operation
>> (put/delete,etc). All the db operations are called in process or window
>> method.
>>
>> Hope this can help you.
>>
>> Thanks,
>>
>> Fang, Yan
>> yanfang...@gmail.com
>>
>> On Tue, Apr 7, 2015 at 7:27 AM, Bart Wyatt <bart.wy...@dsvolition.com>
>> wrote:
>>
>> > We are trying to make sure that we are handling the proper edge cases
>> > in our stateful tasks so that our processes can survive failure well.
>> >
>> > Given the changelog will recreate the KV store (state) up to the point
>> > of time of the last durable changelog write(Ts), the checkpoint will
>> > start input from the point of time represented in the last durable
>> > checkpoint
>> > write(Ti) and the output will have messages from it at the 3rd point
>> > in time of the last durable output write(To), our current assumption
>> > is that in all recovery cases:
>> >
>> > Ti <= Ts <= To
>> >
>> > This means that some input may be "replayed" from the point of view of
>> > the KV store which is handled by normal at-least-once-delivery
>> > semantics processing and that we may duplicate output messages that
>> > would have been produced between Ts and To which is also consistent
>> > with at-least-once-delivery.
>> >
>> > However, I cannot find code that backs this assumptions and I'm hoping
>> > I've just missed it, because:
>> >
>> > If To < Ts, then we may drop output because the state assumed it was
>> > already written and due to timing of actual writes to kafka or
>> > durability concerns the output is not there.  This is important for a
>> > job, for example, that emits "session started @ X" messages on the
>> > first message for any given session key.  The state will see a
>> > repeated message as a duplicate and not emit the output.  I think this
>> > is solvable in the job as long as To >= Ti, but I am not certain the
>> > solution is generally applicable to tasks where side-effects of
>> > previous input exist in the state and have an effect on future output.
>> >
>> > If Ts < Ti, then our stateful task will effectively drop input, even
>> > though it may have produced some or all of the output for those
>> > messages in its previous incarnation, as the state used for all future
>> > processes will not have the side effects of processing the messages
>> > between Ts and Ti. We see no solution for this at the task level as it
>> > would require collusion between two backing system (checkpoints and
>> > changelogs) to correct, presumably by rewinding Ti to Ts.
>> >
>> > Perhaps my code search failed because I was expecting some colluding
>> > system that would wait for output to write out before writing
>> > changelog entries and then again before checkpoints and that was to
>> > presumptive.  Is there something about the code, the assumption or my
>> > edge analysis that I've missed to address this?
>> >
>> > -Bart
>> >
>> >
>> > ________________________________
>> > This e-mail may contain CONFIDENTIAL AND PROPRIETARY INFORMATION
>> > and/or PRIVILEGED AND CONFIDENTIAL COMMUNICATION intended solely for
>> > the recipient and, therefore, may not be retransmitted to any party
>> > outside of the recipient's organization without the prior written
>> consent of the sender.
>> > If you have received this e-mail in error please notify the sender
>> > immediately by telephone or reply e-mail and destroy the original
>> > message without making a copy. Deep Silver, Inc. accepts no liability
>> > for any losses or damages resulting from infected e-mail transmissions
>> > and viruses in e-mail attachments.
>> >
>>
>>
>> ________________________________
>> This e-mail may contain CONFIDENTIAL AND PROPRIETARY INFORMATION and/or
>> PRIVILEGED AND CONFIDENTIAL COMMUNICATION intended solely for the recipient
>> and, therefore, may not be retransmitted to any party outside of the
>> recipient's organization without the prior written consent of the sender.
>> If you have received this e-mail in error please notify the sender
>> immediately by telephone or reply e-mail and destroy the original message
>> without making a copy. Deep Silver, Inc. accepts no liability for any
>> losses or damages resulting from infected e-mail transmissions and viruses
>> in e-mail attachments.
>>
>
>

Reply via email to