Do you believe that all exceptions (including catastrophic ones like out of heap space) should be caught and silently discarded?
Do you believe that a database system that runs out of disk space should silently continue to accept writes? What I am trying to say is, when something is broken in a way that cant be fixed without external intervention, the system shouldn't hide it from you. Systems fail, that's a fact of life. Pretending that a system hasn't failed when it in fact is broken... not a good plan. On Wed, Dec 2, 2015 at 11:38 PM, Dibyendu Bhattacharya < dibyendu.bhattach...@gmail.com> wrote: > There are other ways to deal with the problem than shutdown the streaming > job. You can monitor the lag in your consumer to see if consumer if falling > behind . If lag is too high that offsetOutOfRange can happen, you either > increase retention period or increase consumer rate..or do both .. > > What I am trying to say, streaming job should not fail in any cases .. > > Dibyendu > > On Thu, Dec 3, 2015 at 9:40 AM, Cody Koeninger <c...@koeninger.org> wrote: > >> I believe that what differentiates reliable systems is individual >> components should fail fast when their preconditions aren't met, and other >> components should be responsible for monitoring them. >> >> If a user of the direct stream thinks that your approach of restarting >> and ignoring data loss is the right thing to do, they can monitor the job >> (which they should be doing in any case) and restart. >> >> If a user of your library thinks that my approach of failing (so they >> KNOW there was data loss and can adjust their system) is the right thing to >> do, how do they do that? >> >> On Wed, Dec 2, 2015 at 9:49 PM, Dibyendu Bhattacharya < >> dibyendu.bhattach...@gmail.com> wrote: >> >>> Well, even if you do correct retention and increase speed, >>> OffsetOutOfRange can still come depends on how your downstream processing >>> is. And if that happen , there is No Other way to recover old messages . So >>> best bet here from Streaming Job point of view is to start from earliest >>> offset rather bring down the streaming job . In many cases goal for a >>> streaming job is not to shut down and exit in case of any failure. I >>> believe that is what differentiate a always running streaming job. >>> >>> Dibyendu >>> >>> On Thu, Dec 3, 2015 at 8:26 AM, Cody Koeninger <c...@koeninger.org> >>> wrote: >>> >>>> No, silently restarting from the earliest offset in the case of offset >>>> out of range exceptions during a streaming job is not the "correct way of >>>> recovery". >>>> >>>> If you do that, your users will be losing data without knowing why. >>>> It's more like a "way of ignoring the problem without actually addressing >>>> it". >>>> >>>> The only really correct way to deal with that situation is to recognize >>>> why it's happening, and either increase your Kafka retention or increase >>>> the speed at which you are consuming. >>>> >>>> On Wed, Dec 2, 2015 at 7:13 PM, Dibyendu Bhattacharya < >>>> dibyendu.bhattach...@gmail.com> wrote: >>>> >>>>> This consumer which I mentioned does not silently throw away data. If >>>>> offset out of range it start for earliest offset and that is correct way >>>>> of >>>>> recovery from this error. >>>>> >>>>> Dibyendu >>>>> On Dec 2, 2015 9:56 PM, "Cody Koeninger" <c...@koeninger.org> wrote: >>>>> >>>>>> Again, just to be clear, silently throwing away data because your >>>>>> system isn't working right is not the same as "recover from any Kafka >>>>>> leader changes and offset out of ranges issue". >>>>>> >>>>>> >>>>>> >>>>>> On Tue, Dec 1, 2015 at 11:27 PM, Dibyendu Bhattacharya < >>>>>> dibyendu.bhattach...@gmail.com> wrote: >>>>>> >>>>>>> Hi, if you use Receiver based consumer which is available in >>>>>>> spark-packages ( >>>>>>> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) , >>>>>>> this has all built in failure recovery and it can recover from any Kafka >>>>>>> leader changes and offset out of ranges issue. >>>>>>> >>>>>>> Here is the package form github : >>>>>>> https://github.com/dibbhatt/kafka-spark-consumer >>>>>>> >>>>>>> >>>>>>> Dibyendu >>>>>>> >>>>>>> On Wed, Dec 2, 2015 at 5:28 AM, swetha kasireddy < >>>>>>> swethakasire...@gmail.com> wrote: >>>>>>> >>>>>>>> How to avoid those Errors with receiver based approach? Suppose we >>>>>>>> are OK with at least once processing and use receiver based approach >>>>>>>> which >>>>>>>> uses ZooKeeper but not query Kafka directly, would these >>>>>>>> errors(Couldn't >>>>>>>> find leader offsets for >>>>>>>> Set([test_stream,5]))) be avoided? >>>>>>>> >>>>>>>> On Tue, Dec 1, 2015 at 3:40 PM, Cody Koeninger <c...@koeninger.org> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> KafkaRDD.scala , handleFetchErr >>>>>>>>> >>>>>>>>> On Tue, Dec 1, 2015 at 3:39 PM, swetha kasireddy < >>>>>>>>> swethakasire...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Hi Cody, >>>>>>>>>> >>>>>>>>>> How to look at Option 2(see the following)? Which portion of the >>>>>>>>>> code in Spark Kafka Direct to look at to handle this issue specific >>>>>>>>>> to our >>>>>>>>>> requirements. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> 2.Catch that exception and somehow force things to "reset" for >>>>>>>>>> that >>>>>>>>>> partition And how would it handle the offsets already calculated >>>>>>>>>> in the >>>>>>>>>> backlog (if there is one)? >>>>>>>>>> >>>>>>>>>> On Tue, Dec 1, 2015 at 6:51 AM, Cody Koeninger < >>>>>>>>>> c...@koeninger.org> wrote: >>>>>>>>>> >>>>>>>>>>> If you're consistently getting offset out of range exceptions, >>>>>>>>>>> it's probably because messages are getting deleted before you've >>>>>>>>>>> processed >>>>>>>>>>> them. >>>>>>>>>>> >>>>>>>>>>> The only real way to deal with this is give kafka more >>>>>>>>>>> retention, consume faster, or both. >>>>>>>>>>> >>>>>>>>>>> If you're just looking for a quick "fix" for an infrequent >>>>>>>>>>> issue, option 4 is probably easiest. I wouldn't do that >>>>>>>>>>> automatically / >>>>>>>>>>> silently, because you're losing data. >>>>>>>>>>> >>>>>>>>>>> On Mon, Nov 30, 2015 at 6:22 PM, SRK <swethakasire...@gmail.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi, >>>>>>>>>>>> >>>>>>>>>>>> So, our Streaming Job fails with the following errors. If you >>>>>>>>>>>> see the errors >>>>>>>>>>>> below, they are all related to Kafka losing offsets and >>>>>>>>>>>> OffsetOutOfRangeException. >>>>>>>>>>>> >>>>>>>>>>>> What are the options we have other than fixing Kafka? We would >>>>>>>>>>>> like to do >>>>>>>>>>>> something like the following. How can we achieve 1 and 2 with >>>>>>>>>>>> Spark Kafka >>>>>>>>>>>> Direct? >>>>>>>>>>>> >>>>>>>>>>>> 1.Need to see a way to skip some offsets if they are not >>>>>>>>>>>> available after the >>>>>>>>>>>> max retries are reached..in that case there might be data loss. >>>>>>>>>>>> >>>>>>>>>>>> 2.Catch that exception and somehow force things to "reset" for >>>>>>>>>>>> that >>>>>>>>>>>> partition And how would it handle the offsets already >>>>>>>>>>>> calculated in the >>>>>>>>>>>> backlog (if there is one)? >>>>>>>>>>>> >>>>>>>>>>>> 3.Track the offsets separately, restart the job by providing >>>>>>>>>>>> the offsets. >>>>>>>>>>>> >>>>>>>>>>>> 4.Or a straightforward approach would be to monitor the log for >>>>>>>>>>>> this error, >>>>>>>>>>>> and if it occurs more than X times, kill the job, remove the >>>>>>>>>>>> checkpoint >>>>>>>>>>>> directory, and restart. >>>>>>>>>>>> >>>>>>>>>>>> ERROR DirectKafkaInputDStream: >>>>>>>>>>>> ArrayBuffer(kafka.common.UnknownException, >>>>>>>>>>>> org.apache.spark.SparkException: Couldn't find leader offsets >>>>>>>>>>>> for >>>>>>>>>>>> Set([test_stream,5])) >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> java.lang.ClassNotFoundException: >>>>>>>>>>>> kafka.common.NotLeaderForPartitionException >>>>>>>>>>>> >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> java.util.concurrent.RejectedExecutionException: Task >>>>>>>>>>>> >>>>>>>>>>>> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8 >>>>>>>>>>>> rejected from java.util.concurrent.ThreadPoolExecutor@543258e0 >>>>>>>>>>>> [Terminated, >>>>>>>>>>>> pool size = 0, active threads = 0, queued tasks = 0, completed >>>>>>>>>>>> tasks = >>>>>>>>>>>> 12112] >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage >>>>>>>>>>>> failure: Task 10 >>>>>>>>>>>> in stage 52.0 failed 4 times, most recent failure: Lost task >>>>>>>>>>>> 10.3 in stage >>>>>>>>>>>> 52.0 (TID 255, 172.16.97.97): UnknownReason >>>>>>>>>>>> >>>>>>>>>>>> Exception in thread "streaming-job-executor-0" java.lang.Error: >>>>>>>>>>>> java.lang.InterruptedException >>>>>>>>>>>> >>>>>>>>>>>> Caused by: java.lang.InterruptedException >>>>>>>>>>>> >>>>>>>>>>>> java.lang.ClassNotFoundException: >>>>>>>>>>>> kafka.common.OffsetOutOfRangeException >>>>>>>>>>>> >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage >>>>>>>>>>>> failure: Task 7 in >>>>>>>>>>>> stage 33.0 failed 4 times, most recent failure: Lost task 7.3 >>>>>>>>>>>> in stage 33.0 >>>>>>>>>>>> (TID 283, 172.16.97.103): UnknownReason >>>>>>>>>>>> >>>>>>>>>>>> java.lang.ClassNotFoundException: >>>>>>>>>>>> kafka.common.OffsetOutOfRangeException >>>>>>>>>>>> >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) >>>>>>>>>>>> >>>>>>>>>>>> java.lang.ClassNotFoundException: >>>>>>>>>>>> kafka.common.OffsetOutOfRangeException >>>>>>>>>>>> >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> View this message in context: >>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Recovery-for-Spark-Streaming-Kafka-Direct-in-case-of-issues-with-Kafka-tp25524.html >>>>>>>>>>>> Sent from the Apache Spark User List mailing list archive at >>>>>>>>>>>> Nabble.com. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> --------------------------------------------------------------------- >>>>>>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>>>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>> >>> >> >