[
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14169415#comment-14169415
]
Bhavesh Mistry commented on KAFKA-1642:
---------------------------------------
{code}
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class TestNetworkDownProducer {
public static void main(String[] args) throws IOException {
Properties prop = new Properties();
InputStream propFile =
Thread.currentThread().getContextClassLoader()
.getResourceAsStream("kafkaproducer.properties");
String topic = "test";
prop.load(propFile);
System.out.println("Property: " + prop.toString());
StringBuilder builder = new StringBuilder(1024);
int msgLenth = 256;
for (int i = 0; i < msgLenth; i++)
builder.append("a");
int numberOfProducer = 4;
Producer[] producer = new Producer[numberOfProducer];
for (int i = 0; i < producer.length; i++) {
producer[i] = new KafkaProducer(prop);
}
Callback callback = new Callback() {
public void onCompletion(RecordMetadata metadata,
Exception exception) {
if(exception != null){
System.err.println("Msg dropped..!");
exception.printStackTrace();
}
}
};
ProducerRecord record = new ProducerRecord(topic,
builder.toString().getBytes());
while (true) {
try {
for (int i = 0; i < producer.length; i++) {
producer[i].send(record, callback);
}
Thread.sleep(10);
} catch (Throwable th) {
System.err.println("FATAL ");
th.printStackTrace();
}
}
}
}
{code}
{code: name=kafkaproducer.properties }
# THIS IS FOR NEW PRODUCERS API TRUNK Please see the configuration at
https://kafka.apache.org/documentation.html#newproducerconfigs
# Broker List
bootstrap.servers= BROKERS HERE...
#Data Acks
acks=1
# 64MB of Buffer for log lines (including all messages).
buffer.memory=134217728
compression.type=snappy
retries=3
# DEFAULT FROM THE KAFKA...
# batch size = ((buffer.memory) / (number of partitions)) (so we can have in
progress batch size created for each partition.).
batch.size=1048576
#2MiB
max.request.size=1048576
send.buffer.bytes=2097152
# We do not want to block the buffer Full so application thread will not be
blocked but logs lines will be dropped...
block.on.buffer.full=false
#2MiB
send.buffer.bytes=2097152
#wait...
linger.ms=5000
{code}
> [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network
> connection is lost
> ---------------------------------------------------------------------------------------
>
> Key: KAFKA-1642
> URL: https://issues.apache.org/jira/browse/KAFKA-1642
> Project: Kafka
> Issue Type: Bug
> Components: producer
> Affects Versions: 0.8.2
> Reporter: Bhavesh Mistry
> Assignee: Jun Rao
>
> I see my CPU spike to 100% when network connection is lost for while. It
> seems network IO thread are very busy logging following error message. Is
> this expected behavior ?
> 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR
> org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka
> producer I/O thread:
> java.lang.IllegalStateException: No entry found for node -2
> at
> org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
> at
> org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
> at
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
> at
> org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
> at java.lang.Thread.run(Thread.java:744)
> Thanks,
> Bhavesh
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)