Just so I'm clear on what's happening...

- you're running a job that auto-commits offsets to kafka.
- you stop that job for longer than your retention
- you start that job back up, and it errors because the last committed
offset is no longer available
- you think that instead of erroring, the job should silently restart
based on the value of auto.offset.reset

Is that accurate?


On Wed, Sep 7, 2016 at 10:44 AM, Srikanth <srikanth...@gmail.com> wrote:
> My retention is 1d which isn't terribly low. The problem is every time I
> restart after retention expiry, I get this exception instead of honoring
> auto.offset.reset.
> It isn't a corner case where retention expired after driver created a batch.
> Its easily reproducible and consistent.
>
> On Tue, Sep 6, 2016 at 3:34 PM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> You don't want auto.offset.reset on executors, you want executors to
>> do what the driver told them to do.  Otherwise you're going to get
>> really horrible data inconsistency issues if the executors silently
>> reset.
>>
>> If your retention is so low that retention gets expired in between
>> when the driver created a batch with a given starting offset, and when
>> an executor starts to process that batch, you're going to have
>> problems.
>>
>> On Tue, Sep 6, 2016 at 2:30 PM, Srikanth <srikanth...@gmail.com> wrote:
>> > This isn't a production setup. We kept retention low intentionally.
>> > My original question was why I got the exception instead of it using
>> > auto.offset.reset on restart?
>> >
>> >
>> >
>> >
>> > On Tue, Sep 6, 2016 at 10:48 AM, Cody Koeninger <c...@koeninger.org>
>> > wrote:
>> >>
>> >> If you leave enable.auto.commit set to true, it will commit offsets to
>> >> kafka, but you will get undefined delivery semantics.
>> >>
>> >> If you just want to restart from a fresh state, the easiest thing to
>> >> do is use a new consumer group name.
>> >>
>> >> But if that keeps happening, you should look into why your retention
>> >> is not sufficient.
>> >>
>> >> On Tue, Sep 6, 2016 at 9:39 AM, Srikanth <srikanth...@gmail.com> wrote:
>> >> > You are right. I got confused as its all part of same log when
>> >> > running
>> >> > from
>> >> > IDE.
>> >> > I was looking for a good guide to read to understand the this integ.
>> >> >
>> >> > I'm not managing offset on my own. I've not enabled checkpoint for my
>> >> > tests.
>> >> > I assumed offsets will be stored in kafka by default.
>> >> >
>> >> >     KafkaUtils.createDirectStream[Array[Byte], Array[Byte]] (
>> >> >         ssc, PreferConsistent, SubscribePattern[Array[Byte],
>> >> > Array[Byte]](pattern, kafkaParams) )
>> >> >
>> >> >    * @param offsets: offsets to begin at on initial startup.  If no
>> >> > offset
>> >> > is given for a
>> >> >    * TopicPartition, the committed offset (if applicable) or kafka
>> >> > param
>> >> >    * auto.offset.reset will be used.
>> >> >
>> >> > 16/09/02 13:11:11 INFO ConsumerConfig: ConsumerConfig values:
>> >> > enable.auto.commit = true
>> >> > auto.offset.reset = latest
>> >> >
>> >> > Srikanth
>> >> >
>> >> > On Sat, Sep 3, 2016 at 8:59 AM, Cody Koeninger <c...@koeninger.org>
>> >> > wrote:
>> >> >>
>> >> >> Seems like you're confused about the purpose of that line of code,
>> >> >> it
>> >> >> applies to executors, not the driver. The driver is responsible for
>> >> >> determining offsets.
>> >> >>
>> >> >> Where are you storing offsets, in Kafka, checkpoints, or your own
>> >> >> store?
>> >> >> Auto offset reset won't be used if there are stored offsets.
>> >> >>
>> >> >>
>> >> >> On Sep 2, 2016 14:58, "Srikanth" <srikanth...@gmail.com> wrote:
>> >> >>>
>> >> >>> Hi,
>> >> >>>
>> >> >>> Upon restarting my Spark Streaming app it is failing with error
>> >> >>>
>> >> >>> Exception in thread "main" org.apache.spark.SparkException: Job
>> >> >>> aborted
>> >> >>> due to stage failure: Task 0 in stage 1.0 failed 1 times, most
>> >> >>> recent
>> >> >>> failure: Lost task 0.0 in stage 1.0 (TID 6, localhost):
>> >> >>> org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
>> >> >>> Offsets
>> >> >>> out of
>> >> >>> range with no configured reset policy for partitions:
>> >> >>> {mt-event-2=1710706}
>> >> >>>
>> >> >>> It is correct that the last read offset was deleted by kafka due to
>> >> >>> retention period expiry.
>> >> >>> I've set auto.offset.reset in my app but it is getting reset here
>> >> >>>
>> >> >>>
>> >> >>>
>> >> >>> https://github.com/apache/spark/blob/master/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala#L160
>> >> >>>
>> >> >>> How to force it to restart in this case (fully aware of potential
>> >> >>> data
>> >> >>> loss)?
>> >> >>>
>> >> >>> Srikanth
>> >> >
>> >> >
>> >
>> >
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to