Kafka as yet doesn't have a good way to distinguish between "it's ok to reset at the beginning of a job" and "it's ok to reset any time offsets are out of range"
https://issues.apache.org/jira/browse/KAFKA-3370 Because of that, it's better IMHO to err on the side of obvious failures, as opposed to silent ones. Like I said, the quickest way to deal with this from a user perspective is just use a new consumer group. On Wed, Sep 7, 2016 at 11:44 AM, Srikanth <srikanth...@gmail.com> wrote: > Yes that's right. > > I understand this is a data loss. The restart doesn't have to be all that > silent. It requires us to set a flag. I thought auto.offset.reset is that > flag. > But there isn't much I can do at this point given that retention has cleaned > things up. > The app has to start. Let admins address the data loss on the side. > > On Wed, Sep 7, 2016 at 12:15 PM, Cody Koeninger <c...@koeninger.org> wrote: >> >> Just so I'm clear on what's happening... >> >> - you're running a job that auto-commits offsets to kafka. >> - you stop that job for longer than your retention >> - you start that job back up, and it errors because the last committed >> offset is no longer available >> - you think that instead of erroring, the job should silently restart >> based on the value of auto.offset.reset >> >> Is that accurate? >> >> >> On Wed, Sep 7, 2016 at 10:44 AM, Srikanth <srikanth...@gmail.com> wrote: >> > My retention is 1d which isn't terribly low. The problem is every time I >> > restart after retention expiry, I get this exception instead of honoring >> > auto.offset.reset. >> > It isn't a corner case where retention expired after driver created a >> > batch. >> > Its easily reproducible and consistent. >> > >> > On Tue, Sep 6, 2016 at 3:34 PM, Cody Koeninger <c...@koeninger.org> >> > wrote: >> >> >> >> You don't want auto.offset.reset on executors, you want executors to >> >> do what the driver told them to do. Otherwise you're going to get >> >> really horrible data inconsistency issues if the executors silently >> >> reset. >> >> >> >> If your retention is so low that retention gets expired in between >> >> when the driver created a batch with a given starting offset, and when >> >> an executor starts to process that batch, you're going to have >> >> problems. >> >> >> >> On Tue, Sep 6, 2016 at 2:30 PM, Srikanth <srikanth...@gmail.com> wrote: >> >> > This isn't a production setup. We kept retention low intentionally. >> >> > My original question was why I got the exception instead of it using >> >> > auto.offset.reset on restart? >> >> > >> >> > >> >> > >> >> > >> >> > On Tue, Sep 6, 2016 at 10:48 AM, Cody Koeninger <c...@koeninger.org> >> >> > wrote: >> >> >> >> >> >> If you leave enable.auto.commit set to true, it will commit offsets >> >> >> to >> >> >> kafka, but you will get undefined delivery semantics. >> >> >> >> >> >> If you just want to restart from a fresh state, the easiest thing to >> >> >> do is use a new consumer group name. >> >> >> >> >> >> But if that keeps happening, you should look into why your retention >> >> >> is not sufficient. >> >> >> >> >> >> On Tue, Sep 6, 2016 at 9:39 AM, Srikanth <srikanth...@gmail.com> >> >> >> wrote: >> >> >> > You are right. I got confused as its all part of same log when >> >> >> > running >> >> >> > from >> >> >> > IDE. >> >> >> > I was looking for a good guide to read to understand the this >> >> >> > integ. >> >> >> > >> >> >> > I'm not managing offset on my own. I've not enabled checkpoint for >> >> >> > my >> >> >> > tests. >> >> >> > I assumed offsets will be stored in kafka by default. >> >> >> > >> >> >> > KafkaUtils.createDirectStream[Array[Byte], Array[Byte]] ( >> >> >> > ssc, PreferConsistent, SubscribePattern[Array[Byte], >> >> >> > Array[Byte]](pattern, kafkaParams) ) >> >> >> > >> >> >> > * @param offsets: offsets to begin at on initial startup. If >> >> >> > no >> >> >> > offset >> >> >> > is given for a >> >> >> > * TopicPartition, the committed offset (if applicable) or kafka >> >> >> > param >> >> >> > * auto.offset.reset will be used. >> >> >> > >> >> >> > 16/09/02 13:11:11 INFO ConsumerConfig: ConsumerConfig values: >> >> >> > enable.auto.commit = true >> >> >> > auto.offset.reset = latest >> >> >> > >> >> >> > Srikanth >> >> >> > >> >> >> > On Sat, Sep 3, 2016 at 8:59 AM, Cody Koeninger >> >> >> > <c...@koeninger.org> >> >> >> > wrote: >> >> >> >> >> >> >> >> Seems like you're confused about the purpose of that line of >> >> >> >> code, >> >> >> >> it >> >> >> >> applies to executors, not the driver. The driver is responsible >> >> >> >> for >> >> >> >> determining offsets. >> >> >> >> >> >> >> >> Where are you storing offsets, in Kafka, checkpoints, or your own >> >> >> >> store? >> >> >> >> Auto offset reset won't be used if there are stored offsets. >> >> >> >> >> >> >> >> >> >> >> >> On Sep 2, 2016 14:58, "Srikanth" <srikanth...@gmail.com> wrote: >> >> >> >>> >> >> >> >>> Hi, >> >> >> >>> >> >> >> >>> Upon restarting my Spark Streaming app it is failing with error >> >> >> >>> >> >> >> >>> Exception in thread "main" org.apache.spark.SparkException: Job >> >> >> >>> aborted >> >> >> >>> due to stage failure: Task 0 in stage 1.0 failed 1 times, most >> >> >> >>> recent >> >> >> >>> failure: Lost task 0.0 in stage 1.0 (TID 6, localhost): >> >> >> >>> org.apache.kafka.clients.consumer.OffsetOutOfRangeException: >> >> >> >>> Offsets >> >> >> >>> out of >> >> >> >>> range with no configured reset policy for partitions: >> >> >> >>> {mt-event-2=1710706} >> >> >> >>> >> >> >> >>> It is correct that the last read offset was deleted by kafka due >> >> >> >>> to >> >> >> >>> retention period expiry. >> >> >> >>> I've set auto.offset.reset in my app but it is getting reset >> >> >> >>> here >> >> >> >>> >> >> >> >>> >> >> >> >>> >> >> >> >>> >> >> >> >>> https://github.com/apache/spark/blob/master/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala#L160 >> >> >> >>> >> >> >> >>> How to force it to restart in this case (fully aware of >> >> >> >>> potential >> >> >> >>> data >> >> >> >>> loss)? >> >> >> >>> >> >> >> >>> Srikanth >> >> >> > >> >> >> > >> >> > >> >> > >> > >> > > > --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org