Hi Team,
I have a java application based out of latest Apache Kafka version 2.1.1.
I have a consumer application that runs infinitely to consume messages
whenever produced.
Sometimes there are no messages produced for hours. Still, I see that
the memory allocated to consumer program is drastically increasing.
My code is as follows:
AtomicBoolean isRunning = new AtomicBoolean(true);
Properties kafkaProperties = new Properties();
kafkaProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
kafkaProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
kafkaProperties.put(ConsumerConfig.CLIENT_ID_CONFIG,
UUID.randomUUID().toString());
kafkaProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
kafkaProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
AUTO_OFFSET_RESET_EARLIEST);
consumer = new KafkaConsumer<byte[], byte[]>(kafkaProperties,
keyDeserializer, valueDeserializer);
if (topics != null) {
subscribeTopics(topics);
}
boolean infiniteLoop = false;
boolean oneTimeMode = false;
int timeout = consumeTimeout;
if (isSuggest) {
//Configuration for suggest mode
oneTimeMode = true;
msgCount = 0;
timeout = DEFAULT_CONSUME_TIMEOUT_IN_MS;
} else if (msgCount < 0) {
infiniteLoop = true;
} else if (msgCount == 0) {
oneTimeMode = true;
}
Map<TopicPartition, OffsetAndMetadata> offsets = Maps.newHashMap();
do {
ConsumerRecords<byte[], byte[]> records = consumer.poll(timeout);
for (final ConsumerRecord<byte[], byte[]> record : records) {
if (!infiniteLoop && !oneTimeMode) {
--msgCount;
if (msgCount < 0) {
break;
}
}
outputViews.write(new BinaryOutput() {
@Override
public Document getHeader() {
return generateHeader(record, oldHeader);
}
@Override
public void write(WritableByteChannel
writeChannel) throws IOException {
try (OutputStream os =
Channels.newOutputStream(writeChannel)) {
os.write(record.value());
}
}
});
//The offset to commit should be the next offset of
the current one,
// according to the API
offsets.put(new TopicPartition(record.topic(),
record.partition()),
new OffsetAndMetadata(record.offset() + 1));
//In suggest mode, we should not change the current offset
if (isSyncCommit && isSuggest) {
commitOffset(offsets);
offsets.clear();
}
}
} while ((msgCount > 0 || infiniteLoop) && isRunning.get());
See the screenshot below. In about nineteen hours, it just consumed 5
messages but the memory allocated is 1.6GB.
[image: image.png]
Any clues on how to get rid of memory issue? Anything I need to do in
the program or is it a bug in the kafka library?
Please rever ASAP.
Thanks,