Did you start the consumer after the producer? The default behavior of the
consumer is to "consume from the tail of the log", and hence if there is no
new messages coming in after the consumer started, it will get nothing. You
may set

auto.offset.reset="smallest"

and try again.

Guozhang


On Sun, Jun 22, 2014 at 9:10 PM, Li Li <fancye...@gmail.com> wrote:

> hi all,
>    I am reading the book "apache kafka" and write a simple producer
> and consumer class. the producer works but the consumer hangs.
>    The producer class:
> public static void main(String[] args) {
>     String topic="test-topic";
>     Properties props = new Properties();
>     props.put("metadata.broker.list","linux157:9092");
>     props.put("serializer.class","kafka.serializer.StringEncoder");
>     props.put("request.required.acks", "1");
>     ProducerConfig config = new ProducerConfig(props);
>     Producer<Integer, String> producer = new
> Producer<Integer,String>(config);
>     for(int i=0;i<100;i++){
>         KeyedMessage<Integer, String> data = new
> KeyedMessage<Integer,String>(topic, "msg"+i);
>         producer.send(data);
>     }
>     producer.close();
>
> }
>
> public class TestKafkaConsumer {
> private final ConsumerConnector consumer;
> private final String topic;
>
> public TestKafkaConsumer(String zookeeper, String groupId, String topic) {
> Properties props = new Properties();
> props.put("zookeeper.connect", zookeeper);
> props.put("group.id", groupId);
> props.put("zookeeper.session.timeout.ms", "500");
> props.put("zookeeper.sync.time.ms", "250");
> props.put("auto.commit.interval.ms", "1000");
> consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(
> props));
> this.topic = topic;
> }
>
> public void testConsumer() {
>     Map<String, Integer> topicCount = new HashMap<String, Integer>();
>     // Define single thread for topic
>     topicCount.put(topic, new Integer(1));
>     Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams =
> consumer
>              .createMessageStreams(topicCount);
>     List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
>     for (final KafkaStream stream : streams) {
>         ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
>         while (consumerIte.hasNext())
>             System.out.println("Message from Single Topic :: "
>                        + new String(consumerIte.next().message()));
>         }
>     if (consumer != null)
>         consumer.shutdown();
> }
>
> public static void main(String[] args) {
>     String topic = "test-topic";
>     TestKafkaConsumer simpleHLConsumer = new
> TestKafkaConsumer("linux157:2181",testgroup22", topic);
>     simpleHLConsumer.testConsumer();
>
> }
>
> }
>



-- 
-- Guozhang

Reply via email to