Looking at that heap dump, this probably is a database connection/resource leak (298 connections?) than anything to do with Kafka. Have you investigated if there's any DB resource leak in the application and ruled out that part?

-Jaikiran

On Friday 30 January 2015 01:08 PM, ankit tyagi wrote:
I have shared object histogram after and before gc on gist
https://gist.github.com/ankit1987/f4a04a1350fdd609096d

On Fri, Jan 30, 2015 at 12:43 PM, Jaikiran Pai <jai.forums2...@gmail.com>
wrote:

What kind of a (managed) component is that which has the @PreDestroy?
Looking at the previous snippet you added, it looks like you are creating
the Producer in some method? If  you are going to close the producer in a
@PreDestroy of the component, then you should be creating the producer in
the @PostConstruct of the same component, so that you have proper lifecycle
management of those resources.


-Jaikiran

On Friday 30 January 2015 12:20 PM, ankit tyagi wrote:

Hi,

I am closing my producer at the time of shutting down my application.

@PreDestroy
      public void stop()
      {
          LOG.info("Stopping Kafka Producer for topic: {}", myTopic);
          if (myProducer != null) {
              myProducer.close();
          }
      }



On Fri, Jan 30, 2015 at 11:22 AM, Manikumar Reddy <ku...@nmsworks.co.in>
wrote:

  Hope you are closing the producers. can you share the attachment through
gist/patebin

On Fri, Jan 30, 2015 at 11:11 AM, ankit tyagi <
ankittyagi.mn...@gmail.com>
wrote:

  Hi Jaikiran,
I am using ubuntu and was able to reproduce on redhat too. Please find

the

more information below.


*DISTRIB_ID=Ubuntu*
*DISTRIB_RELEASE=12.04*
*DISTRIB_CODENAME=precise*
*DISTRIB_DESCRIPTION="Ubuntu 12.04.5 LTS"*

*java version "1.7.0_72"*

This is happening on client side. Output of lsof was showing that
maximum
fd were FIFO and anon. But after GC FD count was reduced significantly.

Below is my Client Code which i am using for publishing message.


* private Producer<KafkaPartitionKey, KafkaEventWrapper> myProducer;*

* myProducer =            new Producer<>(new
ProducerConfig(myProducerProperties));*

*   public void send(*
*        List<KeyedMessage<KafkaPartitionKey, KafkaEventWrapper>>
msgs)*
*    {*
*        myProducer.send(msgs);*
*    }*


we are using sync producer. I am attaching object histo before

GC(histo_1)

and after GC(histo_2) in my application.

On Fri, Jan 30, 2015 at 9:34 AM, Jaikiran Pai <jai.forums2...@gmail.com
wrote:

  Which operating system are you on and what Java version? Depending on
the
OS, you could get tools (like lsof) to show which file descriptors are
being held on to. Is it the client JVM which ends up with these leaks?

Also, would it be possible to post a snippet of your application code
which shows how you are using the Kafka APIs?

-Jaikiran
On Thursday 29 January 2015 04:36 PM, ankit tyagi wrote:

  Hi,
Currently we are using sync producer client of 0.8.1 version in our
production box . we are getting the following exception while

publishing
kafka message
*[2015-01-29
13:21:45.505][ThreadPoolTaskExecutor-603][WARN][ClientUtils$:89]

Fetching
topic metadata with correlation id 10808 for topics [Set(*
*kafka_topic_coms_FD_test1)] from broker

[id:0,host:localhost,port:9092]
failed*
*java.net.ConnectException: Connection refused*
*        at sun.nio.ch.Net.connect0(Native Method)*
*        at sun.nio.ch.Net.connect(Net.java:465)*
*        at sun.nio.ch.Net.connect(Net.java:457)*
*        at
sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)*
           at

kafka.network.BlockingChannel.connect(BlockingChannel.scala:
57)
           at

kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
           at
  kafka.producer.SyncProducer.getOrMakeConnection(
SyncProducer.scala:156)

           at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$
doSend(SyncProducer.scala:68)
           at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
           at
kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
           at
kafka.producer.BrokerPartitionInfo.updateInfo(
BrokerPartitionInfo.scala:82)


we are using dynamic thread pool to publish message to kafka. My
observation is when after keep alive time when threads in my executor

gets
destroyed, somehow file descriptor is not getting cleared but when i
did
explicitly ran the full gc, fd count got reduced by a signification
amout.

Reply via email to