Do you believe that all exceptions (including catastrophic ones like out of
heap space) should be caught and silently discarded?

Do you believe that a database system that runs out of disk space should
silently continue to accept writes?

What I am trying to say is, when something is broken in a way that cant be
fixed without external intervention, the system shouldn't hide it from
you.  Systems fail, that's a fact of life.  Pretending that a system hasn't
failed when it in fact is broken... not a good plan.



On Wed, Dec 2, 2015 at 11:38 PM, Dibyendu Bhattacharya <
dibyendu.bhattach...@gmail.com> wrote:

> There are other ways to deal with the problem than shutdown the streaming
> job. You can monitor the lag in your consumer to see if consumer if falling
> behind . If lag is too high that offsetOutOfRange can happen, you either
> increase retention period or increase consumer rate..or do both ..
>
> What I am trying to say, streaming job should not fail in any cases ..
>
> Dibyendu
>
> On Thu, Dec 3, 2015 at 9:40 AM, Cody Koeninger <c...@koeninger.org> wrote:
>
>> I believe that what differentiates reliable systems is individual
>> components should fail fast when their preconditions aren't met, and other
>> components should be responsible for monitoring them.
>>
>> If a user of the direct stream thinks that your approach of restarting
>> and ignoring data loss is the right thing to do, they can monitor the job
>> (which they should be doing in any case) and restart.
>>
>> If a user of your library thinks that my approach of failing (so they
>> KNOW there was data loss and can adjust their system) is the right thing to
>> do, how do they do that?
>>
>> On Wed, Dec 2, 2015 at 9:49 PM, Dibyendu Bhattacharya <
>> dibyendu.bhattach...@gmail.com> wrote:
>>
>>> Well, even if you do correct retention and increase speed,
>>> OffsetOutOfRange can still come depends on how your downstream processing
>>> is. And if that happen , there is No Other way to recover old messages . So
>>> best bet here from Streaming Job point of view  is to start from earliest
>>> offset rather bring down the streaming job . In many cases goal for a
>>> streaming job is not to shut down and exit in case of any failure. I
>>> believe that is what differentiate a always running streaming job.
>>>
>>> Dibyendu
>>>
>>> On Thu, Dec 3, 2015 at 8:26 AM, Cody Koeninger <c...@koeninger.org>
>>> wrote:
>>>
>>>> No, silently restarting from the earliest offset in the case of offset
>>>> out of range exceptions during a streaming job is not the "correct way of
>>>> recovery".
>>>>
>>>> If you do that, your users will be losing data without knowing why.
>>>> It's more like  a "way of ignoring the problem without actually addressing
>>>> it".
>>>>
>>>> The only really correct way to deal with that situation is to recognize
>>>> why it's happening, and either increase your Kafka retention or increase
>>>> the speed at which you are consuming.
>>>>
>>>> On Wed, Dec 2, 2015 at 7:13 PM, Dibyendu Bhattacharya <
>>>> dibyendu.bhattach...@gmail.com> wrote:
>>>>
>>>>> This consumer which I mentioned does not silently throw away data. If
>>>>> offset out of range it start for earliest offset and that is correct way 
>>>>> of
>>>>> recovery from this error.
>>>>>
>>>>> Dibyendu
>>>>> On Dec 2, 2015 9:56 PM, "Cody Koeninger" <c...@koeninger.org> wrote:
>>>>>
>>>>>> Again, just to be clear, silently throwing away data because your
>>>>>> system isn't working right is not the same as "recover from any Kafka
>>>>>> leader changes and offset out of ranges issue".
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Dec 1, 2015 at 11:27 PM, Dibyendu Bhattacharya <
>>>>>> dibyendu.bhattach...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi, if you use Receiver based consumer which is available in
>>>>>>> spark-packages (
>>>>>>> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) ,
>>>>>>> this has all built in failure recovery and it can recover from any Kafka
>>>>>>> leader changes and offset out of ranges issue.
>>>>>>>
>>>>>>> Here is the package form github :
>>>>>>> https://github.com/dibbhatt/kafka-spark-consumer
>>>>>>>
>>>>>>>
>>>>>>> Dibyendu
>>>>>>>
>>>>>>> On Wed, Dec 2, 2015 at 5:28 AM, swetha kasireddy <
>>>>>>> swethakasire...@gmail.com> wrote:
>>>>>>>
>>>>>>>> How to avoid those Errors with receiver based approach? Suppose we
>>>>>>>> are OK with at least once processing and use receiver based approach 
>>>>>>>> which
>>>>>>>> uses ZooKeeper but not query Kafka directly, would these 
>>>>>>>> errors(Couldn't
>>>>>>>> find leader offsets for
>>>>>>>> Set([test_stream,5])))    be avoided?
>>>>>>>>
>>>>>>>> On Tue, Dec 1, 2015 at 3:40 PM, Cody Koeninger <c...@koeninger.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> KafkaRDD.scala , handleFetchErr
>>>>>>>>>
>>>>>>>>> On Tue, Dec 1, 2015 at 3:39 PM, swetha kasireddy <
>>>>>>>>> swethakasire...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Cody,
>>>>>>>>>>
>>>>>>>>>> How to look at Option 2(see the following)? Which portion of the
>>>>>>>>>> code in Spark Kafka Direct to look at to handle this issue specific 
>>>>>>>>>> to our
>>>>>>>>>> requirements.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 2.Catch that exception and somehow force things to "reset" for
>>>>>>>>>> that
>>>>>>>>>> partition And how would it handle the offsets already calculated
>>>>>>>>>> in the
>>>>>>>>>> backlog (if there is one)?
>>>>>>>>>>
>>>>>>>>>> On Tue, Dec 1, 2015 at 6:51 AM, Cody Koeninger <
>>>>>>>>>> c...@koeninger.org> wrote:
>>>>>>>>>>
>>>>>>>>>>> If you're consistently getting offset out of range exceptions,
>>>>>>>>>>> it's probably because messages are getting deleted before you've 
>>>>>>>>>>> processed
>>>>>>>>>>> them.
>>>>>>>>>>>
>>>>>>>>>>> The only real way to deal with this is give kafka more
>>>>>>>>>>> retention, consume faster, or both.
>>>>>>>>>>>
>>>>>>>>>>> If you're just looking for a quick "fix" for an infrequent
>>>>>>>>>>> issue, option 4 is probably easiest.  I wouldn't do that 
>>>>>>>>>>> automatically /
>>>>>>>>>>> silently, because you're losing data.
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Nov 30, 2015 at 6:22 PM, SRK <swethakasire...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> So, our Streaming Job fails with the following errors. If you
>>>>>>>>>>>> see the errors
>>>>>>>>>>>> below, they are all related to Kafka losing offsets and
>>>>>>>>>>>> OffsetOutOfRangeException.
>>>>>>>>>>>>
>>>>>>>>>>>> What are the options we have other than fixing Kafka? We would
>>>>>>>>>>>> like to do
>>>>>>>>>>>> something like the following. How can we achieve 1 and 2 with
>>>>>>>>>>>> Spark Kafka
>>>>>>>>>>>> Direct?
>>>>>>>>>>>>
>>>>>>>>>>>> 1.Need to see a way to skip some offsets if they are not
>>>>>>>>>>>> available after the
>>>>>>>>>>>> max retries are reached..in that case there might be data loss.
>>>>>>>>>>>>
>>>>>>>>>>>> 2.Catch that exception and somehow force things to "reset" for
>>>>>>>>>>>> that
>>>>>>>>>>>> partition And how would it handle the offsets already
>>>>>>>>>>>> calculated in the
>>>>>>>>>>>> backlog (if there is one)?
>>>>>>>>>>>>
>>>>>>>>>>>> 3.Track the offsets separately, restart the job by providing
>>>>>>>>>>>> the offsets.
>>>>>>>>>>>>
>>>>>>>>>>>> 4.Or a straightforward approach would be to monitor the log for
>>>>>>>>>>>> this error,
>>>>>>>>>>>> and if it occurs more than X times, kill the job, remove the
>>>>>>>>>>>> checkpoint
>>>>>>>>>>>> directory, and restart.
>>>>>>>>>>>>
>>>>>>>>>>>> ERROR DirectKafkaInputDStream:
>>>>>>>>>>>> ArrayBuffer(kafka.common.UnknownException,
>>>>>>>>>>>> org.apache.spark.SparkException: Couldn't find leader offsets
>>>>>>>>>>>> for
>>>>>>>>>>>> Set([test_stream,5]))
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> java.lang.ClassNotFoundException:
>>>>>>>>>>>> kafka.common.NotLeaderForPartitionException
>>>>>>>>>>>>
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> java.util.concurrent.RejectedExecutionException: Task
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8
>>>>>>>>>>>> rejected from java.util.concurrent.ThreadPoolExecutor@543258e0
>>>>>>>>>>>> [Terminated,
>>>>>>>>>>>> pool size = 0, active threads = 0, queued tasks = 0, completed
>>>>>>>>>>>> tasks =
>>>>>>>>>>>> 12112]
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage
>>>>>>>>>>>> failure: Task 10
>>>>>>>>>>>> in stage 52.0 failed 4 times, most recent failure: Lost task
>>>>>>>>>>>> 10.3 in stage
>>>>>>>>>>>> 52.0 (TID 255, 172.16.97.97): UnknownReason
>>>>>>>>>>>>
>>>>>>>>>>>> Exception in thread "streaming-job-executor-0" java.lang.Error:
>>>>>>>>>>>> java.lang.InterruptedException
>>>>>>>>>>>>
>>>>>>>>>>>> Caused by: java.lang.InterruptedException
>>>>>>>>>>>>
>>>>>>>>>>>> java.lang.ClassNotFoundException:
>>>>>>>>>>>> kafka.common.OffsetOutOfRangeException
>>>>>>>>>>>>
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage
>>>>>>>>>>>> failure: Task 7 in
>>>>>>>>>>>> stage 33.0 failed 4 times, most recent failure: Lost task 7.3
>>>>>>>>>>>> in stage 33.0
>>>>>>>>>>>> (TID 283, 172.16.97.103): UnknownReason
>>>>>>>>>>>>
>>>>>>>>>>>> java.lang.ClassNotFoundException:
>>>>>>>>>>>> kafka.common.OffsetOutOfRangeException
>>>>>>>>>>>>
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>>>>>>
>>>>>>>>>>>> java.lang.ClassNotFoundException:
>>>>>>>>>>>> kafka.common.OffsetOutOfRangeException
>>>>>>>>>>>>
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> View this message in context:
>>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Recovery-for-Spark-Streaming-Kafka-Direct-in-case-of-issues-with-Kafka-tp25524.html
>>>>>>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>>>>>>> Nabble.com.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>>
>>
>

Reply via email to