One more thing we can try is before committing offset we can verify the latest offset of that partition(in zookeeper) with fromOffset in OffsetRange. Just a thought...
Let me know if it works.. On Tue, Oct 27, 2015 at 9:00 PM, Cody Koeninger <c...@koeninger.org> wrote: > If you want to make sure that your offsets are increasing without gaps... > one way to do that is to enforce that invariant when you're saving to your > database. That would probably mean using a real database instead of > zookeeper though. > > On Tue, Oct 27, 2015 at 4:13 AM, Krot Viacheslav < > krot.vyaches...@gmail.com> wrote: > >> Any ideas? This is so important because we use kafka direct streaming and >> save processed offsets manually as last step in the job, so we archive >> at-least-once. >> But see what happens when new batch is scheduled after a job fails: >> - suppose we start from offset 10 loaded from zookeeper >> - job starts with offsets 10-20 >> - job fails N times, awaitTermination notices that and stops context (or >> even jvm with System.exit), but Scheduler has already started new job, it >> is job for offsets 20-30, and sent it to executor. >> - executor does all the steps (if there is only one stage) and saves >> offset 30 to zookeeper. >> >> This way I loose data in offsets 10-20 >> >> How should this be handled correctly? >> >> пн, 26 окт. 2015 г. в 18:37, varun sharma <varunsharman...@gmail.com>: >> >>> +1, wanted to do same. >>> >>> On Mon, Oct 26, 2015 at 8:58 PM, Krot Viacheslav < >>> krot.vyaches...@gmail.com> wrote: >>> >>>> Hi all, >>>> >>>> I wonder what is the correct way to stop streaming application if some >>>> job failed? >>>> What I have now: >>>> >>>> val ssc = new StreamingContext >>>> .... >>>> ssc.start() >>>> try { >>>> ssc.awaitTermination() >>>> } catch { >>>> case e => ssc.stop(stopSparkContext = true, stopGracefully = false) >>>> } >>>> >>>> It works but one problem still exists - after job failed and before >>>> streaming context is stopped it manages to start job for next batch. That >>>> is not desirable for me. >>>> It works like this because JobScheduler is an actor and after it >>>> reports error, it goes on with next message that starts next batch job. >>>> While ssc.awaitTermination() works in another thread and happens after next >>>> batch starts. >>>> >>>> Is there a way to stop before next job is submitted? >>>> >>> >>> >>> >>> -- >>> *VARUN SHARMA* >>> *Flipkart* >>> *Bangalore* >>> >> > -- *VARUN SHARMA* *Flipkart* *Bangalore*