Yes that's right.

I understand this is a data loss. The restart doesn't have to be all that
silent. It requires us to set a flag. I thought auto.offset.reset is that
flag.
But there isn't much I can do at this point given that retention has
cleaned things up.
The app has to start. Let admins address the data loss on the side.

On Wed, Sep 7, 2016 at 12:15 PM, Cody Koeninger <c...@koeninger.org> wrote:

> Just so I'm clear on what's happening...
>
> - you're running a job that auto-commits offsets to kafka.
> - you stop that job for longer than your retention
> - you start that job back up, and it errors because the last committed
> offset is no longer available
> - you think that instead of erroring, the job should silently restart
> based on the value of auto.offset.reset
>
> Is that accurate?
>
>
> On Wed, Sep 7, 2016 at 10:44 AM, Srikanth <srikanth...@gmail.com> wrote:
> > My retention is 1d which isn't terribly low. The problem is every time I
> > restart after retention expiry, I get this exception instead of honoring
> > auto.offset.reset.
> > It isn't a corner case where retention expired after driver created a
> batch.
> > Its easily reproducible and consistent.
> >
> > On Tue, Sep 6, 2016 at 3:34 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
> >>
> >> You don't want auto.offset.reset on executors, you want executors to
> >> do what the driver told them to do.  Otherwise you're going to get
> >> really horrible data inconsistency issues if the executors silently
> >> reset.
> >>
> >> If your retention is so low that retention gets expired in between
> >> when the driver created a batch with a given starting offset, and when
> >> an executor starts to process that batch, you're going to have
> >> problems.
> >>
> >> On Tue, Sep 6, 2016 at 2:30 PM, Srikanth <srikanth...@gmail.com> wrote:
> >> > This isn't a production setup. We kept retention low intentionally.
> >> > My original question was why I got the exception instead of it using
> >> > auto.offset.reset on restart?
> >> >
> >> >
> >> >
> >> >
> >> > On Tue, Sep 6, 2016 at 10:48 AM, Cody Koeninger <c...@koeninger.org>
> >> > wrote:
> >> >>
> >> >> If you leave enable.auto.commit set to true, it will commit offsets
> to
> >> >> kafka, but you will get undefined delivery semantics.
> >> >>
> >> >> If you just want to restart from a fresh state, the easiest thing to
> >> >> do is use a new consumer group name.
> >> >>
> >> >> But if that keeps happening, you should look into why your retention
> >> >> is not sufficient.
> >> >>
> >> >> On Tue, Sep 6, 2016 at 9:39 AM, Srikanth <srikanth...@gmail.com>
> wrote:
> >> >> > You are right. I got confused as its all part of same log when
> >> >> > running
> >> >> > from
> >> >> > IDE.
> >> >> > I was looking for a good guide to read to understand the this
> integ.
> >> >> >
> >> >> > I'm not managing offset on my own. I've not enabled checkpoint for
> my
> >> >> > tests.
> >> >> > I assumed offsets will be stored in kafka by default.
> >> >> >
> >> >> >     KafkaUtils.createDirectStream[Array[Byte], Array[Byte]] (
> >> >> >         ssc, PreferConsistent, SubscribePattern[Array[Byte],
> >> >> > Array[Byte]](pattern, kafkaParams) )
> >> >> >
> >> >> >    * @param offsets: offsets to begin at on initial startup.  If no
> >> >> > offset
> >> >> > is given for a
> >> >> >    * TopicPartition, the committed offset (if applicable) or kafka
> >> >> > param
> >> >> >    * auto.offset.reset will be used.
> >> >> >
> >> >> > 16/09/02 13:11:11 INFO ConsumerConfig: ConsumerConfig values:
> >> >> > enable.auto.commit = true
> >> >> > auto.offset.reset = latest
> >> >> >
> >> >> > Srikanth
> >> >> >
> >> >> > On Sat, Sep 3, 2016 at 8:59 AM, Cody Koeninger <c...@koeninger.org
> >
> >> >> > wrote:
> >> >> >>
> >> >> >> Seems like you're confused about the purpose of that line of code,
> >> >> >> it
> >> >> >> applies to executors, not the driver. The driver is responsible
> for
> >> >> >> determining offsets.
> >> >> >>
> >> >> >> Where are you storing offsets, in Kafka, checkpoints, or your own
> >> >> >> store?
> >> >> >> Auto offset reset won't be used if there are stored offsets.
> >> >> >>
> >> >> >>
> >> >> >> On Sep 2, 2016 14:58, "Srikanth" <srikanth...@gmail.com> wrote:
> >> >> >>>
> >> >> >>> Hi,
> >> >> >>>
> >> >> >>> Upon restarting my Spark Streaming app it is failing with error
> >> >> >>>
> >> >> >>> Exception in thread "main" org.apache.spark.SparkException: Job
> >> >> >>> aborted
> >> >> >>> due to stage failure: Task 0 in stage 1.0 failed 1 times, most
> >> >> >>> recent
> >> >> >>> failure: Lost task 0.0 in stage 1.0 (TID 6, localhost):
> >> >> >>> org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
> >> >> >>> Offsets
> >> >> >>> out of
> >> >> >>> range with no configured reset policy for partitions:
> >> >> >>> {mt-event-2=1710706}
> >> >> >>>
> >> >> >>> It is correct that the last read offset was deleted by kafka due
> to
> >> >> >>> retention period expiry.
> >> >> >>> I've set auto.offset.reset in my app but it is getting reset here
> >> >> >>>
> >> >> >>>
> >> >> >>>
> >> >> >>> https://github.com/apache/spark/blob/master/external/
> kafka-0-10/src/main/scala/org/apache/spark/streaming/
> kafka010/KafkaUtils.scala#L160
> >> >> >>>
> >> >> >>> How to force it to restart in this case (fully aware of potential
> >> >> >>> data
> >> >> >>> loss)?
> >> >> >>>
> >> >> >>> Srikanth
> >> >> >
> >> >> >
> >> >
> >> >
> >
> >
>

Reply via email to