[jira] [Comment Edited] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.

2014-11-06 Thread Vishal (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14199817#comment-14199817
 ] 

Vishal edited comment on KAFKA-1745 at 11/6/14 12:03 PM:
-

[~junrao] Sorry, I should have been more clear.
[~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a 
sample test case which can replicate the problem (check lsof every 10 seconds 
to notice the increase in KQUEUEs and PIPESs though the producer object is 
being reused):

{code}
public class Test {

private static Queue> producerPool = new 
ConcurrentLinkedQueue>();

private static ProducerConfig config;

static
{
Properties props = new Properties();
props.put("metadata.broker.list", "IP:Port");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
config = new ProducerConfig(props);
}

public static void main(String[] args) throws InterruptedException {

ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, 
TimeUnit.SECONDS, new LinkedBlockingQueue());
tpe.allowCoreThreadTimeOut(true);
Thread run = new Thread(new Runnable() {

@Override
public void run() {
Producer producer = 
producerPool.poll();
if(producer == null)
{
producer = new Producer(config);
}
KeyedMessage data = new 
KeyedMessage("SaSMQ", "0", "test");
producer.send(data);
producerPool.add(producer);
}
});

while(true) //To make sure that the main program does not 
terminate
{
for(int i = 0;i<100; i++)
{
tpe.submit(run);
}
Thread.sleep(1); //10 seconds So that the keep 
alive time set on the thread pool will be exceeded and the threads in the 
threadpool will be cleared. 
}
}

}
{code}


Though KQUEUE and PIPE does get cleared after some time (in this test case, it 
was after 1-2 minutes), why does Kafka's api create a new one when a new thread 
accesses the producer object? 


was (Author: vishal m):
[~junrao] Sorry, I should have been more clear.
[~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a 
sample test case which can replicate the problem (check lsof every 10 seconds 
to notice the increase in KQUEUEs and PIPESs though the producer object is 
being reused):

{code}
public class Test {

private static Queue> producerPool = new 
ConcurrentLinkedQueue>();

private static ProducerConfig config;

static
{
Properties props = new Properties();
props.put("metadata.broker.list", "IP:Port");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
config = new ProducerConfig(props);
}

public static void main(String[] args) throws InterruptedException {

ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, 
TimeUnit.SECONDS, new LinkedBlockingQueue());
tpe.allowCoreThreadTimeOut(true);
Thread run = new Thread(new Runnable() {

@Override
public void run() {
Producer producer = 
producerPool.poll();
if(producer == null)
{
producer = new Producer(config);
}
KeyedMessage data = new 
KeyedMessage("SaSMQ", "0", "test");
producer.send(data);
producerPool.add(producer);
}
});

while(true) //To make sure that the main program does not 
terminate
{
for(int i = 0;i<100; i++)
{
tpe.submit(run);
}
Thread.sleep(1); //10 seconds So that keepalive 
time is exceeded by the thread pool and threads are cleared. 
}
}

}
{code}


Though KQUEUE and

[jira] [Comment Edited] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.

2014-11-06 Thread Vishal (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14199817#comment-14199817
 ] 

Vishal edited comment on KAFKA-1745 at 11/6/14 12:04 PM:
-

[~junrao] Sorry, I should have been more clear.
[~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a 
sample test case which can replicate the problem (check lsof every 10 seconds 
to notice the increase in KQUEUEs and PIPESs though the producer object is 
being reused):

{code}
public class Test {

private static Queue> producerPool = new 
ConcurrentLinkedQueue>();

private static ProducerConfig config;

static
{
Properties props = new Properties();
props.put("metadata.broker.list", "IP:Port");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
config = new ProducerConfig(props);
}

public static void main(String[] args) throws InterruptedException {

ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, 
TimeUnit.SECONDS, new LinkedBlockingQueue());
tpe.allowCoreThreadTimeOut(true);
Thread run = new Thread(new Runnable() {

@Override
public void run() {
Producer producer = 
producerPool.poll();
if(producer == null)
{
producer = new Producer(config);
}
KeyedMessage data = new 
KeyedMessage("SaSMQ", "0", "test");
producer.send(data);
producerPool.add(producer);
}
});

while(true) //To make sure that the main program does not 
terminate
{
for(int i = 0;i<100; i++)
{
tpe.submit(run);
}
Thread.sleep(1); //10 seconds So that the keep 
alive time set on the thread pool will be exceeded and the threads in the 
threadpool will be cleared after each iteration. 
}
}

}
{code}


Though KQUEUE and PIPE does get cleared after some time (in this test case, it 
was after 1-2 minutes), why does Kafka's api create a new one when a new thread 
accesses the producer object? 


was (Author: vishal m):
[~junrao] Sorry, I should have been more clear.
[~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a 
sample test case which can replicate the problem (check lsof every 10 seconds 
to notice the increase in KQUEUEs and PIPESs though the producer object is 
being reused):

{code}
public class Test {

private static Queue> producerPool = new 
ConcurrentLinkedQueue>();

private static ProducerConfig config;

static
{
Properties props = new Properties();
props.put("metadata.broker.list", "IP:Port");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
config = new ProducerConfig(props);
}

public static void main(String[] args) throws InterruptedException {

ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, 
TimeUnit.SECONDS, new LinkedBlockingQueue());
tpe.allowCoreThreadTimeOut(true);
Thread run = new Thread(new Runnable() {

@Override
public void run() {
Producer producer = 
producerPool.poll();
if(producer == null)
{
producer = new Producer(config);
}
KeyedMessage data = new 
KeyedMessage("SaSMQ", "0", "test");
producer.send(data);
producerPool.add(producer);
}
});

while(true) //To make sure that the main program does not 
terminate
{
for(int i = 0;i<100; i++)
{
tpe.submit(run);
}
Thread.sleep(1); //10 seconds So that the keep 
alive time set on the thread pool will be exceeded and the threads in the 
threadpool will be clear

[jira] [Comment Edited] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.

2014-11-06 Thread Vishal (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14199817#comment-14199817
 ] 

Vishal edited comment on KAFKA-1745 at 11/6/14 12:00 PM:
-

[~junrao] Sorry, I should have been more clear.
[~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a 
sample test case which can replicate the problem (check lsof every 10 seconds 
to notice the increase in KQUEUEs and PIPESs though the producer object is 
being reused):

{code}
public class Test {

private static Queue> producerPool = new 
ConcurrentLinkedQueue>();

private static ProducerConfig config;

static
{
Properties props = new Properties();
props.put("metadata.broker.list", "IP:Port");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
config = new ProducerConfig(props);
}

public static void main(String[] args) throws InterruptedException {

ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, 
TimeUnit.SECONDS, new LinkedBlockingQueue());
tpe.allowCoreThreadTimeOut(true);
Thread run = new Thread(new Runnable() {

@Override
public void run() {
Producer producer = 
producerPool.poll();
if(producer == null)
{
producer = new Producer(config);
}
KeyedMessage data = new 
KeyedMessage("SaSMQ", "0", "test");
producer.send(data);
producerPool.add(producer);
}
});

while(true) //To make sure that the main program does not 
terminate
{
for(int i = 0;i<100; i++)
{
tpe.submit(run);
}
Thread.sleep(1); //10 seconds So that keepalive 
time is exceeded by the thread pool and threads are cleared. 
}
}

}
{code}


Though KQUEUE and PIPE does get cleared after some time (in this test case, it 
was after 1-2 minutes), why does Kafka's api create a new one when a new thread 
accesses the producer object? 


was (Author: vishal m):
[~junrao] Sorry, I should have been more clear.
[~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a 
sample test case which can replicate the problem (check lsof every 10 seconds 
to notice the increase in KQUEUEs and PIPESs though the producer object is 
being reused):

{code}
public class Test {

private static Queue> producerPool = new 
ConcurrentLinkedQueue>();

private static ProducerConfig config;

static
{
Properties props = new Properties();
props.put("metadata.broker.list", "IP:Port");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
config = new ProducerConfig(props);
}

public static void main(String[] args) throws InterruptedException {

ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, 
TimeUnit.SECONDS, new LinkedBlockingQueue());
tpe.allowCoreThreadTimeOut(true);
Thread run = new Thread(new Runnable() {

@Override
public void run() {
Producer producer = 
producerPool.poll();
if(producer == null)
{
producer = new Producer(config);
}
KeyedMessage data = new 
KeyedMessage("SaSMQ", "0", "test");
producer.send(data);
producerPool.add(producer);
}
});

while(true) //To make sure that the main program does not 
terminate
{
for(int i = 0;i<100; i++)
{
tpe.submit(run);
}
Thread.sleep(1); //10 seconds So that keepalive 
time is exceeded by the thread pool and threads are cleared. 
}
}

}
{code}


Though KQUEUE and PIPE does get cleared after some time (i

[jira] [Comment Edited] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.

2014-11-06 Thread Vishal (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14199817#comment-14199817
 ] 

Vishal edited comment on KAFKA-1745 at 11/6/14 12:00 PM:
-

[~junrao] Sorry, I should have been more clear.
[~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a 
sample test case which can replicate the problem (check lsof every 10 seconds 
to notice the increase in KQUEUEs and PIPESs though the producer object is 
being reused):

{code}
public class Test {

private static Queue> producerPool = new 
ConcurrentLinkedQueue>();

private static ProducerConfig config;

static
{
Properties props = new Properties();
props.put("metadata.broker.list", "IP:Port");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
config = new ProducerConfig(props);
}

public static void main(String[] args) throws InterruptedException {

ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, 
TimeUnit.SECONDS, new LinkedBlockingQueue());
tpe.allowCoreThreadTimeOut(true);
Thread run = new Thread(new Runnable() {

@Override
public void run() {
Producer producer = 
producerPool.poll();
if(producer == null)
{
producer = new Producer(config);
}
KeyedMessage data = new 
KeyedMessage("SaSMQ", "0", "test");
producer.send(data);
producerPool.add(producer);
}
});

while(true) //To make sure that the main program does not 
terminate
{
for(int i = 0;i<100; i++)
{
tpe.submit(run);
}
Thread.sleep(1); //10 seconds So that keepalive 
time is exceeded by the thread pool and threads are cleared. 
}
}

}
{code}


Though KQUEUE and PIPE does get cleared after some time (in this test case, it 
was after 1-2 minutes), why does kafka's api create a new one when a new thread 
accesses the producer object? 


was (Author: vishal m):
[~junrao] Sorry, I should have been more clear.
[~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a 
sample test case which can replicate the problem (check lsof every 10 seconds 
to notice the increase in KQUEUEs and PIPESs though the producer object is 
being reused):

{code}
public class Test {

private static Queue> producerPool = new 
ConcurrentLinkedQueue>();

private static ProducerConfig config;

static
{
Properties props = new Properties();
props.put("metadata.broker.list", "IP:Port");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
config = new ProducerConfig(props);
}

public static void main(String[] args) throws InterruptedException {

ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, 
TimeUnit.SECONDS, new LinkedBlockingQueue());
tpe.allowCoreThreadTimeOut(true);
Thread run = new Thread(new Runnable() {

@Override
public void run() {
Producer producer = 
producerPool.poll();
if(producer == null)
{
producer = new Producer(config);
}
KeyedMessage data = new 
KeyedMessage("SaSMQ", "0", "test");
producer.send(data);
producerPool.add(producer);
}
});

while(true) //To make sure that the main program does not 
terminate
{
for(int i = 0;i<100; i++)
{
tpe.submit(run);
}
Thread.sleep(1); //10 seconds So that keepalive 
time is exceeded by the thread pool and threads are cleared. 
}
}

}
{code}


Though KQUEUE and PIPE does get cleared after some time (i

[jira] [Comment Edited] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.

2014-11-06 Thread Vishal (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14199817#comment-14199817
 ] 

Vishal edited comment on KAFKA-1745 at 11/6/14 8:07 AM:


[~junrao] Sorry, I should have been more clear.
[~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a 
sample test case which can replicate the problem (check lsof every 10 seconds 
to notice the increase in KQUEUEs and PIPESs though the producer object is 
being reused):

{code}
public class Test {

private static Queue> producerPool = new 
ConcurrentLinkedQueue>();

private static ProducerConfig config;

static
{
Properties props = new Properties();
props.put("metadata.broker.list", "IP:Port");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
config = new ProducerConfig(props);
}

public static void main(String[] args) throws InterruptedException {

ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, 
TimeUnit.SECONDS, new LinkedBlockingQueue());
tpe.allowCoreThreadTimeOut(true);
Thread run = new Thread(new Runnable() {

@Override
public void run() {
Producer producer = 
producerPool.poll();
if(producer == null)
{
producer = new Producer(config);
}
KeyedMessage data = new 
KeyedMessage("SaSMQ", "0", "test");
producer.send(data);
producerPool.add(producer);
}
});

while(true) //To make sure that the main program does not 
terminate
{
for(int i = 0;i<100; i++)
{
tpe.submit(run);
}
Thread.sleep(1); //10 seconds So that keepalive 
time is exceeded by the thread pool and threads are cleared. 
}
}

}
{code}


Though KQUEUE and PIPE does get cleared after some time (in this test case, it 
was after 1-2 minutes), why does it have to create a new one when a new thread 
accesses the producer object? 


was (Author: vishal m):
[~junrao] Sorry, I should have been more clear.
[~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a 
sample test case which can replicate the problem (check lsof every 10 seconds 
to notice the increase in KQUEUEs and PIPESs though the producer object is 
being reused):

{code}
public class Test {

private static Queue> producerPool = new 
ConcurrentLinkedQueue>();

private static ProducerConfig config;

static
{
Properties props = new Properties();
props.put("metadata.broker.list", "IP:Port");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
config = new ProducerConfig(props);
}

public static void main(String[] args) throws InterruptedException {

ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, 
TimeUnit.SECONDS, new LinkedBlockingQueue());
tpe.allowCoreThreadTimeOut(true);
Thread run = new Thread(new Runnable() {

@Override
public void run() {
Producer producer = 
producerPool.poll();
if(producer == null)
{
producer = new Producer(config);
}
KeyedMessage data = new 
KeyedMessage("SaSMQ", "0", "test");
producer.send(data);
producerPool.add(producer);
}
});

while(true) //To make sure that the main program does not 
terminate
{
for(int i = 0;i<100; i++)
{
tpe.submit(run);
}
Thread.sleep(1); //10 seconds So that keepalive 
time is exceeded by the thread pool and threads are cleared. 
}
}

}
{code}


Though KQUEUE and PIPE does get cleared after some time, why 

[jira] [Comment Edited] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.

2014-11-05 Thread Vishal (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14199817#comment-14199817
 ] 

Vishal edited comment on KAFKA-1745 at 11/6/14 5:25 AM:


[~junrao] Sorry, I should have been more clear.
[~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a 
sample test case which can replicate the problem (check lsof every 10 seconds 
to notice the increase in KQUEUEs and PIPESs though the producer object is 
being reused):

{code}
public class Test {

private static Queue> producerPool = new 
ConcurrentLinkedQueue>();

private static ProducerConfig config;

static
{
Properties props = new Properties();
props.put("metadata.broker.list", "IP:Port");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
config = new ProducerConfig(props);
}

public static void main(String[] args) throws InterruptedException {

ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, 
TimeUnit.SECONDS, new LinkedBlockingQueue());
tpe.allowCoreThreadTimeOut(true);
Thread run = new Thread(new Runnable() {

@Override
public void run() {
Producer producer = 
producerPool.poll();
if(producer == null)
{
producer = new Producer(config);
}
KeyedMessage data = new 
KeyedMessage("SaSMQ", "0", "test");
producer.send(data);
producerPool.add(producer);
}
});

while(true) //To make sure that the main program does not 
terminate
{
for(int i = 0;i<100; i++)
{
tpe.submit(run);
}
Thread.sleep(1); //10 seconds So that keepalive 
time is exceeded by the thread pool and threads are cleared. 
}
}

}
{code}


Though KQUEUE and PIPE does get cleared after some time, why does it have to 
create a new one when a new thread accesses the producer object? 


was (Author: vishal m):
[~junrao] Sorry, I should have been more clear.
[~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a 
sample test case which can replicate the problem (check lsof every 10 seconds 
to notice the increase in KQUEUEs and PIPESs though the producer object is 
being reused):

{code}
public class Test {

private static Queue> producerPool = new 
ConcurrentLinkedQueue>();

private static ProducerConfig config;

static
{
Properties props = new Properties();
props.put("metadata.broker.list", "IP:Port");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
config = new ProducerConfig(props);
}

public static void main(String[] args) throws InterruptedException {

ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, 
TimeUnit.SECONDS, new LinkedBlockingQueue());
tpe.allowCoreThreadTimeOut(true);
Thread run = new Thread(new Runnable() {

@Override
public void run() {
Producer producer = 
producerPool.poll();
if(producer == null)
{
producer = new Producer(config);
}
KeyedMessage data = new 
KeyedMessage("SaSMQ", "0", "test");
producer.send(data);
producerPool.add(producer);
}
});

while(true) //To make sure that the main program does not 
terminate
{
for(int i = 0;i<100; i++)
{
tpe.submit(run);
}
Thread.sleep(1); //10 seconds So that keepalive 
time is exceeded by the thread pool and threads are cleared. 
}
}

}
{code}

> Each new thread creates a PIPE and KQUEUE as open files during 
> producer.send() and does not get cleared 

[jira] [Comment Edited] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.

2014-11-05 Thread Vishal (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14197908#comment-14197908
 ] 

Vishal edited comment on KAFKA-1745 at 11/5/14 8:56 AM:


No, since I figured that calling producer.close() and returning that producer 
object to the pool would make that producer object unusable afterwards.


was (Author: vishal m):
No, since I figured that calling producer.close() and returning that producer 
object to the pool would make that producer object unusable afterwards.

> Each new thread creates a PIPE and KQUEUE as open files during 
> producer.send() and does not get cleared when the thread that creates them is 
> cleared.
> -
>
> Key: KAFKA-1745
> URL: https://issues.apache.org/jira/browse/KAFKA-1745
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.1.1
> Environment: Mac OS Mavericks
>Reporter: Vishal
>Priority: Critical
>
> Hi,
> I'm using the java client API for Kafka. I wanted to send data to Kafka 
> by using a producer pool as I'm using a sync producer. The thread that sends 
> the data is from the thread pool that grows and shrinks depending on the 
> usage. So, when I try to send data from one thread, 1 KQUEUE and 2 PIPES are 
> created (got this info by using lsof). If I keep using the same thread it's 
> fine but when a new thread sends data to Kafka (using producer.send() ) a new 
> KQUEUE and 2 PIPEs are created.
> This is okay, but when the thread is cleared from the thread pool and a new 
> thread is created, then new KQUEUEs and PIPEs are created. The problem is 
> that the old ones which were created are not getting destroyed and they are 
> showing up as open files. This is causing a major problem as the number of 
> open file keep increasing and does not decrease.
> Please suggest any solutions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)