Re: having problem with 0.8 gzip compression
Joel, Would you mind point me to how I would be able to enable the trace logs in the producer and broker? Thanks, Scott On Wed, Jul 10, 2013 at 5:33 PM, Joel Koshy jjkosh...@gmail.com wrote: Weird - I tried your exact code and it worked for me (although I was using 0.8 head and not the beta). Can you re-run with trace logs enabled in your producer and paste that output? Broker logs also if you can? Thanks, Joel On Wed, Jul 10, 2013 at 10:23 AM, Scott Wang scott.w...@rumbleentertainment.com wrote: Jun, I did a test this morning and got a very interesting result with you command. I started by wipe all the log files and clean up all zookeeper data files. Once I restarted both server, producer and consumer then execute your command, what I got is a empty log as following: Dumping /Users/scott/Temp/kafka/test-topic-0/.log Starting offset: 0 One observation, the .index file was getting huge but there was nothing in .log file. Thanks, Scott On Tue, Jul 9, 2013 at 8:40 PM, Jun Rao jun...@gmail.com wrote: Could you run the following command on one of the log files of your topic and attach the output? bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/testtopic-0/.log Thanks, Jun On Tue, Jul 9, 2013 at 3:23 PM, Scott Wang scott.w...@rumbleentertainment.com wrote: Another piece of information, the snappy compression also does not work. Thanks, Scott On Tue, Jul 9, 2013 at 11:07 AM, Scott Wang scott.w...@rumbleentertainment.com wrote: I just try it and it still not showing up, thanks for looking into this. Thanks, Scott On Tue, Jul 9, 2013 at 8:06 AM, Jun Rao jun...@gmail.com wrote: Could you try starting the consumer first (and enable gzip in the producer)? Thanks, Jun On Mon, Jul 8, 2013 at 9:37 PM, Scott Wang scott.w...@rumbleentertainment.com wrote: No, I did not start the consumer before the producer. I actually started the producer first and nothing showed up in the consumer unless I commented out this line -- props.put(compression.codec, gzip).If I commented out the compression codec, everything just works. On Mon, Jul 8, 2013 at 9:07 PM, Jun Rao jun...@gmail.com wrote: Did you start the consumer before the producer? Be default, the consumer gets only the new data? Thanks, Jun On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang scott.w...@rumbleentertainment.com wrote: I am testing with Kafka 0.8 beta and having problem of receiving message in consumer. There is no error so does anyone have any insights. When I commented out the compression.code everything works fine. My producer: public class TestKafka08Prod { public static void main(String [] args) { ProducerInteger, String producer = null; try { Properties props = new Properties(); props.put(metadata.broker.list, localhost:9092); props.put(serializer.class, kafka.serializer.StringEncoder); props.put(producer.type, sync); props.put(request.required.acks,1); props.put(compression.codec, gzip); ProducerConfig config = new ProducerConfig(props); producer = new ProducerInteger, String(config); int j=0; for(int i=0; i10; i++) { KeyedMessageInteger, String data = new KeyedMessageInteger, String(test-topic, test-message: +i+ +System.currentTimeMillis()); producer.send(data); } } catch (Exception e) { System.out.println(Error happened: ); e.printStackTrace(); } finally { if(null != null) { producer.close(); } System.out.println(Ened of Sending); } System.exit(0); } } My consumer: public class TestKafka08Consumer { public static void main(String [] args) throws UnknownHostException, SocketException { Properties props = new Properties(); props.put(zookeeper.connect, localhost:2181/kafka_0_8); props.put(group.id, test08ConsumerId); props.put(zk.sessiontimeout.ms, 4000); props.put(zk.synctime.ms, 2000); props.put(autocommit.interval.ms, 1000); ConsumerConfig
Re: having problem with 0.8 gzip compression
Ok, the problem solved, I think it might be because some of the jar files that I was using were OLD. I was building the producer and consumer under the 0.7 environment except swapping out the kafka jar file. Now, I created a whole new environment and pull in all the jar files from the 0.8. That seems to solve my 0.8 gzip problem. Thank you for all the help.
Re: having problem with 0.8 gzip compression
Jun, I did a test this morning and got a very interesting result with you command. I started by wipe all the log files and clean up all zookeeper data files. Once I restarted both server, producer and consumer then execute your command, what I got is a empty log as following: Dumping /Users/scott/Temp/kafka/test-topic-0/.log Starting offset: 0 One observation, the .index file was getting huge but there was nothing in .log file. Thanks, Scott On Tue, Jul 9, 2013 at 8:40 PM, Jun Rao jun...@gmail.com wrote: Could you run the following command on one of the log files of your topic and attach the output? bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/testtopic-0/.log Thanks, Jun On Tue, Jul 9, 2013 at 3:23 PM, Scott Wang scott.w...@rumbleentertainment.com wrote: Another piece of information, the snappy compression also does not work. Thanks, Scott On Tue, Jul 9, 2013 at 11:07 AM, Scott Wang scott.w...@rumbleentertainment.com wrote: I just try it and it still not showing up, thanks for looking into this. Thanks, Scott On Tue, Jul 9, 2013 at 8:06 AM, Jun Rao jun...@gmail.com wrote: Could you try starting the consumer first (and enable gzip in the producer)? Thanks, Jun On Mon, Jul 8, 2013 at 9:37 PM, Scott Wang scott.w...@rumbleentertainment.com wrote: No, I did not start the consumer before the producer. I actually started the producer first and nothing showed up in the consumer unless I commented out this line -- props.put(compression.codec, gzip).If I commented out the compression codec, everything just works. On Mon, Jul 8, 2013 at 9:07 PM, Jun Rao jun...@gmail.com wrote: Did you start the consumer before the producer? Be default, the consumer gets only the new data? Thanks, Jun On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang scott.w...@rumbleentertainment.com wrote: I am testing with Kafka 0.8 beta and having problem of receiving message in consumer. There is no error so does anyone have any insights. When I commented out the compression.code everything works fine. My producer: public class TestKafka08Prod { public static void main(String [] args) { ProducerInteger, String producer = null; try { Properties props = new Properties(); props.put(metadata.broker.list, localhost:9092); props.put(serializer.class, kafka.serializer.StringEncoder); props.put(producer.type, sync); props.put(request.required.acks,1); props.put(compression.codec, gzip); ProducerConfig config = new ProducerConfig(props); producer = new ProducerInteger, String(config); int j=0; for(int i=0; i10; i++) { KeyedMessageInteger, String data = new KeyedMessageInteger, String(test-topic, test-message: +i+ +System.currentTimeMillis()); producer.send(data); } } catch (Exception e) { System.out.println(Error happened: ); e.printStackTrace(); } finally { if(null != null) { producer.close(); } System.out.println(Ened of Sending); } System.exit(0); } } My consumer: public class TestKafka08Consumer { public static void main(String [] args) throws UnknownHostException, SocketException { Properties props = new Properties(); props.put(zookeeper.connect, localhost:2181/kafka_0_8); props.put(group.id, test08ConsumerId); props.put(zk.sessiontimeout.ms, 4000); props.put(zk.synctime.ms, 2000); props.put(autocommit.interval.ms, 1000); ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector consumerConnector = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig); String topic = test-topic; MapString, Integer topicCountMap = new HashMapString, Integer(); topicCountMap.put(topic, new Integer(1)); MapString, ListKafkaStreambyte[], byte[] consumerMap = consumerConnector.createMessageStreams(topicCountMap); KafkaStreambyte[], byte[] stream = consumerMap.get(topic).get(0); ConsumerIteratorbyte[], byte[] it = stream.iterator(); int counter=0
Re: having problem with 0.8 gzip compression
Another piece of information, the snappy compression also does not work. Thanks, Scott On Tue, Jul 9, 2013 at 11:07 AM, Scott Wang scott.w...@rumbleentertainment.com wrote: I just try it and it still not showing up, thanks for looking into this. Thanks, Scott On Tue, Jul 9, 2013 at 8:06 AM, Jun Rao jun...@gmail.com wrote: Could you try starting the consumer first (and enable gzip in the producer)? Thanks, Jun On Mon, Jul 8, 2013 at 9:37 PM, Scott Wang scott.w...@rumbleentertainment.com wrote: No, I did not start the consumer before the producer. I actually started the producer first and nothing showed up in the consumer unless I commented out this line -- props.put(compression.codec, gzip).If I commented out the compression codec, everything just works. On Mon, Jul 8, 2013 at 9:07 PM, Jun Rao jun...@gmail.com wrote: Did you start the consumer before the producer? Be default, the consumer gets only the new data? Thanks, Jun On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang scott.w...@rumbleentertainment.com wrote: I am testing with Kafka 0.8 beta and having problem of receiving message in consumer. There is no error so does anyone have any insights. When I commented out the compression.code everything works fine. My producer: public class TestKafka08Prod { public static void main(String [] args) { ProducerInteger, String producer = null; try { Properties props = new Properties(); props.put(metadata.broker.list, localhost:9092); props.put(serializer.class, kafka.serializer.StringEncoder); props.put(producer.type, sync); props.put(request.required.acks,1); props.put(compression.codec, gzip); ProducerConfig config = new ProducerConfig(props); producer = new ProducerInteger, String(config); int j=0; for(int i=0; i10; i++) { KeyedMessageInteger, String data = new KeyedMessageInteger, String(test-topic, test-message: +i+ +System.currentTimeMillis()); producer.send(data); } } catch (Exception e) { System.out.println(Error happened: ); e.printStackTrace(); } finally { if(null != null) { producer.close(); } System.out.println(Ened of Sending); } System.exit(0); } } My consumer: public class TestKafka08Consumer { public static void main(String [] args) throws UnknownHostException, SocketException { Properties props = new Properties(); props.put(zookeeper.connect, localhost:2181/kafka_0_8); props.put(group.id, test08ConsumerId); props.put(zk.sessiontimeout.ms, 4000); props.put(zk.synctime.ms, 2000); props.put(autocommit.interval.ms, 1000); ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector consumerConnector = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig); String topic = test-topic; MapString, Integer topicCountMap = new HashMapString, Integer(); topicCountMap.put(topic, new Integer(1)); MapString, ListKafkaStreambyte[], byte[] consumerMap = consumerConnector.createMessageStreams(topicCountMap); KafkaStreambyte[], byte[] stream = consumerMap.get(topic).get(0); ConsumerIteratorbyte[], byte[] it = stream.iterator(); int counter=0; while(it.hasNext()) { try { String fromPlatform = new String(it.next().message()); System.out.println(The messages: +fromPlatform); } catch(Exception e) { e.printStackTrace(); } } System.out.println(SystemOut); } } Thanks
having problem with 0.8 gzip compression
I am testing with Kafka 0.8 beta and having problem of receiving message in consumer. There is no error so does anyone have any insights. When I commented out the compression.code everything works fine. My producer: public class TestKafka08Prod { public static void main(String [] args) { ProducerInteger, String producer = null; try { Properties props = new Properties(); props.put(metadata.broker.list, localhost:9092); props.put(serializer.class, kafka.serializer.StringEncoder); props.put(producer.type, sync); props.put(request.required.acks,1); props.put(compression.codec, gzip); ProducerConfig config = new ProducerConfig(props); producer = new ProducerInteger, String(config); int j=0; for(int i=0; i10; i++) { KeyedMessageInteger, String data = new KeyedMessageInteger, String(test-topic, test-message: +i+ +System.currentTimeMillis()); producer.send(data); } } catch (Exception e) { System.out.println(Error happened: ); e.printStackTrace(); } finally { if(null != null) { producer.close(); } System.out.println(Ened of Sending); } System.exit(0); } } My consumer: public class TestKafka08Consumer { public static void main(String [] args) throws UnknownHostException, SocketException { Properties props = new Properties(); props.put(zookeeper.connect, localhost:2181/kafka_0_8); props.put(group.id, test08ConsumerId); props.put(zk.sessiontimeout.ms, 4000); props.put(zk.synctime.ms, 2000); props.put(autocommit.interval.ms, 1000); ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector consumerConnector = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig); String topic = test-topic; MapString, Integer topicCountMap = new HashMapString, Integer(); topicCountMap.put(topic, new Integer(1)); MapString, ListKafkaStreambyte[], byte[] consumerMap = consumerConnector.createMessageStreams(topicCountMap); KafkaStreambyte[], byte[] stream = consumerMap.get(topic).get(0); ConsumerIteratorbyte[], byte[] it = stream.iterator(); int counter=0; while(it.hasNext()) { try { String fromPlatform = new String(it.next().message()); System.out.println(The messages: +fromPlatform); } catch(Exception e) { e.printStackTrace(); } } System.out.println(SystemOut); } } Thanks
Re: having problem with 0.8 gzip compression
No, I did not start the consumer before the producer. I actually started the producer first and nothing showed up in the consumer unless I commented out this line -- props.put(compression.codec, gzip).If I commented out the compression codec, everything just works. On Mon, Jul 8, 2013 at 9:07 PM, Jun Rao jun...@gmail.com wrote: Did you start the consumer before the producer? Be default, the consumer gets only the new data? Thanks, Jun On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang scott.w...@rumbleentertainment.com wrote: I am testing with Kafka 0.8 beta and having problem of receiving message in consumer. There is no error so does anyone have any insights. When I commented out the compression.code everything works fine. My producer: public class TestKafka08Prod { public static void main(String [] args) { ProducerInteger, String producer = null; try { Properties props = new Properties(); props.put(metadata.broker.list, localhost:9092); props.put(serializer.class, kafka.serializer.StringEncoder); props.put(producer.type, sync); props.put(request.required.acks,1); props.put(compression.codec, gzip); ProducerConfig config = new ProducerConfig(props); producer = new ProducerInteger, String(config); int j=0; for(int i=0; i10; i++) { KeyedMessageInteger, String data = new KeyedMessageInteger, String(test-topic, test-message: +i+ +System.currentTimeMillis()); producer.send(data); } } catch (Exception e) { System.out.println(Error happened: ); e.printStackTrace(); } finally { if(null != null) { producer.close(); } System.out.println(Ened of Sending); } System.exit(0); } } My consumer: public class TestKafka08Consumer { public static void main(String [] args) throws UnknownHostException, SocketException { Properties props = new Properties(); props.put(zookeeper.connect, localhost:2181/kafka_0_8); props.put(group.id, test08ConsumerId); props.put(zk.sessiontimeout.ms, 4000); props.put(zk.synctime.ms, 2000); props.put(autocommit.interval.ms, 1000); ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector consumerConnector = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig); String topic = test-topic; MapString, Integer topicCountMap = new HashMapString, Integer(); topicCountMap.put(topic, new Integer(1)); MapString, ListKafkaStreambyte[], byte[] consumerMap = consumerConnector.createMessageStreams(topicCountMap); KafkaStreambyte[], byte[] stream = consumerMap.get(topic).get(0); ConsumerIteratorbyte[], byte[] it = stream.iterator(); int counter=0; while(it.hasNext()) { try { String fromPlatform = new String(it.next().message()); System.out.println(The messages: +fromPlatform); } catch(Exception e) { e.printStackTrace(); } } System.out.println(SystemOut); } } Thanks