Jason, maybe useful to KAFKA-2986 with this information if we ever decide
to do this?

Ismael
On 16 Dec 2015 04:42, "Jason Gustafson" <ja...@confluent.io> wrote:

> I was talking with Jay this afternoon about this use case. The tricky thing
> about adding a ping() or heartbeat() API is that you have to deal with the
> potential for rebalancing. This means either allowing it to block while a
> rebalance completes or having it raise an exception indicating that a
> rebalance is needed. In code, the latter might look like this:
>
> while (running) {
>   ConsumerRecords<K, V> records = consumer.poll(1000);
>   try {
>     for (ConsumerRecord record : records) {
>       process(record);
>       consumer.heartbeat();
>     }
>   } catch (RebalanceException e){
>     continue;
>   }
> }
>
> Unfortunately, this wouldn't work with auto-commit since it would tend to
> break message processing early which would let the committed position get
> ahead of the last offset processed. The alternative blocking approach
> wouldn't be any better in this regard. Overall, it seems like this might
> introduce a bigger problem than it solves.
>
> Perhaps the simpler solution is to provide a way to set the maximum number
> of messages returned. This could either be a new configuration option or a
> second argument in poll, but it would let you handle messages one-by-one if
> you needed to. You'd then be able to set the session timeout according to
> the expected time to handle a single message. It'd be a bit more work to
> implement this, but if the use case is common enough, it might be
> worthwhile.
>
> -Jason
>
> On Tue, Dec 15, 2015 at 10:31 AM, Jason Gustafson <ja...@confluent.io>
> wrote:
>
> > Hey Jens,
> >
> > The purpose of pause() is to stop fetches for a set of partitions. This
> > lets you continue calling poll() to send heartbeats. Also note that
> poll()
> > generally only blocks for rebalances. In code, something like this is
> what
> > I was thinking:
> >
> > while (running) {
> >   ConsumerRecords<K, V> records = consumer.poll(1000);
> >   if (queue.offer(records))
> >     continue;
> >
> >   TopicPartition[] assignment = toArray(consumer.assignment());
> >   consumer.pause(assignment);
> >   while (!queue.offer(records, heartbeatIntervalMs,
> TimeUnit.MILLISECONDS))
> >     consumer.poll(0);
> >   consumer.resume(assignment);
> > }
> >
> > The tricky thing is handling rebalances since they might occur in either
> > call to poll(). In a rebalance, you have to 1) drain the queue, 2) commit
> > current offsets, and 3) maybe break from the inner poll loop. If the
> > processing thread is busy when the rebalance is triggered, then you may
> > have to discard the results when it's finished. It's also a little
> > difficult communicating completion to the poll loop, which is where the
> > offset commit needs to take place. I suppose another queue would work,
> > sigh.
> >
> > Well, I think you can make that work, but I tend to agree that it's
> pretty
> > complicated. Perhaps instead of a queue, you should just submit the
> > processor to an executor service for each record set returned and await
> its
> > completion directly. For example:
> >
> > while (running) {
> >   ConsumerRecords<K, V> records = consumer.poll(1000);
> >   Future future = executor.submit(new Processor(records));
> >
> >   TopicPartition[] assignment = toArray(consumer.assignment());
> >   consumer.pause(assignment);
> >   while (!complete(future, heartbeatIntervalMs, TimeUnit.MILLLISECONDS))
> >     consumer.poll(0);
> >   consumer.resume(assignment);
> >   consumer.commitSync();
> > }
> >
> > This seems closer to the spirit of the poll loop, and it makes handling
> > commits a lot easier. You still have to deal with the rebalance problem,
> > but at least you don't have to deal with the queue. It's still a little
> > complex though. Maybe the consumer needs a ping() API which does the same
> > thing as poll() but doesn't send or return any fetches. That would
> simplify
> > things a little more:
> >
> > while (running) {
> >   ConsumerRecords<K, V> records = consumer.poll(1000);
> >   Future future = executor.submit(new Processor(records));
> >   while (!complete(future, heartbeatIntervalMs, TimeUnit.MILLLISECONDS))
> >     consumer.ping();
> >   consumer.commitSync();
> > }
> >
> > Anyway, I'll think about it a little more and see if any other approaches
> > come to mind. I do agree that we should have a way to handle this case
> > without too much extra work.
> >
> >
> > -Jason
> >
> >
> > On Tue, Dec 15, 2015 at 5:09 AM, Jens Rantil <jens.ran...@tink.se>
> wrote:
> >
> >> Hi Jason,
> >>
> >> Thanks for your response. See replies inline:
> >>
> >> On Tuesday, December 15, 2015, Jason Gustafson <ja...@confluent.io>
> >> wrote:
> >>
> >> > Hey Jens,
> >> >
> >> > I'm not sure I understand why increasing the session timeout is not an
> >> > option. Is the issue that there's too much uncertainly about
> processing
> >> > time to set an upper bound for each round of the poll loop?
> >> >
> >>
> >> Yes, that's the issue.
> >>
> >>
> >> > One general workaround would be to move the processing into another
> >> thread.
> >> > For example, you could add messages to a blocking queue and have the
> >> > processor poll them. We have a pause() API which can be used to
> prevent
> >> > additional fetches when the queue is full. This would allow you to
> >> continue
> >> > the poll loop instead of blocking in offer(). The tricky thing would
> be
> >> > handling rebalances since you'd need to clear the queue and get the
> last
> >> > offset from the processors. Enough people probably will end up doing
> >> > something like this that we should probably work through an example on
> >> how
> >> > to do it properly.
> >> >
> >>
> >> Hm, as far as I've understood the consumer will only send heartbeats to
> >> the
> >> broker when poll() is being called. If I would call pause() on a
> consumer
> >> (from a separate thread) I understand poll() will block undefinitely.
> Will
> >> the polling consumer still send heartbeats when blocked? Or would a
> pause
> >> for too long (while my records are being processed) eventually lead to
> >> session timeout? If the latter, that would sort of defeat the purpose
> >> since
> >> I am trying to avoid unnecessary rebalancing of consumers when there is
> >> high pressure on the consumers.
> >>
> >> Regarding handling of rebalancing for a queue solution you describe; It
> >> really sounds very complicated. It's probably doable, but doesn't this
> >> sort
> >> of defeat the purpose of the high level consumer API? I mean, it sounds
> >> like it should gracefully handle slow consumption of varying size. I
> might
> >> be wrong.
> >>
> >> Thanks,
> >> Jens
> >>
> >>
> >> --
> >> Jens Rantil
> >> Backend engineer
> >> Tink AB
> >>
> >> Email: jens.ran...@tink.se
> >> Phone: +46 708 84 18 32
> >> Web: www.tink.se
> >>
> >> Facebook <https://www.facebook.com/#!/tink.se> Linkedin
> >> <
> >>
> http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo&trkInfo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary
> >> >
> >>  Twitter <https://twitter.com/tink>
> >>
> >
> >
>

Reply via email to