Sounds like you'd be better off just failing if the external server is down, and scripting monitoring / restarting of your job.
On Tue, Sep 1, 2015 at 11:19 AM, Shushant Arora <shushantaror...@gmail.com> wrote: > Since in my app , after processing the events I am posting the events to > some external server- if external server is down - I want to backoff > consuming from kafka. But I can't stop and restart the consumer since it > needs manual effort. > > Backing off few batches is also not possible -since decision of backoff is > based on last batch process status but driver has already computed offsets > for next batches - so if I ignore further few batches till external server > is back to normal its a dataloss if I cannot reset the offset . > > So only option seems is to delay the last batch by calling sleep() in > foreach rdd method after returning from foreachpartitions transformations. > > So concern here is further batches will keep enqueening until current > slept batch completes. So whats the max size of scheduling queue? Say if > server does not come up for hours and my batch size is 5 sec it will > enqueue 720 batches . > Will that be a issue ? > And is there any setting in directkafkastream to enforce not to call > further computes() method after a threshold of scheduling queue size say > (50 batches).Once queue size comes back to less than threshold call compute > and enqueue the next job. > > > > > > On Tue, Sep 1, 2015 at 8:57 PM, Cody Koeninger <c...@koeninger.org> wrote: > >> Honestly I'd concentrate more on getting your batches to finish in a >> timely fashion, so you won't even have the issue to begin with... >> >> On Tue, Sep 1, 2015 at 10:16 AM, Shushant Arora < >> shushantaror...@gmail.com> wrote: >> >>> What if I use custom checkpointing. So that I can take care of offsets >>> being checkpointed at end of each batch. >>> >>> Will it be possible then to reset the offset. >>> >>> On Tue, Sep 1, 2015 at 8:42 PM, Cody Koeninger <c...@koeninger.org> >>> wrote: >>> >>>> No, if you start arbitrarily messing around with offset ranges after >>>> compute is called, things are going to get out of whack. >>>> >>>> e.g. checkpoints are no longer going to correspond to what you're >>>> actually processing >>>> >>>> On Tue, Sep 1, 2015 at 10:04 AM, Shushant Arora < >>>> shushantaror...@gmail.com> wrote: >>>> >>>>> can I reset the range based on some condition - before calling >>>>> transformations on the stream. >>>>> >>>>> Say - >>>>> before calling : >>>>> directKafkaStream.foreachRDD(new Function<JavaRDD<byte[][]>, Void>() { >>>>> >>>>> @Override >>>>> public Void call(JavaRDD<byte[][]> v1) throws Exception { >>>>> v1.foreachPartition(new VoidFunction<Iterator<byte[][]>>{ >>>>> @Override >>>>> public void call(Iterator<byte[][]> t) throws Exception { >>>>> }});}}); >>>>> >>>>> change directKafkaStream's RDD's offset range.(fromOffset). >>>>> >>>>> I can't do this in compute method since compute would have been called >>>>> at current batch queue time - but condition is set at previous batch run >>>>> time. >>>>> >>>>> >>>>> On Tue, Sep 1, 2015 at 7:09 PM, Cody Koeninger <c...@koeninger.org> >>>>> wrote: >>>>> >>>>>> It's at the time compute() gets called, which should be near the time >>>>>> the batch should have been queued. >>>>>> >>>>>> On Tue, Sep 1, 2015 at 8:02 AM, Shushant Arora < >>>>>> shushantaror...@gmail.com> wrote: >>>>>> >>>>>>> Hi >>>>>>> >>>>>>> In spark streaming 1.3 with kafka- when does driver bring latest >>>>>>> offsets of this run - at start of each batch or at time when batch gets >>>>>>> queued ? >>>>>>> >>>>>>> Say few of my batches take longer time to complete than their batch >>>>>>> interval. So some of batches will go in queue. Will driver waits for >>>>>>> queued batches to get started or just brings the latest offsets before >>>>>>> they even actually started. And when they start running they will work >>>>>>> on >>>>>>> old offsets brought at time when they were queued. >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >