One more thing we can try is before committing offset we can verify the
latest offset of that partition(in zookeeper) with fromOffset in
OffsetRange.
Just a thought...

Let me know if it works..

On Tue, Oct 27, 2015 at 9:00 PM, Cody Koeninger <c...@koeninger.org> wrote:

> If you want to make sure that your offsets are increasing without gaps...
> one way to do that is to enforce that invariant when you're saving to your
> database.  That would probably mean using a real database instead of
> zookeeper though.
>
> On Tue, Oct 27, 2015 at 4:13 AM, Krot Viacheslav <
> krot.vyaches...@gmail.com> wrote:
>
>> Any ideas? This is so important because we use kafka direct streaming and
>> save processed offsets manually as last step in the job, so we archive
>> at-least-once.
>> But see what happens when new batch is scheduled after a job fails:
>> - suppose we start from offset 10 loaded from zookeeper
>> - job starts with offsets 10-20
>> - job fails N times, awaitTermination notices that and stops context (or
>> even jvm with System.exit), but Scheduler has already started new job, it
>> is job for offsets 20-30, and sent it to executor.
>> - executor does all the steps (if there is only one stage) and saves
>> offset 30 to zookeeper.
>>
>> This way I loose data in offsets 10-20
>>
>> How should this be handled correctly?
>>
>> пн, 26 окт. 2015 г. в 18:37, varun sharma <varunsharman...@gmail.com>:
>>
>>> +1, wanted to do same.
>>>
>>> On Mon, Oct 26, 2015 at 8:58 PM, Krot Viacheslav <
>>> krot.vyaches...@gmail.com> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I wonder what is the correct way to stop streaming application if some
>>>> job failed?
>>>> What I have now:
>>>>
>>>> val ssc = new StreamingContext
>>>> ....
>>>> ssc.start()
>>>> try {
>>>>    ssc.awaitTermination()
>>>> } catch {
>>>>    case e => ssc.stop(stopSparkContext = true, stopGracefully = false)
>>>> }
>>>>
>>>> It works but one problem still exists - after job failed and before
>>>> streaming context is stopped it manages to start job for next batch. That
>>>> is not desirable for me.
>>>> It works like this because JobScheduler is an actor and after it
>>>> reports error, it goes on with next message that starts next batch job.
>>>> While ssc.awaitTermination() works in another thread and happens after next
>>>> batch starts.
>>>>
>>>> Is there a way to stop before next job is submitted?
>>>>
>>>
>>>
>>>
>>> --
>>> *VARUN SHARMA*
>>> *Flipkart*
>>> *Bangalore*
>>>
>>
>


-- 
*VARUN SHARMA*
*Flipkart*
*Bangalore*

Reply via email to