Thanks for the reply.
Actually in my use-case I need to control the offsets my self so should I
use SimpleConsumer instead of Group Consumers ?


On Tue, Jan 21, 2014 at 9:38 PM, Jun Rao <jun...@gmail.com> wrote:

> "auto.offset.reset" is only used when offsets don't exist in ZK. In your
> case, the consumer likely already committed the offsets to ZK. So, after
> restarting, the consumer will resume from where it left off, instead of
> re-getting everything again. This is the expected behavior during normal
> operation. If you are testing, you can use a new consumer group.
>
> Thanks,
>
> Jun
>
>
> On Tue, Jan 21, 2014 at 8:02 AM, Abhishek Bhattacharjee <
> abhishek.bhattacharje...@gmail.com> wrote:
>
> > I read the faqs and I added "auto.offset.reset" property in the
> > configuration setting of storm. Then I ran my producer code and then I
> ran
> > my consumer code when I ran the consumer code it printed all the messages
> > that were created by producer but after stopping the consumer when I ran
> it
> > again it didn't show any messages. I think the offset was not reset. What
> > do you think is going wrong ?
> >
> > Thanks
> >
> >
> > On Mon, Jan 20, 2014 at 9:42 PM, Jun Rao <jun...@gmail.com> wrote:
> >
> > > Could you check the following FAQ?
> > >
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whydoesmyconsumernevergetanydata
> > > ?
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Myconsumerseemstohavestopped,why
> > > ?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Mon, Jan 20, 2014 at 7:22 AM, Abhishek Bhattacharjee <
> > > abhishek.bhattacharje...@gmail.com> wrote:
> > >
> > > > Sorry I have sent both codes as consumer codes. This is the producer
> > > code.
> > > >
> > > > *Producer.java*
> > > >
> > > > package kafka.examples;
> > > >
> > > >
> > > > import java.util.Properties;
> > > > import kafka.producer.KeyedMessage;
> > > > import kafka.producer.ProducerConfig;
> > > >
> > > > public class Producer/* extends Thread*/
> > > > {
> > > >   private final kafka.javaapi.producer.Producer<Integer, String>
> > > producer;
> > > >   private final String topic;
> > > >   private final Properties props = new Properties();
> > > >
> > > >   public Producer(String topic)
> > > >   {
> > > >     props.put("serializer.class", "kafka.serializer.StringEncoder");
> > > >     props.put("metadata.broker.list", "localhost:9092");
> > > >     // Use random partitioner. Don't need the key type. Just set it
> to
> > > > Integer.
> > > >     // The message is of type String.
> > > >     producer = new kafka.javaapi.producer.Producer<Integer,
> String>(new
> > > > ProducerConfig(props));
> > > >     this.topic = topic;
> > > >     System.out.println("Producer at "+this.topic);
> > > >   }
> > > >
> > > >   public void putdata() {
> > > >     int messageNo = 1;
> > > >     while(messageNo < 100)
> > > >     {
> > > >       String messageStr = new String("Message_" + messageNo);
> > > >       producer.send(new KeyedMessage<Integer, String>(topic
> > > ,messageStr));
> > > >       messageNo = messageNo +1;
> > > >     }
> > > >     producer.close();
> > > >     System.out.println("Producer exit");
> > > >   }
> > > >
> > > > }
> > > >
> > > >
> > > > On Mon, Jan 20, 2014 at 8:46 PM, Abhishek Bhattacharjee <
> > > > abhishek.bhattacharje...@gmail.com> wrote:
> > > >
> > > > > Hello,
> > > > > I am new to kafka and facing some problem.
> > > > > My producer code works properly and sends data.
> > > > > But the consumer is not able to read it.
> > > > > Here are the codes for Producer and Consumer.
> > > > > Something is wrong with the Consumer.java code can someone please
> > help
> > > > > with this.
> > > > >
> > > > >
> > > > > *Producer.java*
> > > > >
> > > > > package kafka.examples;
> > > > >
> > > > >
> > > > > import java.util.HashMap;
> > > > > import java.util.List;
> > > > > import java.util.Map;
> > > > > import java.util.Properties;
> > > > > import kafka.consumer.ConsumerConfig;
> > > > > import kafka.consumer.ConsumerIterator;
> > > > > import kafka.consumer.KafkaStream;
> > > > > import kafka.javaapi.consumer.ConsumerConnector;
> > > > > import kafka.message.Message;
> > > > >
> > > > >
> > > > > public class Consumer
> > > > > {
> > > > >     private final ConsumerConnector consumer;
> > > > >     private final String topic;
> > > > >
> > > > >     public Consumer(String topic)
> > > > >     {
> > > > > consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
> > > > >        createConsumerConfig());
> > > > >  this.topic = topic;
> > > > > System.out.println("Consumer at "+this.topic);
> > > > >     }
> > > > >
> > > > >     private static ConsumerConfig createConsumerConfig()
> > > > >     {
> > > > > Properties props = new Properties();
> > > > > props.put("zookeeper.connect", KafkaProperties.zkConnect);
> > > > >  props.put("group.id", KafkaProperties.groupId);
> > > > > props.put("zookeeper.session.timeout.ms", "400");
> > > > >  props.put("zookeeper.sync.time.ms", "200");
> > > > > props.put("auto.commit.interval.ms", "1000");
> > > > >
> > > > > return new ConsumerConfig(props);
> > > > >
> > > > >     }
> > > > >
> > > > >     public void readdata() {
> > > > > Map<String, Integer> topicCountMap = new HashMap<String,
> Integer>();
> > > > >  topicCountMap.put(topic, new Integer(1));
> > > > > Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> > > > > consumer.createMessageStreams(topicCountMap);
> > > > >  KafkaStream<byte[], byte[]> stream =
>  consumerMap.get(topic).get(0);
> > > > > ConsumerIterator<byte[], byte[]> it = stream.iterator();
> > > > >  System.out.println("Inside read data");
> > > > > while(it.hasNext())
> > > > >     System.out.println(new String(it.next().message()));
> > > > >
> > > > >     }
> > > > > }
> > > > >
> > > > > And this is the consumer code.
> > > > >
> > > > > *Consumer.java*
> > > > >
> > > > > package kafka.examples;
> > > > >
> > > > >
> > > > > import java.util.HashMap;
> > > > > import java.util.List;
> > > > > import java.util.Map;
> > > > > import java.util.Properties;
> > > > > import kafka.consumer.ConsumerConfig;
> > > > > import kafka.consumer.ConsumerIterator;
> > > > > import kafka.consumer.KafkaStream;
> > > > > import kafka.javaapi.consumer.ConsumerConnector;
> > > > > import kafka.message.Message;
> > > > >
> > > > >
> > > > > public class Consumer
> > > > > {
> > > > >   private final ConsumerConnector consumer;
> > > > >   private final String topic;
> > > > >
> > > > >   public Consumer(String topic)
> > > > >   {
> > > > >     consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
> > > > >             createConsumerConfig());
> > > > >     this.topic = topic;
> > > > >     System.out.println("Consumer at "+topic);
> > > > >   }
> > > > >
> > > > >   private static ConsumerConfig createConsumerConfig()
> > > > >   {
> > > > >     Properties props = new Properties();
> > > > >     props.put("zookeeper.connect", KafkaProperties.zkConnect);
> > > > >     props.put("group.id", KafkaProperties.groupId);
> > > > >     props.put("zookeeper.session.timeout.ms", "400");
> > > > >     props.put("zookeeper.sync.time.ms", "200");
> > > > >     props.put("auto.commit.interval.ms", "1000");
> > > > >
> > > > >     return new ConsumerConfig(props);
> > > > >
> > > > >   }
> > > > >
> > > > >   public void readdata() {
> > > > >     Map<String, Integer> topicCountMap = new HashMap<String,
> > > Integer>();
> > > > >     topicCountMap.put(topic, new Integer(1));
> > > > >     Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> > > > > consumer.createMessageStreams(topicCountMap);
> > > > >     KafkaStream<byte[], byte[]> stream =
> > >  consumerMap.get(topic).get(0);
> > > > >     ConsumerIterator<byte[], byte[]> it = stream.iterator();
> > > > >     while(it.hasNext())
> > > > >       System.out.println(new String(it.next().message()));
> > > > >   }
> > > > > }
> > > > >
> > > > >
> > > > > Thanks.
> > > > > --
> > > > > *Abhishek Bhattacharjee*
> > > > > *Pune Institute of Computer Technology*
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > *Abhishek Bhattacharjee*
> > > > *Pune Institute of Computer Technology*
> > > >
> > >
> >
> >
> >
> > --
> > *Abhishek Bhattacharjee*
> > *Pune Institute of Computer Technology*
> >
>



-- 
*Abhishek Bhattacharjee*
*Pune Institute of Computer Technology*

Reply via email to