[jira] [Commented] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.

2014-11-13 Thread Vishal (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14209628#comment-14209628
 ] 

Vishal commented on KAFKA-1745:
---

Any solutions?

> Each new thread creates a PIPE and KQUEUE as open files during 
> producer.send() and does not get cleared when the thread that creates them is 
> cleared.
> -
>
> Key: KAFKA-1745
> URL: https://issues.apache.org/jira/browse/KAFKA-1745
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.1.1
> Environment: Mac OS Mavericks
>Reporter: Vishal
>Priority: Critical
>
> Hi,
> I'm using the java client API for Kafka. I wanted to send data to Kafka 
> by using a producer pool as I'm using a sync producer. The thread that sends 
> the data is from the thread pool that grows and shrinks depending on the 
> usage. So, when I try to send data from one thread, 1 KQUEUE and 2 PIPES are 
> created (got this info by using lsof). If I keep using the same thread it's 
> fine but when a new thread sends data to Kafka (using producer.send() ) a new 
> KQUEUE and 2 PIPEs are created.
> This is okay, but when the thread is cleared from the thread pool and a new 
> thread is created, then new KQUEUEs and PIPEs are created. The problem is 
> that the old ones which were created are not getting destroyed and they are 
> showing up as open files. This is causing a major problem as the number of 
> open file keep increasing and does not decrease.
> Please suggest any solutions.
> FYI, the number of TCP connections established from the producer system to 
> the Kafka Broker remain constant throughout.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.

2014-11-06 Thread Vishal (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14199817#comment-14199817
 ] 

Vishal edited comment on KAFKA-1745 at 11/6/14 12:03 PM:
-

[~junrao] Sorry, I should have been more clear.
[~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a 
sample test case which can replicate the problem (check lsof every 10 seconds 
to notice the increase in KQUEUEs and PIPESs though the producer object is 
being reused):

{code}
public class Test {

private static Queue> producerPool = new 
ConcurrentLinkedQueue>();

private static ProducerConfig config;

static
{
Properties props = new Properties();
props.put("metadata.broker.list", "IP:Port");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
config = new ProducerConfig(props);
}

public static void main(String[] args) throws InterruptedException {

ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, 
TimeUnit.SECONDS, new LinkedBlockingQueue());
tpe.allowCoreThreadTimeOut(true);
Thread run = new Thread(new Runnable() {

@Override
public void run() {
Producer producer = 
producerPool.poll();
if(producer == null)
{
producer = new Producer(config);
}
KeyedMessage data = new 
KeyedMessage("SaSMQ", "0", "test");
producer.send(data);
producerPool.add(producer);
}
});

while(true) //To make sure that the main program does not 
terminate
{
for(int i = 0;i<100; i++)
{
tpe.submit(run);
}
Thread.sleep(1); //10 seconds So that the keep 
alive time set on the thread pool will be exceeded and the threads in the 
threadpool will be cleared. 
}
}

}
{code}


Though KQUEUE and PIPE does get cleared after some time (in this test case, it 
was after 1-2 minutes), why does Kafka's api create a new one when a new thread 
accesses the producer object? 


was (Author: vishal m):
[~junrao] Sorry, I should have been more clear.
[~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a 
sample test case which can replicate the problem (check lsof every 10 seconds 
to notice the increase in KQUEUEs and PIPESs though the producer object is 
being reused):

{code}
public class Test {

private static Queue> producerPool = new 
ConcurrentLinkedQueue>();

private static ProducerConfig config;

static
{
Properties props = new Properties();
props.put("metadata.broker.list", "IP:Port");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
config = new ProducerConfig(props);
}

public static void main(String[] args) throws InterruptedException {

ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, 
TimeUnit.SECONDS, new LinkedBlockingQueue());
tpe.allowCoreThreadTimeOut(true);
Thread run = new Thread(new Runnable() {

@Override
public void run() {
Producer producer = 
producerPool.poll();
if(producer == null)
{
producer = new Producer(config);
}
KeyedMessage data = new 
KeyedMessage("SaSMQ", "0", "test");
producer.send(data);
producerPool.add(producer);
}
});

while(true) //To make sure that the main program does not 
terminate
{
for(int i = 0;i<100; i++)
{
tpe.submit(run);
}
Thread.sleep(1); //10 seconds So that keepalive 
time is exceeded by the thread pool and threads are cleared. 
}
}

}
{code}


Though KQUEUE and

[jira] [Comment Edited] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.

2014-11-06 Thread Vishal (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14199817#comment-14199817
 ] 

Vishal edited comment on KAFKA-1745 at 11/6/14 12:04 PM:
-

[~junrao] Sorry, I should have been more clear.
[~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a 
sample test case which can replicate the problem (check lsof every 10 seconds 
to notice the increase in KQUEUEs and PIPESs though the producer object is 
being reused):

{code}
public class Test {

private static Queue> producerPool = new 
ConcurrentLinkedQueue>();

private static ProducerConfig config;

static
{
Properties props = new Properties();
props.put("metadata.broker.list", "IP:Port");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
config = new ProducerConfig(props);
}

public static void main(String[] args) throws InterruptedException {

ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, 
TimeUnit.SECONDS, new LinkedBlockingQueue());
tpe.allowCoreThreadTimeOut(true);
Thread run = new Thread(new Runnable() {

@Override
public void run() {
Producer producer = 
producerPool.poll();
if(producer == null)
{
producer = new Producer(config);
}
KeyedMessage data = new 
KeyedMessage("SaSMQ", "0", "test");
producer.send(data);
producerPool.add(producer);
}
});

while(true) //To make sure that the main program does not 
terminate
{
for(int i = 0;i<100; i++)
{
tpe.submit(run);
}
Thread.sleep(1); //10 seconds So that the keep 
alive time set on the thread pool will be exceeded and the threads in the 
threadpool will be cleared after each iteration. 
}
}

}
{code}


Though KQUEUE and PIPE does get cleared after some time (in this test case, it 
was after 1-2 minutes), why does Kafka's api create a new one when a new thread 
accesses the producer object? 


was (Author: vishal m):
[~junrao] Sorry, I should have been more clear.
[~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a 
sample test case which can replicate the problem (check lsof every 10 seconds 
to notice the increase in KQUEUEs and PIPESs though the producer object is 
being reused):

{code}
public class Test {

private static Queue> producerPool = new 
ConcurrentLinkedQueue>();

private static ProducerConfig config;

static
{
Properties props = new Properties();
props.put("metadata.broker.list", "IP:Port");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
config = new ProducerConfig(props);
}

public static void main(String[] args) throws InterruptedException {

ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, 
TimeUnit.SECONDS, new LinkedBlockingQueue());
tpe.allowCoreThreadTimeOut(true);
Thread run = new Thread(new Runnable() {

@Override
public void run() {
Producer producer = 
producerPool.poll();
if(producer == null)
{
producer = new Producer(config);
}
KeyedMessage data = new 
KeyedMessage("SaSMQ", "0", "test");
producer.send(data);
producerPool.add(producer);
}
});

while(true) //To make sure that the main program does not 
terminate
{
for(int i = 0;i<100; i++)
{
tpe.submit(run);
}
Thread.sleep(1); //10 seconds So that the keep 
alive time set on the thread pool will be exceeded and the threads in the 
threadpool will be clear

[jira] [Comment Edited] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.

2014-11-06 Thread Vishal (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14199817#comment-14199817
 ] 

Vishal edited comment on KAFKA-1745 at 11/6/14 12:00 PM:
-

[~junrao] Sorry, I should have been more clear.
[~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a 
sample test case which can replicate the problem (check lsof every 10 seconds 
to notice the increase in KQUEUEs and PIPESs though the producer object is 
being reused):

{code}
public class Test {

private static Queue> producerPool = new 
ConcurrentLinkedQueue>();

private static ProducerConfig config;

static
{
Properties props = new Properties();
props.put("metadata.broker.list", "IP:Port");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
config = new ProducerConfig(props);
}

public static void main(String[] args) throws InterruptedException {

ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, 
TimeUnit.SECONDS, new LinkedBlockingQueue());
tpe.allowCoreThreadTimeOut(true);
Thread run = new Thread(new Runnable() {

@Override
public void run() {
Producer producer = 
producerPool.poll();
if(producer == null)
{
producer = new Producer(config);
}
KeyedMessage data = new 
KeyedMessage("SaSMQ", "0", "test");
producer.send(data);
producerPool.add(producer);
}
});

while(true) //To make sure that the main program does not 
terminate
{
for(int i = 0;i<100; i++)
{
tpe.submit(run);
}
Thread.sleep(1); //10 seconds So that keepalive 
time is exceeded by the thread pool and threads are cleared. 
}
}

}
{code}


Though KQUEUE and PIPE does get cleared after some time (in this test case, it 
was after 1-2 minutes), why does Kafka's api create a new one when a new thread 
accesses the producer object? 


was (Author: vishal m):
[~junrao] Sorry, I should have been more clear.
[~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a 
sample test case which can replicate the problem (check lsof every 10 seconds 
to notice the increase in KQUEUEs and PIPESs though the producer object is 
being reused):

{code}
public class Test {

private static Queue> producerPool = new 
ConcurrentLinkedQueue>();

private static ProducerConfig config;

static
{
Properties props = new Properties();
props.put("metadata.broker.list", "IP:Port");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
config = new ProducerConfig(props);
}

public static void main(String[] args) throws InterruptedException {

ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, 
TimeUnit.SECONDS, new LinkedBlockingQueue());
tpe.allowCoreThreadTimeOut(true);
Thread run = new Thread(new Runnable() {

@Override
public void run() {
Producer producer = 
producerPool.poll();
if(producer == null)
{
producer = new Producer(config);
}
KeyedMessage data = new 
KeyedMessage("SaSMQ", "0", "test");
producer.send(data);
producerPool.add(producer);
}
});

while(true) //To make sure that the main program does not 
terminate
{
for(int i = 0;i<100; i++)
{
tpe.submit(run);
}
Thread.sleep(1); //10 seconds So that keepalive 
time is exceeded by the thread pool and threads are cleared. 
}
}

}
{code}


Though KQUEUE and PIPE does get cleared after some time (i

[jira] [Comment Edited] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.

2014-11-06 Thread Vishal (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14199817#comment-14199817
 ] 

Vishal edited comment on KAFKA-1745 at 11/6/14 12:00 PM:
-

[~junrao] Sorry, I should have been more clear.
[~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a 
sample test case which can replicate the problem (check lsof every 10 seconds 
to notice the increase in KQUEUEs and PIPESs though the producer object is 
being reused):

{code}
public class Test {

private static Queue> producerPool = new 
ConcurrentLinkedQueue>();

private static ProducerConfig config;

static
{
Properties props = new Properties();
props.put("metadata.broker.list", "IP:Port");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
config = new ProducerConfig(props);
}

public static void main(String[] args) throws InterruptedException {

ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, 
TimeUnit.SECONDS, new LinkedBlockingQueue());
tpe.allowCoreThreadTimeOut(true);
Thread run = new Thread(new Runnable() {

@Override
public void run() {
Producer producer = 
producerPool.poll();
if(producer == null)
{
producer = new Producer(config);
}
KeyedMessage data = new 
KeyedMessage("SaSMQ", "0", "test");
producer.send(data);
producerPool.add(producer);
}
});

while(true) //To make sure that the main program does not 
terminate
{
for(int i = 0;i<100; i++)
{
tpe.submit(run);
}
Thread.sleep(1); //10 seconds So that keepalive 
time is exceeded by the thread pool and threads are cleared. 
}
}

}
{code}


Though KQUEUE and PIPE does get cleared after some time (in this test case, it 
was after 1-2 minutes), why does kafka's api create a new one when a new thread 
accesses the producer object? 


was (Author: vishal m):
[~junrao] Sorry, I should have been more clear.
[~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a 
sample test case which can replicate the problem (check lsof every 10 seconds 
to notice the increase in KQUEUEs and PIPESs though the producer object is 
being reused):

{code}
public class Test {

private static Queue> producerPool = new 
ConcurrentLinkedQueue>();

private static ProducerConfig config;

static
{
Properties props = new Properties();
props.put("metadata.broker.list", "IP:Port");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
config = new ProducerConfig(props);
}

public static void main(String[] args) throws InterruptedException {

ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, 
TimeUnit.SECONDS, new LinkedBlockingQueue());
tpe.allowCoreThreadTimeOut(true);
Thread run = new Thread(new Runnable() {

@Override
public void run() {
Producer producer = 
producerPool.poll();
if(producer == null)
{
producer = new Producer(config);
}
KeyedMessage data = new 
KeyedMessage("SaSMQ", "0", "test");
producer.send(data);
producerPool.add(producer);
}
});

while(true) //To make sure that the main program does not 
terminate
{
for(int i = 0;i<100; i++)
{
tpe.submit(run);
}
Thread.sleep(1); //10 seconds So that keepalive 
time is exceeded by the thread pool and threads are cleared. 
}
}

}
{code}


Though KQUEUE and PIPE does get cleared after some time (i

[jira] [Comment Edited] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.

2014-11-06 Thread Vishal (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14199817#comment-14199817
 ] 

Vishal edited comment on KAFKA-1745 at 11/6/14 8:07 AM:


[~junrao] Sorry, I should have been more clear.
[~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a 
sample test case which can replicate the problem (check lsof every 10 seconds 
to notice the increase in KQUEUEs and PIPESs though the producer object is 
being reused):

{code}
public class Test {

private static Queue> producerPool = new 
ConcurrentLinkedQueue>();

private static ProducerConfig config;

static
{
Properties props = new Properties();
props.put("metadata.broker.list", "IP:Port");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
config = new ProducerConfig(props);
}

public static void main(String[] args) throws InterruptedException {

ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, 
TimeUnit.SECONDS, new LinkedBlockingQueue());
tpe.allowCoreThreadTimeOut(true);
Thread run = new Thread(new Runnable() {

@Override
public void run() {
Producer producer = 
producerPool.poll();
if(producer == null)
{
producer = new Producer(config);
}
KeyedMessage data = new 
KeyedMessage("SaSMQ", "0", "test");
producer.send(data);
producerPool.add(producer);
}
});

while(true) //To make sure that the main program does not 
terminate
{
for(int i = 0;i<100; i++)
{
tpe.submit(run);
}
Thread.sleep(1); //10 seconds So that keepalive 
time is exceeded by the thread pool and threads are cleared. 
}
}

}
{code}


Though KQUEUE and PIPE does get cleared after some time (in this test case, it 
was after 1-2 minutes), why does it have to create a new one when a new thread 
accesses the producer object? 


was (Author: vishal m):
[~junrao] Sorry, I should have been more clear.
[~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a 
sample test case which can replicate the problem (check lsof every 10 seconds 
to notice the increase in KQUEUEs and PIPESs though the producer object is 
being reused):

{code}
public class Test {

private static Queue> producerPool = new 
ConcurrentLinkedQueue>();

private static ProducerConfig config;

static
{
Properties props = new Properties();
props.put("metadata.broker.list", "IP:Port");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
config = new ProducerConfig(props);
}

public static void main(String[] args) throws InterruptedException {

ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, 
TimeUnit.SECONDS, new LinkedBlockingQueue());
tpe.allowCoreThreadTimeOut(true);
Thread run = new Thread(new Runnable() {

@Override
public void run() {
Producer producer = 
producerPool.poll();
if(producer == null)
{
producer = new Producer(config);
}
KeyedMessage data = new 
KeyedMessage("SaSMQ", "0", "test");
producer.send(data);
producerPool.add(producer);
}
});

while(true) //To make sure that the main program does not 
terminate
{
for(int i = 0;i<100; i++)
{
tpe.submit(run);
}
Thread.sleep(1); //10 seconds So that keepalive 
time is exceeded by the thread pool and threads are cleared. 
}
}

}
{code}


Though KQUEUE and PIPE does get cleared after some time, why 

[jira] [Comment Edited] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.

2014-11-05 Thread Vishal (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14199817#comment-14199817
 ] 

Vishal edited comment on KAFKA-1745 at 11/6/14 5:25 AM:


[~junrao] Sorry, I should have been more clear.
[~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a 
sample test case which can replicate the problem (check lsof every 10 seconds 
to notice the increase in KQUEUEs and PIPESs though the producer object is 
being reused):

{code}
public class Test {

private static Queue> producerPool = new 
ConcurrentLinkedQueue>();

private static ProducerConfig config;

static
{
Properties props = new Properties();
props.put("metadata.broker.list", "IP:Port");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
config = new ProducerConfig(props);
}

public static void main(String[] args) throws InterruptedException {

ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, 
TimeUnit.SECONDS, new LinkedBlockingQueue());
tpe.allowCoreThreadTimeOut(true);
Thread run = new Thread(new Runnable() {

@Override
public void run() {
Producer producer = 
producerPool.poll();
if(producer == null)
{
producer = new Producer(config);
}
KeyedMessage data = new 
KeyedMessage("SaSMQ", "0", "test");
producer.send(data);
producerPool.add(producer);
}
});

while(true) //To make sure that the main program does not 
terminate
{
for(int i = 0;i<100; i++)
{
tpe.submit(run);
}
Thread.sleep(1); //10 seconds So that keepalive 
time is exceeded by the thread pool and threads are cleared. 
}
}

}
{code}


Though KQUEUE and PIPE does get cleared after some time, why does it have to 
create a new one when a new thread accesses the producer object? 


was (Author: vishal m):
[~junrao] Sorry, I should have been more clear.
[~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a 
sample test case which can replicate the problem (check lsof every 10 seconds 
to notice the increase in KQUEUEs and PIPESs though the producer object is 
being reused):

{code}
public class Test {

private static Queue> producerPool = new 
ConcurrentLinkedQueue>();

private static ProducerConfig config;

static
{
Properties props = new Properties();
props.put("metadata.broker.list", "IP:Port");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
config = new ProducerConfig(props);
}

public static void main(String[] args) throws InterruptedException {

ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, 
TimeUnit.SECONDS, new LinkedBlockingQueue());
tpe.allowCoreThreadTimeOut(true);
Thread run = new Thread(new Runnable() {

@Override
public void run() {
Producer producer = 
producerPool.poll();
if(producer == null)
{
producer = new Producer(config);
}
KeyedMessage data = new 
KeyedMessage("SaSMQ", "0", "test");
producer.send(data);
producerPool.add(producer);
}
});

while(true) //To make sure that the main program does not 
terminate
{
for(int i = 0;i<100; i++)
{
tpe.submit(run);
}
Thread.sleep(1); //10 seconds So that keepalive 
time is exceeded by the thread pool and threads are cleared. 
}
}

}
{code}

> Each new thread creates a PIPE and KQUEUE as open files during 
> producer.send() and does not get cleared 

[jira] [Commented] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.

2014-11-05 Thread Vishal (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14199817#comment-14199817
 ] 

Vishal commented on KAFKA-1745:
---

[~junrao] Sorry, I should have been more clear.
[~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a 
sample test case which can replicate the problem (check lsof every 10 seconds 
to notice the increase in KQUEUEs and PIPESs though the producer object is 
being reused):

{code}
public class Test {

private static Queue> producerPool = new 
ConcurrentLinkedQueue>();

private static ProducerConfig config;

static
{
Properties props = new Properties();
props.put("metadata.broker.list", "IP:Port");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
config = new ProducerConfig(props);
}

public static void main(String[] args) throws InterruptedException {

ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, 
TimeUnit.SECONDS, new LinkedBlockingQueue());
tpe.allowCoreThreadTimeOut(true);
Thread run = new Thread(new Runnable() {

@Override
public void run() {
Producer producer = 
producerPool.poll();
if(producer == null)
{
producer = new Producer(config);
}
KeyedMessage data = new 
KeyedMessage("SaSMQ", "0", "test");
producer.send(data);
producerPool.add(producer);
}
});

while(true) //To make sure that the main program does not 
terminate
{
for(int i = 0;i<100; i++)
{
tpe.submit(run);
}
Thread.sleep(1); //10 seconds So that keepalive 
time is exceeded by the thread pool and threads are cleared. 
}
}

}
{code}

> Each new thread creates a PIPE and KQUEUE as open files during 
> producer.send() and does not get cleared when the thread that creates them is 
> cleared.
> -
>
> Key: KAFKA-1745
> URL: https://issues.apache.org/jira/browse/KAFKA-1745
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.1.1
> Environment: Mac OS Mavericks
>Reporter: Vishal
>Priority: Critical
>
> Hi,
> I'm using the java client API for Kafka. I wanted to send data to Kafka 
> by using a producer pool as I'm using a sync producer. The thread that sends 
> the data is from the thread pool that grows and shrinks depending on the 
> usage. So, when I try to send data from one thread, 1 KQUEUE and 2 PIPES are 
> created (got this info by using lsof). If I keep using the same thread it's 
> fine but when a new thread sends data to Kafka (using producer.send() ) a new 
> KQUEUE and 2 PIPEs are created.
> This is okay, but when the thread is cleared from the thread pool and a new 
> thread is created, then new KQUEUEs and PIPEs are created. The problem is 
> that the old ones which were created are not getting destroyed and they are 
> showing up as open files. This is causing a major problem as the number of 
> open file keep increasing and does not decrease.
> Please suggest any solutions.
> FYI, the number of TCP connections established from the producer system to 
> the Kafka Broker remain constant throughout.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.

2014-11-05 Thread Vishal (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vishal updated KAFKA-1745:
--
Description: 
Hi,
I'm using the java client API for Kafka. I wanted to send data to Kafka by 
using a producer pool as I'm using a sync producer. The thread that sends the 
data is from the thread pool that grows and shrinks depending on the usage. So, 
when I try to send data from one thread, 1 KQUEUE and 2 PIPES are created (got 
this info by using lsof). If I keep using the same thread it's fine but when a 
new thread sends data to Kafka (using producer.send() ) a new KQUEUE and 2 
PIPEs are created.

This is okay, but when the thread is cleared from the thread pool and a new 
thread is created, then new KQUEUEs and PIPEs are created. The problem is that 
the old ones which were created are not getting destroyed and they are showing 
up as open files. This is causing a major problem as the number of open file 
keep increasing and does not decrease.

Please suggest any solutions.

FYI, the number of TCP connections established from the producer system to the 
Kafka Broker remain constant throughout.

  was:
Hi,
I'm using the java client API for Kafka. I wanted to send data to Kafka by 
using a producer pool as I'm using a sync producer. The thread that sends the 
data is from the thread pool that grows and shrinks depending on the usage. So, 
when I try to send data from one thread, 1 KQUEUE and 2 PIPES are created (got 
this info by using lsof). If I keep using the same thread it's fine but when a 
new thread sends data to Kafka (using producer.send() ) a new KQUEUE and 2 
PIPEs are created.

This is okay, but when the thread is cleared from the thread pool and a new 
thread is created, then new KQUEUEs and PIPEs are created. The problem is that 
the old ones which were created are not getting destroyed and they are showing 
up as open files. This is causing a major problem as the number of open file 
keep increasing and does not decrease.

Please suggest any solutions.

FYI, the number of TCP connections established remain constant throughout.


> Each new thread creates a PIPE and KQUEUE as open files during 
> producer.send() and does not get cleared when the thread that creates them is 
> cleared.
> -
>
> Key: KAFKA-1745
> URL: https://issues.apache.org/jira/browse/KAFKA-1745
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.1.1
> Environment: Mac OS Mavericks
>Reporter: Vishal
>Priority: Critical
>
> Hi,
> I'm using the java client API for Kafka. I wanted to send data to Kafka 
> by using a producer pool as I'm using a sync producer. The thread that sends 
> the data is from the thread pool that grows and shrinks depending on the 
> usage. So, when I try to send data from one thread, 1 KQUEUE and 2 PIPES are 
> created (got this info by using lsof). If I keep using the same thread it's 
> fine but when a new thread sends data to Kafka (using producer.send() ) a new 
> KQUEUE and 2 PIPEs are created.
> This is okay, but when the thread is cleared from the thread pool and a new 
> thread is created, then new KQUEUEs and PIPEs are created. The problem is 
> that the old ones which were created are not getting destroyed and they are 
> showing up as open files. This is causing a major problem as the number of 
> open file keep increasing and does not decrease.
> Please suggest any solutions.
> FYI, the number of TCP connections established from the producer system to 
> the Kafka Broker remain constant throughout.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.

2014-11-05 Thread Vishal (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vishal updated KAFKA-1745:
--
Description: 
Hi,
I'm using the java client API for Kafka. I wanted to send data to Kafka by 
using a producer pool as I'm using a sync producer. The thread that sends the 
data is from the thread pool that grows and shrinks depending on the usage. So, 
when I try to send data from one thread, 1 KQUEUE and 2 PIPES are created (got 
this info by using lsof). If I keep using the same thread it's fine but when a 
new thread sends data to Kafka (using producer.send() ) a new KQUEUE and 2 
PIPEs are created.

This is okay, but when the thread is cleared from the thread pool and a new 
thread is created, then new KQUEUEs and PIPEs are created. The problem is that 
the old ones which were created are not getting destroyed and they are showing 
up as open files. This is causing a major problem as the number of open file 
keep increasing and does not decrease.

Please suggest any solutions.

FYI, the number of TCP connections established remain constant throughout.

  was:
Hi,
I'm using the java client API for Kafka. I wanted to send data to Kafka by 
using a producer pool as I'm using a sync producer. The thread that sends the 
data is from the thread pool that grows and shrinks depending on the usage. So, 
when I try to send data from one thread, 1 KQUEUE and 2 PIPES are created (got 
this info by using lsof). If I keep using the same thread it's fine but when a 
new thread sends data to Kafka (using producer.send() ) a new KQUEUE and 2 
PIPEs are created.

This is okay, but when the thread is cleared from the thread pool and a new 
thread is created, then new KQUEUEs and PIPEs are created. The problem is that 
the old ones which were created are not getting destroyed and they are showing 
up as open files. This is causing a major problem as the number of open file 
keep increasing and does not decrease.

Please suggest any solutions.


> Each new thread creates a PIPE and KQUEUE as open files during 
> producer.send() and does not get cleared when the thread that creates them is 
> cleared.
> -
>
> Key: KAFKA-1745
> URL: https://issues.apache.org/jira/browse/KAFKA-1745
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.1.1
> Environment: Mac OS Mavericks
>Reporter: Vishal
>Priority: Critical
>
> Hi,
> I'm using the java client API for Kafka. I wanted to send data to Kafka 
> by using a producer pool as I'm using a sync producer. The thread that sends 
> the data is from the thread pool that grows and shrinks depending on the 
> usage. So, when I try to send data from one thread, 1 KQUEUE and 2 PIPES are 
> created (got this info by using lsof). If I keep using the same thread it's 
> fine but when a new thread sends data to Kafka (using producer.send() ) a new 
> KQUEUE and 2 PIPEs are created.
> This is okay, but when the thread is cleared from the thread pool and a new 
> thread is created, then new KQUEUEs and PIPEs are created. The problem is 
> that the old ones which were created are not getting destroyed and they are 
> showing up as open files. This is causing a major problem as the number of 
> open file keep increasing and does not decrease.
> Please suggest any solutions.
> FYI, the number of TCP connections established remain constant throughout.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.

2014-11-05 Thread Vishal (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14197908#comment-14197908
 ] 

Vishal edited comment on KAFKA-1745 at 11/5/14 8:56 AM:


No, since I figured that calling producer.close() and returning that producer 
object to the pool would make that producer object unusable afterwards.


was (Author: vishal m):
No, since I figured that calling producer.close() and returning that producer 
object to the pool would make that producer object unusable afterwards.

> Each new thread creates a PIPE and KQUEUE as open files during 
> producer.send() and does not get cleared when the thread that creates them is 
> cleared.
> -
>
> Key: KAFKA-1745
> URL: https://issues.apache.org/jira/browse/KAFKA-1745
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.1.1
> Environment: Mac OS Mavericks
>Reporter: Vishal
>Priority: Critical
>
> Hi,
> I'm using the java client API for Kafka. I wanted to send data to Kafka 
> by using a producer pool as I'm using a sync producer. The thread that sends 
> the data is from the thread pool that grows and shrinks depending on the 
> usage. So, when I try to send data from one thread, 1 KQUEUE and 2 PIPES are 
> created (got this info by using lsof). If I keep using the same thread it's 
> fine but when a new thread sends data to Kafka (using producer.send() ) a new 
> KQUEUE and 2 PIPEs are created.
> This is okay, but when the thread is cleared from the thread pool and a new 
> thread is created, then new KQUEUEs and PIPEs are created. The problem is 
> that the old ones which were created are not getting destroyed and they are 
> showing up as open files. This is causing a major problem as the number of 
> open file keep increasing and does not decrease.
> Please suggest any solutions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.

2014-11-05 Thread Vishal (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14197908#comment-14197908
 ] 

Vishal commented on KAFKA-1745:
---

No, since I figured that calling producer.close() and returning that producer 
object to the pool would make that producer object unusable afterwards.

> Each new thread creates a PIPE and KQUEUE as open files during 
> producer.send() and does not get cleared when the thread that creates them is 
> cleared.
> -
>
> Key: KAFKA-1745
> URL: https://issues.apache.org/jira/browse/KAFKA-1745
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.1.1
> Environment: Mac OS Mavericks
>Reporter: Vishal
>Priority: Critical
>
> Hi,
> I'm using the java client API for Kafka. I wanted to send data to Kafka 
> by using a producer pool as I'm using a sync producer. The thread that sends 
> the data is from the thread pool that grows and shrinks depending on the 
> usage. So, when I try to send data from one thread, 1 KQUEUE and 2 PIPES are 
> created (got this info by using lsof). If I keep using the same thread it's 
> fine but when a new thread sends data to Kafka (using producer.send() ) a new 
> KQUEUE and 2 PIPEs are created.
> This is okay, but when the thread is cleared from the thread pool and a new 
> thread is created, then new KQUEUEs and PIPEs are created. The problem is 
> that the old ones which were created are not getting destroyed and they are 
> showing up as open files. This is causing a major problem as the number of 
> open file keep increasing and does not decrease.
> Please suggest any solutions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.

2014-11-03 Thread Vishal (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vishal updated KAFKA-1745:
--
Summary: Each new thread creates a PIPE and KQUEUE as open files during 
producer.send() and does not get cleared when the thread that creates them is 
cleared.  (was: Each new thread creates a PIPE and KQUEUE as open files during 
producer.send() and does no0t get cleared when the thread that creates them is 
cleared.)

> Each new thread creates a PIPE and KQUEUE as open files during 
> producer.send() and does not get cleared when the thread that creates them is 
> cleared.
> -
>
> Key: KAFKA-1745
> URL: https://issues.apache.org/jira/browse/KAFKA-1745
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.1.1
> Environment: Mac OS Mavericks
>Reporter: Vishal
>Priority: Critical
>
> Hi,
> I'm using the java client API for Kafka. I wanted to send data to Kafka 
> by using a producer pool as I'm using a sync producer. The thread that sends 
> the data is from the thread pool that grows and shrinks depending on the 
> usage. So, when I try to send data from one thread, 1 KQUEUE and 2 PIPES are 
> created (got this info by using lsof). If I keep using the same thread it's 
> fine but when a new thread sends data to Kafka (using producer.send() ) a new 
> KQUEUE and 2 PIPEs are created.
> This is okay, but when the thread is cleared from the thread pool and a new 
> thread is created, then new KQUEUEs and PIPEs are created. The problem is 
> that the old ones which were created are not getting destroyed and they are 
> showing up as open files. This is causing a major problem as the number of 
> open file keep increasing and does not decrease.
> Please suggest any solutions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does no0t get cleared when the thread that creates them is cleared.

2014-11-03 Thread Vishal (JIRA)
Vishal created KAFKA-1745:
-

 Summary: Each new thread creates a PIPE and KQUEUE as open files 
during producer.send() and does no0t get cleared when the thread that creates 
them is cleared.
 Key: KAFKA-1745
 URL: https://issues.apache.org/jira/browse/KAFKA-1745
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1.1
 Environment: Mac OS Mavericks
Reporter: Vishal
Priority: Critical


Hi,
I'm using the java client API for Kafka. I wanted to send data to Kafka by 
using a producer pool as I'm using a sync producer. The thread that sends the 
data is from the thread pool that grows and shrinks depending on the usage. So, 
when I try to send data from one thread, 1 KQUEUE and 2 PIPES are created (got 
this info by using lsof). If I keep using the same thread it's fine but when a 
new thread sends data to Kafka (using producer.send() ) a new KQUEUE and 2 
PIPEs are created.

This is okay, but when the thread is cleared from the thread pool and a new 
thread is created, then new KQUEUEs and PIPEs are created. The problem is that 
the old ones which were created are not getting destroyed and they are showing 
up as open files. This is causing a major problem as the number of open file 
keep increasing and does not decrease.

Please suggest any solutions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)