Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka

2015-12-03 Thread Cody Koeninger
Do you believe that all exceptions (including catastrophic ones like out of
heap space) should be caught and silently discarded?

Do you believe that a database system that runs out of disk space should
silently continue to accept writes?

What I am trying to say is, when something is broken in a way that cant be
fixed without external intervention, the system shouldn't hide it from
you.  Systems fail, that's a fact of life.  Pretending that a system hasn't
failed when it in fact is broken... not a good plan.



On Wed, Dec 2, 2015 at 11:38 PM, Dibyendu Bhattacharya <
dibyendu.bhattach...@gmail.com> wrote:

> There are other ways to deal with the problem than shutdown the streaming
> job. You can monitor the lag in your consumer to see if consumer if falling
> behind . If lag is too high that offsetOutOfRange can happen, you either
> increase retention period or increase consumer rate..or do both ..
>
> What I am trying to say, streaming job should not fail in any cases ..
>
> Dibyendu
>
> On Thu, Dec 3, 2015 at 9:40 AM, Cody Koeninger  wrote:
>
>> I believe that what differentiates reliable systems is individual
>> components should fail fast when their preconditions aren't met, and other
>> components should be responsible for monitoring them.
>>
>> If a user of the direct stream thinks that your approach of restarting
>> and ignoring data loss is the right thing to do, they can monitor the job
>> (which they should be doing in any case) and restart.
>>
>> If a user of your library thinks that my approach of failing (so they
>> KNOW there was data loss and can adjust their system) is the right thing to
>> do, how do they do that?
>>
>> On Wed, Dec 2, 2015 at 9:49 PM, Dibyendu Bhattacharya <
>> dibyendu.bhattach...@gmail.com> wrote:
>>
>>> Well, even if you do correct retention and increase speed,
>>> OffsetOutOfRange can still come depends on how your downstream processing
>>> is. And if that happen , there is No Other way to recover old messages . So
>>> best bet here from Streaming Job point of view  is to start from earliest
>>> offset rather bring down the streaming job . In many cases goal for a
>>> streaming job is not to shut down and exit in case of any failure. I
>>> believe that is what differentiate a always running streaming job.
>>>
>>> Dibyendu
>>>
>>> On Thu, Dec 3, 2015 at 8:26 AM, Cody Koeninger 
>>> wrote:
>>>
 No, silently restarting from the earliest offset in the case of offset
 out of range exceptions during a streaming job is not the "correct way of
 recovery".

 If you do that, your users will be losing data without knowing why.
 It's more like  a "way of ignoring the problem without actually addressing
 it".

 The only really correct way to deal with that situation is to recognize
 why it's happening, and either increase your Kafka retention or increase
 the speed at which you are consuming.

 On Wed, Dec 2, 2015 at 7:13 PM, Dibyendu Bhattacharya <
 dibyendu.bhattach...@gmail.com> wrote:

> This consumer which I mentioned does not silently throw away data. If
> offset out of range it start for earliest offset and that is correct way 
> of
> recovery from this error.
>
> Dibyendu
> On Dec 2, 2015 9:56 PM, "Cody Koeninger"  wrote:
>
>> Again, just to be clear, silently throwing away data because your
>> system isn't working right is not the same as "recover from any Kafka
>> leader changes and offset out of ranges issue".
>>
>>
>>
>> On Tue, Dec 1, 2015 at 11:27 PM, Dibyendu Bhattacharya <
>> dibyendu.bhattach...@gmail.com> wrote:
>>
>>> Hi, if you use Receiver based consumer which is available in
>>> spark-packages (
>>> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) ,
>>> this has all built in failure recovery and it can recover from any Kafka
>>> leader changes and offset out of ranges issue.
>>>
>>> Here is the package form github :
>>> https://github.com/dibbhatt/kafka-spark-consumer
>>>
>>>
>>> Dibyendu
>>>
>>> On Wed, Dec 2, 2015 at 5:28 AM, swetha kasireddy <
>>> swethakasire...@gmail.com> wrote:
>>>
 How to avoid those Errors with receiver based approach? Suppose we
 are OK with at least once processing and use receiver based approach 
 which
 uses ZooKeeper but not query Kafka directly, would these 
 errors(Couldn't
 find leader offsets for
 Set([test_stream,5])))be avoided?

 On Tue, Dec 1, 2015 at 3:40 PM, Cody Koeninger 
 wrote:

> KafkaRDD.scala , handleFetchErr
>
> On Tue, Dec 1, 2015 at 3:39 PM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> Hi Cody,
>>
>> How to look at Option 2(see the following)? Which portion of 

Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka

2015-12-02 Thread Cody Koeninger
Again, just to be clear, silently throwing away data because your system
isn't working right is not the same as "recover from any Kafka leader
changes and offset out of ranges issue".



On Tue, Dec 1, 2015 at 11:27 PM, Dibyendu Bhattacharya <
dibyendu.bhattach...@gmail.com> wrote:

> Hi, if you use Receiver based consumer which is available in
> spark-packages (
> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) , this
> has all built in failure recovery and it can recover from any Kafka leader
> changes and offset out of ranges issue.
>
> Here is the package form github :
> https://github.com/dibbhatt/kafka-spark-consumer
>
>
> Dibyendu
>
> On Wed, Dec 2, 2015 at 5:28 AM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> How to avoid those Errors with receiver based approach? Suppose we are OK
>> with at least once processing and use receiver based approach which uses
>> ZooKeeper but not query Kafka directly, would these errors(Couldn't find
>> leader offsets for
>> Set([test_stream,5])))be avoided?
>>
>> On Tue, Dec 1, 2015 at 3:40 PM, Cody Koeninger 
>> wrote:
>>
>>> KafkaRDD.scala , handleFetchErr
>>>
>>> On Tue, Dec 1, 2015 at 3:39 PM, swetha kasireddy <
>>> swethakasire...@gmail.com> wrote:
>>>
 Hi Cody,

 How to look at Option 2(see the following)? Which portion of the code
 in Spark Kafka Direct to look at to handle this issue specific to our
 requirements.


 2.Catch that exception and somehow force things to "reset" for that
 partition And how would it handle the offsets already calculated in the
 backlog (if there is one)?

 On Tue, Dec 1, 2015 at 6:51 AM, Cody Koeninger 
 wrote:

> If you're consistently getting offset out of range exceptions, it's
> probably because messages are getting deleted before you've processed 
> them.
>
> The only real way to deal with this is give kafka more retention,
> consume faster, or both.
>
> If you're just looking for a quick "fix" for an infrequent issue,
> option 4 is probably easiest.  I wouldn't do that automatically / 
> silently,
> because you're losing data.
>
> On Mon, Nov 30, 2015 at 6:22 PM, SRK 
> wrote:
>
>> Hi,
>>
>> So, our Streaming Job fails with the following errors. If you see the
>> errors
>> below, they are all related to Kafka losing offsets and
>> OffsetOutOfRangeException.
>>
>> What are the options we have other than fixing Kafka? We would like
>> to do
>> something like the following. How can we achieve 1 and 2 with Spark
>> Kafka
>> Direct?
>>
>> 1.Need to see a way to skip some offsets if they are not available
>> after the
>> max retries are reached..in that case there might be data loss.
>>
>> 2.Catch that exception and somehow force things to "reset" for that
>> partition And how would it handle the offsets already calculated in
>> the
>> backlog (if there is one)?
>>
>> 3.Track the offsets separately, restart the job by providing the
>> offsets.
>>
>> 4.Or a straightforward approach would be to monitor the log for this
>> error,
>> and if it occurs more than X times, kill the job, remove the
>> checkpoint
>> directory, and restart.
>>
>> ERROR DirectKafkaInputDStream:
>> ArrayBuffer(kafka.common.UnknownException,
>> org.apache.spark.SparkException: Couldn't find leader offsets for
>> Set([test_stream,5]))
>>
>>
>>
>> java.lang.ClassNotFoundException:
>> kafka.common.NotLeaderForPartitionException
>>
>> at
>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>
>>
>>
>> java.util.concurrent.RejectedExecutionException: Task
>>
>> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8
>> rejected from java.util.concurrent.ThreadPoolExecutor@543258e0
>> [Terminated,
>> pool size = 0, active threads = 0, queued tasks = 0, completed tasks =
>> 12112]
>>
>>
>>
>> org.apache.spark.SparkException: Job aborted due to stage failure:
>> Task 10
>> in stage 52.0 failed 4 times, most recent failure: Lost task 10.3 in
>> stage
>> 52.0 (TID 255, 172.16.97.97): UnknownReason
>>
>> Exception in thread "streaming-job-executor-0" java.lang.Error:
>> java.lang.InterruptedException
>>
>> Caused by: java.lang.InterruptedException
>>
>> java.lang.ClassNotFoundException:
>> kafka.common.OffsetOutOfRangeException
>>
>> at
>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>
>>
>>
>> org.apache.spark.SparkException: Job aborted due to stage failure:
>> Task 7 in
>> stage 33.0 failed 4 times, most recent failure: Lost task 7.3 in
>> stage 33.0
>> (TID 283, 

Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka

2015-12-02 Thread Dibyendu Bhattacharya
This consumer which I mentioned does not silently throw away data. If
offset out of range it start for earliest offset and that is correct way of
recovery from this error.

Dibyendu
On Dec 2, 2015 9:56 PM, "Cody Koeninger"  wrote:

> Again, just to be clear, silently throwing away data because your system
> isn't working right is not the same as "recover from any Kafka leader
> changes and offset out of ranges issue".
>
>
>
> On Tue, Dec 1, 2015 at 11:27 PM, Dibyendu Bhattacharya <
> dibyendu.bhattach...@gmail.com> wrote:
>
>> Hi, if you use Receiver based consumer which is available in
>> spark-packages (
>> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) , this
>> has all built in failure recovery and it can recover from any Kafka leader
>> changes and offset out of ranges issue.
>>
>> Here is the package form github :
>> https://github.com/dibbhatt/kafka-spark-consumer
>>
>>
>> Dibyendu
>>
>> On Wed, Dec 2, 2015 at 5:28 AM, swetha kasireddy <
>> swethakasire...@gmail.com> wrote:
>>
>>> How to avoid those Errors with receiver based approach? Suppose we are
>>> OK with at least once processing and use receiver based approach which uses
>>> ZooKeeper but not query Kafka directly, would these errors(Couldn't
>>> find leader offsets for
>>> Set([test_stream,5])))be avoided?
>>>
>>> On Tue, Dec 1, 2015 at 3:40 PM, Cody Koeninger 
>>> wrote:
>>>
 KafkaRDD.scala , handleFetchErr

 On Tue, Dec 1, 2015 at 3:39 PM, swetha kasireddy <
 swethakasire...@gmail.com> wrote:

> Hi Cody,
>
> How to look at Option 2(see the following)? Which portion of the code
> in Spark Kafka Direct to look at to handle this issue specific to our
> requirements.
>
>
> 2.Catch that exception and somehow force things to "reset" for that
> partition And how would it handle the offsets already calculated in the
> backlog (if there is one)?
>
> On Tue, Dec 1, 2015 at 6:51 AM, Cody Koeninger 
> wrote:
>
>> If you're consistently getting offset out of range exceptions, it's
>> probably because messages are getting deleted before you've processed 
>> them.
>>
>> The only real way to deal with this is give kafka more retention,
>> consume faster, or both.
>>
>> If you're just looking for a quick "fix" for an infrequent issue,
>> option 4 is probably easiest.  I wouldn't do that automatically / 
>> silently,
>> because you're losing data.
>>
>> On Mon, Nov 30, 2015 at 6:22 PM, SRK 
>> wrote:
>>
>>> Hi,
>>>
>>> So, our Streaming Job fails with the following errors. If you see
>>> the errors
>>> below, they are all related to Kafka losing offsets and
>>> OffsetOutOfRangeException.
>>>
>>> What are the options we have other than fixing Kafka? We would like
>>> to do
>>> something like the following. How can we achieve 1 and 2 with Spark
>>> Kafka
>>> Direct?
>>>
>>> 1.Need to see a way to skip some offsets if they are not available
>>> after the
>>> max retries are reached..in that case there might be data loss.
>>>
>>> 2.Catch that exception and somehow force things to "reset" for that
>>> partition And how would it handle the offsets already calculated in
>>> the
>>> backlog (if there is one)?
>>>
>>> 3.Track the offsets separately, restart the job by providing the
>>> offsets.
>>>
>>> 4.Or a straightforward approach would be to monitor the log for this
>>> error,
>>> and if it occurs more than X times, kill the job, remove the
>>> checkpoint
>>> directory, and restart.
>>>
>>> ERROR DirectKafkaInputDStream:
>>> ArrayBuffer(kafka.common.UnknownException,
>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>> Set([test_stream,5]))
>>>
>>>
>>>
>>> java.lang.ClassNotFoundException:
>>> kafka.common.NotLeaderForPartitionException
>>>
>>> at
>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>
>>>
>>>
>>> java.util.concurrent.RejectedExecutionException: Task
>>>
>>> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8
>>> rejected from java.util.concurrent.ThreadPoolExecutor@543258e0
>>> [Terminated,
>>> pool size = 0, active threads = 0, queued tasks = 0, completed tasks
>>> =
>>> 12112]
>>>
>>>
>>>
>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>> Task 10
>>> in stage 52.0 failed 4 times, most recent failure: Lost task 10.3 in
>>> stage
>>> 52.0 (TID 255, 172.16.97.97): UnknownReason
>>>
>>> Exception in thread "streaming-job-executor-0" java.lang.Error:
>>> java.lang.InterruptedException
>>>
>>> Caused by: java.lang.InterruptedException
>>>
>>> 

Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka

2015-12-02 Thread Cody Koeninger
I believe that what differentiates reliable systems is individual
components should fail fast when their preconditions aren't met, and other
components should be responsible for monitoring them.

If a user of the direct stream thinks that your approach of restarting and
ignoring data loss is the right thing to do, they can monitor the job
(which they should be doing in any case) and restart.

If a user of your library thinks that my approach of failing (so they KNOW
there was data loss and can adjust their system) is the right thing to do,
how do they do that?

On Wed, Dec 2, 2015 at 9:49 PM, Dibyendu Bhattacharya <
dibyendu.bhattach...@gmail.com> wrote:

> Well, even if you do correct retention and increase speed,
> OffsetOutOfRange can still come depends on how your downstream processing
> is. And if that happen , there is No Other way to recover old messages . So
> best bet here from Streaming Job point of view  is to start from earliest
> offset rather bring down the streaming job . In many cases goal for a
> streaming job is not to shut down and exit in case of any failure. I
> believe that is what differentiate a always running streaming job.
>
> Dibyendu
>
> On Thu, Dec 3, 2015 at 8:26 AM, Cody Koeninger  wrote:
>
>> No, silently restarting from the earliest offset in the case of offset
>> out of range exceptions during a streaming job is not the "correct way of
>> recovery".
>>
>> If you do that, your users will be losing data without knowing why.  It's
>> more like  a "way of ignoring the problem without actually addressing it".
>>
>> The only really correct way to deal with that situation is to recognize
>> why it's happening, and either increase your Kafka retention or increase
>> the speed at which you are consuming.
>>
>> On Wed, Dec 2, 2015 at 7:13 PM, Dibyendu Bhattacharya <
>> dibyendu.bhattach...@gmail.com> wrote:
>>
>>> This consumer which I mentioned does not silently throw away data. If
>>> offset out of range it start for earliest offset and that is correct way of
>>> recovery from this error.
>>>
>>> Dibyendu
>>> On Dec 2, 2015 9:56 PM, "Cody Koeninger"  wrote:
>>>
 Again, just to be clear, silently throwing away data because your
 system isn't working right is not the same as "recover from any Kafka
 leader changes and offset out of ranges issue".



 On Tue, Dec 1, 2015 at 11:27 PM, Dibyendu Bhattacharya <
 dibyendu.bhattach...@gmail.com> wrote:

> Hi, if you use Receiver based consumer which is available in
> spark-packages (
> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) ,
> this has all built in failure recovery and it can recover from any Kafka
> leader changes and offset out of ranges issue.
>
> Here is the package form github :
> https://github.com/dibbhatt/kafka-spark-consumer
>
>
> Dibyendu
>
> On Wed, Dec 2, 2015 at 5:28 AM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> How to avoid those Errors with receiver based approach? Suppose we
>> are OK with at least once processing and use receiver based approach 
>> which
>> uses ZooKeeper but not query Kafka directly, would these errors(Couldn't
>> find leader offsets for
>> Set([test_stream,5])))be avoided?
>>
>> On Tue, Dec 1, 2015 at 3:40 PM, Cody Koeninger 
>> wrote:
>>
>>> KafkaRDD.scala , handleFetchErr
>>>
>>> On Tue, Dec 1, 2015 at 3:39 PM, swetha kasireddy <
>>> swethakasire...@gmail.com> wrote:
>>>
 Hi Cody,

 How to look at Option 2(see the following)? Which portion of the
 code in Spark Kafka Direct to look at to handle this issue specific to 
 our
 requirements.


 2.Catch that exception and somehow force things to "reset" for that
 partition And how would it handle the offsets already calculated in
 the
 backlog (if there is one)?

 On Tue, Dec 1, 2015 at 6:51 AM, Cody Koeninger 
 wrote:

> If you're consistently getting offset out of range exceptions,
> it's probably because messages are getting deleted before you've 
> processed
> them.
>
> The only real way to deal with this is give kafka more retention,
> consume faster, or both.
>
> If you're just looking for a quick "fix" for an infrequent issue,
> option 4 is probably easiest.  I wouldn't do that automatically / 
> silently,
> because you're losing data.
>
> On Mon, Nov 30, 2015 at 6:22 PM, SRK 
> wrote:
>
>> Hi,
>>
>> So, our Streaming Job fails with the following errors. If you see
>> the errors
>> below, they are all related to Kafka losing offsets and
>> 

Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka

2015-12-02 Thread Cody Koeninger
No, silently restarting from the earliest offset in the case of offset out
of range exceptions during a streaming job is not the "correct way of
recovery".

If you do that, your users will be losing data without knowing why.  It's
more like  a "way of ignoring the problem without actually addressing it".

The only really correct way to deal with that situation is to recognize why
it's happening, and either increase your Kafka retention or increase the
speed at which you are consuming.

On Wed, Dec 2, 2015 at 7:13 PM, Dibyendu Bhattacharya <
dibyendu.bhattach...@gmail.com> wrote:

> This consumer which I mentioned does not silently throw away data. If
> offset out of range it start for earliest offset and that is correct way of
> recovery from this error.
>
> Dibyendu
> On Dec 2, 2015 9:56 PM, "Cody Koeninger"  wrote:
>
>> Again, just to be clear, silently throwing away data because your system
>> isn't working right is not the same as "recover from any Kafka leader
>> changes and offset out of ranges issue".
>>
>>
>>
>> On Tue, Dec 1, 2015 at 11:27 PM, Dibyendu Bhattacharya <
>> dibyendu.bhattach...@gmail.com> wrote:
>>
>>> Hi, if you use Receiver based consumer which is available in
>>> spark-packages (
>>> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) , this
>>> has all built in failure recovery and it can recover from any Kafka leader
>>> changes and offset out of ranges issue.
>>>
>>> Here is the package form github :
>>> https://github.com/dibbhatt/kafka-spark-consumer
>>>
>>>
>>> Dibyendu
>>>
>>> On Wed, Dec 2, 2015 at 5:28 AM, swetha kasireddy <
>>> swethakasire...@gmail.com> wrote:
>>>
 How to avoid those Errors with receiver based approach? Suppose we are
 OK with at least once processing and use receiver based approach which uses
 ZooKeeper but not query Kafka directly, would these errors(Couldn't
 find leader offsets for
 Set([test_stream,5])))be avoided?

 On Tue, Dec 1, 2015 at 3:40 PM, Cody Koeninger 
 wrote:

> KafkaRDD.scala , handleFetchErr
>
> On Tue, Dec 1, 2015 at 3:39 PM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> Hi Cody,
>>
>> How to look at Option 2(see the following)? Which portion of the code
>> in Spark Kafka Direct to look at to handle this issue specific to our
>> requirements.
>>
>>
>> 2.Catch that exception and somehow force things to "reset" for that
>> partition And how would it handle the offsets already calculated in
>> the
>> backlog (if there is one)?
>>
>> On Tue, Dec 1, 2015 at 6:51 AM, Cody Koeninger 
>> wrote:
>>
>>> If you're consistently getting offset out of range exceptions, it's
>>> probably because messages are getting deleted before you've processed 
>>> them.
>>>
>>> The only real way to deal with this is give kafka more retention,
>>> consume faster, or both.
>>>
>>> If you're just looking for a quick "fix" for an infrequent issue,
>>> option 4 is probably easiest.  I wouldn't do that automatically / 
>>> silently,
>>> because you're losing data.
>>>
>>> On Mon, Nov 30, 2015 at 6:22 PM, SRK 
>>> wrote:
>>>
 Hi,

 So, our Streaming Job fails with the following errors. If you see
 the errors
 below, they are all related to Kafka losing offsets and
 OffsetOutOfRangeException.

 What are the options we have other than fixing Kafka? We would like
 to do
 something like the following. How can we achieve 1 and 2 with Spark
 Kafka
 Direct?

 1.Need to see a way to skip some offsets if they are not available
 after the
 max retries are reached..in that case there might be data loss.

 2.Catch that exception and somehow force things to "reset" for that
 partition And how would it handle the offsets already calculated in
 the
 backlog (if there is one)?

 3.Track the offsets separately, restart the job by providing the
 offsets.

 4.Or a straightforward approach would be to monitor the log for
 this error,
 and if it occurs more than X times, kill the job, remove the
 checkpoint
 directory, and restart.

 ERROR DirectKafkaInputDStream:
 ArrayBuffer(kafka.common.UnknownException,
 org.apache.spark.SparkException: Couldn't find leader offsets for
 Set([test_stream,5]))



 java.lang.ClassNotFoundException:
 kafka.common.NotLeaderForPartitionException

 at
 org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)



 java.util.concurrent.RejectedExecutionException: Task

 

Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka

2015-12-02 Thread Dibyendu Bhattacharya
Well, even if you do correct retention and increase speed, OffsetOutOfRange
can still come depends on how your downstream processing is. And if that
happen , there is No Other way to recover old messages . So best bet here
from Streaming Job point of view  is to start from earliest offset rather
bring down the streaming job . In many cases goal for a streaming job is
not to shut down and exit in case of any failure. I believe that is what
differentiate a always running streaming job.

Dibyendu

On Thu, Dec 3, 2015 at 8:26 AM, Cody Koeninger  wrote:

> No, silently restarting from the earliest offset in the case of offset out
> of range exceptions during a streaming job is not the "correct way of
> recovery".
>
> If you do that, your users will be losing data without knowing why.  It's
> more like  a "way of ignoring the problem without actually addressing it".
>
> The only really correct way to deal with that situation is to recognize
> why it's happening, and either increase your Kafka retention or increase
> the speed at which you are consuming.
>
> On Wed, Dec 2, 2015 at 7:13 PM, Dibyendu Bhattacharya <
> dibyendu.bhattach...@gmail.com> wrote:
>
>> This consumer which I mentioned does not silently throw away data. If
>> offset out of range it start for earliest offset and that is correct way of
>> recovery from this error.
>>
>> Dibyendu
>> On Dec 2, 2015 9:56 PM, "Cody Koeninger"  wrote:
>>
>>> Again, just to be clear, silently throwing away data because your system
>>> isn't working right is not the same as "recover from any Kafka leader
>>> changes and offset out of ranges issue".
>>>
>>>
>>>
>>> On Tue, Dec 1, 2015 at 11:27 PM, Dibyendu Bhattacharya <
>>> dibyendu.bhattach...@gmail.com> wrote:
>>>
 Hi, if you use Receiver based consumer which is available in
 spark-packages (
 http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) ,
 this has all built in failure recovery and it can recover from any Kafka
 leader changes and offset out of ranges issue.

 Here is the package form github :
 https://github.com/dibbhatt/kafka-spark-consumer


 Dibyendu

 On Wed, Dec 2, 2015 at 5:28 AM, swetha kasireddy <
 swethakasire...@gmail.com> wrote:

> How to avoid those Errors with receiver based approach? Suppose we are
> OK with at least once processing and use receiver based approach which 
> uses
> ZooKeeper but not query Kafka directly, would these errors(Couldn't
> find leader offsets for
> Set([test_stream,5])))be avoided?
>
> On Tue, Dec 1, 2015 at 3:40 PM, Cody Koeninger 
> wrote:
>
>> KafkaRDD.scala , handleFetchErr
>>
>> On Tue, Dec 1, 2015 at 3:39 PM, swetha kasireddy <
>> swethakasire...@gmail.com> wrote:
>>
>>> Hi Cody,
>>>
>>> How to look at Option 2(see the following)? Which portion of the
>>> code in Spark Kafka Direct to look at to handle this issue specific to 
>>> our
>>> requirements.
>>>
>>>
>>> 2.Catch that exception and somehow force things to "reset" for that
>>> partition And how would it handle the offsets already calculated in
>>> the
>>> backlog (if there is one)?
>>>
>>> On Tue, Dec 1, 2015 at 6:51 AM, Cody Koeninger 
>>> wrote:
>>>
 If you're consistently getting offset out of range exceptions, it's
 probably because messages are getting deleted before you've processed 
 them.

 The only real way to deal with this is give kafka more retention,
 consume faster, or both.

 If you're just looking for a quick "fix" for an infrequent issue,
 option 4 is probably easiest.  I wouldn't do that automatically / 
 silently,
 because you're losing data.

 On Mon, Nov 30, 2015 at 6:22 PM, SRK 
 wrote:

> Hi,
>
> So, our Streaming Job fails with the following errors. If you see
> the errors
> below, they are all related to Kafka losing offsets and
> OffsetOutOfRangeException.
>
> What are the options we have other than fixing Kafka? We would
> like to do
> something like the following. How can we achieve 1 and 2 with
> Spark Kafka
> Direct?
>
> 1.Need to see a way to skip some offsets if they are not available
> after the
> max retries are reached..in that case there might be data loss.
>
> 2.Catch that exception and somehow force things to "reset" for that
> partition And how would it handle the offsets already calculated
> in the
> backlog (if there is one)?
>
> 3.Track the offsets separately, restart the job by providing the
> offsets.
>
> 4.Or a straightforward approach 

Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka

2015-12-01 Thread swetha kasireddy
Hi Cody,

How to look at Option 2(see the following)? Which portion of the code in
Spark Kafka Direct to look at to handle this issue specific to our
requirements.


2.Catch that exception and somehow force things to "reset" for that
partition And how would it handle the offsets already calculated in the
backlog (if there is one)?

On Tue, Dec 1, 2015 at 6:51 AM, Cody Koeninger  wrote:

> If you're consistently getting offset out of range exceptions, it's
> probably because messages are getting deleted before you've processed them.
>
> The only real way to deal with this is give kafka more retention, consume
> faster, or both.
>
> If you're just looking for a quick "fix" for an infrequent issue, option 4
> is probably easiest.  I wouldn't do that automatically / silently, because
> you're losing data.
>
> On Mon, Nov 30, 2015 at 6:22 PM, SRK  wrote:
>
>> Hi,
>>
>> So, our Streaming Job fails with the following errors. If you see the
>> errors
>> below, they are all related to Kafka losing offsets and
>> OffsetOutOfRangeException.
>>
>> What are the options we have other than fixing Kafka? We would like to do
>> something like the following. How can we achieve 1 and 2 with Spark Kafka
>> Direct?
>>
>> 1.Need to see a way to skip some offsets if they are not available after
>> the
>> max retries are reached..in that case there might be data loss.
>>
>> 2.Catch that exception and somehow force things to "reset" for that
>> partition And how would it handle the offsets already calculated in the
>> backlog (if there is one)?
>>
>> 3.Track the offsets separately, restart the job by providing the offsets.
>>
>> 4.Or a straightforward approach would be to monitor the log for this
>> error,
>> and if it occurs more than X times, kill the job, remove the checkpoint
>> directory, and restart.
>>
>> ERROR DirectKafkaInputDStream: ArrayBuffer(kafka.common.UnknownException,
>> org.apache.spark.SparkException: Couldn't find leader offsets for
>> Set([test_stream,5]))
>>
>>
>>
>> java.lang.ClassNotFoundException:
>> kafka.common.NotLeaderForPartitionException
>>
>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>
>>
>>
>> java.util.concurrent.RejectedExecutionException: Task
>> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8
>> rejected from java.util.concurrent.ThreadPoolExecutor@543258e0
>> [Terminated,
>> pool size = 0, active threads = 0, queued tasks = 0, completed tasks =
>> 12112]
>>
>>
>>
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 10
>> in stage 52.0 failed 4 times, most recent failure: Lost task 10.3 in stage
>> 52.0 (TID 255, 172.16.97.97): UnknownReason
>>
>> Exception in thread "streaming-job-executor-0" java.lang.Error:
>> java.lang.InterruptedException
>>
>> Caused by: java.lang.InterruptedException
>>
>> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
>>
>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>
>>
>>
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 7
>> in
>> stage 33.0 failed 4 times, most recent failure: Lost task 7.3 in stage
>> 33.0
>> (TID 283, 172.16.97.103): UnknownReason
>>
>> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
>>
>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>
>> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
>>
>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Recovery-for-Spark-Streaming-Kafka-Direct-in-case-of-issues-with-Kafka-tp25524.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka

2015-12-01 Thread swetha kasireddy
How to avoid those Errors with receiver based approach? Suppose we are OK
with at least once processing and use receiver based approach which uses
ZooKeeper but not query Kafka directly, would these errors(Couldn't find
leader offsets for
Set([test_stream,5])))be avoided?

On Tue, Dec 1, 2015 at 3:40 PM, Cody Koeninger  wrote:

> KafkaRDD.scala , handleFetchErr
>
> On Tue, Dec 1, 2015 at 3:39 PM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> Hi Cody,
>>
>> How to look at Option 2(see the following)? Which portion of the code in
>> Spark Kafka Direct to look at to handle this issue specific to our
>> requirements.
>>
>>
>> 2.Catch that exception and somehow force things to "reset" for that
>> partition And how would it handle the offsets already calculated in the
>> backlog (if there is one)?
>>
>> On Tue, Dec 1, 2015 at 6:51 AM, Cody Koeninger 
>> wrote:
>>
>>> If you're consistently getting offset out of range exceptions, it's
>>> probably because messages are getting deleted before you've processed them.
>>>
>>> The only real way to deal with this is give kafka more retention,
>>> consume faster, or both.
>>>
>>> If you're just looking for a quick "fix" for an infrequent issue, option
>>> 4 is probably easiest.  I wouldn't do that automatically / silently,
>>> because you're losing data.
>>>
>>> On Mon, Nov 30, 2015 at 6:22 PM, SRK  wrote:
>>>
 Hi,

 So, our Streaming Job fails with the following errors. If you see the
 errors
 below, they are all related to Kafka losing offsets and
 OffsetOutOfRangeException.

 What are the options we have other than fixing Kafka? We would like to
 do
 something like the following. How can we achieve 1 and 2 with Spark
 Kafka
 Direct?

 1.Need to see a way to skip some offsets if they are not available
 after the
 max retries are reached..in that case there might be data loss.

 2.Catch that exception and somehow force things to "reset" for that
 partition And how would it handle the offsets already calculated in the
 backlog (if there is one)?

 3.Track the offsets separately, restart the job by providing the
 offsets.

 4.Or a straightforward approach would be to monitor the log for this
 error,
 and if it occurs more than X times, kill the job, remove the checkpoint
 directory, and restart.

 ERROR DirectKafkaInputDStream:
 ArrayBuffer(kafka.common.UnknownException,
 org.apache.spark.SparkException: Couldn't find leader offsets for
 Set([test_stream,5]))



 java.lang.ClassNotFoundException:
 kafka.common.NotLeaderForPartitionException

 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)



 java.util.concurrent.RejectedExecutionException: Task

 org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8
 rejected from java.util.concurrent.ThreadPoolExecutor@543258e0
 [Terminated,
 pool size = 0, active threads = 0, queued tasks = 0, completed tasks =
 12112]



 org.apache.spark.SparkException: Job aborted due to stage failure: Task
 10
 in stage 52.0 failed 4 times, most recent failure: Lost task 10.3 in
 stage
 52.0 (TID 255, 172.16.97.97): UnknownReason

 Exception in thread "streaming-job-executor-0" java.lang.Error:
 java.lang.InterruptedException

 Caused by: java.lang.InterruptedException

 java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException

 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)



 org.apache.spark.SparkException: Job aborted due to stage failure: Task
 7 in
 stage 33.0 failed 4 times, most recent failure: Lost task 7.3 in stage
 33.0
 (TID 283, 172.16.97.103): UnknownReason

 java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException

 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)

 java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException

 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Recovery-for-Spark-Streaming-Kafka-Direct-in-case-of-issues-with-Kafka-tp25524.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


>>>
>>
>


Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka

2015-12-01 Thread Cody Koeninger
KafkaRDD.scala , handleFetchErr

On Tue, Dec 1, 2015 at 3:39 PM, swetha kasireddy 
wrote:

> Hi Cody,
>
> How to look at Option 2(see the following)? Which portion of the code in
> Spark Kafka Direct to look at to handle this issue specific to our
> requirements.
>
>
> 2.Catch that exception and somehow force things to "reset" for that
> partition And how would it handle the offsets already calculated in the
> backlog (if there is one)?
>
> On Tue, Dec 1, 2015 at 6:51 AM, Cody Koeninger  wrote:
>
>> If you're consistently getting offset out of range exceptions, it's
>> probably because messages are getting deleted before you've processed them.
>>
>> The only real way to deal with this is give kafka more retention, consume
>> faster, or both.
>>
>> If you're just looking for a quick "fix" for an infrequent issue, option
>> 4 is probably easiest.  I wouldn't do that automatically / silently,
>> because you're losing data.
>>
>> On Mon, Nov 30, 2015 at 6:22 PM, SRK  wrote:
>>
>>> Hi,
>>>
>>> So, our Streaming Job fails with the following errors. If you see the
>>> errors
>>> below, they are all related to Kafka losing offsets and
>>> OffsetOutOfRangeException.
>>>
>>> What are the options we have other than fixing Kafka? We would like to do
>>> something like the following. How can we achieve 1 and 2 with Spark Kafka
>>> Direct?
>>>
>>> 1.Need to see a way to skip some offsets if they are not available after
>>> the
>>> max retries are reached..in that case there might be data loss.
>>>
>>> 2.Catch that exception and somehow force things to "reset" for that
>>> partition And how would it handle the offsets already calculated in the
>>> backlog (if there is one)?
>>>
>>> 3.Track the offsets separately, restart the job by providing the offsets.
>>>
>>> 4.Or a straightforward approach would be to monitor the log for this
>>> error,
>>> and if it occurs more than X times, kill the job, remove the checkpoint
>>> directory, and restart.
>>>
>>> ERROR DirectKafkaInputDStream: ArrayBuffer(kafka.common.UnknownException,
>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>> Set([test_stream,5]))
>>>
>>>
>>>
>>> java.lang.ClassNotFoundException:
>>> kafka.common.NotLeaderForPartitionException
>>>
>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>
>>>
>>>
>>> java.util.concurrent.RejectedExecutionException: Task
>>>
>>> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8
>>> rejected from java.util.concurrent.ThreadPoolExecutor@543258e0
>>> [Terminated,
>>> pool size = 0, active threads = 0, queued tasks = 0, completed tasks =
>>> 12112]
>>>
>>>
>>>
>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>> 10
>>> in stage 52.0 failed 4 times, most recent failure: Lost task 10.3 in
>>> stage
>>> 52.0 (TID 255, 172.16.97.97): UnknownReason
>>>
>>> Exception in thread "streaming-job-executor-0" java.lang.Error:
>>> java.lang.InterruptedException
>>>
>>> Caused by: java.lang.InterruptedException
>>>
>>> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
>>>
>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>
>>>
>>>
>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>> 7 in
>>> stage 33.0 failed 4 times, most recent failure: Lost task 7.3 in stage
>>> 33.0
>>> (TID 283, 172.16.97.103): UnknownReason
>>>
>>> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
>>>
>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>
>>> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
>>>
>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Recovery-for-Spark-Streaming-Kafka-Direct-in-case-of-issues-with-Kafka-tp25524.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka

2015-12-01 Thread swetha kasireddy
Following is the Option 2 that I was talking about:

2.Catch that exception and somehow force things to "reset" for that
partition And how would it handle the offsets already calculated in the
backlog (if there is one)?

On Tue, Dec 1, 2015 at 1:39 PM, swetha kasireddy 
wrote:

> Hi Cody,
>
> How to look at Option 2(see the following)? Which portion of the code in
> Spark Kafka Direct to look at to handle this issue specific to our
> requirements.
>
>
> 2.Catch that exception and somehow force things to "reset" for that
> partition And how would it handle the offsets already calculated in the
> backlog (if there is one)?
>
> On Tue, Dec 1, 2015 at 6:51 AM, Cody Koeninger  wrote:
>
>> If you're consistently getting offset out of range exceptions, it's
>> probably because messages are getting deleted before you've processed them.
>>
>> The only real way to deal with this is give kafka more retention, consume
>> faster, or both.
>>
>> If you're just looking for a quick "fix" for an infrequent issue, option
>> 4 is probably easiest.  I wouldn't do that automatically / silently,
>> because you're losing data.
>>
>> On Mon, Nov 30, 2015 at 6:22 PM, SRK  wrote:
>>
>>> Hi,
>>>
>>> So, our Streaming Job fails with the following errors. If you see the
>>> errors
>>> below, they are all related to Kafka losing offsets and
>>> OffsetOutOfRangeException.
>>>
>>> What are the options we have other than fixing Kafka? We would like to do
>>> something like the following. How can we achieve 1 and 2 with Spark Kafka
>>> Direct?
>>>
>>> 1.Need to see a way to skip some offsets if they are not available after
>>> the
>>> max retries are reached..in that case there might be data loss.
>>>
>>> 2.Catch that exception and somehow force things to "reset" for that
>>> partition And how would it handle the offsets already calculated in the
>>> backlog (if there is one)?
>>>
>>> 3.Track the offsets separately, restart the job by providing the offsets.
>>>
>>> 4.Or a straightforward approach would be to monitor the log for this
>>> error,
>>> and if it occurs more than X times, kill the job, remove the checkpoint
>>> directory, and restart.
>>>
>>> ERROR DirectKafkaInputDStream: ArrayBuffer(kafka.common.UnknownException,
>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>> Set([test_stream,5]))
>>>
>>>
>>>
>>> java.lang.ClassNotFoundException:
>>> kafka.common.NotLeaderForPartitionException
>>>
>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>
>>>
>>>
>>> java.util.concurrent.RejectedExecutionException: Task
>>>
>>> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8
>>> rejected from java.util.concurrent.ThreadPoolExecutor@543258e0
>>> [Terminated,
>>> pool size = 0, active threads = 0, queued tasks = 0, completed tasks =
>>> 12112]
>>>
>>>
>>>
>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>> 10
>>> in stage 52.0 failed 4 times, most recent failure: Lost task 10.3 in
>>> stage
>>> 52.0 (TID 255, 172.16.97.97): UnknownReason
>>>
>>> Exception in thread "streaming-job-executor-0" java.lang.Error:
>>> java.lang.InterruptedException
>>>
>>> Caused by: java.lang.InterruptedException
>>>
>>> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
>>>
>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>
>>>
>>>
>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>> 7 in
>>> stage 33.0 failed 4 times, most recent failure: Lost task 7.3 in stage
>>> 33.0
>>> (TID 283, 172.16.97.103): UnknownReason
>>>
>>> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
>>>
>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>
>>> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
>>>
>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Recovery-for-Spark-Streaming-Kafka-Direct-in-case-of-issues-with-Kafka-tp25524.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka

2015-12-01 Thread Dibyendu Bhattacharya
Hi, if you use Receiver based consumer which is available in spark-packages
( http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) , this
has all built in failure recovery and it can recover from any Kafka leader
changes and offset out of ranges issue.

Here is the package form github :
https://github.com/dibbhatt/kafka-spark-consumer


Dibyendu

On Wed, Dec 2, 2015 at 5:28 AM, swetha kasireddy 
wrote:

> How to avoid those Errors with receiver based approach? Suppose we are OK
> with at least once processing and use receiver based approach which uses
> ZooKeeper but not query Kafka directly, would these errors(Couldn't find
> leader offsets for
> Set([test_stream,5])))be avoided?
>
> On Tue, Dec 1, 2015 at 3:40 PM, Cody Koeninger  wrote:
>
>> KafkaRDD.scala , handleFetchErr
>>
>> On Tue, Dec 1, 2015 at 3:39 PM, swetha kasireddy <
>> swethakasire...@gmail.com> wrote:
>>
>>> Hi Cody,
>>>
>>> How to look at Option 2(see the following)? Which portion of the code in
>>> Spark Kafka Direct to look at to handle this issue specific to our
>>> requirements.
>>>
>>>
>>> 2.Catch that exception and somehow force things to "reset" for that
>>> partition And how would it handle the offsets already calculated in the
>>> backlog (if there is one)?
>>>
>>> On Tue, Dec 1, 2015 at 6:51 AM, Cody Koeninger 
>>> wrote:
>>>
 If you're consistently getting offset out of range exceptions, it's
 probably because messages are getting deleted before you've processed them.

 The only real way to deal with this is give kafka more retention,
 consume faster, or both.

 If you're just looking for a quick "fix" for an infrequent issue,
 option 4 is probably easiest.  I wouldn't do that automatically / silently,
 because you're losing data.

 On Mon, Nov 30, 2015 at 6:22 PM, SRK  wrote:

> Hi,
>
> So, our Streaming Job fails with the following errors. If you see the
> errors
> below, they are all related to Kafka losing offsets and
> OffsetOutOfRangeException.
>
> What are the options we have other than fixing Kafka? We would like to
> do
> something like the following. How can we achieve 1 and 2 with Spark
> Kafka
> Direct?
>
> 1.Need to see a way to skip some offsets if they are not available
> after the
> max retries are reached..in that case there might be data loss.
>
> 2.Catch that exception and somehow force things to "reset" for that
> partition And how would it handle the offsets already calculated in the
> backlog (if there is one)?
>
> 3.Track the offsets separately, restart the job by providing the
> offsets.
>
> 4.Or a straightforward approach would be to monitor the log for this
> error,
> and if it occurs more than X times, kill the job, remove the checkpoint
> directory, and restart.
>
> ERROR DirectKafkaInputDStream:
> ArrayBuffer(kafka.common.UnknownException,
> org.apache.spark.SparkException: Couldn't find leader offsets for
> Set([test_stream,5]))
>
>
>
> java.lang.ClassNotFoundException:
> kafka.common.NotLeaderForPartitionException
>
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>
>
>
> java.util.concurrent.RejectedExecutionException: Task
>
> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8
> rejected from java.util.concurrent.ThreadPoolExecutor@543258e0
> [Terminated,
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks =
> 12112]
>
>
>
> org.apache.spark.SparkException: Job aborted due to stage failure:
> Task 10
> in stage 52.0 failed 4 times, most recent failure: Lost task 10.3 in
> stage
> 52.0 (TID 255, 172.16.97.97): UnknownReason
>
> Exception in thread "streaming-job-executor-0" java.lang.Error:
> java.lang.InterruptedException
>
> Caused by: java.lang.InterruptedException
>
> java.lang.ClassNotFoundException:
> kafka.common.OffsetOutOfRangeException
>
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>
>
>
> org.apache.spark.SparkException: Job aborted due to stage failure:
> Task 7 in
> stage 33.0 failed 4 times, most recent failure: Lost task 7.3 in stage
> 33.0
> (TID 283, 172.16.97.103): UnknownReason
>
> java.lang.ClassNotFoundException:
> kafka.common.OffsetOutOfRangeException
>
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>
> java.lang.ClassNotFoundException:
> kafka.common.OffsetOutOfRangeException
>
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>
>
>
> --
> View this message in context:
> 

Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka

2015-12-01 Thread Cody Koeninger
If you're consistently getting offset out of range exceptions, it's
probably because messages are getting deleted before you've processed them.

The only real way to deal with this is give kafka more retention, consume
faster, or both.

If you're just looking for a quick "fix" for an infrequent issue, option 4
is probably easiest.  I wouldn't do that automatically / silently, because
you're losing data.

On Mon, Nov 30, 2015 at 6:22 PM, SRK  wrote:

> Hi,
>
> So, our Streaming Job fails with the following errors. If you see the
> errors
> below, they are all related to Kafka losing offsets and
> OffsetOutOfRangeException.
>
> What are the options we have other than fixing Kafka? We would like to do
> something like the following. How can we achieve 1 and 2 with Spark Kafka
> Direct?
>
> 1.Need to see a way to skip some offsets if they are not available after
> the
> max retries are reached..in that case there might be data loss.
>
> 2.Catch that exception and somehow force things to "reset" for that
> partition And how would it handle the offsets already calculated in the
> backlog (if there is one)?
>
> 3.Track the offsets separately, restart the job by providing the offsets.
>
> 4.Or a straightforward approach would be to monitor the log for this error,
> and if it occurs more than X times, kill the job, remove the checkpoint
> directory, and restart.
>
> ERROR DirectKafkaInputDStream: ArrayBuffer(kafka.common.UnknownException,
> org.apache.spark.SparkException: Couldn't find leader offsets for
> Set([test_stream,5]))
>
>
>
> java.lang.ClassNotFoundException:
> kafka.common.NotLeaderForPartitionException
>
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>
>
>
> java.util.concurrent.RejectedExecutionException: Task
> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8
> rejected from java.util.concurrent.ThreadPoolExecutor@543258e0[Terminated,
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks =
> 12112]
>
>
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 10
> in stage 52.0 failed 4 times, most recent failure: Lost task 10.3 in stage
> 52.0 (TID 255, 172.16.97.97): UnknownReason
>
> Exception in thread "streaming-job-executor-0" java.lang.Error:
> java.lang.InterruptedException
>
> Caused by: java.lang.InterruptedException
>
> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
>
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>
>
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 7
> in
> stage 33.0 failed 4 times, most recent failure: Lost task 7.3 in stage 33.0
> (TID 283, 172.16.97.103): UnknownReason
>
> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
>
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>
> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
>
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Recovery-for-Spark-Streaming-Kafka-Direct-in-case-of-issues-with-Kafka-tp25524.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>