attaching my producer whole code.
Creating kafkaProducer Bean in xml
* <bean id="kafkaESBProducer"
class="com.snapdeal.coms.kafka.KafkaProducer">*
* <constructor-arg name="topic" *
*
value="${KAFKA_ESB_TOPIC_NAME:kafka_topic_coms_esb_${COMS_PROFILE:dev}_${USER}}"
/>*
* <constructor-arg name="producerProperties">*
* <props>*
* <prop
key="metadata.broker.list">${KAFKA_PRODUCER_BROKER_LIST}</prop>*
* </props>*
* </constructor-arg>*
* </bean>*
public class KafkaProducer
{
private static final Logger LOG =
LoggerFactory.getLogger(KafkaProducer.class);
private static final String SYSTEM_USER_NAME_PROPERTY = "user.name";
private static final String CONFIG_PARAM_CLIENT_ID = "client.id";
private static final String CLIENT_ID_FORMAT_STR =
"kafka.coms.producer.%s.%s.%s";
private enum ConfigParam
{
SERIALIZER_CLASS("serializer.class",
CommonPropertyParam.KAFKA_PRODUCER_SERIALIZER_CLASS),
KEY_SERIALIZER_CLASS("key.serializer.class",
CommonPropertyParam.KAFKA_PRODUCER_PARTITION_KEY_SERIALIZER_CLASS),
//commenting this to use kafka default paritioner
// PARTITIONER_CLASS("partitioner.class",
// CommonPropertyParam.KAFKA_PRODUCER_PARTITIONER_CLASS),
REQUEST_REQUIRED_ACKS("request.required.acks",
CommonPropertyParam.KAFKA_PRODUCER_REQUEST_REQUIRED_ACKS);
private final String myName;
private final PropertyParam myParam;
ConfigParam(String name, PropertyParam param)
{
myName = name;
myParam = param;
}
public String getName()
{
return myName;
}
public PropertyParam getParam()
{
return myParam;
}
}
private final String myTopic;
private final Properties myProducerProperties;
private Producer<KafkaPartitionKey, KafkaEventWrapper> myProducer;
@Autowired
private COMSConfiguration myAppConfig;
public KafkaProducer(String topic, Properties producerProperties)
{
LOG.info("Creating Kafka Producer instance: {}", this);
myTopic = topic;
myProducerProperties = producerProperties;
}
* @PostConstruct*
* private void initializeProducer()*
* {*
* LOG.info("Initializing Kafka Producer for topic: {}", getTopic());*
* // Set producer unique client id*
* String currentUser =
System.getProperty(SYSTEM_USER_NAME_PROPERTY);*
* String currentJVMName =
ManagementFactory.getRuntimeMXBean().getName();*
* currentJVMName = currentJVMName.replace('@', '_');*
* String uniqueClientId = String.format(CLIENT_ID_FORMAT_STR,*
* getTopic(), currentUser, currentJVMName);*
* if (myProducerProperties.contains(CONFIG_PARAM_CLIENT_ID)) {*
* uniqueClientId += ":"*
* +
myProducerProperties.getProperty(CONFIG_PARAM_CLIENT_ID);*
* }*
* myProducerProperties.setProperty(CONFIG_PARAM_CLIENT_ID,*
* uniqueClientId);*
* // Set reasonable defaults for required params*
* for (ConfigParam cp : ConfigParam.values()) {*
* if (!myProducerProperties.containsKey(cp.getName())) {*
* String cpValue =
myAppConfig.getPropertyValue(cp.getParam());*
* myProducerProperties.setProperty(cp.getName(), cpValue);*
* }*
* }*
* myProducer =*
* new Producer<>(new ProducerConfig(myProducerProperties));*
* LOG.info("Initialized Kafka Producer for topic: {} and properties
{}", getTopic(),myProducerProperties);*
* }*
public String getTopic()
{
return myTopic;
}
public Producer<KafkaPartitionKey, KafkaEventWrapper> getProducer()
{
return myProducer;
}
public void send(KeyedMessage<KafkaPartitionKey, KafkaEventWrapper> msg)
{
myProducer.send(msg);
}
public void send(
List<KeyedMessage<KafkaPartitionKey, KafkaEventWrapper>> msgs)
{
myProducer.send(msgs);
}
* @PreDestroy*
* public void stop()*
* {*
* LOG.info("Stopping Kafka Producer for topic: {}", myTopic);*
* if (myProducer != null) {*
* myProducer.close();*
* }*
* }*
}
On Fri, Jan 30, 2015 at 1:08 PM, ankit tyagi <[email protected]>
wrote:
> I have shared object histogram after and before gc on gist
> https://gist.github.com/ankit1987/f4a04a1350fdd609096d
>
> On Fri, Jan 30, 2015 at 12:43 PM, Jaikiran Pai <[email protected]>
> wrote:
>
>> What kind of a (managed) component is that which has the @PreDestroy?
>> Looking at the previous snippet you added, it looks like you are creating
>> the Producer in some method? If you are going to close the producer in a
>> @PreDestroy of the component, then you should be creating the producer in
>> the @PostConstruct of the same component, so that you have proper lifecycle
>> management of those resources.
>>
>>
>> -Jaikiran
>>
>> On Friday 30 January 2015 12:20 PM, ankit tyagi wrote:
>>
>>> Hi,
>>>
>>> I am closing my producer at the time of shutting down my application.
>>>
>>> @PreDestroy
>>> public void stop()
>>> {
>>> LOG.info("Stopping Kafka Producer for topic: {}", myTopic);
>>> if (myProducer != null) {
>>> myProducer.close();
>>> }
>>> }
>>>
>>>
>>>
>>> On Fri, Jan 30, 2015 at 11:22 AM, Manikumar Reddy <[email protected]>
>>> wrote:
>>>
>>> Hope you are closing the producers. can you share the attachment through
>>>> gist/patebin
>>>>
>>>> On Fri, Jan 30, 2015 at 11:11 AM, ankit tyagi <
>>>> [email protected]>
>>>> wrote:
>>>>
>>>> Hi Jaikiran,
>>>>>
>>>>> I am using ubuntu and was able to reproduce on redhat too. Please find
>>>>>
>>>> the
>>>>
>>>>> more information below.
>>>>>
>>>>>
>>>>> *DISTRIB_ID=Ubuntu*
>>>>> *DISTRIB_RELEASE=12.04*
>>>>> *DISTRIB_CODENAME=precise*
>>>>> *DISTRIB_DESCRIPTION="Ubuntu 12.04.5 LTS"*
>>>>>
>>>>> *java version "1.7.0_72"*
>>>>>
>>>>> This is happening on client side. Output of lsof was showing that
>>>>> maximum
>>>>> fd were FIFO and anon. But after GC FD count was reduced significantly.
>>>>>
>>>>> Below is my Client Code which i am using for publishing message.
>>>>>
>>>>>
>>>>> * private Producer<KafkaPartitionKey, KafkaEventWrapper> myProducer;*
>>>>>
>>>>> * myProducer = new Producer<>(new
>>>>> ProducerConfig(myProducerProperties));*
>>>>>
>>>>> * public void send(*
>>>>> * List<KeyedMessage<KafkaPartitionKey, KafkaEventWrapper>>
>>>>> msgs)*
>>>>> * {*
>>>>> * myProducer.send(msgs);*
>>>>> * }*
>>>>>
>>>>>
>>>>> we are using sync producer. I am attaching object histo before
>>>>>
>>>> GC(histo_1)
>>>>
>>>>> and after GC(histo_2) in my application.
>>>>>
>>>>> On Fri, Jan 30, 2015 at 9:34 AM, Jaikiran Pai <
>>>>> [email protected]>
>>>>> wrote:
>>>>>
>>>>> Which operating system are you on and what Java version? Depending on
>>>>>>
>>>>> the
>>>>
>>>>> OS, you could get tools (like lsof) to show which file descriptors are
>>>>>> being held on to. Is it the client JVM which ends up with these leaks?
>>>>>>
>>>>>> Also, would it be possible to post a snippet of your application code
>>>>>> which shows how you are using the Kafka APIs?
>>>>>>
>>>>>> -Jaikiran
>>>>>> On Thursday 29 January 2015 04:36 PM, ankit tyagi wrote:
>>>>>>
>>>>>> Hi,
>>>>>>>
>>>>>>> Currently we are using sync producer client of 0.8.1 version in our
>>>>>>> production box . we are getting the following exception while
>>>>>>>
>>>>>> publishing
>>>>
>>>>> kafka message
>>>>>>>
>>>>>>> *[2015-01-29
>>>>>>> 13:21:45.505][ThreadPoolTaskExecutor-603][WARN][ClientUtils$:89]
>>>>>>>
>>>>>> Fetching
>>>>>
>>>>>> topic metadata with correlation id 10808 for topics [Set(*
>>>>>>> *kafka_topic_coms_FD_test1)] from broker
>>>>>>>
>>>>>> [id:0,host:localhost,port:9092]
>>>>
>>>>> failed*
>>>>>>> *java.net.ConnectException: Connection refused*
>>>>>>> * at sun.nio.ch.Net.connect0(Native Method)*
>>>>>>> * at sun.nio.ch.Net.connect(Net.java:465)*
>>>>>>> * at sun.nio.ch.Net.connect(Net.java:457)*
>>>>>>> * at
>>>>>>> sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)*
>>>>>>> at
>>>>>>>
>>>>>> kafka.network.BlockingChannel.connect(BlockingChannel.scala:
>>>>
>>>>> 57)
>>>>>>> at
>>>>>>>
>>>>>> kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
>>>>
>>>>> at
>>>>>>>
>>>>>>> kafka.producer.SyncProducer.getOrMakeConnection(
>>>> SyncProducer.scala:156)
>>>>
>>>>> at
>>>>>>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$
>>>>>>> doSend(SyncProducer.scala:68)
>>>>>>> at kafka.producer.SyncProducer.
>>>>>>> send(SyncProducer.scala:112)
>>>>>>> at
>>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
>>>>>>> at
>>>>>>> kafka.producer.BrokerPartitionInfo.updateInfo(
>>>>>>> BrokerPartitionInfo.scala:82)
>>>>>>>
>>>>>>>
>>>>>>> we are using dynamic thread pool to publish message to kafka. My
>>>>>>> observation is when after keep alive time when threads in my executor
>>>>>>>
>>>>>> gets
>>>>>
>>>>>> destroyed, somehow file descriptor is not getting cleared but when i
>>>>>>>
>>>>>> did
>>>>
>>>>> explicitly ran the full gc, fd count got reduced by a signification
>>>>>>>
>>>>>> amout.
>>>>>
>>>>>>
>>>>>>>
>>
>