Hi, if you use Receiver based consumer which is available in spark-packages
( http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) , this
has all built in failure recovery and it can recover from any Kafka leader
changes and offset out of ranges issue.

Here is the package form github :
https://github.com/dibbhatt/kafka-spark-consumer


Dibyendu

On Wed, Dec 2, 2015 at 5:28 AM, swetha kasireddy <swethakasire...@gmail.com>
wrote:

> How to avoid those Errors with receiver based approach? Suppose we are OK
> with at least once processing and use receiver based approach which uses
> ZooKeeper but not query Kafka directly, would these errors(Couldn't find
> leader offsets for
> Set([test_stream,5])))    be avoided?
>
> On Tue, Dec 1, 2015 at 3:40 PM, Cody Koeninger <c...@koeninger.org> wrote:
>
>> KafkaRDD.scala , handleFetchErr
>>
>> On Tue, Dec 1, 2015 at 3:39 PM, swetha kasireddy <
>> swethakasire...@gmail.com> wrote:
>>
>>> Hi Cody,
>>>
>>> How to look at Option 2(see the following)? Which portion of the code in
>>> Spark Kafka Direct to look at to handle this issue specific to our
>>> requirements.
>>>
>>>
>>> 2.Catch that exception and somehow force things to "reset" for that
>>> partition And how would it handle the offsets already calculated in the
>>> backlog (if there is one)?
>>>
>>> On Tue, Dec 1, 2015 at 6:51 AM, Cody Koeninger <c...@koeninger.org>
>>> wrote:
>>>
>>>> If you're consistently getting offset out of range exceptions, it's
>>>> probably because messages are getting deleted before you've processed them.
>>>>
>>>> The only real way to deal with this is give kafka more retention,
>>>> consume faster, or both.
>>>>
>>>> If you're just looking for a quick "fix" for an infrequent issue,
>>>> option 4 is probably easiest.  I wouldn't do that automatically / silently,
>>>> because you're losing data.
>>>>
>>>> On Mon, Nov 30, 2015 at 6:22 PM, SRK <swethakasire...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> So, our Streaming Job fails with the following errors. If you see the
>>>>> errors
>>>>> below, they are all related to Kafka losing offsets and
>>>>> OffsetOutOfRangeException.
>>>>>
>>>>> What are the options we have other than fixing Kafka? We would like to
>>>>> do
>>>>> something like the following. How can we achieve 1 and 2 with Spark
>>>>> Kafka
>>>>> Direct?
>>>>>
>>>>> 1.Need to see a way to skip some offsets if they are not available
>>>>> after the
>>>>> max retries are reached..in that case there might be data loss.
>>>>>
>>>>> 2.Catch that exception and somehow force things to "reset" for that
>>>>> partition And how would it handle the offsets already calculated in the
>>>>> backlog (if there is one)?
>>>>>
>>>>> 3.Track the offsets separately, restart the job by providing the
>>>>> offsets.
>>>>>
>>>>> 4.Or a straightforward approach would be to monitor the log for this
>>>>> error,
>>>>> and if it occurs more than X times, kill the job, remove the checkpoint
>>>>> directory, and restart.
>>>>>
>>>>> ERROR DirectKafkaInputDStream:
>>>>> ArrayBuffer(kafka.common.UnknownException,
>>>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>>>> Set([test_stream,5]))
>>>>>
>>>>>
>>>>>
>>>>> java.lang.ClassNotFoundException:
>>>>> kafka.common.NotLeaderForPartitionException
>>>>>
>>>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>
>>>>>
>>>>>
>>>>> java.util.concurrent.RejectedExecutionException: Task
>>>>>
>>>>> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8
>>>>> rejected from java.util.concurrent.ThreadPoolExecutor@543258e0
>>>>> [Terminated,
>>>>> pool size = 0, active threads = 0, queued tasks = 0, completed tasks =
>>>>> 12112]
>>>>>
>>>>>
>>>>>
>>>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>>>> Task 10
>>>>> in stage 52.0 failed 4 times, most recent failure: Lost task 10.3 in
>>>>> stage
>>>>> 52.0 (TID 255, 172.16.97.97): UnknownReason
>>>>>
>>>>> Exception in thread "streaming-job-executor-0" java.lang.Error:
>>>>> java.lang.InterruptedException
>>>>>
>>>>> Caused by: java.lang.InterruptedException
>>>>>
>>>>> java.lang.ClassNotFoundException:
>>>>> kafka.common.OffsetOutOfRangeException
>>>>>
>>>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>
>>>>>
>>>>>
>>>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>>>> Task 7 in
>>>>> stage 33.0 failed 4 times, most recent failure: Lost task 7.3 in stage
>>>>> 33.0
>>>>> (TID 283, 172.16.97.103): UnknownReason
>>>>>
>>>>> java.lang.ClassNotFoundException:
>>>>> kafka.common.OffsetOutOfRangeException
>>>>>
>>>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>
>>>>> java.lang.ClassNotFoundException:
>>>>> kafka.common.OffsetOutOfRangeException
>>>>>
>>>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> View this message in context:
>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Recovery-for-Spark-Streaming-Kafka-Direct-in-case-of-issues-with-Kafka-tp25524.html
>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>> Nabble.com.
>>>>>
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to