Sonke, Thanks so much for the reply. I used the new version of poll(Duration) method. Still, I see memory issue. Is there a way we can get on a one-one call and discuss this pls? Let me know your availability. I can share zoom meeting link.
Thanks, On Sat, Mar 2, 2019 at 2:15 AM Sönke Liebau <soenke.lie...@opencore.com.invalid> wrote: > Hi Syed, > > from your screenshot I assume that you are using SnapLogic to run your > code (full disclosure: I do not have the faintest idea of this > product!). I've just had a look at the docs and am a bit confused by > their explanation of the metric that you point out in your image > "Memory Allocated". The docs say: "The Memory Allocated reflects the > number of bytes that were allocated by the Snap. Note that this > number does not reflect the amount of memory that was freed and it is > not the peak memory usage of the Snap. So, it is not necessarily a > metric that can be used to estimate the required size of a Snaplex > node. Rather, the number provides an insight into how much memory had > to be allocated to process all of the documents. For example, if the > total allocated was 5MB and the Snap processed 32 documents, then the > Snap allocated roughly 164KB per document. When combined with the > other statistics, this number can help to identify the potential > causes of performance issues." > The part about not reflecting memory that was freed makes me somewhat > doubtful whether this actually reflects how much memory the process > currently holds. Can you give some more insight there? > > Apart from that, I just ran your code somewhat modified to make it > work without dependencies for 2 hours and saw no unusual memory > consumption, just a regular garbage collection sawtooth pattern. That > being said, I had to replace your actual processing with a simple > println, so if there is a memory leak in there I would of course not > have noticed. > I've uploaded the code I ran [1] for reference. For further analysis, > maybe you could run something similar with just a println or noop and > see if the symptoms persist, to localize the leak (if it exists). > > Also, two random observations on your code: > > KafkaConsumer.poll(Long timeout) is deprecated, you should consider > using the overloaded version with a Duration parameter instead. > > The comment at [2] seems to contradict the following code, as the > offsets are only changed when in suggest mode. But as I have no idea > what suggest mode even is or all this means this observation may be > miles of point :) > > I hope that helps a little. > > Best regards, > Sönke > > [1] https://gist.github.com/soenkeliebau/e77e8665a1e7e49ade9ec27a6696e983 > [2] > https://gist.github.com/soenkeliebau/e77e8665a1e7e49ade9ec27a6696e983#file-memoryleak-java-L86 > > > On Fri, Mar 1, 2019 at 7:35 AM Syed Mudassir Ahmed > <syed.mudas...@gaianconsultants.com> wrote: > > > > > > Thanks, > > > > > > > > ---------- Forwarded message --------- > > From: Syed Mudassir Ahmed <syed.mudas...@gaianconsultants.com> > > Date: Tue, Feb 26, 2019 at 12:40 PM > > Subject: Apache Kafka Memory Leakage??? > > To: <us...@kafka.apache.org> > > Cc: Syed Mudassir Ahmed <syed.mudas...@gaianconsultants.com> > > > > > > Hi Team, > > I have a java application based out of latest Apache Kafka version > 2.1.1. > > I have a consumer application that runs infinitely to consume messages > whenever produced. > > Sometimes there are no messages produced for hours. Still, I see that > the memory allocated to consumer program is drastically increasing. > > My code is as follows: > > > > AtomicBoolean isRunning = new AtomicBoolean(true); > > > > Properties kafkaProperties = new Properties(); > > > > kafkaProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, > brokers); > > > > kafkaProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupID); > > > > kafkaProperties.put(ConsumerConfig.CLIENT_ID_CONFIG, > UUID.randomUUID().toString()); > > kafkaProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); > > kafkaProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, > AUTO_OFFSET_RESET_EARLIEST); > > consumer = new KafkaConsumer<byte[], byte[]>(kafkaProperties, > keyDeserializer, valueDeserializer); > > if (topics != null) { > > subscribeTopics(topics); > > } > > > > > > boolean infiniteLoop = false; > > boolean oneTimeMode = false; > > int timeout = consumeTimeout; > > if (isSuggest) { > > //Configuration for suggest mode > > oneTimeMode = true; > > msgCount = 0; > > timeout = DEFAULT_CONSUME_TIMEOUT_IN_MS; > > } else if (msgCount < 0) { > > infiniteLoop = true; > > } else if (msgCount == 0) { > > oneTimeMode = true; > > } > > Map<TopicPartition, OffsetAndMetadata> offsets = Maps.newHashMap(); > > do { > > ConsumerRecords<byte[], byte[]> records = > consumer.poll(timeout); > > for (final ConsumerRecord<byte[], byte[]> record : records) { > > if (!infiniteLoop && !oneTimeMode) { > > --msgCount; > > if (msgCount < 0) { > > break; > > } > > } > > outputViews.write(new BinaryOutput() { > > @Override > > public Document getHeader() { > > return generateHeader(record, oldHeader); > > } > > > > @Override > > public void write(WritableByteChannel writeChannel) > throws IOException { > > try (OutputStream os = > Channels.newOutputStream(writeChannel)) { > > os.write(record.value()); > > } > > } > > }); > > //The offset to commit should be the next offset of the > current one, > > // according to the API > > offsets.put(new TopicPartition(record.topic(), > record.partition()), > > new OffsetAndMetadata(record.offset() + 1)); > > //In suggest mode, we should not change the current > offset > > if (isSyncCommit && isSuggest) { > > commitOffset(offsets); > > offsets.clear(); > > } > > } > > } while ((msgCount > 0 || infiniteLoop) && isRunning.get()); > > > > > > See the screenshot below. In about nineteen hours, it just consumed 5 > messages but the memory allocated is 1.6GB. > > > > > > Any clues on how to get rid of memory issue? Anything I need to do in > the program or is it a bug in the kafka library? > > > > Please rever ASAP. > > > > > > Thanks, > > > > > -- > Sönke Liebau > Partner > Tel. +49 179 7940878 > OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany >