Sonke,
  Thanks so much for the reply.  I used the new version of poll(Duration)
method.  Still, I see memory issue.
  Is there a way we can get on a one-one call and discuss this pls?  Let me
know your availability.  I can share zoom meeting link.

Thanks,



On Sat, Mar 2, 2019 at 2:15 AM Sönke Liebau
<soenke.lie...@opencore.com.invalid> wrote:

> Hi Syed,
>
> from your screenshot I assume that you are using SnapLogic to run your
> code (full disclosure: I do not have the faintest idea of this
> product!). I've just had a look at the docs and am a bit confused by
> their explanation of the metric that you point out in your image
> "Memory Allocated". The docs say: "The Memory Allocated reflects the
> number of bytes that were allocated by the Snap.  Note that this
> number does not reflect the amount of memory that was freed and it is
> not the peak memory usage of the Snap.  So, it is not necessarily a
> metric that can be used to estimate the required size of a Snaplex
> node.  Rather, the number provides an insight into how much memory had
> to be allocated to process all of the documents.  For example, if the
> total allocated was 5MB and the Snap processed 32 documents, then the
> Snap allocated roughly 164KB per document.  When combined with the
> other statistics, this number can help to identify the potential
> causes of performance issues."
> The part about not reflecting memory that was freed makes me somewhat
> doubtful whether this actually reflects how much memory the process
> currently holds.  Can you give some more insight there?
>
> Apart from that, I just ran your code somewhat modified to make it
> work without dependencies for 2 hours and saw no unusual memory
> consumption, just a regular garbage collection sawtooth pattern. That
> being said, I had to replace your actual processing with a simple
> println, so if there is a memory leak in there I would of course not
> have noticed.
> I've uploaded the code I ran [1] for reference. For further analysis,
> maybe you could run something similar with just a println or noop and
> see if the symptoms persist, to localize the leak (if it exists).
>
> Also, two random observations on your code:
>
> KafkaConsumer.poll(Long timeout) is deprecated, you should consider
> using the overloaded version with a Duration parameter instead.
>
> The comment at [2] seems to contradict the following code, as the
> offsets are only changed when in suggest mode. But as I have no idea
> what suggest mode even is or all this means this observation may be
> miles of point :)
>
> I hope that helps a little.
>
> Best regards,
> Sönke
>
> [1] https://gist.github.com/soenkeliebau/e77e8665a1e7e49ade9ec27a6696e983
> [2]
> https://gist.github.com/soenkeliebau/e77e8665a1e7e49ade9ec27a6696e983#file-memoryleak-java-L86
>
>
> On Fri, Mar 1, 2019 at 7:35 AM Syed Mudassir Ahmed
> <syed.mudas...@gaianconsultants.com> wrote:
> >
> >
> > Thanks,
> >
> >
> >
> > ---------- Forwarded message ---------
> > From: Syed Mudassir Ahmed <syed.mudas...@gaianconsultants.com>
> > Date: Tue, Feb 26, 2019 at 12:40 PM
> > Subject: Apache Kafka Memory Leakage???
> > To: <us...@kafka.apache.org>
> > Cc: Syed Mudassir Ahmed <syed.mudas...@gaianconsultants.com>
> >
> >
> > Hi Team,
> >   I have a java application based out of latest Apache Kafka version
> 2.1.1.
> >   I have a consumer application that runs infinitely to consume messages
> whenever produced.
> >   Sometimes there are no messages produced for hours.  Still, I see that
> the memory allocated to consumer program is drastically increasing.
> >   My code is as follows:
> >
> > AtomicBoolean isRunning = new AtomicBoolean(true);
> >
> > Properties kafkaProperties = new Properties();
> >
> > kafkaProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
> brokers);
> >
> > kafkaProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
> >
> > kafkaProperties.put(ConsumerConfig.CLIENT_ID_CONFIG,
> UUID.randomUUID().toString());
> > kafkaProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
> > kafkaProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
> AUTO_OFFSET_RESET_EARLIEST);
> > consumer = new KafkaConsumer<byte[], byte[]>(kafkaProperties,
> keyDeserializer, valueDeserializer);
> > if (topics != null) {
> >     subscribeTopics(topics);
> > }
> >
> >
> >     boolean infiniteLoop = false;
> >     boolean oneTimeMode = false;
> >     int timeout = consumeTimeout;
> >     if (isSuggest) {
> >         //Configuration for suggest mode
> >         oneTimeMode = true;
> >         msgCount = 0;
> >         timeout = DEFAULT_CONSUME_TIMEOUT_IN_MS;
> >     } else if (msgCount < 0) {
> >         infiniteLoop = true;
> >     } else if (msgCount == 0) {
> >         oneTimeMode = true;
> >     }
> >     Map<TopicPartition, OffsetAndMetadata> offsets = Maps.newHashMap();
> >     do {
> >             ConsumerRecords<byte[], byte[]> records =
> consumer.poll(timeout);
> >             for (final ConsumerRecord<byte[], byte[]> record : records) {
> >                 if (!infiniteLoop && !oneTimeMode) {
> >                     --msgCount;
> >                     if (msgCount < 0) {
> >                         break;
> >                     }
> >                 }
> >                 outputViews.write(new BinaryOutput() {
> >                     @Override
> >                     public Document getHeader() {
> >                         return generateHeader(record, oldHeader);
> >                     }
> >
> >                     @Override
> >                     public void write(WritableByteChannel writeChannel)
> throws IOException {
> >                         try (OutputStream os =
> Channels.newOutputStream(writeChannel)) {
> >                             os.write(record.value());
> >                         }
> >                     }
> >                 });
> >                 //The offset to commit should be the next offset of the
> current one,
> >                 // according to the API
> >                 offsets.put(new TopicPartition(record.topic(),
> record.partition()),
> >                         new OffsetAndMetadata(record.offset() + 1));
> >                 //In suggest mode, we should not change the current
> offset
> >                 if (isSyncCommit && isSuggest) {
> >                     commitOffset(offsets);
> >                     offsets.clear();
> >                 }
> >             }
> >      } while ((msgCount > 0 || infiniteLoop) && isRunning.get());
> >
> >
> > See the screenshot below.  In about nineteen hours, it just consumed 5
> messages but the memory allocated is 1.6GB.
> >
> >
> > Any clues on how to get rid of memory issue?  Anything I need to do in
> the program or is it a bug in the kafka library?
> >
> > Please rever ASAP.
> >
> >
> > Thanks,
> >
>
>
> --
> Sönke Liebau
> Partner
> Tel. +49 179 7940878
> OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany
>

Reply via email to