We have recently had great success with committing records in smaller
batches between poll()'s. Something along these lines:

records = consumer.poll()
foreach record:
process record





On 31 March 2016 at 12:13, Daniel Fanjul <daniel.fanjul.alcu...@gmail.com>
wrote:

> Hi all,
>
> My problem: If the consumer fetches too much data and the processing of the
> records is not fast enough, commit() fails because there was a rebalance.
>
> I cannot reduce 'max.partition.fetch.bytes' because there might be large
> messages.
>
> I don't want to increase the 'session.timeout.ms', because it would be too
> large to detect failures.
>
> I understand that the new consumer API only sends the heartbeats and
> manages rebalances during the call to poll(). But if I call poll(0), there
> is still a chance it will return even more data. So I keep the heart beats,
> but I may accumulate too much data, eventually leading to OOM.
>
> I would like something:
> foreach record in consumer.poll() {
>   process(record)
>   consumer.doHeartBeatsAndRebalanceSoKeepMeStillAlive()
> }
>
> Is this possible?
>

Reply via email to