for (final KafkaStream stream : streams) {
ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
while (consumerIte.hasNext())
System.out.println("Message from Single Topic :: "
+ new String(consumerIte.next().
message()));
}
Besides what Guozhang suggested, this code has a bug. Since each of the
streams is blocking, you will have to start each stream in a separate
thread. Please take a look at
https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
Thanks,
Neha
On Mon, Jun 23, 2014 at 8:05 AM, Guozhang Wang <[email protected]> wrote:
> Did you start the consumer after the producer? The default behavior of the
> consumer is to "consume from the tail of the log", and hence if there is no
> new messages coming in after the consumer started, it will get nothing. You
> may set
>
> auto.offset.reset="smallest"
>
> and try again.
>
> Guozhang
>
>
> On Sun, Jun 22, 2014 at 9:10 PM, Li Li <[email protected]> wrote:
>
> > hi all,
> > I am reading the book "apache kafka" and write a simple producer
> > and consumer class. the producer works but the consumer hangs.
> > The producer class:
> > public static void main(String[] args) {
> > String topic="test-topic";
> > Properties props = new Properties();
> > props.put("metadata.broker.list","linux157:9092");
> > props.put("serializer.class","kafka.serializer.StringEncoder");
> > props.put("request.required.acks", "1");
> > ProducerConfig config = new ProducerConfig(props);
> > Producer<Integer, String> producer = new
> > Producer<Integer,String>(config);
> > for(int i=0;i<100;i++){
> > KeyedMessage<Integer, String> data = new
> > KeyedMessage<Integer,String>(topic, "msg"+i);
> > producer.send(data);
> > }
> > producer.close();
> >
> > }
> >
> > public class TestKafkaConsumer {
> > private final ConsumerConnector consumer;
> > private final String topic;
> >
> > public TestKafkaConsumer(String zookeeper, String groupId, String topic)
> {
> > Properties props = new Properties();
> > props.put("zookeeper.connect", zookeeper);
> > props.put("group.id", groupId);
> > props.put("zookeeper.session.timeout.ms", "500");
> > props.put("zookeeper.sync.time.ms", "250");
> > props.put("auto.commit.interval.ms", "1000");
> > consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(
> > props));
> > this.topic = topic;
> > }
> >
> > public void testConsumer() {
> > Map<String, Integer> topicCount = new HashMap<String, Integer>();
> > // Define single thread for topic
> > topicCount.put(topic, new Integer(1));
> > Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams =
> > consumer
> > .createMessageStreams(topicCount);
> > List<KafkaStream<byte[], byte[]>> streams =
> consumerStreams.get(topic);
> > for (final KafkaStream stream : streams) {
> > ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
> > while (consumerIte.hasNext())
> > System.out.println("Message from Single Topic :: "
> > + new String(consumerIte.next().message()));
> > }
> > if (consumer != null)
> > consumer.shutdown();
> > }
> >
> > public static void main(String[] args) {
> > String topic = "test-topic";
> > TestKafkaConsumer simpleHLConsumer = new
> > TestKafkaConsumer("linux157:2181",testgroup22", topic);
> > simpleHLConsumer.testConsumer();
> >
> > }
> >
> > }
> >
>
>
>
> --
> -- Guozhang
>