We have recently had great success with committing records in smaller batches between poll()'s. Something along these lines:
records = consumer.poll() foreach record: process record On 31 March 2016 at 12:13, Daniel Fanjul <daniel.fanjul.alcu...@gmail.com> wrote: > Hi all, > > My problem: If the consumer fetches too much data and the processing of the > records is not fast enough, commit() fails because there was a rebalance. > > I cannot reduce 'max.partition.fetch.bytes' because there might be large > messages. > > I don't want to increase the 'session.timeout.ms', because it would be too > large to detect failures. > > I understand that the new consumer API only sends the heartbeats and > manages rebalances during the call to poll(). But if I call poll(0), there > is still a chance it will return even more data. So I keep the heart beats, > but I may accumulate too much data, eventually leading to OOM. > > I would like something: > foreach record in consumer.poll() { > process(record) > consumer.doHeartBeatsAndRebalanceSoKeepMeStillAlive() > } > > Is this possible? >