No, I did not start the consumer before the producer. I actually started
the producer first and nothing showed up in the consumer unless I commented
out this line -- props.put("compression.codec", "gzip"). If I commented
out the compression codec, everything just works.
On Mon, Jul 8, 2013 at 9:07 PM, Jun Rao <[email protected]> wrote:
> Did you start the consumer before the producer? Be default, the consumer
> gets only the new data?
>
> Thanks,
>
> Jun
>
>
> On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang <
> [email protected]> wrote:
>
> > I am testing with Kafka 0.8 beta and having problem of receiving message
> in
> > consumer. There is no error so does anyone have any insights. When I
> > commented out the "compression.code" everything works fine.
> >
> > My producer:
> > public class TestKafka08Prod {
> >
> > public static void main(String [] args) {
> >
> > Producer<Integer, String> producer = null;
> > try {
> > Properties props = new Properties();
> > props.put("metadata.broker.list", "localhost:9092");
> > props.put("serializer.class",
> > "kafka.serializer.StringEncoder");
> > props.put("producer.type", "sync");
> > props.put("request.required.acks","1");
> > props.put("compression.codec", "gzip");
> > ProducerConfig config = new ProducerConfig(props);
> > producer = new Producer<Integer, String>(config);
> > int j=0;
> > for(int i=0; i<10; i++) {
> > KeyedMessage<Integer, String> data = new
> > KeyedMessage<Integer, String>("test-topic", "test-message: "+i+"
> > "+System.currentTimeMillis());
> > producer.send(data);
> >
> > }
> >
> > } catch (Exception e) {
> > System.out.println("Error happened: ");
> > e.printStackTrace();
> > } finally {
> > if(null != null) {
> > producer.close();
> > }
> >
> > System.out.println("Ened of Sending");
> > }
> >
> > System.exit(0);
> > }
> > }
> >
> >
> > My consumer:
> >
> > public class TestKafka08Consumer {
> > public static void main(String [] args) throws UnknownHostException,
> > SocketException {
> >
> > Properties props = new Properties();
> > props.put("zookeeper.connect", "localhost:2181/kafka_0_8");
> > props.put("group.id", "test08ConsumerId");
> > props.put("zk.sessiontimeout.ms", "4000");
> > props.put("zk.synctime.ms", "2000");
> > props.put("autocommit.interval.ms", "1000");
> >
> > ConsumerConfig consumerConfig = new ConsumerConfig(props);
> >
> > ConsumerConnector consumerConnector =
> > kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
> >
> > String topic = "test-topic";
> > Map<String, Integer> topicCountMap = new HashMap<String,
> > Integer>();
> > topicCountMap.put(topic, new Integer(1));
> > Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> > consumerConnector.createMessageStreams(topicCountMap);
> > KafkaStream<byte[], byte[]> stream =
> > consumerMap.get(topic).get(0);
> >
> > ConsumerIterator<byte[], byte[]> it = stream.iterator();
> >
> > int counter=0;
> > while(it.hasNext()) {
> > try {
> > String fromPlatform = new String(it.next().message());
> > System.out.println("The messages: "+fromPlatform);
> > } catch(Exception e) {
> > e.printStackTrace();
> > }
> > }
> > System.out.println("SystemOut");
> > }
> > }
> >
> >
> > Thanks
> >
>