[
https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14199817#comment-14199817
]
Vishal edited comment on KAFKA-1745 at 11/6/14 12:00 PM:
---------------------------------------------------------
[~junrao] Sorry, I should have been more clear.
[~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a
sample test case which can replicate the problem (check lsof every 10 seconds
to notice the increase in KQUEUEs and PIPESs though the producer object is
being reused):
{code}
public class Test {
private static Queue<Producer<String, String>> producerPool = new
ConcurrentLinkedQueue<Producer<String,String>>();
private static ProducerConfig config;
static
{
Properties props = new Properties();
props.put("metadata.broker.list", "IP:Port");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
config = new ProducerConfig(props);
}
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
tpe.allowCoreThreadTimeOut(true);
Thread run = new Thread(new Runnable() {
@Override
public void run() {
Producer<String, String> producer =
producerPool.poll();
if(producer == null)
{
producer = new Producer<String,
String>(config);
}
KeyedMessage<String, String> data = new
KeyedMessage<String, String>("SaSMQ", "0", "test");
producer.send(data);
producerPool.add(producer);
}
});
while(true) //To make sure that the main program does not
terminate
{
for(int i = 0;i<100; i++)
{
tpe.submit(run);
}
Thread.sleep(10000); //10 seconds.... So that keepalive
time is exceeded by the thread pool and threads are cleared.
}
}
}
{code}
Though KQUEUE and PIPE does get cleared after some time (in this test case, it
was after 1-2 minutes), why does kafka's api create a new one when a new thread
accesses the producer object?
was (Author: vishal m):
[~junrao] Sorry, I should have been more clear.
[~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a
sample test case which can replicate the problem (check lsof every 10 seconds
to notice the increase in KQUEUEs and PIPESs though the producer object is
being reused):
{code}
public class Test {
private static Queue<Producer<String, String>> producerPool = new
ConcurrentLinkedQueue<Producer<String,String>>();
private static ProducerConfig config;
static
{
Properties props = new Properties();
props.put("metadata.broker.list", "IP:Port");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
config = new ProducerConfig(props);
}
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
tpe.allowCoreThreadTimeOut(true);
Thread run = new Thread(new Runnable() {
@Override
public void run() {
Producer<String, String> producer =
producerPool.poll();
if(producer == null)
{
producer = new Producer<String,
String>(config);
}
KeyedMessage<String, String> data = new
KeyedMessage<String, String>("SaSMQ", "0", "test");
producer.send(data);
producerPool.add(producer);
}
});
while(true) //To make sure that the main program does not
terminate
{
for(int i = 0;i<100; i++)
{
tpe.submit(run);
}
Thread.sleep(10000); //10 seconds.... So that keepalive
time is exceeded by the thread pool and threads are cleared.
}
}
}
{code}
Though KQUEUE and PIPE does get cleared after some time (in this test case, it
was after 1-2 minutes), why does it have to create a new one when a new thread
accesses the producer object?
> Each new thread creates a PIPE and KQUEUE as open files during
> producer.send() and does not get cleared when the thread that creates them is
> cleared.
> -----------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-1745
> URL: https://issues.apache.org/jira/browse/KAFKA-1745
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 0.8.1.1
> Environment: Mac OS Mavericks
> Reporter: Vishal
> Priority: Critical
>
> Hi,
> I'm using the java client API for Kafka. I wanted to send data to Kafka
> by using a producer pool as I'm using a sync producer. The thread that sends
> the data is from the thread pool that grows and shrinks depending on the
> usage. So, when I try to send data from one thread, 1 KQUEUE and 2 PIPES are
> created (got this info by using lsof). If I keep using the same thread it's
> fine but when a new thread sends data to Kafka (using producer.send() ) a new
> KQUEUE and 2 PIPEs are created.
> This is okay, but when the thread is cleared from the thread pool and a new
> thread is created, then new KQUEUEs and PIPEs are created. The problem is
> that the old ones which were created are not getting destroyed and they are
> showing up as open files. This is causing a major problem as the number of
> open file keep increasing and does not decrease.
> Please suggest any solutions.
> FYI, the number of TCP connections established from the producer system to
> the Kafka Broker remain constant throughout.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)