Thanks,


---------- Forwarded message ---------
From: Syed Mudassir Ahmed <syed.mudas...@gaianconsultants.com>
Date: Tue, Feb 26, 2019 at 12:40 PM
Subject: Apache Kafka Memory Leakage???
To: <us...@kafka.apache.org>
Cc: Syed Mudassir Ahmed <syed.mudas...@gaianconsultants.com>


Hi Team,
  I have a java application based out of latest Apache Kafka version 2.1.1.
  I have a consumer application that runs infinitely to consume messages
whenever produced.
  Sometimes there are no messages produced for hours.  Still, I see that
the memory allocated to consumer program is drastically increasing.
  My code is as follows:

AtomicBoolean isRunning = new AtomicBoolean(true);

Properties kafkaProperties = new Properties();

kafkaProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);

kafkaProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);

kafkaProperties.put(ConsumerConfig.CLIENT_ID_CONFIG,
UUID.randomUUID().toString());
kafkaProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
kafkaProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
AUTO_OFFSET_RESET_EARLIEST);
consumer = new KafkaConsumer<byte[], byte[]>(kafkaProperties,
keyDeserializer, valueDeserializer);
if (topics != null) {
    subscribeTopics(topics);
}


    boolean infiniteLoop = false;
    boolean oneTimeMode = false;
    int timeout = consumeTimeout;
    if (isSuggest) {
        //Configuration for suggest mode
        oneTimeMode = true;
        msgCount = 0;
        timeout = DEFAULT_CONSUME_TIMEOUT_IN_MS;
    } else if (msgCount < 0) {
        infiniteLoop = true;
    } else if (msgCount == 0) {
        oneTimeMode = true;
    }
    Map<TopicPartition, OffsetAndMetadata> offsets = Maps.newHashMap();
    do {
            ConsumerRecords<byte[], byte[]> records = consumer.poll(timeout);
            for (final ConsumerRecord<byte[], byte[]> record : records) {
                if (!infiniteLoop && !oneTimeMode) {
                    --msgCount;
                    if (msgCount < 0) {
                        break;
                    }
                }
                outputViews.write(new BinaryOutput() {
                    @Override
                    public Document getHeader() {
                        return generateHeader(record, oldHeader);
                    }

                    @Override
                    public void write(WritableByteChannel
writeChannel) throws IOException {
                        try (OutputStream os =
Channels.newOutputStream(writeChannel)) {
                            os.write(record.value());
                        }
                    }
                });
                //The offset to commit should be the next offset of
the current one,
                // according to the API
                offsets.put(new TopicPartition(record.topic(),
record.partition()),
                        new OffsetAndMetadata(record.offset() + 1));
                //In suggest mode, we should not change the current offset
                if (isSyncCommit && isSuggest) {
                    commitOffset(offsets);
                    offsets.clear();
                }
            }
     } while ((msgCount > 0 || infiniteLoop) && isRunning.get());


See the screenshot below.  In about nineteen hours, it just consumed 5
messages but the memory allocated is 1.6GB.

[image: image.png]

Any clues on how to get rid of memory issue?  Anything I need to do in
the program or is it a bug in the kafka library?

Please rever ASAP.


Thanks,

Reply via email to