Re: having problem with 0.8 gzip compression
Ok, the problem solved, I think it might be because some of the jar files that I was using were "OLD". I was building the producer and consumer under the 0.7 environment except swapping out the kafka jar file. Now, I created a whole new environment and pull in all the jar files from the 0.8. That seems to solve my 0.8 gzip problem. Thank you for all the help.
Re: having problem with 0.8 gzip compression
Joel, Would you mind point me to how I would be able to enable the trace logs in the producer and broker? Thanks, Scott On Wed, Jul 10, 2013 at 5:33 PM, Joel Koshy wrote: > Weird - I tried your exact code and it worked for me (although I was > using 0.8 head and not the beta). Can you re-run with trace logs > enabled in your producer and paste that output? Broker logs also if > you can? > > Thanks, > > Joel > > On Wed, Jul 10, 2013 at 10:23 AM, Scott Wang > wrote: > > Jun, > > > > I did a test this morning and got a very interesting result with you > > command. I started by wipe all the log files and clean up all zookeeper > > data files. > > > > Once I restarted both server, producer and consumer then execute your > > command, what I got is a empty log as following: > > > > Dumping /Users/scott/Temp/kafka/test-topic-0/.log > > Starting offset: 0 > > > > One observation, the .index file was getting huge but > > there was nothing in .log file. > > > > Thanks, > > Scott > > > > > > > > > > On Tue, Jul 9, 2013 at 8:40 PM, Jun Rao wrote: > > > >> Could you run the following command on one of the log files of your > topic > >> and attach the output? > >> > >> bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files > >> /tmp/kafka-logs/testtopic-0/.log > >> > >> Thanks, > >> > >> Jun > >> > >> > >> On Tue, Jul 9, 2013 at 3:23 PM, Scott Wang < > >> scott.w...@rumbleentertainment.com> wrote: > >> > >> > Another piece of information, the snappy compression also does not > work. > >> > > >> > Thanks, > >> > Scott > >> > > >> > > >> > On Tue, Jul 9, 2013 at 11:07 AM, Scott Wang < > >> > scott.w...@rumbleentertainment.com> wrote: > >> > > >> > > I just try it and it still not showing up, thanks for looking into > >> this. > >> > > > >> > > Thanks, > >> > > Scott > >> > > > >> > > > >> > > On Tue, Jul 9, 2013 at 8:06 AM, Jun Rao wrote: > >> > > > >> > >> Could you try starting the consumer first (and enable gzip in the > >> > >> producer)? > >> > >> > >> > >> Thanks, > >> > >> > >> > >> Jun > >> > >> > >> > >> > >> > >> On Mon, Jul 8, 2013 at 9:37 PM, Scott Wang < > >> > >> scott.w...@rumbleentertainment.com> wrote: > >> > >> > >> > >> > No, I did not start the consumer before the producer. I actually > >> > >> started > >> > >> > the producer first and nothing showed up in the consumer unless I > >> > >> commented > >> > >> > out this line -- props.put("compression.codec", "gzip").If I > >> > >> commented > >> > >> > out the compression codec, everything just works. > >> > >> > > >> > >> > > >> > >> > On Mon, Jul 8, 2013 at 9:07 PM, Jun Rao > wrote: > >> > >> > > >> > >> > > Did you start the consumer before the producer? Be default, the > >> > >> consumer > >> > >> > > gets only the new data? > >> > >> > > > >> > >> > > Thanks, > >> > >> > > > >> > >> > > Jun > >> > >> > > > >> > >> > > > >> > >> > > On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang < > >> > >> > > scott.w...@rumbleentertainment.com> wrote: > >> > >> > > > >> > >> > > > I am testing with Kafka 0.8 beta and having problem of > receiving > >> > >> > message > >> > >> > > in > >> > >> > > > consumer. There is no error so does anyone have any > insights. > >> > >> When I > >> > >> > > > commented out the "compression.code" everything works fine. > >> > >> > > > > >> > >> > > > My producer: > >> > >> > > > public class TestKafka08Prod { > >> > >&g
Re: having problem with 0.8 gzip compression
Jun, I did a test this morning and got a very interesting result with you command. I started by wipe all the log files and clean up all zookeeper data files. Once I restarted both server, producer and consumer then execute your command, what I got is a empty log as following: Dumping /Users/scott/Temp/kafka/test-topic-0/.log Starting offset: 0 One observation, the .index file was getting huge but there was nothing in .log file. Thanks, Scott On Tue, Jul 9, 2013 at 8:40 PM, Jun Rao wrote: > Could you run the following command on one of the log files of your topic > and attach the output? > > bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files > /tmp/kafka-logs/testtopic-0/.log > > Thanks, > > Jun > > > On Tue, Jul 9, 2013 at 3:23 PM, Scott Wang < > scott.w...@rumbleentertainment.com> wrote: > > > Another piece of information, the snappy compression also does not work. > > > > Thanks, > > Scott > > > > > > On Tue, Jul 9, 2013 at 11:07 AM, Scott Wang < > > scott.w...@rumbleentertainment.com> wrote: > > > > > I just try it and it still not showing up, thanks for looking into > this. > > > > > > Thanks, > > > Scott > > > > > > > > > On Tue, Jul 9, 2013 at 8:06 AM, Jun Rao wrote: > > > > > >> Could you try starting the consumer first (and enable gzip in the > > >> producer)? > > >> > > >> Thanks, > > >> > > >> Jun > > >> > > >> > > >> On Mon, Jul 8, 2013 at 9:37 PM, Scott Wang < > > >> scott.w...@rumbleentertainment.com> wrote: > > >> > > >> > No, I did not start the consumer before the producer. I actually > > >> started > > >> > the producer first and nothing showed up in the consumer unless I > > >> commented > > >> > out this line -- props.put("compression.codec", "gzip").If I > > >> commented > > >> > out the compression codec, everything just works. > > >> > > > >> > > > >> > On Mon, Jul 8, 2013 at 9:07 PM, Jun Rao wrote: > > >> > > > >> > > Did you start the consumer before the producer? Be default, the > > >> consumer > > >> > > gets only the new data? > > >> > > > > >> > > Thanks, > > >> > > > > >> > > Jun > > >> > > > > >> > > > > >> > > On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang < > > >> > > scott.w...@rumbleentertainment.com> wrote: > > >> > > > > >> > > > I am testing with Kafka 0.8 beta and having problem of receiving > > >> > message > > >> > > in > > >> > > > consumer. There is no error so does anyone have any insights. > > >> When I > > >> > > > commented out the "compression.code" everything works fine. > > >> > > > > > >> > > > My producer: > > >> > > > public class TestKafka08Prod { > > >> > > > > > >> > > > public static void main(String [] args) { > > >> > > > > > >> > > > Producer producer = null; > > >> > > > try { > > >> > > > Properties props = new Properties(); > > >> > > > props.put("metadata.broker.list", "localhost:9092"); > > >> > > > props.put("serializer.class", > > >> > > > "kafka.serializer.StringEncoder"); > > >> > > > props.put("producer.type", "sync"); > > >> > > > props.put("request.required.acks","1"); > > >> > > > props.put("compression.codec", "gzip"); > > >> > > > ProducerConfig config = new ProducerConfig(props); > > >> > > > producer = new Producer(config); > > >> > > > int j=0; > > >> > > > for(int i=0; i<10; i++) { > > >> > > > KeyedMessage data = new > > >> > > > KeyedMessage("test-topic", "test-me
Re: having problem with 0.8 gzip compression
Another piece of information, the snappy compression also does not work. Thanks, Scott On Tue, Jul 9, 2013 at 11:07 AM, Scott Wang < scott.w...@rumbleentertainment.com> wrote: > I just try it and it still not showing up, thanks for looking into this. > > Thanks, > Scott > > > On Tue, Jul 9, 2013 at 8:06 AM, Jun Rao wrote: > >> Could you try starting the consumer first (and enable gzip in the >> producer)? >> >> Thanks, >> >> Jun >> >> >> On Mon, Jul 8, 2013 at 9:37 PM, Scott Wang < >> scott.w...@rumbleentertainment.com> wrote: >> >> > No, I did not start the consumer before the producer. I actually >> started >> > the producer first and nothing showed up in the consumer unless I >> commented >> > out this line -- props.put("compression.codec", "gzip").If I >> commented >> > out the compression codec, everything just works. >> > >> > >> > On Mon, Jul 8, 2013 at 9:07 PM, Jun Rao wrote: >> > >> > > Did you start the consumer before the producer? Be default, the >> consumer >> > > gets only the new data? >> > > >> > > Thanks, >> > > >> > > Jun >> > > >> > > >> > > On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang < >> > > scott.w...@rumbleentertainment.com> wrote: >> > > >> > > > I am testing with Kafka 0.8 beta and having problem of receiving >> > message >> > > in >> > > > consumer. There is no error so does anyone have any insights. >> When I >> > > > commented out the "compression.code" everything works fine. >> > > > >> > > > My producer: >> > > > public class TestKafka08Prod { >> > > > >> > > > public static void main(String [] args) { >> > > > >> > > > Producer producer = null; >> > > > try { >> > > > Properties props = new Properties(); >> > > > props.put("metadata.broker.list", "localhost:9092"); >> > > > props.put("serializer.class", >> > > > "kafka.serializer.StringEncoder"); >> > > > props.put("producer.type", "sync"); >> > > > props.put("request.required.acks","1"); >> > > > props.put("compression.codec", "gzip"); >> > > > ProducerConfig config = new ProducerConfig(props); >> > > > producer = new Producer(config); >> > > > int j=0; >> > > > for(int i=0; i<10; i++) { >> > > > KeyedMessage data = new >> > > > KeyedMessage("test-topic", "test-message: "+i+" >> > > > "+System.currentTimeMillis()); >> > > > producer.send(data); >> > > > >> > > > } >> > > > >> > > > } catch (Exception e) { >> > > > System.out.println("Error happened: "); >> > > > e.printStackTrace(); >> > > > } finally { >> > > > if(null != null) { >> > > > producer.close(); >> > > > } >> > > > >> > > > System.out.println("Ened of Sending"); >> > > > } >> > > > >> > > > System.exit(0); >> > > > } >> > > > } >> > > > >> > > > >> > > > My consumer: >> > > > >> > > > public class TestKafka08Consumer { >> > > > public static void main(String [] args) throws >> > UnknownHostException, >> > > > SocketException { >> > > > >> > > > Properties props = new Properties(); >> > > > props.put("zookeeper.connect", "localhost:2181/kafka_0_8"); >> > > > props.put("group.id", "test08ConsumerId"); >> > > > props.put("zk.sessiontimeout.ms", "4000"); >> > > > props.put("zk.synctime.ms", "2000"); >> > > > props.put("autocommit.interval.ms", "1000"); >> > > > >> > > > ConsumerConfig consumerConfig = new ConsumerConfig(props); >> > > > >> > > > ConsumerConnector consumerConnector = >> > > > kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig); >> > > > >> > > > String topic = "test-topic"; >> > > > Map topicCountMap = new HashMap> > > > Integer>(); >> > > > topicCountMap.put(topic, new Integer(1)); >> > > > Map>> consumerMap = >> > > > consumerConnector.createMessageStreams(topicCountMap); >> > > > KafkaStream stream = >> > > > consumerMap.get(topic).get(0); >> > > > >> > > > ConsumerIterator it = stream.iterator(); >> > > > >> > > > int counter=0; >> > > > while(it.hasNext()) { >> > > > try { >> > > > String fromPlatform = new >> String(it.next().message()); >> > > > System.out.println("The messages: "+fromPlatform); >> > > > } catch(Exception e) { >> > > > e.printStackTrace(); >> > > > } >> > > > } >> > > > System.out.println("SystemOut"); >> > > > } >> > > > } >> > > > >> > > > >> > > > Thanks >> > > > >> > > >> > >> > >
Re: having problem with 0.8 gzip compression
I just try it and it still not showing up, thanks for looking into this. Thanks, Scott On Tue, Jul 9, 2013 at 8:06 AM, Jun Rao wrote: > Could you try starting the consumer first (and enable gzip in the > producer)? > > Thanks, > > Jun > > > On Mon, Jul 8, 2013 at 9:37 PM, Scott Wang < > scott.w...@rumbleentertainment.com> wrote: > > > No, I did not start the consumer before the producer. I actually started > > the producer first and nothing showed up in the consumer unless I > commented > > out this line -- props.put("compression.codec", "gzip").If I > commented > > out the compression codec, everything just works. > > > > > > On Mon, Jul 8, 2013 at 9:07 PM, Jun Rao wrote: > > > > > Did you start the consumer before the producer? Be default, the > consumer > > > gets only the new data? > > > > > > Thanks, > > > > > > Jun > > > > > > > > > On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang < > > > scott.w...@rumbleentertainment.com> wrote: > > > > > > > I am testing with Kafka 0.8 beta and having problem of receiving > > message > > > in > > > > consumer. There is no error so does anyone have any insights. When > I > > > > commented out the "compression.code" everything works fine. > > > > > > > > My producer: > > > > public class TestKafka08Prod { > > > > > > > > public static void main(String [] args) { > > > > > > > > Producer producer = null; > > > > try { > > > > Properties props = new Properties(); > > > > props.put("metadata.broker.list", "localhost:9092"); > > > > props.put("serializer.class", > > > > "kafka.serializer.StringEncoder"); > > > > props.put("producer.type", "sync"); > > > > props.put("request.required.acks","1"); > > > > props.put("compression.codec", "gzip"); > > > > ProducerConfig config = new ProducerConfig(props); > > > > producer = new Producer(config); > > > > int j=0; > > > > for(int i=0; i<10; i++) { > > > > KeyedMessage data = new > > > > KeyedMessage("test-topic", "test-message: "+i+" > > > > "+System.currentTimeMillis()); > > > > producer.send(data); > > > > > > > > } > > > > > > > > } catch (Exception e) { > > > > System.out.println("Error happened: "); > > > > e.printStackTrace(); > > > > } finally { > > > > if(null != null) { > > > > producer.close(); > > > > } > > > > > > > > System.out.println("Ened of Sending"); > > > > } > > > > > > > > System.exit(0); > > > > } > > > > } > > > > > > > > > > > > My consumer: > > > > > > > > public class TestKafka08Consumer { > > > > public static void main(String [] args) throws > > UnknownHostException, > > > > SocketException { > > > > > > > > Properties props = new Properties(); > > > > props.put("zookeeper.connect", "localhost:2181/kafka_0_8"); > > > > props.put("group.id", "test08ConsumerId"); > > > > props.put("zk.sessiontimeout.ms", "4000"); > > > > props.put("zk.synctime.ms", "2000"); > > > > props.put("autocommit.interval.ms", "1000"); > > > > > > > > ConsumerConfig consumerConfig = new ConsumerConfig(props); > > > > > > > > ConsumerConnector consumerConnector = > > > > kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig); > > > > > > > > String topic = "test-topic"; > > > > Map topicCountMap = new HashMap > > > Integer>(); > > > > topicCountMap.put(topic, new Integer(1)); > > > > Map>> consumerMap = > > > > consumerConnector.createMessageStreams(topicCountMap); > > > > KafkaStream stream = > > > > consumerMap.get(topic).get(0); > > > > > > > > ConsumerIterator it = stream.iterator(); > > > > > > > > int counter=0; > > > > while(it.hasNext()) { > > > > try { > > > > String fromPlatform = new > String(it.next().message()); > > > > System.out.println("The messages: "+fromPlatform); > > > > } catch(Exception e) { > > > > e.printStackTrace(); > > > > } > > > > } > > > > System.out.println("SystemOut"); > > > > } > > > > } > > > > > > > > > > > > Thanks > > > > > > > > > >
Re: having problem with 0.8 gzip compression
No, I did not start the consumer before the producer. I actually started the producer first and nothing showed up in the consumer unless I commented out this line -- props.put("compression.codec", "gzip").If I commented out the compression codec, everything just works. On Mon, Jul 8, 2013 at 9:07 PM, Jun Rao wrote: > Did you start the consumer before the producer? Be default, the consumer > gets only the new data? > > Thanks, > > Jun > > > On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang < > scott.w...@rumbleentertainment.com> wrote: > > > I am testing with Kafka 0.8 beta and having problem of receiving message > in > > consumer. There is no error so does anyone have any insights. When I > > commented out the "compression.code" everything works fine. > > > > My producer: > > public class TestKafka08Prod { > > > > public static void main(String [] args) { > > > > Producer producer = null; > > try { > > Properties props = new Properties(); > > props.put("metadata.broker.list", "localhost:9092"); > > props.put("serializer.class", > > "kafka.serializer.StringEncoder"); > > props.put("producer.type", "sync"); > > props.put("request.required.acks","1"); > > props.put("compression.codec", "gzip"); > > ProducerConfig config = new ProducerConfig(props); > > producer = new Producer(config); > > int j=0; > > for(int i=0; i<10; i++) { > > KeyedMessage data = new > > KeyedMessage("test-topic", "test-message: "+i+" > > "+System.currentTimeMillis()); > > producer.send(data); > > > > } > > > > } catch (Exception e) { > > System.out.println("Error happened: "); > > e.printStackTrace(); > > } finally { > > if(null != null) { > > producer.close(); > > } > > > > System.out.println("Ened of Sending"); > > } > > > > System.exit(0); > > } > > } > > > > > > My consumer: > > > > public class TestKafka08Consumer { > > public static void main(String [] args) throws UnknownHostException, > > SocketException { > > > > Properties props = new Properties(); > > props.put("zookeeper.connect", "localhost:2181/kafka_0_8"); > > props.put("group.id", "test08ConsumerId"); > > props.put("zk.sessiontimeout.ms", "4000"); > > props.put("zk.synctime.ms", "2000"); > > props.put("autocommit.interval.ms", "1000"); > > > > ConsumerConfig consumerConfig = new ConsumerConfig(props); > > > > ConsumerConnector consumerConnector = > > kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig); > > > > String topic = "test-topic"; > > Map topicCountMap = new HashMap > Integer>(); > > topicCountMap.put(topic, new Integer(1)); > > Map>> consumerMap = > > consumerConnector.createMessageStreams(topicCountMap); > > KafkaStream stream = > > consumerMap.get(topic).get(0); > > > > ConsumerIterator it = stream.iterator(); > > > > int counter=0; > > while(it.hasNext()) { > > try { > > String fromPlatform = new String(it.next().message()); > > System.out.println("The messages: "+fromPlatform); > > } catch(Exception e) { > > e.printStackTrace(); > > } > > } > > System.out.println("SystemOut"); > > } > > } > > > > > > Thanks > > >
having problem with 0.8 gzip compression
I am testing with Kafka 0.8 beta and having problem of receiving message in consumer. There is no error so does anyone have any insights. When I commented out the "compression.code" everything works fine. My producer: public class TestKafka08Prod { public static void main(String [] args) { Producer producer = null; try { Properties props = new Properties(); props.put("metadata.broker.list", "localhost:9092"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("producer.type", "sync"); props.put("request.required.acks","1"); props.put("compression.codec", "gzip"); ProducerConfig config = new ProducerConfig(props); producer = new Producer(config); int j=0; for(int i=0; i<10; i++) { KeyedMessage data = new KeyedMessage("test-topic", "test-message: "+i+" "+System.currentTimeMillis()); producer.send(data); } } catch (Exception e) { System.out.println("Error happened: "); e.printStackTrace(); } finally { if(null != null) { producer.close(); } System.out.println("Ened of Sending"); } System.exit(0); } } My consumer: public class TestKafka08Consumer { public static void main(String [] args) throws UnknownHostException, SocketException { Properties props = new Properties(); props.put("zookeeper.connect", "localhost:2181/kafka_0_8"); props.put("group.id", "test08ConsumerId"); props.put("zk.sessiontimeout.ms", "4000"); props.put("zk.synctime.ms", "2000"); props.put("autocommit.interval.ms", "1000"); ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector consumerConnector = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig); String topic = "test-topic"; Map topicCountMap = new HashMap(); topicCountMap.put(topic, new Integer(1)); Map>> consumerMap = consumerConnector.createMessageStreams(topicCountMap); KafkaStream stream = consumerMap.get(topic).get(0); ConsumerIterator it = stream.iterator(); int counter=0; while(it.hasNext()) { try { String fromPlatform = new String(it.next().message()); System.out.println("The messages: "+fromPlatform); } catch(Exception e) { e.printStackTrace(); } } System.out.println("SystemOut"); } } Thanks