Jason, maybe useful to KAFKA-2986 with this information if we ever decide to do this?
Ismael On 16 Dec 2015 04:42, "Jason Gustafson" <ja...@confluent.io> wrote: > I was talking with Jay this afternoon about this use case. The tricky thing > about adding a ping() or heartbeat() API is that you have to deal with the > potential for rebalancing. This means either allowing it to block while a > rebalance completes or having it raise an exception indicating that a > rebalance is needed. In code, the latter might look like this: > > while (running) { > ConsumerRecords<K, V> records = consumer.poll(1000); > try { > for (ConsumerRecord record : records) { > process(record); > consumer.heartbeat(); > } > } catch (RebalanceException e){ > continue; > } > } > > Unfortunately, this wouldn't work with auto-commit since it would tend to > break message processing early which would let the committed position get > ahead of the last offset processed. The alternative blocking approach > wouldn't be any better in this regard. Overall, it seems like this might > introduce a bigger problem than it solves. > > Perhaps the simpler solution is to provide a way to set the maximum number > of messages returned. This could either be a new configuration option or a > second argument in poll, but it would let you handle messages one-by-one if > you needed to. You'd then be able to set the session timeout according to > the expected time to handle a single message. It'd be a bit more work to > implement this, but if the use case is common enough, it might be > worthwhile. > > -Jason > > On Tue, Dec 15, 2015 at 10:31 AM, Jason Gustafson <ja...@confluent.io> > wrote: > > > Hey Jens, > > > > The purpose of pause() is to stop fetches for a set of partitions. This > > lets you continue calling poll() to send heartbeats. Also note that > poll() > > generally only blocks for rebalances. In code, something like this is > what > > I was thinking: > > > > while (running) { > > ConsumerRecords<K, V> records = consumer.poll(1000); > > if (queue.offer(records)) > > continue; > > > > TopicPartition[] assignment = toArray(consumer.assignment()); > > consumer.pause(assignment); > > while (!queue.offer(records, heartbeatIntervalMs, > TimeUnit.MILLISECONDS)) > > consumer.poll(0); > > consumer.resume(assignment); > > } > > > > The tricky thing is handling rebalances since they might occur in either > > call to poll(). In a rebalance, you have to 1) drain the queue, 2) commit > > current offsets, and 3) maybe break from the inner poll loop. If the > > processing thread is busy when the rebalance is triggered, then you may > > have to discard the results when it's finished. It's also a little > > difficult communicating completion to the poll loop, which is where the > > offset commit needs to take place. I suppose another queue would work, > > sigh. > > > > Well, I think you can make that work, but I tend to agree that it's > pretty > > complicated. Perhaps instead of a queue, you should just submit the > > processor to an executor service for each record set returned and await > its > > completion directly. For example: > > > > while (running) { > > ConsumerRecords<K, V> records = consumer.poll(1000); > > Future future = executor.submit(new Processor(records)); > > > > TopicPartition[] assignment = toArray(consumer.assignment()); > > consumer.pause(assignment); > > while (!complete(future, heartbeatIntervalMs, TimeUnit.MILLLISECONDS)) > > consumer.poll(0); > > consumer.resume(assignment); > > consumer.commitSync(); > > } > > > > This seems closer to the spirit of the poll loop, and it makes handling > > commits a lot easier. You still have to deal with the rebalance problem, > > but at least you don't have to deal with the queue. It's still a little > > complex though. Maybe the consumer needs a ping() API which does the same > > thing as poll() but doesn't send or return any fetches. That would > simplify > > things a little more: > > > > while (running) { > > ConsumerRecords<K, V> records = consumer.poll(1000); > > Future future = executor.submit(new Processor(records)); > > while (!complete(future, heartbeatIntervalMs, TimeUnit.MILLLISECONDS)) > > consumer.ping(); > > consumer.commitSync(); > > } > > > > Anyway, I'll think about it a little more and see if any other approaches > > come to mind. I do agree that we should have a way to handle this case > > without too much extra work. > > > > > > -Jason > > > > > > On Tue, Dec 15, 2015 at 5:09 AM, Jens Rantil <jens.ran...@tink.se> > wrote: > > > >> Hi Jason, > >> > >> Thanks for your response. See replies inline: > >> > >> On Tuesday, December 15, 2015, Jason Gustafson <ja...@confluent.io> > >> wrote: > >> > >> > Hey Jens, > >> > > >> > I'm not sure I understand why increasing the session timeout is not an > >> > option. Is the issue that there's too much uncertainly about > processing > >> > time to set an upper bound for each round of the poll loop? > >> > > >> > >> Yes, that's the issue. > >> > >> > >> > One general workaround would be to move the processing into another > >> thread. > >> > For example, you could add messages to a blocking queue and have the > >> > processor poll them. We have a pause() API which can be used to > prevent > >> > additional fetches when the queue is full. This would allow you to > >> continue > >> > the poll loop instead of blocking in offer(). The tricky thing would > be > >> > handling rebalances since you'd need to clear the queue and get the > last > >> > offset from the processors. Enough people probably will end up doing > >> > something like this that we should probably work through an example on > >> how > >> > to do it properly. > >> > > >> > >> Hm, as far as I've understood the consumer will only send heartbeats to > >> the > >> broker when poll() is being called. If I would call pause() on a > consumer > >> (from a separate thread) I understand poll() will block undefinitely. > Will > >> the polling consumer still send heartbeats when blocked? Or would a > pause > >> for too long (while my records are being processed) eventually lead to > >> session timeout? If the latter, that would sort of defeat the purpose > >> since > >> I am trying to avoid unnecessary rebalancing of consumers when there is > >> high pressure on the consumers. > >> > >> Regarding handling of rebalancing for a queue solution you describe; It > >> really sounds very complicated. It's probably doable, but doesn't this > >> sort > >> of defeat the purpose of the high level consumer API? I mean, it sounds > >> like it should gracefully handle slow consumption of varying size. I > might > >> be wrong. > >> > >> Thanks, > >> Jens > >> > >> > >> -- > >> Jens Rantil > >> Backend engineer > >> Tink AB > >> > >> Email: jens.ran...@tink.se > >> Phone: +46 708 84 18 32 > >> Web: www.tink.se > >> > >> Facebook <https://www.facebook.com/#!/tink.se> Linkedin > >> < > >> > http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo&trkInfo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary > >> > > >> Twitter <https://twitter.com/tink> > >> > > > > >