I was talking with Jay this afternoon about this use case. The tricky thing
about adding a ping() or heartbeat() API is that you have to deal with the
potential for rebalancing. This means either allowing it to block while a
rebalance completes or having it raise an exception indicating that a
rebalance is needed. In code, the latter might look like this:

while (running) {
  ConsumerRecords<K, V> records = consumer.poll(1000);
  try {
    for (ConsumerRecord record : records) {
      process(record);
      consumer.heartbeat();
    }
  } catch (RebalanceException e){
    continue;
  }
}

Unfortunately, this wouldn't work with auto-commit since it would tend to
break message processing early which would let the committed position get
ahead of the last offset processed. The alternative blocking approach
wouldn't be any better in this regard. Overall, it seems like this might
introduce a bigger problem than it solves.

Perhaps the simpler solution is to provide a way to set the maximum number
of messages returned. This could either be a new configuration option or a
second argument in poll, but it would let you handle messages one-by-one if
you needed to. You'd then be able to set the session timeout according to
the expected time to handle a single message. It'd be a bit more work to
implement this, but if the use case is common enough, it might be
worthwhile.

-Jason

On Tue, Dec 15, 2015 at 10:31 AM, Jason Gustafson <ja...@confluent.io>
wrote:

> Hey Jens,
>
> The purpose of pause() is to stop fetches for a set of partitions. This
> lets you continue calling poll() to send heartbeats. Also note that poll()
> generally only blocks for rebalances. In code, something like this is what
> I was thinking:
>
> while (running) {
>   ConsumerRecords<K, V> records = consumer.poll(1000);
>   if (queue.offer(records))
>     continue;
>
>   TopicPartition[] assignment = toArray(consumer.assignment());
>   consumer.pause(assignment);
>   while (!queue.offer(records, heartbeatIntervalMs, TimeUnit.MILLISECONDS))
>     consumer.poll(0);
>   consumer.resume(assignment);
> }
>
> The tricky thing is handling rebalances since they might occur in either
> call to poll(). In a rebalance, you have to 1) drain the queue, 2) commit
> current offsets, and 3) maybe break from the inner poll loop. If the
> processing thread is busy when the rebalance is triggered, then you may
> have to discard the results when it's finished. It's also a little
> difficult communicating completion to the poll loop, which is where the
> offset commit needs to take place. I suppose another queue would work,
> sigh.
>
> Well, I think you can make that work, but I tend to agree that it's pretty
> complicated. Perhaps instead of a queue, you should just submit the
> processor to an executor service for each record set returned and await its
> completion directly. For example:
>
> while (running) {
>   ConsumerRecords<K, V> records = consumer.poll(1000);
>   Future future = executor.submit(new Processor(records));
>
>   TopicPartition[] assignment = toArray(consumer.assignment());
>   consumer.pause(assignment);
>   while (!complete(future, heartbeatIntervalMs, TimeUnit.MILLLISECONDS))
>     consumer.poll(0);
>   consumer.resume(assignment);
>   consumer.commitSync();
> }
>
> This seems closer to the spirit of the poll loop, and it makes handling
> commits a lot easier. You still have to deal with the rebalance problem,
> but at least you don't have to deal with the queue. It's still a little
> complex though. Maybe the consumer needs a ping() API which does the same
> thing as poll() but doesn't send or return any fetches. That would simplify
> things a little more:
>
> while (running) {
>   ConsumerRecords<K, V> records = consumer.poll(1000);
>   Future future = executor.submit(new Processor(records));
>   while (!complete(future, heartbeatIntervalMs, TimeUnit.MILLLISECONDS))
>     consumer.ping();
>   consumer.commitSync();
> }
>
> Anyway, I'll think about it a little more and see if any other approaches
> come to mind. I do agree that we should have a way to handle this case
> without too much extra work.
>
>
> -Jason
>
>
> On Tue, Dec 15, 2015 at 5:09 AM, Jens Rantil <jens.ran...@tink.se> wrote:
>
>> Hi Jason,
>>
>> Thanks for your response. See replies inline:
>>
>> On Tuesday, December 15, 2015, Jason Gustafson <ja...@confluent.io>
>> wrote:
>>
>> > Hey Jens,
>> >
>> > I'm not sure I understand why increasing the session timeout is not an
>> > option. Is the issue that there's too much uncertainly about processing
>> > time to set an upper bound for each round of the poll loop?
>> >
>>
>> Yes, that's the issue.
>>
>>
>> > One general workaround would be to move the processing into another
>> thread.
>> > For example, you could add messages to a blocking queue and have the
>> > processor poll them. We have a pause() API which can be used to prevent
>> > additional fetches when the queue is full. This would allow you to
>> continue
>> > the poll loop instead of blocking in offer(). The tricky thing would be
>> > handling rebalances since you'd need to clear the queue and get the last
>> > offset from the processors. Enough people probably will end up doing
>> > something like this that we should probably work through an example on
>> how
>> > to do it properly.
>> >
>>
>> Hm, as far as I've understood the consumer will only send heartbeats to
>> the
>> broker when poll() is being called. If I would call pause() on a consumer
>> (from a separate thread) I understand poll() will block undefinitely. Will
>> the polling consumer still send heartbeats when blocked? Or would a pause
>> for too long (while my records are being processed) eventually lead to
>> session timeout? If the latter, that would sort of defeat the purpose
>> since
>> I am trying to avoid unnecessary rebalancing of consumers when there is
>> high pressure on the consumers.
>>
>> Regarding handling of rebalancing for a queue solution you describe; It
>> really sounds very complicated. It's probably doable, but doesn't this
>> sort
>> of defeat the purpose of the high level consumer API? I mean, it sounds
>> like it should gracefully handle slow consumption of varying size. I might
>> be wrong.
>>
>> Thanks,
>> Jens
>>
>>
>> --
>> Jens Rantil
>> Backend engineer
>> Tink AB
>>
>> Email: jens.ran...@tink.se
>> Phone: +46 708 84 18 32
>> Web: www.tink.se
>>
>> Facebook <https://www.facebook.com/#!/tink.se> Linkedin
>> <
>> http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo&trkInfo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary
>> >
>>  Twitter <https://twitter.com/tink>
>>
>
>

Reply via email to