My retention is 1d which isn't terribly low. The problem is every time I restart after retention expiry, I get this exception instead of honoring auto.offset.reset. It isn't a corner case where retention expired after driver created a batch. Its easily reproducible and consistent.
On Tue, Sep 6, 2016 at 3:34 PM, Cody Koeninger <c...@koeninger.org> wrote: > You don't want auto.offset.reset on executors, you want executors to > do what the driver told them to do. Otherwise you're going to get > really horrible data inconsistency issues if the executors silently > reset. > > If your retention is so low that retention gets expired in between > when the driver created a batch with a given starting offset, and when > an executor starts to process that batch, you're going to have > problems. > > On Tue, Sep 6, 2016 at 2:30 PM, Srikanth <srikanth...@gmail.com> wrote: > > This isn't a production setup. We kept retention low intentionally. > > My original question was why I got the exception instead of it using > > auto.offset.reset on restart? > > > > > > > > > > On Tue, Sep 6, 2016 at 10:48 AM, Cody Koeninger <c...@koeninger.org> > wrote: > >> > >> If you leave enable.auto.commit set to true, it will commit offsets to > >> kafka, but you will get undefined delivery semantics. > >> > >> If you just want to restart from a fresh state, the easiest thing to > >> do is use a new consumer group name. > >> > >> But if that keeps happening, you should look into why your retention > >> is not sufficient. > >> > >> On Tue, Sep 6, 2016 at 9:39 AM, Srikanth <srikanth...@gmail.com> wrote: > >> > You are right. I got confused as its all part of same log when running > >> > from > >> > IDE. > >> > I was looking for a good guide to read to understand the this integ. > >> > > >> > I'm not managing offset on my own. I've not enabled checkpoint for my > >> > tests. > >> > I assumed offsets will be stored in kafka by default. > >> > > >> > KafkaUtils.createDirectStream[Array[Byte], Array[Byte]] ( > >> > ssc, PreferConsistent, SubscribePattern[Array[Byte], > >> > Array[Byte]](pattern, kafkaParams) ) > >> > > >> > * @param offsets: offsets to begin at on initial startup. If no > >> > offset > >> > is given for a > >> > * TopicPartition, the committed offset (if applicable) or kafka > param > >> > * auto.offset.reset will be used. > >> > > >> > 16/09/02 13:11:11 INFO ConsumerConfig: ConsumerConfig values: > >> > enable.auto.commit = true > >> > auto.offset.reset = latest > >> > > >> > Srikanth > >> > > >> > On Sat, Sep 3, 2016 at 8:59 AM, Cody Koeninger <c...@koeninger.org> > >> > wrote: > >> >> > >> >> Seems like you're confused about the purpose of that line of code, it > >> >> applies to executors, not the driver. The driver is responsible for > >> >> determining offsets. > >> >> > >> >> Where are you storing offsets, in Kafka, checkpoints, or your own > >> >> store? > >> >> Auto offset reset won't be used if there are stored offsets. > >> >> > >> >> > >> >> On Sep 2, 2016 14:58, "Srikanth" <srikanth...@gmail.com> wrote: > >> >>> > >> >>> Hi, > >> >>> > >> >>> Upon restarting my Spark Streaming app it is failing with error > >> >>> > >> >>> Exception in thread "main" org.apache.spark.SparkException: Job > >> >>> aborted > >> >>> due to stage failure: Task 0 in stage 1.0 failed 1 times, most > recent > >> >>> failure: Lost task 0.0 in stage 1.0 (TID 6, localhost): > >> >>> org.apache.kafka.clients.consumer.OffsetOutOfRangeException: > Offsets > >> >>> out of > >> >>> range with no configured reset policy for partitions: > >> >>> {mt-event-2=1710706} > >> >>> > >> >>> It is correct that the last read offset was deleted by kafka due to > >> >>> retention period expiry. > >> >>> I've set auto.offset.reset in my app but it is getting reset here > >> >>> > >> >>> > >> >>> https://github.com/apache/spark/blob/master/external/ > kafka-0-10/src/main/scala/org/apache/spark/streaming/ > kafka010/KafkaUtils.scala#L160 > >> >>> > >> >>> How to force it to restart in this case (fully aware of potential > data > >> >>> loss)? > >> >>> > >> >>> Srikanth > >> > > >> > > > > > >