Kafka as yet doesn't have a good way to distinguish between "it's ok
to reset at the beginning of a job" and "it's ok to reset any time
offsets are out of range"

https://issues.apache.org/jira/browse/KAFKA-3370

Because of that, it's better IMHO to err on the side of obvious
failures, as opposed to silent ones.

Like I said, the quickest way to deal with this from a user
perspective is just use a new consumer group.



On Wed, Sep 7, 2016 at 11:44 AM, Srikanth <srikanth...@gmail.com> wrote:
> Yes that's right.
>
> I understand this is a data loss. The restart doesn't have to be all that
> silent. It requires us to set a flag. I thought auto.offset.reset is that
> flag.
> But there isn't much I can do at this point given that retention has cleaned
> things up.
> The app has to start. Let admins address the data loss on the side.
>
> On Wed, Sep 7, 2016 at 12:15 PM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> Just so I'm clear on what's happening...
>>
>> - you're running a job that auto-commits offsets to kafka.
>> - you stop that job for longer than your retention
>> - you start that job back up, and it errors because the last committed
>> offset is no longer available
>> - you think that instead of erroring, the job should silently restart
>> based on the value of auto.offset.reset
>>
>> Is that accurate?
>>
>>
>> On Wed, Sep 7, 2016 at 10:44 AM, Srikanth <srikanth...@gmail.com> wrote:
>> > My retention is 1d which isn't terribly low. The problem is every time I
>> > restart after retention expiry, I get this exception instead of honoring
>> > auto.offset.reset.
>> > It isn't a corner case where retention expired after driver created a
>> > batch.
>> > Its easily reproducible and consistent.
>> >
>> > On Tue, Sep 6, 2016 at 3:34 PM, Cody Koeninger <c...@koeninger.org>
>> > wrote:
>> >>
>> >> You don't want auto.offset.reset on executors, you want executors to
>> >> do what the driver told them to do.  Otherwise you're going to get
>> >> really horrible data inconsistency issues if the executors silently
>> >> reset.
>> >>
>> >> If your retention is so low that retention gets expired in between
>> >> when the driver created a batch with a given starting offset, and when
>> >> an executor starts to process that batch, you're going to have
>> >> problems.
>> >>
>> >> On Tue, Sep 6, 2016 at 2:30 PM, Srikanth <srikanth...@gmail.com> wrote:
>> >> > This isn't a production setup. We kept retention low intentionally.
>> >> > My original question was why I got the exception instead of it using
>> >> > auto.offset.reset on restart?
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > On Tue, Sep 6, 2016 at 10:48 AM, Cody Koeninger <c...@koeninger.org>
>> >> > wrote:
>> >> >>
>> >> >> If you leave enable.auto.commit set to true, it will commit offsets
>> >> >> to
>> >> >> kafka, but you will get undefined delivery semantics.
>> >> >>
>> >> >> If you just want to restart from a fresh state, the easiest thing to
>> >> >> do is use a new consumer group name.
>> >> >>
>> >> >> But if that keeps happening, you should look into why your retention
>> >> >> is not sufficient.
>> >> >>
>> >> >> On Tue, Sep 6, 2016 at 9:39 AM, Srikanth <srikanth...@gmail.com>
>> >> >> wrote:
>> >> >> > You are right. I got confused as its all part of same log when
>> >> >> > running
>> >> >> > from
>> >> >> > IDE.
>> >> >> > I was looking for a good guide to read to understand the this
>> >> >> > integ.
>> >> >> >
>> >> >> > I'm not managing offset on my own. I've not enabled checkpoint for
>> >> >> > my
>> >> >> > tests.
>> >> >> > I assumed offsets will be stored in kafka by default.
>> >> >> >
>> >> >> >     KafkaUtils.createDirectStream[Array[Byte], Array[Byte]] (
>> >> >> >         ssc, PreferConsistent, SubscribePattern[Array[Byte],
>> >> >> > Array[Byte]](pattern, kafkaParams) )
>> >> >> >
>> >> >> >    * @param offsets: offsets to begin at on initial startup.  If
>> >> >> > no
>> >> >> > offset
>> >> >> > is given for a
>> >> >> >    * TopicPartition, the committed offset (if applicable) or kafka
>> >> >> > param
>> >> >> >    * auto.offset.reset will be used.
>> >> >> >
>> >> >> > 16/09/02 13:11:11 INFO ConsumerConfig: ConsumerConfig values:
>> >> >> > enable.auto.commit = true
>> >> >> > auto.offset.reset = latest
>> >> >> >
>> >> >> > Srikanth
>> >> >> >
>> >> >> > On Sat, Sep 3, 2016 at 8:59 AM, Cody Koeninger
>> >> >> > <c...@koeninger.org>
>> >> >> > wrote:
>> >> >> >>
>> >> >> >> Seems like you're confused about the purpose of that line of
>> >> >> >> code,
>> >> >> >> it
>> >> >> >> applies to executors, not the driver. The driver is responsible
>> >> >> >> for
>> >> >> >> determining offsets.
>> >> >> >>
>> >> >> >> Where are you storing offsets, in Kafka, checkpoints, or your own
>> >> >> >> store?
>> >> >> >> Auto offset reset won't be used if there are stored offsets.
>> >> >> >>
>> >> >> >>
>> >> >> >> On Sep 2, 2016 14:58, "Srikanth" <srikanth...@gmail.com> wrote:
>> >> >> >>>
>> >> >> >>> Hi,
>> >> >> >>>
>> >> >> >>> Upon restarting my Spark Streaming app it is failing with error
>> >> >> >>>
>> >> >> >>> Exception in thread "main" org.apache.spark.SparkException: Job
>> >> >> >>> aborted
>> >> >> >>> due to stage failure: Task 0 in stage 1.0 failed 1 times, most
>> >> >> >>> recent
>> >> >> >>> failure: Lost task 0.0 in stage 1.0 (TID 6, localhost):
>> >> >> >>> org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
>> >> >> >>> Offsets
>> >> >> >>> out of
>> >> >> >>> range with no configured reset policy for partitions:
>> >> >> >>> {mt-event-2=1710706}
>> >> >> >>>
>> >> >> >>> It is correct that the last read offset was deleted by kafka due
>> >> >> >>> to
>> >> >> >>> retention period expiry.
>> >> >> >>> I've set auto.offset.reset in my app but it is getting reset
>> >> >> >>> here
>> >> >> >>>
>> >> >> >>>
>> >> >> >>>
>> >> >> >>>
>> >> >> >>> https://github.com/apache/spark/blob/master/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala#L160
>> >> >> >>>
>> >> >> >>> How to force it to restart in this case (fully aware of
>> >> >> >>> potential
>> >> >> >>> data
>> >> >> >>> loss)?
>> >> >> >>>
>> >> >> >>> Srikanth
>> >> >> >
>> >> >> >
>> >> >
>> >> >
>> >
>> >
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to