Re: Data loss - Spark streaming and network receiver

2014-08-18 Thread Wei Liu
BTW, we heard that it is not so easy to setup/admin kafka on AWS, if any of
you had good or bad experiences, do you mind sharing them with us? Thanks.
I knew this is an off topic for spark user group, I wouldn't mind if you
just reply to my email address.  Thanks in advance.

Wei


On Mon, Aug 18, 2014 at 10:18 PM, Wei Liu 
wrote:

> Thank you all for responding to my question. I am pleasantly surprised by
> this many prompt responses I got. It shows the strength of the spark
> community.
>
> Kafka is still an option for us, I will check out the link provided by
> Dibyendu.
>
> Meanwhile if someone out there already figured this out with Kinesis,
> please keep your suggestion coming. Thanks.
>
> Thanks,
> Wei
>
>
> On Mon, Aug 18, 2014 at 9:31 PM, Dibyendu Bhattacharya <
> dibyendu.bhattach...@gmail.com> wrote:
>
>> Dear All,
>>
>> Recently I have written a Spark Kafka Consumer to solve this problem.
>> Even we have seen issues with KafkaUtils which is using Highlevel Kafka
>> Consumer and consumer code has no handle to offset management.
>>
>> The below code solves this problem, and this has is being tested in our
>> Spark Cluster and this working fine as of now.
>>
>> https://github.com/dibbhatt/kafka-spark-consumer
>>
>> This is Low Level Kafka Consumer using Kafka Simple Consumer API.
>>
>> Please have a look at it and let me know your opinion. This has been
>> written to eliminate the Data loss by committing the offset after it is
>> written to BM. Also existing HighLevel KafkaUtils does not have any feature
>> to control Data Flow, and is gives Out Of Memory error is there is too much
>> backlogs in Kafka. This consumer solves this problem as well.  And this
>> code has been modified from earlier Storm Kafka consumer code and it has
>> lot of other features like recovery from Kafka node failures, ZK failures,
>> recover from Offset errors etc.
>>
>> Regards,
>> Dibyendu
>>
>>
>> On Tue, Aug 19, 2014 at 9:49 AM, Shao, Saisai 
>> wrote:
>>
>>>  I think Currently Spark Streaming lack a data acknowledging mechanism
>>> when data is stored and replicated in BlockManager, so potentially data
>>> will be lost even pulled into Kafka, say if data is stored just in
>>> BlockGenerator not BM, while in the meantime Kafka itself commit the
>>> consumer offset, also at this point node is failed, from Kafka’s point this
>>> part of data is feed into Spark Streaming but actually this data is not yet
>>> processed, so potentially this part of data will never be processed again,
>>> unless you read the whole partition again.
>>>
>>>
>>>
>>> To solve this potential data loss problem, Spark Streaming needs to
>>> offer a data acknowledging mechanism, so custom Receiver can use this
>>> acknowledgement to do checkpoint or recovery, like Storm.
>>>
>>>
>>>
>>> Besides, driver failure is another story need to be carefully
>>> considered. So currently it is hard to make sure no data loss in Spark
>>> Streaming, still need to improve at some points J.
>>>
>>>
>>>
>>> Thanks
>>>
>>> Jerry
>>>
>>>
>>>
>>> *From:* Tobias Pfeiffer [mailto:t...@preferred.jp]
>>> *Sent:* Tuesday, August 19, 2014 10:47 AM
>>> *To:* Wei Liu
>>> *Cc:* user
>>> *Subject:* Re: Data loss - Spark streaming and network receiver
>>>
>>>
>>>
>>> Hi Wei,
>>>
>>>
>>>
>>> On Tue, Aug 19, 2014 at 10:18 AM, Wei Liu 
>>> wrote:
>>>
>>> Since our application cannot tolerate losing customer data, I am
>>> wondering what is the best way for us to address this issue.
>>>
>>> 1) We are thinking writing application specific logic to address the
>>> data loss. To us, the problem seems to be caused by that Kinesis receivers
>>> advanced their checkpoint before we know for sure the data is replicated.
>>> For example, we can do another checkpoint ourselves to remember the kinesis
>>> sequence number for data that has been processed by spark streaming. When
>>> Kinesis receiver is restarted due to worker failures, we restarted it from
>>> the checkpoint we tracked.
>>>
>>>
>>>
>>> This sounds pretty much to me like the way Kafka does it. So, I am not
>>> saying that the stock KafkaReceiver does what you want (it may or may not),
>>> but it should be possible to update the "offset" (corresponds to "sequence
>>> number") in Zookeeper only after data has been replicated successfully. I
>>> guess "replace Kinesis by Kafka" is not in option for you, but you may
>>> consider pulling Kinesis data into Kafka before processing with Spark?
>>>
>>>
>>>
>>> Tobias
>>>
>>>
>>>
>>
>>
>


Re: Data loss - Spark streaming and network receiver

2014-08-18 Thread Wei Liu
Thank you all for responding to my question. I am pleasantly surprised by
this many prompt responses I got. It shows the strength of the spark
community.

Kafka is still an option for us, I will check out the link provided by
Dibyendu.

Meanwhile if someone out there already figured this out with Kinesis,
please keep your suggestion coming. Thanks.

Thanks,
Wei


On Mon, Aug 18, 2014 at 9:31 PM, Dibyendu Bhattacharya <
dibyendu.bhattach...@gmail.com> wrote:

> Dear All,
>
> Recently I have written a Spark Kafka Consumer to solve this problem. Even
> we have seen issues with KafkaUtils which is using Highlevel Kafka Consumer
> and consumer code has no handle to offset management.
>
> The below code solves this problem, and this has is being tested in our
> Spark Cluster and this working fine as of now.
>
> https://github.com/dibbhatt/kafka-spark-consumer
>
> This is Low Level Kafka Consumer using Kafka Simple Consumer API.
>
> Please have a look at it and let me know your opinion. This has been
> written to eliminate the Data loss by committing the offset after it is
> written to BM. Also existing HighLevel KafkaUtils does not have any feature
> to control Data Flow, and is gives Out Of Memory error is there is too much
> backlogs in Kafka. This consumer solves this problem as well.  And this
> code has been modified from earlier Storm Kafka consumer code and it has
> lot of other features like recovery from Kafka node failures, ZK failures,
> recover from Offset errors etc.
>
> Regards,
> Dibyendu
>
>
> On Tue, Aug 19, 2014 at 9:49 AM, Shao, Saisai 
> wrote:
>
>>  I think Currently Spark Streaming lack a data acknowledging mechanism
>> when data is stored and replicated in BlockManager, so potentially data
>> will be lost even pulled into Kafka, say if data is stored just in
>> BlockGenerator not BM, while in the meantime Kafka itself commit the
>> consumer offset, also at this point node is failed, from Kafka’s point this
>> part of data is feed into Spark Streaming but actually this data is not yet
>> processed, so potentially this part of data will never be processed again,
>> unless you read the whole partition again.
>>
>>
>>
>> To solve this potential data loss problem, Spark Streaming needs to offer
>> a data acknowledging mechanism, so custom Receiver can use this
>> acknowledgement to do checkpoint or recovery, like Storm.
>>
>>
>>
>> Besides, driver failure is another story need to be carefully considered.
>> So currently it is hard to make sure no data loss in Spark Streaming, still
>> need to improve at some points J.
>>
>>
>>
>> Thanks
>>
>> Jerry
>>
>>
>>
>> *From:* Tobias Pfeiffer [mailto:t...@preferred.jp]
>> *Sent:* Tuesday, August 19, 2014 10:47 AM
>> *To:* Wei Liu
>> *Cc:* user
>> *Subject:* Re: Data loss - Spark streaming and network receiver
>>
>>
>>
>> Hi Wei,
>>
>>
>>
>> On Tue, Aug 19, 2014 at 10:18 AM, Wei Liu 
>> wrote:
>>
>> Since our application cannot tolerate losing customer data, I am
>> wondering what is the best way for us to address this issue.
>>
>> 1) We are thinking writing application specific logic to address the data
>> loss. To us, the problem seems to be caused by that Kinesis receivers
>> advanced their checkpoint before we know for sure the data is replicated.
>> For example, we can do another checkpoint ourselves to remember the kinesis
>> sequence number for data that has been processed by spark streaming. When
>> Kinesis receiver is restarted due to worker failures, we restarted it from
>> the checkpoint we tracked.
>>
>>
>>
>> This sounds pretty much to me like the way Kafka does it. So, I am not
>> saying that the stock KafkaReceiver does what you want (it may or may not),
>> but it should be possible to update the "offset" (corresponds to "sequence
>> number") in Zookeeper only after data has been replicated successfully. I
>> guess "replace Kinesis by Kafka" is not in option for you, but you may
>> consider pulling Kinesis data into Kafka before processing with Spark?
>>
>>
>>
>> Tobias
>>
>>
>>
>
>


Re: Data loss - Spark streaming and network receiver

2014-08-18 Thread Dibyendu Bhattacharya
Dear All,

Recently I have written a Spark Kafka Consumer to solve this problem. Even
we have seen issues with KafkaUtils which is using Highlevel Kafka Consumer
and consumer code has no handle to offset management.

The below code solves this problem, and this has is being tested in our
Spark Cluster and this working fine as of now.

https://github.com/dibbhatt/kafka-spark-consumer

This is Low Level Kafka Consumer using Kafka Simple Consumer API.

Please have a look at it and let me know your opinion. This has been
written to eliminate the Data loss by committing the offset after it is
written to BM. Also existing HighLevel KafkaUtils does not have any feature
to control Data Flow, and is gives Out Of Memory error is there is too much
backlogs in Kafka. This consumer solves this problem as well.  And this
code has been modified from earlier Storm Kafka consumer code and it has
lot of other features like recovery from Kafka node failures, ZK failures,
recover from Offset errors etc.

Regards,
Dibyendu


On Tue, Aug 19, 2014 at 9:49 AM, Shao, Saisai  wrote:

>  I think Currently Spark Streaming lack a data acknowledging mechanism
> when data is stored and replicated in BlockManager, so potentially data
> will be lost even pulled into Kafka, say if data is stored just in
> BlockGenerator not BM, while in the meantime Kafka itself commit the
> consumer offset, also at this point node is failed, from Kafka’s point this
> part of data is feed into Spark Streaming but actually this data is not yet
> processed, so potentially this part of data will never be processed again,
> unless you read the whole partition again.
>
>
>
> To solve this potential data loss problem, Spark Streaming needs to offer
> a data acknowledging mechanism, so custom Receiver can use this
> acknowledgement to do checkpoint or recovery, like Storm.
>
>
>
> Besides, driver failure is another story need to be carefully considered.
> So currently it is hard to make sure no data loss in Spark Streaming, still
> need to improve at some points J.
>
>
>
> Thanks
>
> Jerry
>
>
>
> *From:* Tobias Pfeiffer [mailto:t...@preferred.jp]
> *Sent:* Tuesday, August 19, 2014 10:47 AM
> *To:* Wei Liu
> *Cc:* user
> *Subject:* Re: Data loss - Spark streaming and network receiver
>
>
>
> Hi Wei,
>
>
>
> On Tue, Aug 19, 2014 at 10:18 AM, Wei Liu 
> wrote:
>
> Since our application cannot tolerate losing customer data, I am wondering
> what is the best way for us to address this issue.
>
> 1) We are thinking writing application specific logic to address the data
> loss. To us, the problem seems to be caused by that Kinesis receivers
> advanced their checkpoint before we know for sure the data is replicated.
> For example, we can do another checkpoint ourselves to remember the kinesis
> sequence number for data that has been processed by spark streaming. When
> Kinesis receiver is restarted due to worker failures, we restarted it from
> the checkpoint we tracked.
>
>
>
> This sounds pretty much to me like the way Kafka does it. So, I am not
> saying that the stock KafkaReceiver does what you want (it may or may not),
> but it should be possible to update the "offset" (corresponds to "sequence
> number") in Zookeeper only after data has been replicated successfully. I
> guess "replace Kinesis by Kafka" is not in option for you, but you may
> consider pulling Kinesis data into Kafka before processing with Spark?
>
>
>
> Tobias
>
>
>


RE: Data loss - Spark streaming and network receiver

2014-08-18 Thread Shao, Saisai
I think Currently Spark Streaming lack a data acknowledging mechanism when data 
is stored and replicated in BlockManager, so potentially data will be lost even 
pulled into Kafka, say if data is stored just in BlockGenerator not BM, while 
in the meantime Kafka itself commit the consumer offset, also at this point 
node is failed, from Kafka’s point this part of data is feed into Spark 
Streaming but actually this data is not yet processed, so potentially this part 
of data will never be processed again, unless you read the whole partition 
again.

To solve this potential data loss problem, Spark Streaming needs to offer a 
data acknowledging mechanism, so custom Receiver can use this acknowledgement 
to do checkpoint or recovery, like Storm.

Besides, driver failure is another story need to be carefully considered. So 
currently it is hard to make sure no data loss in Spark Streaming, still need 
to improve at some points ☺.

Thanks
Jerry

From: Tobias Pfeiffer [mailto:t...@preferred.jp]
Sent: Tuesday, August 19, 2014 10:47 AM
To: Wei Liu
Cc: user
Subject: Re: Data loss - Spark streaming and network receiver

Hi Wei,

On Tue, Aug 19, 2014 at 10:18 AM, Wei Liu 
mailto:wei@stellarloyalty.com>> wrote:
Since our application cannot tolerate losing customer data, I am wondering what 
is the best way for us to address this issue.
1) We are thinking writing application specific logic to address the data loss. 
To us, the problem seems to be caused by that Kinesis receivers advanced their 
checkpoint before we know for sure the data is replicated. For example, we can 
do another checkpoint ourselves to remember the kinesis sequence number for 
data that has been processed by spark streaming. When Kinesis receiver is 
restarted due to worker failures, we restarted it from the checkpoint we 
tracked.

This sounds pretty much to me like the way Kafka does it. So, I am not saying 
that the stock KafkaReceiver does what you want (it may or may not), but it 
should be possible to update the "offset" (corresponds to "sequence number") in 
Zookeeper only after data has been replicated successfully. I guess "replace 
Kinesis by Kafka" is not in option for you, but you may consider pulling 
Kinesis data into Kafka before processing with Spark?

Tobias



Re: Data loss - Spark streaming and network receiver

2014-08-18 Thread Tobias Pfeiffer
Hi Wei,

On Tue, Aug 19, 2014 at 10:18 AM, Wei Liu 
wrote:
>
> Since our application cannot tolerate losing customer data, I am wondering
> what is the best way for us to address this issue.
> 1) We are thinking writing application specific logic to address the data
> loss. To us, the problem seems to be caused by that Kinesis receivers
> advanced their checkpoint before we know for sure the data is replicated.
> For example, we can do another checkpoint ourselves to remember the kinesis
> sequence number for data that has been processed by spark streaming. When
> Kinesis receiver is restarted due to worker failures, we restarted it from
> the checkpoint we tracked.
>

This sounds pretty much to me like the way Kafka does it. So, I am not
saying that the stock KafkaReceiver does what you want (it may or may not),
but it should be possible to update the "offset" (corresponds to "sequence
number") in Zookeeper only after data has been replicated successfully. I
guess "replace Kinesis by Kafka" is not in option for you, but you may
consider pulling Kinesis data into Kafka before processing with Spark?

Tobias


Data loss - Spark streaming and network receiver

2014-08-18 Thread Wei Liu
We are prototyping an application with Spark streaming and Kinesis. We use
kinesis to accept incoming txn data, and then process them using spark
streaming. So far we really liked both technologies, and we saw both
technologies are getting mature rapidly. We are almost settled to use these
two technologies, but we are a little scary by the paragraph in the
programming guide.

"For network-based data sources like Kafka and Flume, the received input
data is replicated in memory between nodes of the cluster (default
replication factor is 2). So if a worker node fails, then the system can
recompute the lost from the the left over copy of the input data. However,
if the worker node where a network receiver was running fails, then a tiny
bit of data may be lost, that is, the data received by the system but not
yet replicated to other node(s). The receiver will be started on a
different node and it will continue to receive data."

Since our application cannot tolerate losing customer data, I am wondering
what is the best way for us to address this issue.
1) We are thinking writing application specific logic to address the data
loss. To us, the problem seems to be caused by that Kinesis receivers
advanced their checkpoint before we know for sure the data is replicated.
For example, we can do another checkpoint ourselves to remember the kinesis
sequence number for data that has been processed by spark streaming. When
Kinesis receiver is restarted due to worker failures, we restarted it from
the checkpoint we tracked. We also worry about our driver program (or the
whole cluster) dies because of a bug in the application, the above logic
will allow us to resume from our last checkpoint.

Is there any best practices out there for this issue? I suppose many folks
are using spark streaming with network receivers, any suggestion is
welcomed.
2) Write kinesis data to s3 first, then either use it as a backup or read
from s3 in spark streaming. This is the safest approach but with a
performance/latency penalty. On the other hand,  we may have to write data
to s3 anyway since Kinesis only stores up to 24 hours data just in case we
had a bad day in our server infrastructure.
3) Wait for this issue to be addressed in spark streaming. I found this
ticket https://issues.apache.org/jira/browse/SPARK-1647, but it is not
resolved yet.

Thanks,
Wei