Well, even if you do correct retention and increase speed, OffsetOutOfRange
can still come depends on how your downstream processing is. And if that
happen , there is No Other way to recover old messages . So best bet here
from Streaming Job point of view  is to start from earliest offset rather
bring down the streaming job . In many cases goal for a streaming job is
not to shut down and exit in case of any failure. I believe that is what
differentiate a always running streaming job.

Dibyendu

On Thu, Dec 3, 2015 at 8:26 AM, Cody Koeninger <c...@koeninger.org> wrote:

> No, silently restarting from the earliest offset in the case of offset out
> of range exceptions during a streaming job is not the "correct way of
> recovery".
>
> If you do that, your users will be losing data without knowing why.  It's
> more like  a "way of ignoring the problem without actually addressing it".
>
> The only really correct way to deal with that situation is to recognize
> why it's happening, and either increase your Kafka retention or increase
> the speed at which you are consuming.
>
> On Wed, Dec 2, 2015 at 7:13 PM, Dibyendu Bhattacharya <
> dibyendu.bhattach...@gmail.com> wrote:
>
>> This consumer which I mentioned does not silently throw away data. If
>> offset out of range it start for earliest offset and that is correct way of
>> recovery from this error.
>>
>> Dibyendu
>> On Dec 2, 2015 9:56 PM, "Cody Koeninger" <c...@koeninger.org> wrote:
>>
>>> Again, just to be clear, silently throwing away data because your system
>>> isn't working right is not the same as "recover from any Kafka leader
>>> changes and offset out of ranges issue".
>>>
>>>
>>>
>>> On Tue, Dec 1, 2015 at 11:27 PM, Dibyendu Bhattacharya <
>>> dibyendu.bhattach...@gmail.com> wrote:
>>>
>>>> Hi, if you use Receiver based consumer which is available in
>>>> spark-packages (
>>>> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) ,
>>>> this has all built in failure recovery and it can recover from any Kafka
>>>> leader changes and offset out of ranges issue.
>>>>
>>>> Here is the package form github :
>>>> https://github.com/dibbhatt/kafka-spark-consumer
>>>>
>>>>
>>>> Dibyendu
>>>>
>>>> On Wed, Dec 2, 2015 at 5:28 AM, swetha kasireddy <
>>>> swethakasire...@gmail.com> wrote:
>>>>
>>>>> How to avoid those Errors with receiver based approach? Suppose we are
>>>>> OK with at least once processing and use receiver based approach which 
>>>>> uses
>>>>> ZooKeeper but not query Kafka directly, would these errors(Couldn't
>>>>> find leader offsets for
>>>>> Set([test_stream,5])))    be avoided?
>>>>>
>>>>> On Tue, Dec 1, 2015 at 3:40 PM, Cody Koeninger <c...@koeninger.org>
>>>>> wrote:
>>>>>
>>>>>> KafkaRDD.scala , handleFetchErr
>>>>>>
>>>>>> On Tue, Dec 1, 2015 at 3:39 PM, swetha kasireddy <
>>>>>> swethakasire...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Cody,
>>>>>>>
>>>>>>> How to look at Option 2(see the following)? Which portion of the
>>>>>>> code in Spark Kafka Direct to look at to handle this issue specific to 
>>>>>>> our
>>>>>>> requirements.
>>>>>>>
>>>>>>>
>>>>>>> 2.Catch that exception and somehow force things to "reset" for that
>>>>>>> partition And how would it handle the offsets already calculated in
>>>>>>> the
>>>>>>> backlog (if there is one)?
>>>>>>>
>>>>>>> On Tue, Dec 1, 2015 at 6:51 AM, Cody Koeninger <c...@koeninger.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> If you're consistently getting offset out of range exceptions, it's
>>>>>>>> probably because messages are getting deleted before you've processed 
>>>>>>>> them.
>>>>>>>>
>>>>>>>> The only real way to deal with this is give kafka more retention,
>>>>>>>> consume faster, or both.
>>>>>>>>
>>>>>>>> If you're just looking for a quick "fix" for an infrequent issue,
>>>>>>>> option 4 is probably easiest.  I wouldn't do that automatically / 
>>>>>>>> silently,
>>>>>>>> because you're losing data.
>>>>>>>>
>>>>>>>> On Mon, Nov 30, 2015 at 6:22 PM, SRK <swethakasire...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> So, our Streaming Job fails with the following errors. If you see
>>>>>>>>> the errors
>>>>>>>>> below, they are all related to Kafka losing offsets and
>>>>>>>>> OffsetOutOfRangeException.
>>>>>>>>>
>>>>>>>>> What are the options we have other than fixing Kafka? We would
>>>>>>>>> like to do
>>>>>>>>> something like the following. How can we achieve 1 and 2 with
>>>>>>>>> Spark Kafka
>>>>>>>>> Direct?
>>>>>>>>>
>>>>>>>>> 1.Need to see a way to skip some offsets if they are not available
>>>>>>>>> after the
>>>>>>>>> max retries are reached..in that case there might be data loss.
>>>>>>>>>
>>>>>>>>> 2.Catch that exception and somehow force things to "reset" for that
>>>>>>>>> partition And how would it handle the offsets already calculated
>>>>>>>>> in the
>>>>>>>>> backlog (if there is one)?
>>>>>>>>>
>>>>>>>>> 3.Track the offsets separately, restart the job by providing the
>>>>>>>>> offsets.
>>>>>>>>>
>>>>>>>>> 4.Or a straightforward approach would be to monitor the log for
>>>>>>>>> this error,
>>>>>>>>> and if it occurs more than X times, kill the job, remove the
>>>>>>>>> checkpoint
>>>>>>>>> directory, and restart.
>>>>>>>>>
>>>>>>>>> ERROR DirectKafkaInputDStream:
>>>>>>>>> ArrayBuffer(kafka.common.UnknownException,
>>>>>>>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>>>>>>>> Set([test_stream,5]))
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> java.lang.ClassNotFoundException:
>>>>>>>>> kafka.common.NotLeaderForPartitionException
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> java.util.concurrent.RejectedExecutionException: Task
>>>>>>>>>
>>>>>>>>> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8
>>>>>>>>> rejected from java.util.concurrent.ThreadPoolExecutor@543258e0
>>>>>>>>> [Terminated,
>>>>>>>>> pool size = 0, active threads = 0, queued tasks = 0, completed
>>>>>>>>> tasks =
>>>>>>>>> 12112]
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>>>>>>>> Task 10
>>>>>>>>> in stage 52.0 failed 4 times, most recent failure: Lost task 10.3
>>>>>>>>> in stage
>>>>>>>>> 52.0 (TID 255, 172.16.97.97): UnknownReason
>>>>>>>>>
>>>>>>>>> Exception in thread "streaming-job-executor-0" java.lang.Error:
>>>>>>>>> java.lang.InterruptedException
>>>>>>>>>
>>>>>>>>> Caused by: java.lang.InterruptedException
>>>>>>>>>
>>>>>>>>> java.lang.ClassNotFoundException:
>>>>>>>>> kafka.common.OffsetOutOfRangeException
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>>>>>>>> Task 7 in
>>>>>>>>> stage 33.0 failed 4 times, most recent failure: Lost task 7.3 in
>>>>>>>>> stage 33.0
>>>>>>>>> (TID 283, 172.16.97.103): UnknownReason
>>>>>>>>>
>>>>>>>>> java.lang.ClassNotFoundException:
>>>>>>>>> kafka.common.OffsetOutOfRangeException
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>>>
>>>>>>>>> java.lang.ClassNotFoundException:
>>>>>>>>> kafka.common.OffsetOutOfRangeException
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> View this message in context:
>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Recovery-for-Spark-Streaming-Kafka-Direct-in-case-of-issues-with-Kafka-tp25524.html
>>>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>>>> Nabble.com.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>

Reply via email to