Well, even if you do correct retention and increase speed, OffsetOutOfRange can still come depends on how your downstream processing is. And if that happen , there is No Other way to recover old messages . So best bet here from Streaming Job point of view is to start from earliest offset rather bring down the streaming job . In many cases goal for a streaming job is not to shut down and exit in case of any failure. I believe that is what differentiate a always running streaming job.
Dibyendu On Thu, Dec 3, 2015 at 8:26 AM, Cody Koeninger <c...@koeninger.org> wrote: > No, silently restarting from the earliest offset in the case of offset out > of range exceptions during a streaming job is not the "correct way of > recovery". > > If you do that, your users will be losing data without knowing why. It's > more like a "way of ignoring the problem without actually addressing it". > > The only really correct way to deal with that situation is to recognize > why it's happening, and either increase your Kafka retention or increase > the speed at which you are consuming. > > On Wed, Dec 2, 2015 at 7:13 PM, Dibyendu Bhattacharya < > dibyendu.bhattach...@gmail.com> wrote: > >> This consumer which I mentioned does not silently throw away data. If >> offset out of range it start for earliest offset and that is correct way of >> recovery from this error. >> >> Dibyendu >> On Dec 2, 2015 9:56 PM, "Cody Koeninger" <c...@koeninger.org> wrote: >> >>> Again, just to be clear, silently throwing away data because your system >>> isn't working right is not the same as "recover from any Kafka leader >>> changes and offset out of ranges issue". >>> >>> >>> >>> On Tue, Dec 1, 2015 at 11:27 PM, Dibyendu Bhattacharya < >>> dibyendu.bhattach...@gmail.com> wrote: >>> >>>> Hi, if you use Receiver based consumer which is available in >>>> spark-packages ( >>>> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) , >>>> this has all built in failure recovery and it can recover from any Kafka >>>> leader changes and offset out of ranges issue. >>>> >>>> Here is the package form github : >>>> https://github.com/dibbhatt/kafka-spark-consumer >>>> >>>> >>>> Dibyendu >>>> >>>> On Wed, Dec 2, 2015 at 5:28 AM, swetha kasireddy < >>>> swethakasire...@gmail.com> wrote: >>>> >>>>> How to avoid those Errors with receiver based approach? Suppose we are >>>>> OK with at least once processing and use receiver based approach which >>>>> uses >>>>> ZooKeeper but not query Kafka directly, would these errors(Couldn't >>>>> find leader offsets for >>>>> Set([test_stream,5]))) be avoided? >>>>> >>>>> On Tue, Dec 1, 2015 at 3:40 PM, Cody Koeninger <c...@koeninger.org> >>>>> wrote: >>>>> >>>>>> KafkaRDD.scala , handleFetchErr >>>>>> >>>>>> On Tue, Dec 1, 2015 at 3:39 PM, swetha kasireddy < >>>>>> swethakasire...@gmail.com> wrote: >>>>>> >>>>>>> Hi Cody, >>>>>>> >>>>>>> How to look at Option 2(see the following)? Which portion of the >>>>>>> code in Spark Kafka Direct to look at to handle this issue specific to >>>>>>> our >>>>>>> requirements. >>>>>>> >>>>>>> >>>>>>> 2.Catch that exception and somehow force things to "reset" for that >>>>>>> partition And how would it handle the offsets already calculated in >>>>>>> the >>>>>>> backlog (if there is one)? >>>>>>> >>>>>>> On Tue, Dec 1, 2015 at 6:51 AM, Cody Koeninger <c...@koeninger.org> >>>>>>> wrote: >>>>>>> >>>>>>>> If you're consistently getting offset out of range exceptions, it's >>>>>>>> probably because messages are getting deleted before you've processed >>>>>>>> them. >>>>>>>> >>>>>>>> The only real way to deal with this is give kafka more retention, >>>>>>>> consume faster, or both. >>>>>>>> >>>>>>>> If you're just looking for a quick "fix" for an infrequent issue, >>>>>>>> option 4 is probably easiest. I wouldn't do that automatically / >>>>>>>> silently, >>>>>>>> because you're losing data. >>>>>>>> >>>>>>>> On Mon, Nov 30, 2015 at 6:22 PM, SRK <swethakasire...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> So, our Streaming Job fails with the following errors. If you see >>>>>>>>> the errors >>>>>>>>> below, they are all related to Kafka losing offsets and >>>>>>>>> OffsetOutOfRangeException. >>>>>>>>> >>>>>>>>> What are the options we have other than fixing Kafka? We would >>>>>>>>> like to do >>>>>>>>> something like the following. How can we achieve 1 and 2 with >>>>>>>>> Spark Kafka >>>>>>>>> Direct? >>>>>>>>> >>>>>>>>> 1.Need to see a way to skip some offsets if they are not available >>>>>>>>> after the >>>>>>>>> max retries are reached..in that case there might be data loss. >>>>>>>>> >>>>>>>>> 2.Catch that exception and somehow force things to "reset" for that >>>>>>>>> partition And how would it handle the offsets already calculated >>>>>>>>> in the >>>>>>>>> backlog (if there is one)? >>>>>>>>> >>>>>>>>> 3.Track the offsets separately, restart the job by providing the >>>>>>>>> offsets. >>>>>>>>> >>>>>>>>> 4.Or a straightforward approach would be to monitor the log for >>>>>>>>> this error, >>>>>>>>> and if it occurs more than X times, kill the job, remove the >>>>>>>>> checkpoint >>>>>>>>> directory, and restart. >>>>>>>>> >>>>>>>>> ERROR DirectKafkaInputDStream: >>>>>>>>> ArrayBuffer(kafka.common.UnknownException, >>>>>>>>> org.apache.spark.SparkException: Couldn't find leader offsets for >>>>>>>>> Set([test_stream,5])) >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> java.lang.ClassNotFoundException: >>>>>>>>> kafka.common.NotLeaderForPartitionException >>>>>>>>> >>>>>>>>> at >>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> java.util.concurrent.RejectedExecutionException: Task >>>>>>>>> >>>>>>>>> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8 >>>>>>>>> rejected from java.util.concurrent.ThreadPoolExecutor@543258e0 >>>>>>>>> [Terminated, >>>>>>>>> pool size = 0, active threads = 0, queued tasks = 0, completed >>>>>>>>> tasks = >>>>>>>>> 12112] >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure: >>>>>>>>> Task 10 >>>>>>>>> in stage 52.0 failed 4 times, most recent failure: Lost task 10.3 >>>>>>>>> in stage >>>>>>>>> 52.0 (TID 255, 172.16.97.97): UnknownReason >>>>>>>>> >>>>>>>>> Exception in thread "streaming-job-executor-0" java.lang.Error: >>>>>>>>> java.lang.InterruptedException >>>>>>>>> >>>>>>>>> Caused by: java.lang.InterruptedException >>>>>>>>> >>>>>>>>> java.lang.ClassNotFoundException: >>>>>>>>> kafka.common.OffsetOutOfRangeException >>>>>>>>> >>>>>>>>> at >>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure: >>>>>>>>> Task 7 in >>>>>>>>> stage 33.0 failed 4 times, most recent failure: Lost task 7.3 in >>>>>>>>> stage 33.0 >>>>>>>>> (TID 283, 172.16.97.103): UnknownReason >>>>>>>>> >>>>>>>>> java.lang.ClassNotFoundException: >>>>>>>>> kafka.common.OffsetOutOfRangeException >>>>>>>>> >>>>>>>>> at >>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) >>>>>>>>> >>>>>>>>> java.lang.ClassNotFoundException: >>>>>>>>> kafka.common.OffsetOutOfRangeException >>>>>>>>> >>>>>>>>> at >>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> View this message in context: >>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Recovery-for-Spark-Streaming-Kafka-Direct-in-case-of-issues-with-Kafka-tp25524.html >>>>>>>>> Sent from the Apache Spark User List mailing list archive at >>>>>>>>> Nabble.com. >>>>>>>>> >>>>>>>>> >>>>>>>>> --------------------------------------------------------------------- >>>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >