[jira] [Commented] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.
[ https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14209628#comment-14209628 ] Vishal commented on KAFKA-1745: --- Any solutions? > Each new thread creates a PIPE and KQUEUE as open files during > producer.send() and does not get cleared when the thread that creates them is > cleared. > - > > Key: KAFKA-1745 > URL: https://issues.apache.org/jira/browse/KAFKA-1745 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.1.1 > Environment: Mac OS Mavericks >Reporter: Vishal >Priority: Critical > > Hi, > I'm using the java client API for Kafka. I wanted to send data to Kafka > by using a producer pool as I'm using a sync producer. The thread that sends > the data is from the thread pool that grows and shrinks depending on the > usage. So, when I try to send data from one thread, 1 KQUEUE and 2 PIPES are > created (got this info by using lsof). If I keep using the same thread it's > fine but when a new thread sends data to Kafka (using producer.send() ) a new > KQUEUE and 2 PIPEs are created. > This is okay, but when the thread is cleared from the thread pool and a new > thread is created, then new KQUEUEs and PIPEs are created. The problem is > that the old ones which were created are not getting destroyed and they are > showing up as open files. This is causing a major problem as the number of > open file keep increasing and does not decrease. > Please suggest any solutions. > FYI, the number of TCP connections established from the producer system to > the Kafka Broker remain constant throughout. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.
[ https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14199817#comment-14199817 ] Vishal edited comment on KAFKA-1745 at 11/6/14 12:03 PM: - [~junrao] Sorry, I should have been more clear. [~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a sample test case which can replicate the problem (check lsof every 10 seconds to notice the increase in KQUEUEs and PIPESs though the producer object is being reused): {code} public class Test { private static Queue> producerPool = new ConcurrentLinkedQueue>(); private static ProducerConfig config; static { Properties props = new Properties(); props.put("metadata.broker.list", "IP:Port"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "1"); config = new ProducerConfig(props); } public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, TimeUnit.SECONDS, new LinkedBlockingQueue()); tpe.allowCoreThreadTimeOut(true); Thread run = new Thread(new Runnable() { @Override public void run() { Producer producer = producerPool.poll(); if(producer == null) { producer = new Producer(config); } KeyedMessage data = new KeyedMessage("SaSMQ", "0", "test"); producer.send(data); producerPool.add(producer); } }); while(true) //To make sure that the main program does not terminate { for(int i = 0;i<100; i++) { tpe.submit(run); } Thread.sleep(1); //10 seconds So that the keep alive time set on the thread pool will be exceeded and the threads in the threadpool will be cleared. } } } {code} Though KQUEUE and PIPE does get cleared after some time (in this test case, it was after 1-2 minutes), why does Kafka's api create a new one when a new thread accesses the producer object? was (Author: vishal m): [~junrao] Sorry, I should have been more clear. [~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a sample test case which can replicate the problem (check lsof every 10 seconds to notice the increase in KQUEUEs and PIPESs though the producer object is being reused): {code} public class Test { private static Queue> producerPool = new ConcurrentLinkedQueue>(); private static ProducerConfig config; static { Properties props = new Properties(); props.put("metadata.broker.list", "IP:Port"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "1"); config = new ProducerConfig(props); } public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, TimeUnit.SECONDS, new LinkedBlockingQueue()); tpe.allowCoreThreadTimeOut(true); Thread run = new Thread(new Runnable() { @Override public void run() { Producer producer = producerPool.poll(); if(producer == null) { producer = new Producer(config); } KeyedMessage data = new KeyedMessage("SaSMQ", "0", "test"); producer.send(data); producerPool.add(producer); } }); while(true) //To make sure that the main program does not terminate { for(int i = 0;i<100; i++) { tpe.submit(run); } Thread.sleep(1); //10 seconds So that keepalive time is exceeded by the thread pool and threads are cleared. } } } {code} Though KQUEUE and
[jira] [Comment Edited] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.
[ https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14199817#comment-14199817 ] Vishal edited comment on KAFKA-1745 at 11/6/14 12:04 PM: - [~junrao] Sorry, I should have been more clear. [~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a sample test case which can replicate the problem (check lsof every 10 seconds to notice the increase in KQUEUEs and PIPESs though the producer object is being reused): {code} public class Test { private static Queue> producerPool = new ConcurrentLinkedQueue>(); private static ProducerConfig config; static { Properties props = new Properties(); props.put("metadata.broker.list", "IP:Port"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "1"); config = new ProducerConfig(props); } public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, TimeUnit.SECONDS, new LinkedBlockingQueue()); tpe.allowCoreThreadTimeOut(true); Thread run = new Thread(new Runnable() { @Override public void run() { Producer producer = producerPool.poll(); if(producer == null) { producer = new Producer(config); } KeyedMessage data = new KeyedMessage("SaSMQ", "0", "test"); producer.send(data); producerPool.add(producer); } }); while(true) //To make sure that the main program does not terminate { for(int i = 0;i<100; i++) { tpe.submit(run); } Thread.sleep(1); //10 seconds So that the keep alive time set on the thread pool will be exceeded and the threads in the threadpool will be cleared after each iteration. } } } {code} Though KQUEUE and PIPE does get cleared after some time (in this test case, it was after 1-2 minutes), why does Kafka's api create a new one when a new thread accesses the producer object? was (Author: vishal m): [~junrao] Sorry, I should have been more clear. [~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a sample test case which can replicate the problem (check lsof every 10 seconds to notice the increase in KQUEUEs and PIPESs though the producer object is being reused): {code} public class Test { private static Queue> producerPool = new ConcurrentLinkedQueue>(); private static ProducerConfig config; static { Properties props = new Properties(); props.put("metadata.broker.list", "IP:Port"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "1"); config = new ProducerConfig(props); } public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, TimeUnit.SECONDS, new LinkedBlockingQueue()); tpe.allowCoreThreadTimeOut(true); Thread run = new Thread(new Runnable() { @Override public void run() { Producer producer = producerPool.poll(); if(producer == null) { producer = new Producer(config); } KeyedMessage data = new KeyedMessage("SaSMQ", "0", "test"); producer.send(data); producerPool.add(producer); } }); while(true) //To make sure that the main program does not terminate { for(int i = 0;i<100; i++) { tpe.submit(run); } Thread.sleep(1); //10 seconds So that the keep alive time set on the thread pool will be exceeded and the threads in the threadpool will be clear
[jira] [Comment Edited] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.
[ https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14199817#comment-14199817 ] Vishal edited comment on KAFKA-1745 at 11/6/14 12:00 PM: - [~junrao] Sorry, I should have been more clear. [~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a sample test case which can replicate the problem (check lsof every 10 seconds to notice the increase in KQUEUEs and PIPESs though the producer object is being reused): {code} public class Test { private static Queue> producerPool = new ConcurrentLinkedQueue>(); private static ProducerConfig config; static { Properties props = new Properties(); props.put("metadata.broker.list", "IP:Port"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "1"); config = new ProducerConfig(props); } public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, TimeUnit.SECONDS, new LinkedBlockingQueue()); tpe.allowCoreThreadTimeOut(true); Thread run = new Thread(new Runnable() { @Override public void run() { Producer producer = producerPool.poll(); if(producer == null) { producer = new Producer(config); } KeyedMessage data = new KeyedMessage("SaSMQ", "0", "test"); producer.send(data); producerPool.add(producer); } }); while(true) //To make sure that the main program does not terminate { for(int i = 0;i<100; i++) { tpe.submit(run); } Thread.sleep(1); //10 seconds So that keepalive time is exceeded by the thread pool and threads are cleared. } } } {code} Though KQUEUE and PIPE does get cleared after some time (in this test case, it was after 1-2 minutes), why does Kafka's api create a new one when a new thread accesses the producer object? was (Author: vishal m): [~junrao] Sorry, I should have been more clear. [~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a sample test case which can replicate the problem (check lsof every 10 seconds to notice the increase in KQUEUEs and PIPESs though the producer object is being reused): {code} public class Test { private static Queue> producerPool = new ConcurrentLinkedQueue>(); private static ProducerConfig config; static { Properties props = new Properties(); props.put("metadata.broker.list", "IP:Port"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "1"); config = new ProducerConfig(props); } public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, TimeUnit.SECONDS, new LinkedBlockingQueue()); tpe.allowCoreThreadTimeOut(true); Thread run = new Thread(new Runnable() { @Override public void run() { Producer producer = producerPool.poll(); if(producer == null) { producer = new Producer(config); } KeyedMessage data = new KeyedMessage("SaSMQ", "0", "test"); producer.send(data); producerPool.add(producer); } }); while(true) //To make sure that the main program does not terminate { for(int i = 0;i<100; i++) { tpe.submit(run); } Thread.sleep(1); //10 seconds So that keepalive time is exceeded by the thread pool and threads are cleared. } } } {code} Though KQUEUE and PIPE does get cleared after some time (i
[jira] [Comment Edited] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.
[ https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14199817#comment-14199817 ] Vishal edited comment on KAFKA-1745 at 11/6/14 12:00 PM: - [~junrao] Sorry, I should have been more clear. [~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a sample test case which can replicate the problem (check lsof every 10 seconds to notice the increase in KQUEUEs and PIPESs though the producer object is being reused): {code} public class Test { private static Queue> producerPool = new ConcurrentLinkedQueue>(); private static ProducerConfig config; static { Properties props = new Properties(); props.put("metadata.broker.list", "IP:Port"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "1"); config = new ProducerConfig(props); } public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, TimeUnit.SECONDS, new LinkedBlockingQueue()); tpe.allowCoreThreadTimeOut(true); Thread run = new Thread(new Runnable() { @Override public void run() { Producer producer = producerPool.poll(); if(producer == null) { producer = new Producer(config); } KeyedMessage data = new KeyedMessage("SaSMQ", "0", "test"); producer.send(data); producerPool.add(producer); } }); while(true) //To make sure that the main program does not terminate { for(int i = 0;i<100; i++) { tpe.submit(run); } Thread.sleep(1); //10 seconds So that keepalive time is exceeded by the thread pool and threads are cleared. } } } {code} Though KQUEUE and PIPE does get cleared after some time (in this test case, it was after 1-2 minutes), why does kafka's api create a new one when a new thread accesses the producer object? was (Author: vishal m): [~junrao] Sorry, I should have been more clear. [~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a sample test case which can replicate the problem (check lsof every 10 seconds to notice the increase in KQUEUEs and PIPESs though the producer object is being reused): {code} public class Test { private static Queue> producerPool = new ConcurrentLinkedQueue>(); private static ProducerConfig config; static { Properties props = new Properties(); props.put("metadata.broker.list", "IP:Port"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "1"); config = new ProducerConfig(props); } public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, TimeUnit.SECONDS, new LinkedBlockingQueue()); tpe.allowCoreThreadTimeOut(true); Thread run = new Thread(new Runnable() { @Override public void run() { Producer producer = producerPool.poll(); if(producer == null) { producer = new Producer(config); } KeyedMessage data = new KeyedMessage("SaSMQ", "0", "test"); producer.send(data); producerPool.add(producer); } }); while(true) //To make sure that the main program does not terminate { for(int i = 0;i<100; i++) { tpe.submit(run); } Thread.sleep(1); //10 seconds So that keepalive time is exceeded by the thread pool and threads are cleared. } } } {code} Though KQUEUE and PIPE does get cleared after some time (i
[jira] [Comment Edited] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.
[ https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14199817#comment-14199817 ] Vishal edited comment on KAFKA-1745 at 11/6/14 8:07 AM: [~junrao] Sorry, I should have been more clear. [~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a sample test case which can replicate the problem (check lsof every 10 seconds to notice the increase in KQUEUEs and PIPESs though the producer object is being reused): {code} public class Test { private static Queue> producerPool = new ConcurrentLinkedQueue>(); private static ProducerConfig config; static { Properties props = new Properties(); props.put("metadata.broker.list", "IP:Port"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "1"); config = new ProducerConfig(props); } public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, TimeUnit.SECONDS, new LinkedBlockingQueue()); tpe.allowCoreThreadTimeOut(true); Thread run = new Thread(new Runnable() { @Override public void run() { Producer producer = producerPool.poll(); if(producer == null) { producer = new Producer(config); } KeyedMessage data = new KeyedMessage("SaSMQ", "0", "test"); producer.send(data); producerPool.add(producer); } }); while(true) //To make sure that the main program does not terminate { for(int i = 0;i<100; i++) { tpe.submit(run); } Thread.sleep(1); //10 seconds So that keepalive time is exceeded by the thread pool and threads are cleared. } } } {code} Though KQUEUE and PIPE does get cleared after some time (in this test case, it was after 1-2 minutes), why does it have to create a new one when a new thread accesses the producer object? was (Author: vishal m): [~junrao] Sorry, I should have been more clear. [~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a sample test case which can replicate the problem (check lsof every 10 seconds to notice the increase in KQUEUEs and PIPESs though the producer object is being reused): {code} public class Test { private static Queue> producerPool = new ConcurrentLinkedQueue>(); private static ProducerConfig config; static { Properties props = new Properties(); props.put("metadata.broker.list", "IP:Port"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "1"); config = new ProducerConfig(props); } public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, TimeUnit.SECONDS, new LinkedBlockingQueue()); tpe.allowCoreThreadTimeOut(true); Thread run = new Thread(new Runnable() { @Override public void run() { Producer producer = producerPool.poll(); if(producer == null) { producer = new Producer(config); } KeyedMessage data = new KeyedMessage("SaSMQ", "0", "test"); producer.send(data); producerPool.add(producer); } }); while(true) //To make sure that the main program does not terminate { for(int i = 0;i<100; i++) { tpe.submit(run); } Thread.sleep(1); //10 seconds So that keepalive time is exceeded by the thread pool and threads are cleared. } } } {code} Though KQUEUE and PIPE does get cleared after some time, why
[jira] [Comment Edited] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.
[ https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14199817#comment-14199817 ] Vishal edited comment on KAFKA-1745 at 11/6/14 5:25 AM: [~junrao] Sorry, I should have been more clear. [~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a sample test case which can replicate the problem (check lsof every 10 seconds to notice the increase in KQUEUEs and PIPESs though the producer object is being reused): {code} public class Test { private static Queue> producerPool = new ConcurrentLinkedQueue>(); private static ProducerConfig config; static { Properties props = new Properties(); props.put("metadata.broker.list", "IP:Port"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "1"); config = new ProducerConfig(props); } public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, TimeUnit.SECONDS, new LinkedBlockingQueue()); tpe.allowCoreThreadTimeOut(true); Thread run = new Thread(new Runnable() { @Override public void run() { Producer producer = producerPool.poll(); if(producer == null) { producer = new Producer(config); } KeyedMessage data = new KeyedMessage("SaSMQ", "0", "test"); producer.send(data); producerPool.add(producer); } }); while(true) //To make sure that the main program does not terminate { for(int i = 0;i<100; i++) { tpe.submit(run); } Thread.sleep(1); //10 seconds So that keepalive time is exceeded by the thread pool and threads are cleared. } } } {code} Though KQUEUE and PIPE does get cleared after some time, why does it have to create a new one when a new thread accesses the producer object? was (Author: vishal m): [~junrao] Sorry, I should have been more clear. [~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a sample test case which can replicate the problem (check lsof every 10 seconds to notice the increase in KQUEUEs and PIPESs though the producer object is being reused): {code} public class Test { private static Queue> producerPool = new ConcurrentLinkedQueue>(); private static ProducerConfig config; static { Properties props = new Properties(); props.put("metadata.broker.list", "IP:Port"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "1"); config = new ProducerConfig(props); } public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, TimeUnit.SECONDS, new LinkedBlockingQueue()); tpe.allowCoreThreadTimeOut(true); Thread run = new Thread(new Runnable() { @Override public void run() { Producer producer = producerPool.poll(); if(producer == null) { producer = new Producer(config); } KeyedMessage data = new KeyedMessage("SaSMQ", "0", "test"); producer.send(data); producerPool.add(producer); } }); while(true) //To make sure that the main program does not terminate { for(int i = 0;i<100; i++) { tpe.submit(run); } Thread.sleep(1); //10 seconds So that keepalive time is exceeded by the thread pool and threads are cleared. } } } {code} > Each new thread creates a PIPE and KQUEUE as open files during > producer.send() and does not get cleared
[jira] [Commented] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.
[ https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14199817#comment-14199817 ] Vishal commented on KAFKA-1745: --- [~junrao] Sorry, I should have been more clear. [~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a sample test case which can replicate the problem (check lsof every 10 seconds to notice the increase in KQUEUEs and PIPESs though the producer object is being reused): {code} public class Test { private static Queue> producerPool = new ConcurrentLinkedQueue>(); private static ProducerConfig config; static { Properties props = new Properties(); props.put("metadata.broker.list", "IP:Port"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "1"); config = new ProducerConfig(props); } public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, TimeUnit.SECONDS, new LinkedBlockingQueue()); tpe.allowCoreThreadTimeOut(true); Thread run = new Thread(new Runnable() { @Override public void run() { Producer producer = producerPool.poll(); if(producer == null) { producer = new Producer(config); } KeyedMessage data = new KeyedMessage("SaSMQ", "0", "test"); producer.send(data); producerPool.add(producer); } }); while(true) //To make sure that the main program does not terminate { for(int i = 0;i<100; i++) { tpe.submit(run); } Thread.sleep(1); //10 seconds So that keepalive time is exceeded by the thread pool and threads are cleared. } } } {code} > Each new thread creates a PIPE and KQUEUE as open files during > producer.send() and does not get cleared when the thread that creates them is > cleared. > - > > Key: KAFKA-1745 > URL: https://issues.apache.org/jira/browse/KAFKA-1745 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.1.1 > Environment: Mac OS Mavericks >Reporter: Vishal >Priority: Critical > > Hi, > I'm using the java client API for Kafka. I wanted to send data to Kafka > by using a producer pool as I'm using a sync producer. The thread that sends > the data is from the thread pool that grows and shrinks depending on the > usage. So, when I try to send data from one thread, 1 KQUEUE and 2 PIPES are > created (got this info by using lsof). If I keep using the same thread it's > fine but when a new thread sends data to Kafka (using producer.send() ) a new > KQUEUE and 2 PIPEs are created. > This is okay, but when the thread is cleared from the thread pool and a new > thread is created, then new KQUEUEs and PIPEs are created. The problem is > that the old ones which were created are not getting destroyed and they are > showing up as open files. This is causing a major problem as the number of > open file keep increasing and does not decrease. > Please suggest any solutions. > FYI, the number of TCP connections established from the producer system to > the Kafka Broker remain constant throughout. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.
[ https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vishal updated KAFKA-1745: -- Description: Hi, I'm using the java client API for Kafka. I wanted to send data to Kafka by using a producer pool as I'm using a sync producer. The thread that sends the data is from the thread pool that grows and shrinks depending on the usage. So, when I try to send data from one thread, 1 KQUEUE and 2 PIPES are created (got this info by using lsof). If I keep using the same thread it's fine but when a new thread sends data to Kafka (using producer.send() ) a new KQUEUE and 2 PIPEs are created. This is okay, but when the thread is cleared from the thread pool and a new thread is created, then new KQUEUEs and PIPEs are created. The problem is that the old ones which were created are not getting destroyed and they are showing up as open files. This is causing a major problem as the number of open file keep increasing and does not decrease. Please suggest any solutions. FYI, the number of TCP connections established from the producer system to the Kafka Broker remain constant throughout. was: Hi, I'm using the java client API for Kafka. I wanted to send data to Kafka by using a producer pool as I'm using a sync producer. The thread that sends the data is from the thread pool that grows and shrinks depending on the usage. So, when I try to send data from one thread, 1 KQUEUE and 2 PIPES are created (got this info by using lsof). If I keep using the same thread it's fine but when a new thread sends data to Kafka (using producer.send() ) a new KQUEUE and 2 PIPEs are created. This is okay, but when the thread is cleared from the thread pool and a new thread is created, then new KQUEUEs and PIPEs are created. The problem is that the old ones which were created are not getting destroyed and they are showing up as open files. This is causing a major problem as the number of open file keep increasing and does not decrease. Please suggest any solutions. FYI, the number of TCP connections established remain constant throughout. > Each new thread creates a PIPE and KQUEUE as open files during > producer.send() and does not get cleared when the thread that creates them is > cleared. > - > > Key: KAFKA-1745 > URL: https://issues.apache.org/jira/browse/KAFKA-1745 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.1.1 > Environment: Mac OS Mavericks >Reporter: Vishal >Priority: Critical > > Hi, > I'm using the java client API for Kafka. I wanted to send data to Kafka > by using a producer pool as I'm using a sync producer. The thread that sends > the data is from the thread pool that grows and shrinks depending on the > usage. So, when I try to send data from one thread, 1 KQUEUE and 2 PIPES are > created (got this info by using lsof). If I keep using the same thread it's > fine but when a new thread sends data to Kafka (using producer.send() ) a new > KQUEUE and 2 PIPEs are created. > This is okay, but when the thread is cleared from the thread pool and a new > thread is created, then new KQUEUEs and PIPEs are created. The problem is > that the old ones which were created are not getting destroyed and they are > showing up as open files. This is causing a major problem as the number of > open file keep increasing and does not decrease. > Please suggest any solutions. > FYI, the number of TCP connections established from the producer system to > the Kafka Broker remain constant throughout. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.
[ https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vishal updated KAFKA-1745: -- Description: Hi, I'm using the java client API for Kafka. I wanted to send data to Kafka by using a producer pool as I'm using a sync producer. The thread that sends the data is from the thread pool that grows and shrinks depending on the usage. So, when I try to send data from one thread, 1 KQUEUE and 2 PIPES are created (got this info by using lsof). If I keep using the same thread it's fine but when a new thread sends data to Kafka (using producer.send() ) a new KQUEUE and 2 PIPEs are created. This is okay, but when the thread is cleared from the thread pool and a new thread is created, then new KQUEUEs and PIPEs are created. The problem is that the old ones which were created are not getting destroyed and they are showing up as open files. This is causing a major problem as the number of open file keep increasing and does not decrease. Please suggest any solutions. FYI, the number of TCP connections established remain constant throughout. was: Hi, I'm using the java client API for Kafka. I wanted to send data to Kafka by using a producer pool as I'm using a sync producer. The thread that sends the data is from the thread pool that grows and shrinks depending on the usage. So, when I try to send data from one thread, 1 KQUEUE and 2 PIPES are created (got this info by using lsof). If I keep using the same thread it's fine but when a new thread sends data to Kafka (using producer.send() ) a new KQUEUE and 2 PIPEs are created. This is okay, but when the thread is cleared from the thread pool and a new thread is created, then new KQUEUEs and PIPEs are created. The problem is that the old ones which were created are not getting destroyed and they are showing up as open files. This is causing a major problem as the number of open file keep increasing and does not decrease. Please suggest any solutions. > Each new thread creates a PIPE and KQUEUE as open files during > producer.send() and does not get cleared when the thread that creates them is > cleared. > - > > Key: KAFKA-1745 > URL: https://issues.apache.org/jira/browse/KAFKA-1745 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.1.1 > Environment: Mac OS Mavericks >Reporter: Vishal >Priority: Critical > > Hi, > I'm using the java client API for Kafka. I wanted to send data to Kafka > by using a producer pool as I'm using a sync producer. The thread that sends > the data is from the thread pool that grows and shrinks depending on the > usage. So, when I try to send data from one thread, 1 KQUEUE and 2 PIPES are > created (got this info by using lsof). If I keep using the same thread it's > fine but when a new thread sends data to Kafka (using producer.send() ) a new > KQUEUE and 2 PIPEs are created. > This is okay, but when the thread is cleared from the thread pool and a new > thread is created, then new KQUEUEs and PIPEs are created. The problem is > that the old ones which were created are not getting destroyed and they are > showing up as open files. This is causing a major problem as the number of > open file keep increasing and does not decrease. > Please suggest any solutions. > FYI, the number of TCP connections established remain constant throughout. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.
[ https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14197908#comment-14197908 ] Vishal edited comment on KAFKA-1745 at 11/5/14 8:56 AM: No, since I figured that calling producer.close() and returning that producer object to the pool would make that producer object unusable afterwards. was (Author: vishal m): No, since I figured that calling producer.close() and returning that producer object to the pool would make that producer object unusable afterwards. > Each new thread creates a PIPE and KQUEUE as open files during > producer.send() and does not get cleared when the thread that creates them is > cleared. > - > > Key: KAFKA-1745 > URL: https://issues.apache.org/jira/browse/KAFKA-1745 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.1.1 > Environment: Mac OS Mavericks >Reporter: Vishal >Priority: Critical > > Hi, > I'm using the java client API for Kafka. I wanted to send data to Kafka > by using a producer pool as I'm using a sync producer. The thread that sends > the data is from the thread pool that grows and shrinks depending on the > usage. So, when I try to send data from one thread, 1 KQUEUE and 2 PIPES are > created (got this info by using lsof). If I keep using the same thread it's > fine but when a new thread sends data to Kafka (using producer.send() ) a new > KQUEUE and 2 PIPEs are created. > This is okay, but when the thread is cleared from the thread pool and a new > thread is created, then new KQUEUEs and PIPEs are created. The problem is > that the old ones which were created are not getting destroyed and they are > showing up as open files. This is causing a major problem as the number of > open file keep increasing and does not decrease. > Please suggest any solutions. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.
[ https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14197908#comment-14197908 ] Vishal commented on KAFKA-1745: --- No, since I figured that calling producer.close() and returning that producer object to the pool would make that producer object unusable afterwards. > Each new thread creates a PIPE and KQUEUE as open files during > producer.send() and does not get cleared when the thread that creates them is > cleared. > - > > Key: KAFKA-1745 > URL: https://issues.apache.org/jira/browse/KAFKA-1745 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.1.1 > Environment: Mac OS Mavericks >Reporter: Vishal >Priority: Critical > > Hi, > I'm using the java client API for Kafka. I wanted to send data to Kafka > by using a producer pool as I'm using a sync producer. The thread that sends > the data is from the thread pool that grows and shrinks depending on the > usage. So, when I try to send data from one thread, 1 KQUEUE and 2 PIPES are > created (got this info by using lsof). If I keep using the same thread it's > fine but when a new thread sends data to Kafka (using producer.send() ) a new > KQUEUE and 2 PIPEs are created. > This is okay, but when the thread is cleared from the thread pool and a new > thread is created, then new KQUEUEs and PIPEs are created. The problem is > that the old ones which were created are not getting destroyed and they are > showing up as open files. This is causing a major problem as the number of > open file keep increasing and does not decrease. > Please suggest any solutions. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.
[ https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vishal updated KAFKA-1745: -- Summary: Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared. (was: Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does no0t get cleared when the thread that creates them is cleared.) > Each new thread creates a PIPE and KQUEUE as open files during > producer.send() and does not get cleared when the thread that creates them is > cleared. > - > > Key: KAFKA-1745 > URL: https://issues.apache.org/jira/browse/KAFKA-1745 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.1.1 > Environment: Mac OS Mavericks >Reporter: Vishal >Priority: Critical > > Hi, > I'm using the java client API for Kafka. I wanted to send data to Kafka > by using a producer pool as I'm using a sync producer. The thread that sends > the data is from the thread pool that grows and shrinks depending on the > usage. So, when I try to send data from one thread, 1 KQUEUE and 2 PIPES are > created (got this info by using lsof). If I keep using the same thread it's > fine but when a new thread sends data to Kafka (using producer.send() ) a new > KQUEUE and 2 PIPEs are created. > This is okay, but when the thread is cleared from the thread pool and a new > thread is created, then new KQUEUEs and PIPEs are created. The problem is > that the old ones which were created are not getting destroyed and they are > showing up as open files. This is causing a major problem as the number of > open file keep increasing and does not decrease. > Please suggest any solutions. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does no0t get cleared when the thread that creates them is cleared.
Vishal created KAFKA-1745: - Summary: Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does no0t get cleared when the thread that creates them is cleared. Key: KAFKA-1745 URL: https://issues.apache.org/jira/browse/KAFKA-1745 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Environment: Mac OS Mavericks Reporter: Vishal Priority: Critical Hi, I'm using the java client API for Kafka. I wanted to send data to Kafka by using a producer pool as I'm using a sync producer. The thread that sends the data is from the thread pool that grows and shrinks depending on the usage. So, when I try to send data from one thread, 1 KQUEUE and 2 PIPES are created (got this info by using lsof). If I keep using the same thread it's fine but when a new thread sends data to Kafka (using producer.send() ) a new KQUEUE and 2 PIPEs are created. This is okay, but when the thread is cleared from the thread pool and a new thread is created, then new KQUEUEs and PIPEs are created. The problem is that the old ones which were created are not getting destroyed and they are showing up as open files. This is causing a major problem as the number of open file keep increasing and does not decrease. Please suggest any solutions. -- This message was sent by Atlassian JIRA (v6.3.4#6332)