Re: Implementation for exactly-once streaming sink

2018-12-06 Thread Eric Wohlstadter
Thanks Arun. In our case, we only commit sink task output to the datastore by coordinating with the driver. Sink tasks write output to a "staging" area, and the driver only commits the staging data to a datastore once all tasks for a micro-batch have reported success back to the driver. In the

Re: Implementation for exactly-once streaming sink

2018-12-06 Thread Arun M
Hi Eric, I think it will depend on how you implement the sink and when the data in the sink partitions are committed. I think the batch can be repeated during task retries as well as if the driver fails before the batch id is committed in sparks checkpoint. In the first case may be the sink had

Re: Implementation for exactly-once streaming sink

2018-12-06 Thread Eric Wohlstadter
Hi Arun, Gabor, Thanks for the feedback. We are using the "Exactly-once using transactional writes" approach, so we don't rely on message keys for idempotent writes. So I should clarify that my question is specific to the "Exactly-once using transactional writes" approach. We are following the

Re: Implementation for exactly-once streaming sink

2018-12-06 Thread Gabor Somogyi
Hi Eric, In order to have exactly-once one need re-playable source and idempotent sink. The cases what you've mentioned are covering the 2 main group of issues. Practically any kind of programming problem can end-up in duplicated data (even in the code which feeds kafka). Don't know why have you

Re: Implementation for exactly-once streaming sink

2018-12-05 Thread Arun Mahadevan
I guess thats roughly it. As of now theres no in-built support to co-ordinate the commits across the executors in an atomic way. So you need to commit the batch (global commit) at the driver. And when the batch is replayed and if any of the intermediate operations are not idempotent or can cause

Implementation for exactly-once streaming sink

2018-12-05 Thread Eric Wohlstadter
Hi all, We are working on implementing a streaming sink on 2.3.1 with the DataSourceV2 APIs. Can anyone help check if my understanding is correct, with respect to the failure modes which need to be covered? We are assuming that a Reliable Receiver (such as Kafka) is used as the stream source.