Re: Exactly once processing

2016-04-20 Thread Sabarish Sasidharan
Great, thanks all!

Regards
Sab

On Tue, Apr 19, 2016 at 5:00 AM, Yi Pan  wrote:

> Hi, Sabarish Sasidharan,
>
> The key point is to make your KV-store update idempotent. So, if the offset
> associated with the aggregated value are written in the same row in RocksDB
> (i.e. atomicity is achieved here), I think that your approach would work.
> As Robert mentioned, offsets are always committed last in Samza. Hence, any
> failure recovery is guaranteed to replay some  of the old messages. If the
> flushed state store has the aggregated value together w/ the offset, you
> can use the offset to de-dup the replayed old messages that are already
> applied to the aggregated results.
>
> @Robert, yes, the order you listed would be maintained.
>
> Thanks!
>
> -Yi
>
> On Fri, Apr 15, 2016 at 12:16 PM, Robert Crim  wrote:
>
> > Looking at:
> >
> >
> https://github.com/apache/samza/blob/f02386464d31b5a496bb0578838f51a0331bfffa/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala#L171
> >
> >
> > The commit function, in order, does:
> > 1. Flushes metrics
> > 2. Flushes stores
> > 3. Produces messages from the collectors
> > 4. Write offsets
> >
> > So I would reason that it would be OK to store an offset you've seen in
> the
> > store and use that to skip the messages if you've already mutated your
> data
> > -- but be aware any of 2 (if multiple stores) ,3, or 4 may not have
> > happened so you might want to do those again. You'd need to be careful if
> > your changes span multiple stores or keys since multiple writes to
> > changelogs are not atomic.
> >
> > Question to maintainers: is it safe for Samza users to relay on this
> order?
> >
> > On Fri, Apr 15, 2016 at 11:31 AM, Sabarish Sasidharan <
> > sabarish@gmail.com> wrote:
> >
> > > Hi Guozhang
> > >
> > > Thanks. Assuming the checkpoint would typically be behind the offset
> > > persisted in my store (+ changelog), when the messages are replayed
> > > starting from the checkpoint, I can very well skip those by comparing
> > > against the offset in my store right? So I am not understanding why
> > > duplicates would affect my state.
> > >
> > > Regards
> > > Sab
> > >
> > > On Fri, Apr 15, 2016 at 10:07 PM, Guozhang Wang 
> > > wrote:
> > >
> > > > Hi Sab,
> > > >
> > > > For stateful processing where you have persistent state stores, you
> > need
> > > to
> > > > maintain the checkpoint which includes the committed offsets as well
> as
> > > the
> > > > store flushed in sync, but right not these two operations are not
> done
> > > > atomically, and hence if you fail in between, you could still get
> > > > duplicates where you consume from the committed offsets while some of
> > > them
> > > > have already updated the stores.
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Thu, Apr 14, 2016 at 11:56 PM, Sasidharan, Sabarish <
> > > > sabarish.sasidha...@harman.com> wrote:
> > > >
> > > > > Hi
> > > > >
> > > > > To achieve exactly once processing for my aggregates, wouldn’t it
> be
> > > > > enough if I maintain the latest offset processed for the aggregate
> > and
> > > > > check against that offset when messages are replayed on recovery?
> Am
> > I
> > > > > missing something here?
> > > > >
> > > > > Thanks
> > > > >
> > > > > Regards
> > > > > Sab
> > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
>


Re: Exactly once processing

2016-04-18 Thread Yi Pan
Hi, Sabarish Sasidharan,

The key point is to make your KV-store update idempotent. So, if the offset
associated with the aggregated value are written in the same row in RocksDB
(i.e. atomicity is achieved here), I think that your approach would work.
As Robert mentioned, offsets are always committed last in Samza. Hence, any
failure recovery is guaranteed to replay some  of the old messages. If the
flushed state store has the aggregated value together w/ the offset, you
can use the offset to de-dup the replayed old messages that are already
applied to the aggregated results.

@Robert, yes, the order you listed would be maintained.

Thanks!

-Yi

On Fri, Apr 15, 2016 at 12:16 PM, Robert Crim  wrote:

> Looking at:
>
> https://github.com/apache/samza/blob/f02386464d31b5a496bb0578838f51a0331bfffa/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala#L171
>
>
> The commit function, in order, does:
> 1. Flushes metrics
> 2. Flushes stores
> 3. Produces messages from the collectors
> 4. Write offsets
>
> So I would reason that it would be OK to store an offset you've seen in the
> store and use that to skip the messages if you've already mutated your data
> -- but be aware any of 2 (if multiple stores) ,3, or 4 may not have
> happened so you might want to do those again. You'd need to be careful if
> your changes span multiple stores or keys since multiple writes to
> changelogs are not atomic.
>
> Question to maintainers: is it safe for Samza users to relay on this order?
>
> On Fri, Apr 15, 2016 at 11:31 AM, Sabarish Sasidharan <
> sabarish@gmail.com> wrote:
>
> > Hi Guozhang
> >
> > Thanks. Assuming the checkpoint would typically be behind the offset
> > persisted in my store (+ changelog), when the messages are replayed
> > starting from the checkpoint, I can very well skip those by comparing
> > against the offset in my store right? So I am not understanding why
> > duplicates would affect my state.
> >
> > Regards
> > Sab
> >
> > On Fri, Apr 15, 2016 at 10:07 PM, Guozhang Wang 
> > wrote:
> >
> > > Hi Sab,
> > >
> > > For stateful processing where you have persistent state stores, you
> need
> > to
> > > maintain the checkpoint which includes the committed offsets as well as
> > the
> > > store flushed in sync, but right not these two operations are not done
> > > atomically, and hence if you fail in between, you could still get
> > > duplicates where you consume from the committed offsets while some of
> > them
> > > have already updated the stores.
> > >
> > > Guozhang
> > >
> > >
> > > On Thu, Apr 14, 2016 at 11:56 PM, Sasidharan, Sabarish <
> > > sabarish.sasidha...@harman.com> wrote:
> > >
> > > > Hi
> > > >
> > > > To achieve exactly once processing for my aggregates, wouldn’t it be
> > > > enough if I maintain the latest offset processed for the aggregate
> and
> > > > check against that offset when messages are replayed on recovery? Am
> I
> > > > missing something here?
> > > >
> > > > Thanks
> > > >
> > > > Regards
> > > > Sab
> > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>


Re: Exactly once processing

2016-04-15 Thread Robert Crim
Looking at:
https://github.com/apache/samza/blob/f02386464d31b5a496bb0578838f51a0331bfffa/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala#L171


The commit function, in order, does:
1. Flushes metrics
2. Flushes stores
3. Produces messages from the collectors
4. Write offsets

So I would reason that it would be OK to store an offset you've seen in the
store and use that to skip the messages if you've already mutated your data
-- but be aware any of 2 (if multiple stores) ,3, or 4 may not have
happened so you might want to do those again. You'd need to be careful if
your changes span multiple stores or keys since multiple writes to
changelogs are not atomic.

Question to maintainers: is it safe for Samza users to relay on this order?

On Fri, Apr 15, 2016 at 11:31 AM, Sabarish Sasidharan <
sabarish@gmail.com> wrote:

> Hi Guozhang
>
> Thanks. Assuming the checkpoint would typically be behind the offset
> persisted in my store (+ changelog), when the messages are replayed
> starting from the checkpoint, I can very well skip those by comparing
> against the offset in my store right? So I am not understanding why
> duplicates would affect my state.
>
> Regards
> Sab
>
> On Fri, Apr 15, 2016 at 10:07 PM, Guozhang Wang 
> wrote:
>
> > Hi Sab,
> >
> > For stateful processing where you have persistent state stores, you need
> to
> > maintain the checkpoint which includes the committed offsets as well as
> the
> > store flushed in sync, but right not these two operations are not done
> > atomically, and hence if you fail in between, you could still get
> > duplicates where you consume from the committed offsets while some of
> them
> > have already updated the stores.
> >
> > Guozhang
> >
> >
> > On Thu, Apr 14, 2016 at 11:56 PM, Sasidharan, Sabarish <
> > sabarish.sasidha...@harman.com> wrote:
> >
> > > Hi
> > >
> > > To achieve exactly once processing for my aggregates, wouldn’t it be
> > > enough if I maintain the latest offset processed for the aggregate and
> > > check against that offset when messages are replayed on recovery? Am I
> > > missing something here?
> > >
> > > Thanks
> > >
> > > Regards
> > > Sab
> >
> >
> >
> >
> > --
> > -- Guozhang
> >
>


Re: Exactly once processing

2016-04-15 Thread Sabarish Sasidharan
Hi Guozhang

Thanks. Assuming the checkpoint would typically be behind the offset
persisted in my store (+ changelog), when the messages are replayed
starting from the checkpoint, I can very well skip those by comparing
against the offset in my store right? So I am not understanding why
duplicates would affect my state.

Regards
Sab

On Fri, Apr 15, 2016 at 10:07 PM, Guozhang Wang  wrote:

> Hi Sab,
>
> For stateful processing where you have persistent state stores, you need to
> maintain the checkpoint which includes the committed offsets as well as the
> store flushed in sync, but right not these two operations are not done
> atomically, and hence if you fail in between, you could still get
> duplicates where you consume from the committed offsets while some of them
> have already updated the stores.
>
> Guozhang
>
>
> On Thu, Apr 14, 2016 at 11:56 PM, Sasidharan, Sabarish <
> sabarish.sasidha...@harman.com> wrote:
>
> > Hi
> >
> > To achieve exactly once processing for my aggregates, wouldn’t it be
> > enough if I maintain the latest offset processed for the aggregate and
> > check against that offset when messages are replayed on recovery? Am I
> > missing something here?
> >
> > Thanks
> >
> > Regards
> > Sab
>
>
>
>
> --
> -- Guozhang
>


Re: Exactly once processing

2016-04-15 Thread Guozhang Wang
Hi Sab,

For stateful processing where you have persistent state stores, you need to
maintain the checkpoint which includes the committed offsets as well as the
store flushed in sync, but right not these two operations are not done
atomically, and hence if you fail in between, you could still get
duplicates where you consume from the committed offsets while some of them
have already updated the stores.

Guozhang


On Thu, Apr 14, 2016 at 11:56 PM, Sasidharan, Sabarish <
sabarish.sasidha...@harman.com> wrote:

> Hi
>
> To achieve exactly once processing for my aggregates, wouldn’t it be
> enough if I maintain the latest offset processed for the aggregate and
> check against that offset when messages are replayed on recovery? Am I
> missing something here?
>
> Thanks
>
> Regards
> Sab




-- 
-- Guozhang


Exactly once processing

2016-04-15 Thread Sasidharan, Sabarish
Hi

To achieve exactly once processing for my aggregates, wouldn’t it be enough if 
I maintain the latest offset processed for the aggregate and check against that 
offset when messages are replayed on recovery? Am I missing something here?

Thanks

Regards
Sab

Exactly once processing

2016-04-15 Thread Sabarish Sasidharan
Hi


To achieve exactly once processing for my aggregates, wouldn’t it be enough
if I maintain the latest offset processed for the aggregate and check
against that offset when messages are replayed on recovery? Am I missing
something here?


Thanks


Regards

Sab