Re: Implementation for exactly-once streaming sink

2018-12-06 Thread Eric Wohlstadter
Thanks Arun.

In our case, we only commit sink task output to the datastore by
coordinating with the driver.
Sink tasks write output to a "staging" area, and the driver only commits
the staging data to a datastore once all tasks for a micro-batch have
reported success back to the driver.
In the case of multiple task attempts (for the same task), each attempt
ensures that it cleans up any data from the staging area left by previous
attempts for the same task.

So I think we have the case of repeated task attempts (within a
micro-batch) covered.

I'm worried about the cases where an entire micro-batch is repeated (i.e.
two micro-batch executions are assigned the same batchId).

I know this can occur in the case of driver failure, because upon recovery,
the driver may replay a batchId already seen by the sink. So when the
driver is recovered from a checkpoint, we need to make sure Spark's view of
the current batchId is consistent with that in the datastore.

In the case that the driver did not fail, but a batchId is repeated due to
micro-batch job failure (e.g. due to hitting max task attempt failures),
we wouldn't have committed anything to the datastore for that batch anyway.

This helps me to narrow down my question:
  Is it possible that, without the driver failing, the same batchId is
replayed, after that batchId has completed successfully at the driver?

I assume the answer would be no, but I wanted to double check.




On Thu, Dec 6, 2018 at 1:39 PM Arun M  wrote:

> Hi Eric,
>
> I think it will depend on how you implement the sink and when the data in
> the sink partitions are committed.
>
> I think the batch can be repeated during task retries as well as if the
> driver fails before the batch id is committed in sparks checkpoint. In the
> first case may be the sink had not committed the batch (rolled back) so the
> batch id is not present and it can go ahead and write the data. In the
> second case the sink needs to check for duplicates and ignore that batch. I
> guess irrespective of the case the Sink just needs to ignore the batch if a
> batch Id is present in the datastore.
>
> Also the discussion in the mail thread you posted assumes that each
> intermediate operation is idempotent otherwise the data generated when the
> batch replays can be different.
>
> On Thu, 6 Dec 2018 at 11:55, Eric Wohlstadter  wrote:
>
>> Hi Arun, Gabor,
>>  Thanks for the feedback. We are using the "Exactly-once using
>> transactional writes" approach, so we don't rely on message keys for
>> idempotent writes.
>> So I should clarify that my question is specific to the "Exactly-once
>> using transactional writes" approach.
>>
>> We are following the approach mentioned from this previous list posting:
>> http://apache-spark-developers-list.1001551.n3.nabble.com/Easy-way-to-get-offset-metatada-with-Spark-Streaming-API-td22406.html
>>
>> Quoted from that posting: "I think the right way to look at this is the
>> batchId is just a proxy for offsets that is agnostic to what type of source
>> you are reading from (or how many sources their are).  We might call into a
>> custom sink with the same batchId more than once, but it will always
>> contain the same data (there is no race condition, since this is stored in
>> a write-ahead log).  As long as you check/commit the batch id in the same
>> transaction as the data you will get exactly once."
>>
>> We coordinate each micro-batch transaction from the driver, and
>> atomically commit the batchId with the data to a datastore.
>>
>> So my question is really specific to this part of the referenced posting "We
>> might call into a custom sink with the same batchId more than once".
>>
>> Before committing any micro-batch, we can check whether the batchId has
>> already been committed.
>> I know that this is required in the case of driver recovery, because the
>> batchId in the checkpoint file might be out-of-synch with the batchId that
>> was committed in a datastore transaction.
>>
>> Are there other cases where Spark might call into a custom sink with the
>> same batchId more than once?
>>
>> i.e. do we need to check Spark's current batchId against the datastore
>> for each micro-batch, or can we get away with only doing this check at the
>> time of driver recovery?
>>
>>
>>
>>
>>
>> On Thu, Dec 6, 2018 at 1:28 AM Gabor Somogyi 
>> wrote:
>>
>>> Hi Eric,
>>>
>>> In order to have exactly-once one need re-playable source and idempotent
>>> sink.
>>> The cases what you've mentioned are covering the 2 main group of issues.
>>> Practically any kind of programming problem can end-up in duplicated data
>>> (even in the code which feeds kafka).
>>> Don't know why have you asked this because if the sink see an already
>>> processed key then it should be just skipped and doesn't matter why it is
>>> duplicated.
>>> Cody has a really good writing about semantics:
>>> https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md#delivery-semantics
>>>
>>> I think if you reach 

Re: Implementation for exactly-once streaming sink

2018-12-06 Thread Arun M
Hi Eric,

I think it will depend on how you implement the sink and when the data in
the sink partitions are committed.

I think the batch can be repeated during task retries as well as if the
driver fails before the batch id is committed in sparks checkpoint. In the
first case may be the sink had not committed the batch (rolled back) so the
batch id is not present and it can go ahead and write the data. In the
second case the sink needs to check for duplicates and ignore that batch. I
guess irrespective of the case the Sink just needs to ignore the batch if a
batch Id is present in the datastore.

Also the discussion in the mail thread you posted assumes that each
intermediate operation is idempotent otherwise the data generated when the
batch replays can be different.

On Thu, 6 Dec 2018 at 11:55, Eric Wohlstadter  wrote:

> Hi Arun, Gabor,
>  Thanks for the feedback. We are using the "Exactly-once using
> transactional writes" approach, so we don't rely on message keys for
> idempotent writes.
> So I should clarify that my question is specific to the "Exactly-once
> using transactional writes" approach.
>
> We are following the approach mentioned from this previous list posting:
> http://apache-spark-developers-list.1001551.n3.nabble.com/Easy-way-to-get-offset-metatada-with-Spark-Streaming-API-td22406.html
>
> Quoted from that posting: "I think the right way to look at this is the
> batchId is just a proxy for offsets that is agnostic to what type of source
> you are reading from (or how many sources their are).  We might call into a
> custom sink with the same batchId more than once, but it will always
> contain the same data (there is no race condition, since this is stored in
> a write-ahead log).  As long as you check/commit the batch id in the same
> transaction as the data you will get exactly once."
>
> We coordinate each micro-batch transaction from the driver, and atomically
> commit the batchId with the data to a datastore.
>
> So my question is really specific to this part of the referenced posting "We
> might call into a custom sink with the same batchId more than once".
>
> Before committing any micro-batch, we can check whether the batchId has
> already been committed.
> I know that this is required in the case of driver recovery, because the
> batchId in the checkpoint file might be out-of-synch with the batchId that
> was committed in a datastore transaction.
>
> Are there other cases where Spark might call into a custom sink with the
> same batchId more than once?
>
> i.e. do we need to check Spark's current batchId against the datastore for
> each micro-batch, or can we get away with only doing this check at the time
> of driver recovery?
>
>
>
>
>
> On Thu, Dec 6, 2018 at 1:28 AM Gabor Somogyi 
> wrote:
>
>> Hi Eric,
>>
>> In order to have exactly-once one need re-playable source and idempotent
>> sink.
>> The cases what you've mentioned are covering the 2 main group of issues.
>> Practically any kind of programming problem can end-up in duplicated data
>> (even in the code which feeds kafka).
>> Don't know why have you asked this because if the sink see an already
>> processed key then it should be just skipped and doesn't matter why it is
>> duplicated.
>> Cody has a really good writing about semantics:
>> https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md#delivery-semantics
>>
>> I think if you reach Continuous Processing this it worth to consider:
>> "There are currently no automatic retries of failed tasks. Any failure
>> will lead to the query being stopped and it needs to be manually restarted
>> from the checkpoint."
>>
>> BR,
>> G
>>
>>
>> On Wed, Dec 5, 2018 at 8:36 PM Eric Wohlstadter 
>> wrote:
>>
>>> Hi all,
>>>  We are working on implementing a streaming sink on 2.3.1 with the
>>> DataSourceV2 APIs.
>>>
>>> Can anyone help check if my understanding is correct, with respect to
>>> the failure modes which need to be covered?
>>>
>>> We are assuming that a Reliable Receiver (such as Kafka) is used as the
>>> stream source. And we only want to support micro-batch execution at this
>>> time (not yet Continuous Processing).
>>>
>>> I believe the possible failures that need to be covered are:
>>>
>>> 1. Task failure: If a task fails, it may have written data to the sink
>>> output before failure. Subsequent attempts for a failed task must be
>>> idempotent, so that no data is duplicated in the output.
>>> 2. Driver failure: If the driver fails, upon recovery, it might replay a
>>> micro-batch that was already seen by the sink (if a failure occurs after
>>> the sink has committed output but before the driver has updated the
>>> checkpoint). In this case, the sink must be idempotent when a micro-batch
>>> is replayed so that no data is duplicated in the output.
>>>
>>> Are there any other cases where data might be duplicated in the stream?
>>> i.e. if neither of these 2 failures occur, is there still a case where
>>> data can be duplicated?
>>>
>>> Thanks for 

Re: Implementation for exactly-once streaming sink

2018-12-06 Thread Eric Wohlstadter
Hi Arun, Gabor,
 Thanks for the feedback. We are using the "Exactly-once using
transactional writes" approach, so we don't rely on message keys for
idempotent writes.
So I should clarify that my question is specific to the "Exactly-once using
transactional writes" approach.

We are following the approach mentioned from this previous list posting:
http://apache-spark-developers-list.1001551.n3.nabble.com/Easy-way-to-get-offset-metatada-with-Spark-Streaming-API-td22406.html

Quoted from that posting: "I think the right way to look at this is the
batchId is just a proxy for offsets that is agnostic to what type of source
you are reading from (or how many sources their are).  We might call into a
custom sink with the same batchId more than once, but it will always
contain the same data (there is no race condition, since this is stored in
a write-ahead log).  As long as you check/commit the batch id in the same
transaction as the data you will get exactly once."

We coordinate each micro-batch transaction from the driver, and atomically
commit the batchId with the data to a datastore.

So my question is really specific to this part of the referenced posting "We
might call into a custom sink with the same batchId more than once".

Before committing any micro-batch, we can check whether the batchId has
already been committed.
I know that this is required in the case of driver recovery, because the
batchId in the checkpoint file might be out-of-synch with the batchId that
was committed in a datastore transaction.

Are there other cases where Spark might call into a custom sink with the
same batchId more than once?

i.e. do we need to check Spark's current batchId against the datastore for
each micro-batch, or can we get away with only doing this check at the time
of driver recovery?





On Thu, Dec 6, 2018 at 1:28 AM Gabor Somogyi 
wrote:

> Hi Eric,
>
> In order to have exactly-once one need re-playable source and idempotent
> sink.
> The cases what you've mentioned are covering the 2 main group of issues.
> Practically any kind of programming problem can end-up in duplicated data
> (even in the code which feeds kafka).
> Don't know why have you asked this because if the sink see an already
> processed key then it should be just skipped and doesn't matter why it is
> duplicated.
> Cody has a really good writing about semantics:
> https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md#delivery-semantics
>
> I think if you reach Continuous Processing this it worth to consider:
> "There are currently no automatic retries of failed tasks. Any failure
> will lead to the query being stopped and it needs to be manually restarted
> from the checkpoint."
>
> BR,
> G
>
>
> On Wed, Dec 5, 2018 at 8:36 PM Eric Wohlstadter 
> wrote:
>
>> Hi all,
>>  We are working on implementing a streaming sink on 2.3.1 with the
>> DataSourceV2 APIs.
>>
>> Can anyone help check if my understanding is correct, with respect to the
>> failure modes which need to be covered?
>>
>> We are assuming that a Reliable Receiver (such as Kafka) is used as the
>> stream source. And we only want to support micro-batch execution at this
>> time (not yet Continuous Processing).
>>
>> I believe the possible failures that need to be covered are:
>>
>> 1. Task failure: If a task fails, it may have written data to the sink
>> output before failure. Subsequent attempts for a failed task must be
>> idempotent, so that no data is duplicated in the output.
>> 2. Driver failure: If the driver fails, upon recovery, it might replay a
>> micro-batch that was already seen by the sink (if a failure occurs after
>> the sink has committed output but before the driver has updated the
>> checkpoint). In this case, the sink must be idempotent when a micro-batch
>> is replayed so that no data is duplicated in the output.
>>
>> Are there any other cases where data might be duplicated in the stream?
>> i.e. if neither of these 2 failures occur, is there still a case where
>> data can be duplicated?
>>
>> Thanks for any help to check if my understanding is correct.
>>
>>
>>
>>
>>


Re: Implementation for exactly-once streaming sink

2018-12-06 Thread Gabor Somogyi
Hi Eric,

In order to have exactly-once one need re-playable source and idempotent
sink.
The cases what you've mentioned are covering the 2 main group of issues.
Practically any kind of programming problem can end-up in duplicated data
(even in the code which feeds kafka).
Don't know why have you asked this because if the sink see an already
processed key then it should be just skipped and doesn't matter why it is
duplicated.
Cody has a really good writing about semantics:
https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md#delivery-semantics

I think if you reach Continuous Processing this it worth to consider:
"There are currently no automatic retries of failed tasks. Any failure will
lead to the query being stopped and it needs to be manually restarted from
the checkpoint."

BR,
G


On Wed, Dec 5, 2018 at 8:36 PM Eric Wohlstadter  wrote:

> Hi all,
>  We are working on implementing a streaming sink on 2.3.1 with the
> DataSourceV2 APIs.
>
> Can anyone help check if my understanding is correct, with respect to the
> failure modes which need to be covered?
>
> We are assuming that a Reliable Receiver (such as Kafka) is used as the
> stream source. And we only want to support micro-batch execution at this
> time (not yet Continuous Processing).
>
> I believe the possible failures that need to be covered are:
>
> 1. Task failure: If a task fails, it may have written data to the sink
> output before failure. Subsequent attempts for a failed task must be
> idempotent, so that no data is duplicated in the output.
> 2. Driver failure: If the driver fails, upon recovery, it might replay a
> micro-batch that was already seen by the sink (if a failure occurs after
> the sink has committed output but before the driver has updated the
> checkpoint). In this case, the sink must be idempotent when a micro-batch
> is replayed so that no data is duplicated in the output.
>
> Are there any other cases where data might be duplicated in the stream?
> i.e. if neither of these 2 failures occur, is there still a case where
> data can be duplicated?
>
> Thanks for any help to check if my understanding is correct.
>
>
>
>
>


Re: Implementation for exactly-once streaming sink

2018-12-05 Thread Arun Mahadevan
I guess thats roughly it.

As of now theres no in-built support to co-ordinate the commits across the
executors in an atomic way. So you need to commit the batch (global commit)
at the driver.

And when the batch is replayed and if any of the intermediate operations
are not idempotent or can cause side effects, the result produced during
replay may be different from what you committed and would be ignored.

Thanks,
Arun

On Wed, 5 Dec 2018 at 11:36, Eric Wohlstadter  wrote:

> Hi all,
>  We are working on implementing a streaming sink on 2.3.1 with the
> DataSourceV2 APIs.
>
> Can anyone help check if my understanding is correct, with respect to the
> failure modes which need to be covered?
>
> We are assuming that a Reliable Receiver (such as Kafka) is used as the
> stream source. And we only want to support micro-batch execution at this
> time (not yet Continuous Processing).
>
> I believe the possible failures that need to be covered are:
>
> 1. Task failure: If a task fails, it may have written data to the sink
> output before failure. Subsequent attempts for a failed task must be
> idempotent, so that no data is duplicated in the output.
> 2. Driver failure: If the driver fails, upon recovery, it might replay a
> micro-batch that was already seen by the sink (if a failure occurs after
> the sink has committed output but before the driver has updated the
> checkpoint). In this case, the sink must be idempotent when a micro-batch
> is replayed so that no data is duplicated in the output.
>
> Are there any other cases where data might be duplicated in the stream?
> i.e. if neither of these 2 failures occur, is there still a case where
> data can be duplicated?
>
> Thanks for any help to check if my understanding is correct.
>
>
>
>
>


Implementation for exactly-once streaming sink

2018-12-05 Thread Eric Wohlstadter
Hi all,
 We are working on implementing a streaming sink on 2.3.1 with the
DataSourceV2 APIs.

Can anyone help check if my understanding is correct, with respect to the
failure modes which need to be covered?

We are assuming that a Reliable Receiver (such as Kafka) is used as the
stream source. And we only want to support micro-batch execution at this
time (not yet Continuous Processing).

I believe the possible failures that need to be covered are:

1. Task failure: If a task fails, it may have written data to the sink
output before failure. Subsequent attempts for a failed task must be
idempotent, so that no data is duplicated in the output.
2. Driver failure: If the driver fails, upon recovery, it might replay a
micro-batch that was already seen by the sink (if a failure occurs after
the sink has committed output but before the driver has updated the
checkpoint). In this case, the sink must be idempotent when a micro-batch
is replayed so that no data is duplicated in the output.

Are there any other cases where data might be duplicated in the stream?
i.e. if neither of these 2 failures occur, is there still a case where data
can be duplicated?

Thanks for any help to check if my understanding is correct.