Hello,
I am new to kafka and facing some problem.
My producer code works properly and sends data.
But the consumer is not able to read it.
Here are the codes for Producer and Consumer.
Something is wrong with the Consumer.java code can someone please help with
this.


*Producer.java*

package kafka.examples;


import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;


public class Consumer
{
    private final ConsumerConnector consumer;
    private final String topic;

    public Consumer(String topic)
    {
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
       createConsumerConfig());
this.topic = topic;
System.out.println("Consumer at "+this.topic);
    }

    private static ConsumerConfig createConsumerConfig()
    {
Properties props = new Properties();
props.put("zookeeper.connect", KafkaProperties.zkConnect);
props.put("group.id", KafkaProperties.groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");

return new ConsumerConfig(props);

    }

    public void readdata() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream =  consumerMap.get(topic).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
System.out.println("Inside read data");
while(it.hasNext())
    System.out.println(new String(it.next().message()));

    }
}

And this is the consumer code.

*Consumer.java*

package kafka.examples;


import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;


public class Consumer
{
  private final ConsumerConnector consumer;
  private final String topic;

  public Consumer(String topic)
  {
    consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
            createConsumerConfig());
    this.topic = topic;
    System.out.println("Consumer at "+topic);
  }

  private static ConsumerConfig createConsumerConfig()
  {
    Properties props = new Properties();
    props.put("zookeeper.connect", KafkaProperties.zkConnect);
    props.put("group.id", KafkaProperties.groupId);
    props.put("zookeeper.session.timeout.ms", "400");
    props.put("zookeeper.sync.time.ms", "200");
    props.put("auto.commit.interval.ms", "1000");

    return new ConsumerConfig(props);

  }

  public void readdata() {
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(1));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumer.createMessageStreams(topicCountMap);
    KafkaStream<byte[], byte[]> stream =  consumerMap.get(topic).get(0);
    ConsumerIterator<byte[], byte[]> it = stream.iterator();
    while(it.hasNext())
      System.out.println(new String(it.next().message()));
  }
}


Thanks.
-- 
*Abhishek Bhattacharjee*
*Pune Institute of Computer Technology*

Reply via email to