[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14169415#comment-14169415
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 10/13/14 4:11 PM:
-----------------------------------------------------------------

{code}


import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class TestNetworkDownProducer {
        
        static int numberTh = 200;
        static CountDownLatch latch = new CountDownLatch(200);
        public static void main(String[] args) throws IOException, 
InterruptedException {

                Properties prop = new Properties();
                InputStream propFile = 
Thread.currentThread().getContextClassLoader()
                                
.getResourceAsStream("kafkaproducer.properties");

                String topic = "test";
                prop.load(propFile);
                System.out.println("Property: " + prop.toString());
                StringBuilder builder = new StringBuilder(1024);
                int msgLenth = 256;
                for (int i = 0; i < msgLenth; i++)
                        builder.append("a");

                int numberOfProducer = 4;
                Producer[] producer = new Producer[numberOfProducer];

                for (int i = 0; i < producer.length; i++) {
                        producer[i] = new KafkaProducer(prop);
                }
                ExecutorService service =   new ThreadPoolExecutor(numberTh, 
numberTh,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(numberTh *2));
                
                for(int i = 0 ; i < numberTh;i++){
                        service.execute(new 
MyProducer(producer,10,builder.toString(), topic));
                }               
                latch.await();
                
                System.out.println("All Producers done...!");
                for (int i = 0; i < producer.length; i++) {
                        producer[i].close();
                }               
                service.shutdownNow();
                System.out.println("All done...!");

        }


        
        static class MyProducer implements Runnable {
                
                Producer[] producer;
                long maxloops;
                String msg ;
                String topic;
                
                MyProducer(Producer[] list, long maxloops,String msg,String 
topic){
                        this.producer = list;
                        this.maxloops = maxloops;
                        this.msg = msg;
                        this.topic = topic;
                }
                public void run() {
                        ProducerRecord record = new ProducerRecord(topic, 
msg.toString().getBytes());
                        Callback  callBack = new  MyCallback();
                        try{
                                for(long j=0 ; j < maxloops ; j++){
                                        try {
                                                for (int i = 0; i < 
producer.length; i++) {
                                                        
producer[i].send(record, callBack);
                                                }
                                                Thread.sleep(10);
                                        } catch (Throwable th) {
                                                System.err.println("FATAL ");
                                                th.printStackTrace();
                                        }
                                }

                        }finally {
                                latch.countDown();
                        }                       
                }
        }       

        static class MyCallback implements Callback {
                public void onCompletion(RecordMetadata metadata, Exception 
exception) {
                        if(exception != null){
                                System.err.println("Msg dropped..!");
                                exception.printStackTrace();
                        }
                        
                }
        }
        
}
{code}

This is property file used:
{code}
# THIS IS FOR NEW PRODUCERS API TRUNK Please see the configuration at 
https://kafka.apache.org/documentation.html#newproducerconfigs
# Broker List
bootstrap.servers= BROKERS HERE...
#Data Acks
acks=1
# 64MB of Buffer for log lines (including all messages).
buffer.memory=134217728
compression.type=snappy
retries=3
# DEFAULT FROM THE KAFKA...
# batch size =  ((buffer.memory) / (number of partitions)) (so we can have in 
progress batch size created for each partition.).
batch.size=1048576
#2MiB
max.request.size=1048576
send.buffer.bytes=2097152
# We do not want to block the buffer Full so application thread will not be 
blocked but logs lines will be dropped...
block.on.buffer.full=false
#2MiB
send.buffer.bytes=2097152
#wait...
linger.ms=5000
{code}


was (Author: bmis13):
{code}


import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class TestNetworkDownProducer {
        
        static int numberTh = 200;
        static CountDownLatch latch = new CountDownLatch(200);
        public static void main(String[] args) throws IOException, 
InterruptedException {

                Properties prop = new Properties();
                InputStream propFile = 
Thread.currentThread().getContextClassLoader()
                                
.getResourceAsStream("kafkaproducer.properties");

                String topic = "test";
                prop.load(propFile);
                System.out.println("Property: " + prop.toString());
                StringBuilder builder = new StringBuilder(1024);
                int msgLenth = 256;
                for (int i = 0; i < msgLenth; i++)
                        builder.append("a");

                int numberOfProducer = 4;
                Producer[] producer = new Producer[numberOfProducer];

                for (int i = 0; i < producer.length; i++) {
                        producer[i] = new KafkaProducer(prop);
                }
                ExecutorService service =   new ThreadPoolExecutor(numberTh, 
numberTh,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(numberTh *2));
                
                for(int i = 0 ; i < numberTh;i++){
                        service.execute(new 
MyProducer(producer,10,builder.toString(), topic));
                }               
                latch.await();
                
                System.out.println("All Producers done...!");
                for (int i = 0; i < producer.length; i++) {
                        producer[i].close();
                }               
                service.shutdownNow();
                System.out.println("All done...!");

        }


        
        static class MyProducer implements Runnable {
                
                Producer[] producer;
                long maxloops;
                String msg ;
                String topic;
                
                MyProducer(Producer[] list, long maxloops,String msg,String 
topic){
                        this.producer = list;
                        this.maxloops = maxloops;
                        this.msg = msg;
                        this.topic = topic;
                }
                public void run() {
                        ProducerRecord record = new ProducerRecord(topic, 
msg.toString().getBytes());
                        Callback  callBack = new  MyCallback();
                        try{
                                for(long j=0 ; j < maxloops ; j++){
                                        try {
                                                for (int i = 0; i < 
producer.length; i++) {
                                                        
producer[i].send(record, callBack);
                                                }
                                                Thread.sleep(10);
                                        } catch (Throwable th) {
                                                System.err.println("FATAL ");
                                                th.printStackTrace();
                                        }
                                }

                        }finally {
                                latch.countDown();
                        }                       
                }
        }       

        static class MyCallback implements Callback {
                public void onCompletion(RecordMetadata metadata, Exception 
exception) {
                        if(exception != null){
                                System.err.println("Msg dropped..!");
                                exception.printStackTrace();
                        }
                        
                }
        }
        
}
{code}

This is property file used:
{code }
# THIS IS FOR NEW PRODUCERS API TRUNK Please see the configuration at 
https://kafka.apache.org/documentation.html#newproducerconfigs
# Broker List
bootstrap.servers= BROKERS HERE...
#Data Acks
acks=1
# 64MB of Buffer for log lines (including all messages).
buffer.memory=134217728
compression.type=snappy
retries=3
# DEFAULT FROM THE KAFKA...
# batch size =  ((buffer.memory) / (number of partitions)) (so we can have in 
progress batch size created for each partition.).
batch.size=1048576
#2MiB
max.request.size=1048576
send.buffer.bytes=2097152
# We do not want to block the buffer Full so application thread will not be 
blocked but logs lines will be dropped...
block.on.buffer.full=false
#2MiB
send.buffer.bytes=2097152
#wait...
linger.ms=5000
{code}

> [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
> connection is lost
> ---------------------------------------------------------------------------------------
>
>                 Key: KAFKA-1642
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1642
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 0.8.2
>            Reporter: Bhavesh Mistry
>            Assignee: Jun Rao
>
> I see my CPU spike to 100% when network connection is lost for while.  It 
> seems network  IO thread are very busy logging following error message.  Is 
> this expected behavior ?
> 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR 
> org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
> producer I/O thread: 
> java.lang.IllegalStateException: No entry found for node -2
> at 
> org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
> at 
> org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
> at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
> at 
> org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
> at java.lang.Thread.run(Thread.java:744)
> Thanks,
> Bhavesh



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to