Hi All,
I was using the new Kafka Consumer to fetch messages in this way:
while (true) {
ConsumerRecords<Object, T> records = kafkaConsumer.poll(Long.MAX_VALUE);
// do nothing if records are empty
....
}
Then I realized that blocking until new messages fetched might be a little
overhead. So I looked into the KafkaConsumer code to figure out get a
reasonable timeout.
do {
Map<TopicPartition, List<ConsumerRecord<K, V>>> records =
pollOnce(remaining);
if (!records.isEmpty()) {
// if data is available, then return it, but first send off the
// next round of fetches to enable pipelining while the user is
// handling the fetched records.
fetcher.initFetches(metadata.fetch());
client.poll(0);
return new ConsumerRecords<>(records);
}
long elapsed = time.milliseconds() - start;
remaining = timeout - elapsed;
} while (remaining > 0);
It seems that even if I set a much lower timeout, like 1000ms, my code will
still keep fetching messages, as I use while(true) and the code won't do
anything with an empty message set. So the only difference between a high
timeout and a low one is that the code is looping in the while loop I wrote
or the one in poll(). But in terms of connections to Kafka, setting a low
or high timeout won't affect much in my case.
I might misunderstand the code completely. Anyone is able to shed some
light on this topic?
Thanks.
--
Yifan