[jira] [Comment Edited] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.
[ https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14199817#comment-14199817 ] Vishal edited comment on KAFKA-1745 at 11/6/14 12:03 PM: - [~junrao] Sorry, I should have been more clear. [~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a sample test case which can replicate the problem (check lsof every 10 seconds to notice the increase in KQUEUEs and PIPESs though the producer object is being reused): {code} public class Test { private static Queue> producerPool = new ConcurrentLinkedQueue>(); private static ProducerConfig config; static { Properties props = new Properties(); props.put("metadata.broker.list", "IP:Port"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "1"); config = new ProducerConfig(props); } public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, TimeUnit.SECONDS, new LinkedBlockingQueue()); tpe.allowCoreThreadTimeOut(true); Thread run = new Thread(new Runnable() { @Override public void run() { Producer producer = producerPool.poll(); if(producer == null) { producer = new Producer(config); } KeyedMessage data = new KeyedMessage("SaSMQ", "0", "test"); producer.send(data); producerPool.add(producer); } }); while(true) //To make sure that the main program does not terminate { for(int i = 0;i<100; i++) { tpe.submit(run); } Thread.sleep(1); //10 seconds So that the keep alive time set on the thread pool will be exceeded and the threads in the threadpool will be cleared. } } } {code} Though KQUEUE and PIPE does get cleared after some time (in this test case, it was after 1-2 minutes), why does Kafka's api create a new one when a new thread accesses the producer object? was (Author: vishal m): [~junrao] Sorry, I should have been more clear. [~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a sample test case which can replicate the problem (check lsof every 10 seconds to notice the increase in KQUEUEs and PIPESs though the producer object is being reused): {code} public class Test { private static Queue> producerPool = new ConcurrentLinkedQueue>(); private static ProducerConfig config; static { Properties props = new Properties(); props.put("metadata.broker.list", "IP:Port"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "1"); config = new ProducerConfig(props); } public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, TimeUnit.SECONDS, new LinkedBlockingQueue()); tpe.allowCoreThreadTimeOut(true); Thread run = new Thread(new Runnable() { @Override public void run() { Producer producer = producerPool.poll(); if(producer == null) { producer = new Producer(config); } KeyedMessage data = new KeyedMessage("SaSMQ", "0", "test"); producer.send(data); producerPool.add(producer); } }); while(true) //To make sure that the main program does not terminate { for(int i = 0;i<100; i++) { tpe.submit(run); } Thread.sleep(1); //10 seconds So that keepalive time is exceeded by the thread pool and threads are cleared. } } } {code} Though KQUEUE and
[jira] [Comment Edited] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.
[ https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14199817#comment-14199817 ] Vishal edited comment on KAFKA-1745 at 11/6/14 12:04 PM: - [~junrao] Sorry, I should have been more clear. [~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a sample test case which can replicate the problem (check lsof every 10 seconds to notice the increase in KQUEUEs and PIPESs though the producer object is being reused): {code} public class Test { private static Queue> producerPool = new ConcurrentLinkedQueue>(); private static ProducerConfig config; static { Properties props = new Properties(); props.put("metadata.broker.list", "IP:Port"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "1"); config = new ProducerConfig(props); } public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, TimeUnit.SECONDS, new LinkedBlockingQueue()); tpe.allowCoreThreadTimeOut(true); Thread run = new Thread(new Runnable() { @Override public void run() { Producer producer = producerPool.poll(); if(producer == null) { producer = new Producer(config); } KeyedMessage data = new KeyedMessage("SaSMQ", "0", "test"); producer.send(data); producerPool.add(producer); } }); while(true) //To make sure that the main program does not terminate { for(int i = 0;i<100; i++) { tpe.submit(run); } Thread.sleep(1); //10 seconds So that the keep alive time set on the thread pool will be exceeded and the threads in the threadpool will be cleared after each iteration. } } } {code} Though KQUEUE and PIPE does get cleared after some time (in this test case, it was after 1-2 minutes), why does Kafka's api create a new one when a new thread accesses the producer object? was (Author: vishal m): [~junrao] Sorry, I should have been more clear. [~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a sample test case which can replicate the problem (check lsof every 10 seconds to notice the increase in KQUEUEs and PIPESs though the producer object is being reused): {code} public class Test { private static Queue> producerPool = new ConcurrentLinkedQueue>(); private static ProducerConfig config; static { Properties props = new Properties(); props.put("metadata.broker.list", "IP:Port"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "1"); config = new ProducerConfig(props); } public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, TimeUnit.SECONDS, new LinkedBlockingQueue()); tpe.allowCoreThreadTimeOut(true); Thread run = new Thread(new Runnable() { @Override public void run() { Producer producer = producerPool.poll(); if(producer == null) { producer = new Producer(config); } KeyedMessage data = new KeyedMessage("SaSMQ", "0", "test"); producer.send(data); producerPool.add(producer); } }); while(true) //To make sure that the main program does not terminate { for(int i = 0;i<100; i++) { tpe.submit(run); } Thread.sleep(1); //10 seconds So that the keep alive time set on the thread pool will be exceeded and the threads in the threadpool will be clear
[jira] [Comment Edited] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.
[ https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14199817#comment-14199817 ] Vishal edited comment on KAFKA-1745 at 11/6/14 12:00 PM: - [~junrao] Sorry, I should have been more clear. [~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a sample test case which can replicate the problem (check lsof every 10 seconds to notice the increase in KQUEUEs and PIPESs though the producer object is being reused): {code} public class Test { private static Queue> producerPool = new ConcurrentLinkedQueue>(); private static ProducerConfig config; static { Properties props = new Properties(); props.put("metadata.broker.list", "IP:Port"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "1"); config = new ProducerConfig(props); } public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, TimeUnit.SECONDS, new LinkedBlockingQueue()); tpe.allowCoreThreadTimeOut(true); Thread run = new Thread(new Runnable() { @Override public void run() { Producer producer = producerPool.poll(); if(producer == null) { producer = new Producer(config); } KeyedMessage data = new KeyedMessage("SaSMQ", "0", "test"); producer.send(data); producerPool.add(producer); } }); while(true) //To make sure that the main program does not terminate { for(int i = 0;i<100; i++) { tpe.submit(run); } Thread.sleep(1); //10 seconds So that keepalive time is exceeded by the thread pool and threads are cleared. } } } {code} Though KQUEUE and PIPE does get cleared after some time (in this test case, it was after 1-2 minutes), why does Kafka's api create a new one when a new thread accesses the producer object? was (Author: vishal m): [~junrao] Sorry, I should have been more clear. [~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a sample test case which can replicate the problem (check lsof every 10 seconds to notice the increase in KQUEUEs and PIPESs though the producer object is being reused): {code} public class Test { private static Queue> producerPool = new ConcurrentLinkedQueue>(); private static ProducerConfig config; static { Properties props = new Properties(); props.put("metadata.broker.list", "IP:Port"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "1"); config = new ProducerConfig(props); } public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, TimeUnit.SECONDS, new LinkedBlockingQueue()); tpe.allowCoreThreadTimeOut(true); Thread run = new Thread(new Runnable() { @Override public void run() { Producer producer = producerPool.poll(); if(producer == null) { producer = new Producer(config); } KeyedMessage data = new KeyedMessage("SaSMQ", "0", "test"); producer.send(data); producerPool.add(producer); } }); while(true) //To make sure that the main program does not terminate { for(int i = 0;i<100; i++) { tpe.submit(run); } Thread.sleep(1); //10 seconds So that keepalive time is exceeded by the thread pool and threads are cleared. } } } {code} Though KQUEUE and PIPE does get cleared after some time (i
[jira] [Comment Edited] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.
[ https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14199817#comment-14199817 ] Vishal edited comment on KAFKA-1745 at 11/6/14 12:00 PM: - [~junrao] Sorry, I should have been more clear. [~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a sample test case which can replicate the problem (check lsof every 10 seconds to notice the increase in KQUEUEs and PIPESs though the producer object is being reused): {code} public class Test { private static Queue> producerPool = new ConcurrentLinkedQueue>(); private static ProducerConfig config; static { Properties props = new Properties(); props.put("metadata.broker.list", "IP:Port"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "1"); config = new ProducerConfig(props); } public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, TimeUnit.SECONDS, new LinkedBlockingQueue()); tpe.allowCoreThreadTimeOut(true); Thread run = new Thread(new Runnable() { @Override public void run() { Producer producer = producerPool.poll(); if(producer == null) { producer = new Producer(config); } KeyedMessage data = new KeyedMessage("SaSMQ", "0", "test"); producer.send(data); producerPool.add(producer); } }); while(true) //To make sure that the main program does not terminate { for(int i = 0;i<100; i++) { tpe.submit(run); } Thread.sleep(1); //10 seconds So that keepalive time is exceeded by the thread pool and threads are cleared. } } } {code} Though KQUEUE and PIPE does get cleared after some time (in this test case, it was after 1-2 minutes), why does kafka's api create a new one when a new thread accesses the producer object? was (Author: vishal m): [~junrao] Sorry, I should have been more clear. [~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a sample test case which can replicate the problem (check lsof every 10 seconds to notice the increase in KQUEUEs and PIPESs though the producer object is being reused): {code} public class Test { private static Queue> producerPool = new ConcurrentLinkedQueue>(); private static ProducerConfig config; static { Properties props = new Properties(); props.put("metadata.broker.list", "IP:Port"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "1"); config = new ProducerConfig(props); } public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, TimeUnit.SECONDS, new LinkedBlockingQueue()); tpe.allowCoreThreadTimeOut(true); Thread run = new Thread(new Runnable() { @Override public void run() { Producer producer = producerPool.poll(); if(producer == null) { producer = new Producer(config); } KeyedMessage data = new KeyedMessage("SaSMQ", "0", "test"); producer.send(data); producerPool.add(producer); } }); while(true) //To make sure that the main program does not terminate { for(int i = 0;i<100; i++) { tpe.submit(run); } Thread.sleep(1); //10 seconds So that keepalive time is exceeded by the thread pool and threads are cleared. } } } {code} Though KQUEUE and PIPE does get cleared after some time (i
[jira] [Comment Edited] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.
[ https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14199817#comment-14199817 ] Vishal edited comment on KAFKA-1745 at 11/6/14 8:07 AM: [~junrao] Sorry, I should have been more clear. [~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a sample test case which can replicate the problem (check lsof every 10 seconds to notice the increase in KQUEUEs and PIPESs though the producer object is being reused): {code} public class Test { private static Queue> producerPool = new ConcurrentLinkedQueue>(); private static ProducerConfig config; static { Properties props = new Properties(); props.put("metadata.broker.list", "IP:Port"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "1"); config = new ProducerConfig(props); } public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, TimeUnit.SECONDS, new LinkedBlockingQueue()); tpe.allowCoreThreadTimeOut(true); Thread run = new Thread(new Runnable() { @Override public void run() { Producer producer = producerPool.poll(); if(producer == null) { producer = new Producer(config); } KeyedMessage data = new KeyedMessage("SaSMQ", "0", "test"); producer.send(data); producerPool.add(producer); } }); while(true) //To make sure that the main program does not terminate { for(int i = 0;i<100; i++) { tpe.submit(run); } Thread.sleep(1); //10 seconds So that keepalive time is exceeded by the thread pool and threads are cleared. } } } {code} Though KQUEUE and PIPE does get cleared after some time (in this test case, it was after 1-2 minutes), why does it have to create a new one when a new thread accesses the producer object? was (Author: vishal m): [~junrao] Sorry, I should have been more clear. [~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a sample test case which can replicate the problem (check lsof every 10 seconds to notice the increase in KQUEUEs and PIPESs though the producer object is being reused): {code} public class Test { private static Queue> producerPool = new ConcurrentLinkedQueue>(); private static ProducerConfig config; static { Properties props = new Properties(); props.put("metadata.broker.list", "IP:Port"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "1"); config = new ProducerConfig(props); } public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, TimeUnit.SECONDS, new LinkedBlockingQueue()); tpe.allowCoreThreadTimeOut(true); Thread run = new Thread(new Runnable() { @Override public void run() { Producer producer = producerPool.poll(); if(producer == null) { producer = new Producer(config); } KeyedMessage data = new KeyedMessage("SaSMQ", "0", "test"); producer.send(data); producerPool.add(producer); } }); while(true) //To make sure that the main program does not terminate { for(int i = 0;i<100; i++) { tpe.submit(run); } Thread.sleep(1); //10 seconds So that keepalive time is exceeded by the thread pool and threads are cleared. } } } {code} Though KQUEUE and PIPE does get cleared after some time, why
[jira] [Comment Edited] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.
[ https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14199817#comment-14199817 ] Vishal edited comment on KAFKA-1745 at 11/6/14 5:25 AM: [~junrao] Sorry, I should have been more clear. [~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a sample test case which can replicate the problem (check lsof every 10 seconds to notice the increase in KQUEUEs and PIPESs though the producer object is being reused): {code} public class Test { private static Queue> producerPool = new ConcurrentLinkedQueue>(); private static ProducerConfig config; static { Properties props = new Properties(); props.put("metadata.broker.list", "IP:Port"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "1"); config = new ProducerConfig(props); } public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, TimeUnit.SECONDS, new LinkedBlockingQueue()); tpe.allowCoreThreadTimeOut(true); Thread run = new Thread(new Runnable() { @Override public void run() { Producer producer = producerPool.poll(); if(producer == null) { producer = new Producer(config); } KeyedMessage data = new KeyedMessage("SaSMQ", "0", "test"); producer.send(data); producerPool.add(producer); } }); while(true) //To make sure that the main program does not terminate { for(int i = 0;i<100; i++) { tpe.submit(run); } Thread.sleep(1); //10 seconds So that keepalive time is exceeded by the thread pool and threads are cleared. } } } {code} Though KQUEUE and PIPE does get cleared after some time, why does it have to create a new one when a new thread accesses the producer object? was (Author: vishal m): [~junrao] Sorry, I should have been more clear. [~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a sample test case which can replicate the problem (check lsof every 10 seconds to notice the increase in KQUEUEs and PIPESs though the producer object is being reused): {code} public class Test { private static Queue> producerPool = new ConcurrentLinkedQueue>(); private static ProducerConfig config; static { Properties props = new Properties(); props.put("metadata.broker.list", "IP:Port"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "1"); config = new ProducerConfig(props); } public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, TimeUnit.SECONDS, new LinkedBlockingQueue()); tpe.allowCoreThreadTimeOut(true); Thread run = new Thread(new Runnable() { @Override public void run() { Producer producer = producerPool.poll(); if(producer == null) { producer = new Producer(config); } KeyedMessage data = new KeyedMessage("SaSMQ", "0", "test"); producer.send(data); producerPool.add(producer); } }); while(true) //To make sure that the main program does not terminate { for(int i = 0;i<100; i++) { tpe.submit(run); } Thread.sleep(1); //10 seconds So that keepalive time is exceeded by the thread pool and threads are cleared. } } } {code} > Each new thread creates a PIPE and KQUEUE as open files during > producer.send() and does not get cleared
[jira] [Comment Edited] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.
[ https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14197908#comment-14197908 ] Vishal edited comment on KAFKA-1745 at 11/5/14 8:56 AM: No, since I figured that calling producer.close() and returning that producer object to the pool would make that producer object unusable afterwards. was (Author: vishal m): No, since I figured that calling producer.close() and returning that producer object to the pool would make that producer object unusable afterwards. > Each new thread creates a PIPE and KQUEUE as open files during > producer.send() and does not get cleared when the thread that creates them is > cleared. > - > > Key: KAFKA-1745 > URL: https://issues.apache.org/jira/browse/KAFKA-1745 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.1.1 > Environment: Mac OS Mavericks >Reporter: Vishal >Priority: Critical > > Hi, > I'm using the java client API for Kafka. I wanted to send data to Kafka > by using a producer pool as I'm using a sync producer. The thread that sends > the data is from the thread pool that grows and shrinks depending on the > usage. So, when I try to send data from one thread, 1 KQUEUE and 2 PIPES are > created (got this info by using lsof). If I keep using the same thread it's > fine but when a new thread sends data to Kafka (using producer.send() ) a new > KQUEUE and 2 PIPEs are created. > This is okay, but when the thread is cleared from the thread pool and a new > thread is created, then new KQUEUEs and PIPEs are created. The problem is > that the old ones which were created are not getting destroyed and they are > showing up as open files. This is causing a major problem as the number of > open file keep increasing and does not decrease. > Please suggest any solutions. -- This message was sent by Atlassian JIRA (v6.3.4#6332)