Could you check the following FAQ?

https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whydoesmyconsumernevergetanydata
?
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Myconsumerseemstohavestopped,why
?

Thanks,

Jun


On Mon, Jan 20, 2014 at 7:22 AM, Abhishek Bhattacharjee <
abhishek.bhattacharje...@gmail.com> wrote:

> Sorry I have sent both codes as consumer codes. This is the producer code.
>
> *Producer.java*
>
> package kafka.examples;
>
>
> import java.util.Properties;
> import kafka.producer.KeyedMessage;
> import kafka.producer.ProducerConfig;
>
> public class Producer/* extends Thread*/
> {
>   private final kafka.javaapi.producer.Producer<Integer, String> producer;
>   private final String topic;
>   private final Properties props = new Properties();
>
>   public Producer(String topic)
>   {
>     props.put("serializer.class", "kafka.serializer.StringEncoder");
>     props.put("metadata.broker.list", "localhost:9092");
>     // Use random partitioner. Don't need the key type. Just set it to
> Integer.
>     // The message is of type String.
>     producer = new kafka.javaapi.producer.Producer<Integer, String>(new
> ProducerConfig(props));
>     this.topic = topic;
>     System.out.println("Producer at "+this.topic);
>   }
>
>   public void putdata() {
>     int messageNo = 1;
>     while(messageNo < 100)
>     {
>       String messageStr = new String("Message_" + messageNo);
>       producer.send(new KeyedMessage<Integer, String>(topic ,messageStr));
>       messageNo = messageNo +1;
>     }
>     producer.close();
>     System.out.println("Producer exit");
>   }
>
> }
>
>
> On Mon, Jan 20, 2014 at 8:46 PM, Abhishek Bhattacharjee <
> abhishek.bhattacharje...@gmail.com> wrote:
>
> > Hello,
> > I am new to kafka and facing some problem.
> > My producer code works properly and sends data.
> > But the consumer is not able to read it.
> > Here are the codes for Producer and Consumer.
> > Something is wrong with the Consumer.java code can someone please help
> > with this.
> >
> >
> > *Producer.java*
> >
> > package kafka.examples;
> >
> >
> > import java.util.HashMap;
> > import java.util.List;
> > import java.util.Map;
> > import java.util.Properties;
> > import kafka.consumer.ConsumerConfig;
> > import kafka.consumer.ConsumerIterator;
> > import kafka.consumer.KafkaStream;
> > import kafka.javaapi.consumer.ConsumerConnector;
> > import kafka.message.Message;
> >
> >
> > public class Consumer
> > {
> >     private final ConsumerConnector consumer;
> >     private final String topic;
> >
> >     public Consumer(String topic)
> >     {
> > consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
> >        createConsumerConfig());
> >  this.topic = topic;
> > System.out.println("Consumer at "+this.topic);
> >     }
> >
> >     private static ConsumerConfig createConsumerConfig()
> >     {
> > Properties props = new Properties();
> > props.put("zookeeper.connect", KafkaProperties.zkConnect);
> >  props.put("group.id", KafkaProperties.groupId);
> > props.put("zookeeper.session.timeout.ms", "400");
> >  props.put("zookeeper.sync.time.ms", "200");
> > props.put("auto.commit.interval.ms", "1000");
> >
> > return new ConsumerConfig(props);
> >
> >     }
> >
> >     public void readdata() {
> > Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
> >  topicCountMap.put(topic, new Integer(1));
> > Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> > consumer.createMessageStreams(topicCountMap);
> >  KafkaStream<byte[], byte[]> stream =  consumerMap.get(topic).get(0);
> > ConsumerIterator<byte[], byte[]> it = stream.iterator();
> >  System.out.println("Inside read data");
> > while(it.hasNext())
> >     System.out.println(new String(it.next().message()));
> >
> >     }
> > }
> >
> > And this is the consumer code.
> >
> > *Consumer.java*
> >
> > package kafka.examples;
> >
> >
> > import java.util.HashMap;
> > import java.util.List;
> > import java.util.Map;
> > import java.util.Properties;
> > import kafka.consumer.ConsumerConfig;
> > import kafka.consumer.ConsumerIterator;
> > import kafka.consumer.KafkaStream;
> > import kafka.javaapi.consumer.ConsumerConnector;
> > import kafka.message.Message;
> >
> >
> > public class Consumer
> > {
> >   private final ConsumerConnector consumer;
> >   private final String topic;
> >
> >   public Consumer(String topic)
> >   {
> >     consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
> >             createConsumerConfig());
> >     this.topic = topic;
> >     System.out.println("Consumer at "+topic);
> >   }
> >
> >   private static ConsumerConfig createConsumerConfig()
> >   {
> >     Properties props = new Properties();
> >     props.put("zookeeper.connect", KafkaProperties.zkConnect);
> >     props.put("group.id", KafkaProperties.groupId);
> >     props.put("zookeeper.session.timeout.ms", "400");
> >     props.put("zookeeper.sync.time.ms", "200");
> >     props.put("auto.commit.interval.ms", "1000");
> >
> >     return new ConsumerConfig(props);
> >
> >   }
> >
> >   public void readdata() {
> >     Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
> >     topicCountMap.put(topic, new Integer(1));
> >     Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> > consumer.createMessageStreams(topicCountMap);
> >     KafkaStream<byte[], byte[]> stream =  consumerMap.get(topic).get(0);
> >     ConsumerIterator<byte[], byte[]> it = stream.iterator();
> >     while(it.hasNext())
> >       System.out.println(new String(it.next().message()));
> >   }
> > }
> >
> >
> > Thanks.
> > --
> > *Abhishek Bhattacharjee*
> > *Pune Institute of Computer Technology*
> >
>
>
>
> --
> *Abhishek Bhattacharjee*
> *Pune Institute of Computer Technology*
>

Reply via email to