Just wanted to make sure one thing is really clear – the kafka offsets are part 
of the actual RDD – in every batch spark is saving the offset ranges for each 
partition – this in theory will make the data in each batch stable across 
recovery.

The other important thing is that with correct checkpointing on the DStreams 
(mandatory on stateful ones) you will rarely (if ever!) need to go back from 
zero. That’s the point of checkpointing data.
If you checkpoint every 10 batches, then you will have to re-process AT MOST 10 
batches back, and the new data will be merged into the state that’s loaded from 
the hdfs checkpoint.

Lastly, there are still issues with adding/removing nodes from a running 
cluster. Most of the time it works, sometimes the job crashes or doesn’t 
re-deploy the executors. That being said, restarting the driver (with no 
dataloss thanks to checkpointing) has always been a workaround that worked for 
me.
In this spirit, you could test (I have it on my list) stopping a driver by 
killing the process or with yarn application –kill and resubmitting with a 
larger number of executors (—executor-cores). In theory it should work as 
expected, I don’t think this is part of the checkpointed metadata in the spark 
context.

-adrian


From: Cody Koeninger
Date: Tuesday, September 29, 2015 at 12:49 AM
To: Sourabh Chandak
Cc: Augustus Hong, "user@spark.apache.org<mailto:user@spark.apache.org>"
Subject: Re: Adding / Removing worker nodes for Spark Streaming

If a node fails, the partition / offset range that it was working on will be 
scheduled to run on another node.  This is generally true of spark, regardless 
of checkpointing.

The offset ranges for a given batch are stored in the checkpoint for that 
batch.  That's relevant if your entire job fails (driver failure, all workers 
fail, etc).

If you really can't afford to run from the smallest offset and can't afford to 
lose data, don't rely on spark checkpoints (because of the conditions under 
which they can't be recovered).  Store the offset ranges yourself.


On Mon, Sep 28, 2015 at 4:34 PM, Sourabh Chandak 
<sourabh3...@gmail.com<mailto:sourabh3...@gmail.com>> wrote:
I also have the same use case as Augustus, and have some basic questions about 
recovery from checkpoint. I have a 10 node Kafka cluster and a 30 node Spark 
cluster running streaming job, how is the (topic, partition) data handled in 
checkpointing. The scenario I want to understand is, in case of node failure 
how will a new node know the checkpoint of the failed node?
The amount of data we have is huge and we can't run from the smallest offset.

Thanks,
Sourabh

On Mon, Sep 28, 2015 at 11:43 AM, Augustus Hong 
<augus...@branchmetrics.io<mailto:augus...@branchmetrics.io>> wrote:
Got it, thank you!


On Mon, Sep 28, 2015 at 11:37 AM, Cody Koeninger 
<c...@koeninger.org<mailto:c...@koeninger.org>> wrote:
Losing worker nodes without stopping is definitely possible.  I haven't had 
much success adding workers to a running job, but I also haven't spent much 
time on it.

If you're restarting with the same jar, you should be able to recover from 
checkpoint without losing data (usual caveats apply, e.g. you need enough kafka 
retention).  Make sure to test it though, as the code paths taken during 
recovery from checkpoint are not the same as on initial startup, and you can 
run into unexpected issues (e.g. authentication).

On Mon, Sep 28, 2015 at 1:27 PM, Augustus Hong 
<augus...@branchmetrics.io<mailto:augus...@branchmetrics.io>> wrote:
Hey all,

I'm evaluating using Spark Streaming with Kafka direct streaming, and I have a 
couple of questions:

1.  Would it be possible to add / remove worker nodes without stopping and 
restarting the spark streaming driver?

2.  I understand that we can enable checkpointing to recover from node 
failures, and that it doesn't work across code changes.  What about in the 
event that worker nodes failed due to load -> we added more worker nodes -> 
restart Spark Streaming?  Would this incur data loss as well?


Best,
Augustus

--
[Branch Metrics mobile deep linking]<http://branch.io/> 
[https://app.xink.io/Images/Get/G3/b84.jpg]      Augustus Hong
 Data Analytics | Branch Metrics
 m 650-391-3369<tel:650-391-3369> | e 
augus...@branch.io<mailto:augus...@branch.io>




--
[Branch Metrics mobile deep linking]<http://branch.io/> 
[https://app.xink.io/Images/Get/G3/b84.jpg]      Augustus Hong
 Data Analytics | Branch Metrics
 m 650-391-3369<tel:650-391-3369> | e 
augus...@branch.io<mailto:augus...@branch.io>


Reply via email to