Hi, if you use Receiver based consumer which is available in spark-packages ( http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) , this has all built in failure recovery and it can recover from any Kafka leader changes and offset out of ranges issue.
Here is the package form github : https://github.com/dibbhatt/kafka-spark-consumer Dibyendu On Wed, Dec 2, 2015 at 5:28 AM, swetha kasireddy <swethakasire...@gmail.com> wrote: > How to avoid those Errors with receiver based approach? Suppose we are OK > with at least once processing and use receiver based approach which uses > ZooKeeper but not query Kafka directly, would these errors(Couldn't find > leader offsets for > Set([test_stream,5]))) be avoided? > > On Tue, Dec 1, 2015 at 3:40 PM, Cody Koeninger <c...@koeninger.org> wrote: > >> KafkaRDD.scala , handleFetchErr >> >> On Tue, Dec 1, 2015 at 3:39 PM, swetha kasireddy < >> swethakasire...@gmail.com> wrote: >> >>> Hi Cody, >>> >>> How to look at Option 2(see the following)? Which portion of the code in >>> Spark Kafka Direct to look at to handle this issue specific to our >>> requirements. >>> >>> >>> 2.Catch that exception and somehow force things to "reset" for that >>> partition And how would it handle the offsets already calculated in the >>> backlog (if there is one)? >>> >>> On Tue, Dec 1, 2015 at 6:51 AM, Cody Koeninger <c...@koeninger.org> >>> wrote: >>> >>>> If you're consistently getting offset out of range exceptions, it's >>>> probably because messages are getting deleted before you've processed them. >>>> >>>> The only real way to deal with this is give kafka more retention, >>>> consume faster, or both. >>>> >>>> If you're just looking for a quick "fix" for an infrequent issue, >>>> option 4 is probably easiest. I wouldn't do that automatically / silently, >>>> because you're losing data. >>>> >>>> On Mon, Nov 30, 2015 at 6:22 PM, SRK <swethakasire...@gmail.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> So, our Streaming Job fails with the following errors. If you see the >>>>> errors >>>>> below, they are all related to Kafka losing offsets and >>>>> OffsetOutOfRangeException. >>>>> >>>>> What are the options we have other than fixing Kafka? We would like to >>>>> do >>>>> something like the following. How can we achieve 1 and 2 with Spark >>>>> Kafka >>>>> Direct? >>>>> >>>>> 1.Need to see a way to skip some offsets if they are not available >>>>> after the >>>>> max retries are reached..in that case there might be data loss. >>>>> >>>>> 2.Catch that exception and somehow force things to "reset" for that >>>>> partition And how would it handle the offsets already calculated in the >>>>> backlog (if there is one)? >>>>> >>>>> 3.Track the offsets separately, restart the job by providing the >>>>> offsets. >>>>> >>>>> 4.Or a straightforward approach would be to monitor the log for this >>>>> error, >>>>> and if it occurs more than X times, kill the job, remove the checkpoint >>>>> directory, and restart. >>>>> >>>>> ERROR DirectKafkaInputDStream: >>>>> ArrayBuffer(kafka.common.UnknownException, >>>>> org.apache.spark.SparkException: Couldn't find leader offsets for >>>>> Set([test_stream,5])) >>>>> >>>>> >>>>> >>>>> java.lang.ClassNotFoundException: >>>>> kafka.common.NotLeaderForPartitionException >>>>> >>>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) >>>>> >>>>> >>>>> >>>>> java.util.concurrent.RejectedExecutionException: Task >>>>> >>>>> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8 >>>>> rejected from java.util.concurrent.ThreadPoolExecutor@543258e0 >>>>> [Terminated, >>>>> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = >>>>> 12112] >>>>> >>>>> >>>>> >>>>> org.apache.spark.SparkException: Job aborted due to stage failure: >>>>> Task 10 >>>>> in stage 52.0 failed 4 times, most recent failure: Lost task 10.3 in >>>>> stage >>>>> 52.0 (TID 255, 172.16.97.97): UnknownReason >>>>> >>>>> Exception in thread "streaming-job-executor-0" java.lang.Error: >>>>> java.lang.InterruptedException >>>>> >>>>> Caused by: java.lang.InterruptedException >>>>> >>>>> java.lang.ClassNotFoundException: >>>>> kafka.common.OffsetOutOfRangeException >>>>> >>>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) >>>>> >>>>> >>>>> >>>>> org.apache.spark.SparkException: Job aborted due to stage failure: >>>>> Task 7 in >>>>> stage 33.0 failed 4 times, most recent failure: Lost task 7.3 in stage >>>>> 33.0 >>>>> (TID 283, 172.16.97.103): UnknownReason >>>>> >>>>> java.lang.ClassNotFoundException: >>>>> kafka.common.OffsetOutOfRangeException >>>>> >>>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) >>>>> >>>>> java.lang.ClassNotFoundException: >>>>> kafka.common.OffsetOutOfRangeException >>>>> >>>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) >>>>> >>>>> >>>>> >>>>> -- >>>>> View this message in context: >>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Recovery-for-Spark-Streaming-Kafka-Direct-in-case-of-issues-with-Kafka-tp25524.html >>>>> Sent from the Apache Spark User List mailing list archive at >>>>> Nabble.com. >>>>> >>>>> --------------------------------------------------------------------- >>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>> >>>>> >>>> >>> >> >