I write the kafka demo with java .
The prouct can send the message but the consumer can not get the message
My  kafka configuration is ok
./kafka-console-producer.sh --broker-list localhost:9080 --topic testkun
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic testkun 

The following is the java code of consumer,product ,app ant interface
package com.huawei.business;
import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.Consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

public class hwKafkaConsumer extends Thread {
    private final String topic;

    public hwKafkaConsumer(final String topic)

        this.topic = topic;
    public void  run()

    Properties props = new Properties();
    props.put("bootstrap.servers", "");
    props.put("zookeeper.connect", KafkaProperties.zkConnect);
    props.put("group.id", KafkaProperties.groupId);
    props.put("enable.auto.commit", "true");//
    props.put("auto.commit.interval.ms", "1000");//
    props.put("zookeeper.session.timeout.ms", "40000");
    props.put("zookeeper.sync.time.ms", "200");
    props.put("auto.commit.interval.ms", "1000");
    props.put("rebalance.backoff.ms", "2000");
    props.put("rebalance.max.retries", "10");
    props.put("auto.offset.reset", "earliest");//add to comsumer the earliest 
    Consumer<String, String> consumer = new KafkaConsumer<>(props);
    System.out.printf("consumer :the topic is "+this.topic+"   ");

    while (true) {
        System.out.printf("consumer:recieve the message in while loop  ");
        ConsumerRecords<String, String> records = consumer.poll(100);//seem to 
hang here
              System.out.printf("  consumer after create records  ");
        for (ConsumerRecord<String, String> record : records) {
                 System.out.printf("for loop to parse the message.....");
            System.out.printf("offset = %d, key = %s, value11111 = %s%n", 
record.offset(), record.key(), record.value());
        System.out.printf("  consumer after for loop  ");



Any one can help to find the problem?
the product test is as following:

package com.huawei.business;
import java.util.Date;
import java.util.Properties;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

//import kafka.producer.KeyedMessage;
//import kafka.producer.ProducerConfig;

public class hwKafkaProducer extends Thread
    //private final kafka.javaapi.producer.Producer<Integer, String> producer;
    private final String topic;
    // private final Properties props = new Properties();

    public hwKafkaProducer(final String topic)
       // props.put("serializer.class", "kafka.serializer.StringEncoder");
       // props.put("metadata.broker.list", "");
       // producer = new kafka.javaapi.producer.Producer<Integer, String>(new 

        this.topic = topic;
    public void run()
        int messageNo = 1;

        Properties props = new Properties();
        props.put("bootstrap.servers", "");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("metadata.broker.list", "");
        props.put("bootstrap.servers", "");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("group.id", KafkaProperties.groupId);
       // Producer<String, String> producer = new KafkaProducer<>(props);

        Producer<String, String> producer = new KafkaProducer<>(props);
        for(int i = 0; i <1; i++){
            System.out.printf("send  the message:create record.");
            System.out.printf("send  the message: the topic is "+this.topic);
            ProducerRecord<String, String> data = new ProducerRecord<String, 
String>(this.topic, Integer.toString(i),"helloworld_ni_hao");
            System.out.printf("send  the message:before send.");
           // producer.send(data);

            producer.send(data,new Callback() {
                public void onCompletion(RecordMetadata metadata, Exception e) {
                    System.out.println("why why why?: " );
                    if(e != null) {
                    } else {
                       System.out.println("bbbb: " + metadata.offset());
                       System.out.println("cccc: " );

            System.out.printf("send  the message:the data.values 
            System.out.printf("send  the message:after send.");

        while (true)
            System.out.printf("send  the message");
            long runtime = new Date().getTime();
            String ip = "123456";
            String msg = "mymessage";
            ProducerRecord<String, String> data = new ProducerRecord<String, 
String>(this.topic, ip, msg);
                     new Callback() {
                 public void onCompletion(RecordMetadata metadata, Exception e) 
                     if(e != null) {
                     } else {
                        System.out.println("The offset of the record we just 
sent is: " + metadata.offset());

The app is
package com.huawei.business;

public class app
    public static void main(final String[] args)
        final hwKafkaProducer producerThread = new 
        final hwKafkaConsumer consumerThread = new 


The properties java file is

package com.huawei.business;

* Created by x00343661 on 2016/6/12.
public interface KafkaProperties
    final static String zkConnect = "";
    final static String groupId = "group1";
    final static String topic = "yqf";
    final static String kafkaServerURL = "";
    final static int kafkaServerPort = 9080;
    final static int kafkaProducerBufferSize = 64 * 1024;
    final static int connectionTimeOut = 20000;
    final static int reconnectInterval = 10000;
    final static String topic2 = "topic2";
    final static String topic3 = "topic3";
    final static String clientId = "SimpleConsumerDemoClient";

