[ 
https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14199817#comment-14199817
 ] 

Vishal edited comment on KAFKA-1745 at 11/6/14 8:07 AM:
--------------------------------------------------------

[~junrao] Sorry, I should have been more clear.
[~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a 
sample test case which can replicate the problem (check lsof every 10 seconds 
to notice the increase in KQUEUEs and PIPESs though the producer object is 
being reused):

{code}
public class Test {
        
        private static Queue<Producer<String, String>> producerPool = new 
ConcurrentLinkedQueue<Producer<String,String>>();
        
        private static ProducerConfig config;
        
        static
        {
                Properties props = new Properties();
                props.put("metadata.broker.list", "IP:Port");
                props.put("serializer.class", "kafka.serializer.StringEncoder");
                props.put("request.required.acks", "1");
                config = new ProducerConfig(props);
        }

        public static void main(String[] args) throws InterruptedException {
                
                ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, 
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
                tpe.allowCoreThreadTimeOut(true);
                Thread run = new Thread(new Runnable() {
                        
                        @Override
                        public void run() {
                                Producer<String, String> producer = 
producerPool.poll();
                                if(producer == null)
                                {
                                        producer = new Producer<String, 
String>(config);
                                }
                                KeyedMessage<String, String> data = new 
KeyedMessage<String, String>("SaSMQ", "0", "test");
                                producer.send(data);
                                producerPool.add(producer);
                        }
                });
                
                while(true) //To make sure that the main program does not 
terminate
                {
                        for(int i = 0;i<100; i++)
                        {
                                tpe.submit(run);
                        }
                        Thread.sleep(10000); //10 seconds.... So that keepalive 
time is exceeded by the thread pool and threads are cleared. 
                }
        }

}
{code}


Though KQUEUE and PIPE does get cleared after some time (in this test case, it 
was after 1-2 minutes), why does it have to create a new one when a new thread 
accesses the producer object? 


was (Author: vishal m):
[~junrao] Sorry, I should have been more clear.
[~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a 
sample test case which can replicate the problem (check lsof every 10 seconds 
to notice the increase in KQUEUEs and PIPESs though the producer object is 
being reused):

{code}
public class Test {
        
        private static Queue<Producer<String, String>> producerPool = new 
ConcurrentLinkedQueue<Producer<String,String>>();
        
        private static ProducerConfig config;
        
        static
        {
                Properties props = new Properties();
                props.put("metadata.broker.list", "IP:Port");
                props.put("serializer.class", "kafka.serializer.StringEncoder");
                props.put("request.required.acks", "1");
                config = new ProducerConfig(props);
        }

        public static void main(String[] args) throws InterruptedException {
                
                ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, 
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
                tpe.allowCoreThreadTimeOut(true);
                Thread run = new Thread(new Runnable() {
                        
                        @Override
                        public void run() {
                                Producer<String, String> producer = 
producerPool.poll();
                                if(producer == null)
                                {
                                        producer = new Producer<String, 
String>(config);
                                }
                                KeyedMessage<String, String> data = new 
KeyedMessage<String, String>("SaSMQ", "0", "test");
                                producer.send(data);
                                producerPool.add(producer);
                        }
                });
                
                while(true) //To make sure that the main program does not 
terminate
                {
                        for(int i = 0;i<100; i++)
                        {
                                tpe.submit(run);
                        }
                        Thread.sleep(10000); //10 seconds.... So that keepalive 
time is exceeded by the thread pool and threads are cleared. 
                }
        }

}
{code}


Though KQUEUE and PIPE does get cleared after some time, why does it have to 
create a new one when a new thread accesses the producer object? 

> Each new thread creates a PIPE and KQUEUE as open files during 
> producer.send() and does not get cleared when the thread that creates them is 
> cleared.
> -----------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-1745
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1745
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8.1.1
>         Environment: Mac OS Mavericks
>            Reporter: Vishal
>            Priority: Critical
>
> Hi,
>     I'm using the java client API for Kafka. I wanted to send data to Kafka 
> by using a producer pool as I'm using a sync producer. The thread that sends 
> the data is from the thread pool that grows and shrinks depending on the 
> usage. So, when I try to send data from one thread, 1 KQUEUE and 2 PIPES are 
> created (got this info by using lsof). If I keep using the same thread it's 
> fine but when a new thread sends data to Kafka (using producer.send() ) a new 
> KQUEUE and 2 PIPEs are created.
> This is okay, but when the thread is cleared from the thread pool and a new 
> thread is created, then new KQUEUEs and PIPEs are created. The problem is 
> that the old ones which were created are not getting destroyed and they are 
> showing up as open files. This is causing a major problem as the number of 
> open file keep increasing and does not decrease.
> Please suggest any solutions.
> FYI, the number of TCP connections established from the producer system to 
> the Kafka Broker remain constant throughout.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to