Hi,

I had the same issue and managed to work around it by simulating a
heartbeat to kafka. It works really well, i.e., we have had zero issues
since it was implemented

I have somthing like this:


void process() {
   records = consumer.poll(timeout)
   dispatcher.dispatch(records)
   while(!dispatcher.isDone(heartbeatInterval, TimeUnit.MILLISECONDS) {
     heartbeat()
   }
}

void heartbeat() {
  consumer.pause(getCurrentAssignment())
  consumer.poll(10)
  consumer.resume(getCurrentAssignment())
}

TopicPartition[] getCurrentAssignment() {
   return consumer.assignment().toArray(new TopicPartition[0])
}


On 16 February 2016 at 16:20, Ben Stopford <b...@confluent.io> wrote:

> I think you’ll find some useful context in this KIP Jason wrote. It’s
> pretty good.
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-41%3A+KafkaConsumer+Max+Records
> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-41:+KafkaConsumer+Max+Records
> >
>
>
> > On 16 Feb 2016, at 07:15, Насыров Ренат <renat-nasy...@yandex.ru> wrote:
> >
> > Hello!
> >
> > I'm trying to use kafka for long-running tasks processing. The tasks can
> be very short (less than a second) or very long (about 10 minutes). I've
> got one consumer group for the single queue, and one or more consumers.
> Sometimes consumers manage to commit their offsets before rebalancing,
> sometimes not (and fail). Accordning to this document (
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
> ), in worst case (when all the consumers are on very long tasks) it goes as
> follows:
> >
> > 1) Consumers get long tasks from the queue.
> > 2) Consumers performing their long-running tasks.
> > 3) Session timeout happens.
> > 4) Group coordinator performs a rebalance; the current generation number
> is increased.
> > 5) Consumers complete their long-running tasks and commit.
> > 6) GroupCoordinator returns IllegalGeneration errors to consumers and
> does not allow to commit the offsets.
> > 7) Consumers reconnect and get the very messages from the previous
> generation, thus stucking in forever loop.
> >
> > Suggestions:
> >
> > 1) Commit first, then process. Inacceptable in my case because it leads
> to at-most-once semantics.
> > 2) Increase session timeout limit. Not desired because task duration can
> negatively affect the effectiveness of rebalance.
> >
> > Is there any proper way to complete long-running tasks?
>
>

Reply via email to