hi all,
I am reading the book "apache kafka" and write a simple producer
and consumer class. the producer works but the consumer hangs.
The producer class:
public static void main(String[] args) {
String topic="test-topic";
Properties props = new Properties();
props.put("metadata.broker.list","linux157:9092");
props.put("serializer.class","kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
Producer<Integer, String> producer = new Producer<Integer,String>(config);
for(int i=0;i<100;i++){
KeyedMessage<Integer, String> data = new
KeyedMessage<Integer,String>(topic, "msg"+i);
producer.send(data);
}
producer.close();
}
public class TestKafkaConsumer {
private final ConsumerConnector consumer;
private final String topic;
public TestKafkaConsumer(String zookeeper, String groupId, String topic) {
Properties props = new Properties();
props.put("zookeeper.connect", zookeeper);
props.put("group.id", groupId);
props.put("zookeeper.session.timeout.ms", "500");
props.put("zookeeper.sync.time.ms", "250");
props.put("auto.commit.interval.ms", "1000");
consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(
props));
this.topic = topic;
}
public void testConsumer() {
Map<String, Integer> topicCount = new HashMap<String, Integer>();
// Define single thread for topic
topicCount.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer
.createMessageStreams(topicCount);
List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
for (final KafkaStream stream : streams) {
ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
while (consumerIte.hasNext())
System.out.println("Message from Single Topic :: "
+ new String(consumerIte.next().message()));
}
if (consumer != null)
consumer.shutdown();
}
public static void main(String[] args) {
String topic = "test-topic";
TestKafkaConsumer simpleHLConsumer = new
TestKafkaConsumer("linux157:2181",testgroup22", topic);
simpleHLConsumer.testConsumer();
}
}