[ 
https://issues.apache.org/jira/browse/KAFKA-270?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13207023#comment-13207023
 ] 

Jun Rao commented on KAFKA-270:
-------------------------------

A few things to try:

1. It seems that there is ZK session expiration in the client. This should be 
rare. If it's frequent, it's very likely caused by client GC. Please check your 
GC log.
2. Enable debug level logging in FileMessageSet in the broker. You will see the 
flush time for each log write. See if the flush time is reasonable (typically 
low 10s of ms) since it controls how many IOs a broker can do per second. 
                
>  sync producer / consumer test producing lot of kafka server exceptions & not 
> getting the throughput mentioned here 
> http://incubator.apache.org/kafka/performance.html
> ----------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-270
>                 URL: https://issues.apache.org/jira/browse/KAFKA-270
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, core
>    Affects Versions: 0.7
>         Environment: Linux 2.6.18-238.1.1.el5 , x86_64 x86_64 x86_64 
> GNU/Linux 
> ext3 file system with raid10
>            Reporter: Praveen Ramachandra
>              Labels: clients, core, newdev, performance
>
> I am getting ridiculously low producer and consumer throughput.
> I am using default config values for producer, consumer and broker
> which are very good starting points, as they should yield sufficient
> throughput.
> Appreciate if you point what settings/changes-in-code needs to be done
> to get higher throughput.
> I changed num.partitions in the server.config to 10.
> Please look below for exception and error messages from the server
> BTW: I am running server, zookeeper, producer, consumer on the same host.
> ====Consumer Code=====
>        long startTime = System.currentTimeMillis();
>        long endTime = startTime + runDuration*1000l;
>        Properties props = new Properties();
>        props.put("zk.connect", "localhost:2181");
>        props.put("groupid", subscriptionName); // to support multiple
> subscribers
>        props.put("zk.sessiontimeout.ms", "400");
>        props.put("zk.synctime.ms", "200");
>        props.put("autocommit.interval.ms", "1000");
>        consConfig =  new ConsumerConfig(props);
>        consumer =
> kafka.consumer.Consumer.createJavaConsumerConnector(consConfig);
>        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
>        topicCountMap.put(topicName, new Integer(1)); // has the topic
> to which to subscribe to
>        Map<String, List<KafkaMessageStream<Message>>> consumerMap =
> consumer.createMessageStreams(topicCountMap);
>        KafkaMessageStream<Message> stream =  
> consumerMap.get(topicName).get(0);
>        ConsumerIterator<Message> it = stream.iterator();
>        while(System.currentTimeMillis() <= endTime )
>        {
>            it.next(); // discard data
>            consumeMsgCount.incrementAndGet();
>        }
> ====End consumer CODE============================
> =====Producer CODE========================
>        props.put("serializer.class", "kafka.serializer.StringEncoder");
>        props.put("zk.connect", "localhost:2181");
>            // Use random partitioner. Don't need the key type. Just
> set it to Integer.
>            // The message is of type String.
>            producer = new kafka.javaapi.producer.Producer<Integer,
> String>(new ProducerConfig(props));
>        long endTime = startTime + runDuration*1000l; // run duration
> is in seconds
>        while(System.currentTimeMillis() <= endTime )
>        {
>            String msg =
> org.apache.commons.lang.RandomStringUtils.random(msgSizes.get(0));
>            producer.send(new ProducerData<Integer, String>(topicName, msg));
>            pc.incrementAndGet();
>        }
>        java.util.Date date = new java.util.Date(System.currentTimeMillis());
>        System.out.println(date+" :: stopped producer for topic"+topicName);
> =====END Producer CODE========================
> I see a bunch of exceptions like this
> [2012-02-11 02:44:11,945] ERROR Closing socket for /188.125.88.145 because of 
> error (kafka.network.Processor)
> java.io.IOException: Connection reset by peer
>       at sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
>       at 
> sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:405)
>       at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:506)
>       at kafka.message.FileMessageSet.writeTo(FileMessageSet.scala:107)
>       at kafka.server.MessageSetSend.writeTo(MessageSetSend.scala:51)
>       at kafka.network.MultiSend.writeTo(Transmission.scala:95)
>       at kafka.network.Processor.write(SocketServer.scala:332)
>       at kafka.network.Processor.run(SocketServer.scala:209)
>       at java.lang.Thread.run(Thread.java:662)
> java.io.IOException: Connection reset by peer
>       at sun.nio.ch.FileDispatcher.read0(Native Method)
>       at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
>       at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:198)
>       at sun.nio.ch.IOUtil.read(IOUtil.java:171)
>       at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:243)
>       at kafka.utils.Utils$.read(Utils.scala:485)
>       at 
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>       at kafka.network.Processor.read(SocketServer.scala:304)
>       at kafka.network.Processor.run(SocketServer.scala:207)
>       at java.lang.Thread.run(Thread.java:662)
> And Many INFO messages e.g.,
> INFO: Expiring session 0x1356a43167e0009, timeout of 6000ms exceeded 
> (org.apache.zookeeper.server.ZooKeeperServer)
> INFO: Closed socket connection for client /127.0.0.1:59884 which had 
> sessionid 0x1356a43167e0022 (org.apache.zookeeper.server.NIOServerCnxn)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to