Hey Guven, A heartbeat API actually came up in the discussion of KIP-41. Ultimately we rejected it because it led to confusing API semantics. The problem is that heartbeat responses are used by the coordinator to tell consumers when a rebalance is needed. But what should the user do if they call heartbeat() and find that the group is rebalancing? If they don't stop message processing and rejoin, then they may be kicked out of the group just as if they had failed to heartbeat before expiration of the session timeout. Alternatively, if we made heartbeat() blocking and let the rebalance complete in the call itself, then the consumer may no longer be assigned the same partitions. So either way, unless you can preempt message processing, you may fall out of the group and pending messages will need to be reprocessed after the rebalance completes. And if you can preempt message processing, then you can ensure that heartbeats get sent by always preempting the processor before the session timeout expires.
In the end, we felt that max.poll.records was a simpler option since it gives you fine control over the poll loop and doesn't require any confusing API changes . As long as you can put some upper bound on the processing time, you can set max.poll.records=1 and the session timeout to whatever the upper bound is. However, if you have a use case where there is a very high variance in message processing times, it may not be so helpful. In that case, the best options I can think of at the moment are the following: 1. Move the processing to another thread. Basically the workflow would be something like this: 1) receive records for a partition in poll(), 2) submit them to an executor for processing, 3) pause the partition, and 4) continue the poll loop. When the processor finishes with the records, you can use resume() to reenable fetching. You'll have to manage offset commits yourself since you wouldn't want to commit before the thread has actually finished processing. You'll also have to account for the possibility of a rebalance completing while the thread is still processing a batch (an easy way to do this would probably be to just ignore CommitFailedException thrown from commit). 2. This is a tad hacky, but you could take advantage of the fact that the coordinator treats commits as heartbeats and call commitSync() periodically while handling a batch of records. Note in this case that you should not use the no-arg commitSync() variant which will commit the offsets for the full batch returned from the last poll(). Instead you should pass the offsets of the records already processed explicitly in commitSync(Map<TopicPartition, OffsetAndMetadata>). 3. Use the consumer in "simple" mode. If you don't actually need group coordination, then you can assign the partitions you want to read from manually and consume them at your own rate. There is no heartbeating or rebalancing to worry about. -Jason On Fri, Feb 26, 2016 at 1:20 AM, Guven Demir <guven.de...@sahibinden.com> wrote: > thanks for the response Jason, > > i've already experimented with a similar solution myself, lowering > max.partition.fetch.bytes to barely fit the largest message (2k at the > moment) > > still, i've observed similar problems, which is caused by really long > processing times, e.g. downloading a large video via a link received in the > message > > it's not very feasible to increase the heartbeat timeout too much, as > session timeout is recommened to be at least 3 times that of heartbeat > timeout. and that is bounded by broker's group.max.session.timeout.ms, > which i would not want to increase as it would affect all other > topics/consumers > > could there be an api for triggering the heartbeat manually maybe? it can > be argued that that would beat the purpose of a heartbeat though, it might > be used improperly, i.e. in my case rather than sending heartbeats inside > the download/save loop but in an empty loop waiting for the download to > complete, which might never happen. again, sending heartbeats in > application code might be considered tight coupling as well > > other than that, i will experiment with the pause() api, separate thread > for the actual message processing and poll()'ing with all partitions paused > > guven > > > > On 25 Feb 2016, at 20:19, Jason Gustafson <ja...@confluent.io> wrote: > > > > Hey Guven, > > > > This problem is what KIP-41 was created for: > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-41%3A+KafkaConsumer+Max+Records > > . > > > > The patch for this was committed yesterday and will be included in 0.10. > If > > you need something in the shorter term, you could probably use the client > > from trunk (no changes to the server are needed). > > > > If this is still not sufficient, I recommend looking into the pause() > API, > > which can facilitate asynchronous message processing in another thread. > > > > -Jason > > > > On Thu, Feb 25, 2016 at 8:53 AM, Guven Demir <guven.de...@sahibinden.com > > > > wrote: > > > >> hi all, > >> > >> i'm having trouble processing a topic which includes paths to images > which > >> need to be downloaded and saved to disk (each takes ~3-5 seconds) and > >> several are received on each poll > >> > >> within this scenario, i'm receiving the following error: > >> > >> org.apache.kafka.clients.consumer.CommitFailedException: Commit > cannot > >> be completed due to group rebalance > >> > >> which i assume is due to heartbeat failure and broker re-assigning the > >> consumer's partition to another consumer > >> > >> are there any recommendations for processing long to process messages? > >> > >> thanks in advance, > >> guven > >> > >> > >> > >