Re: having problem with 0.8 gzip compression

2013-07-11 Thread Scott Wang
Ok, the problem solved, I think it might be because some of the jar files
that I was using were "OLD".  I was building the producer and consumer
under the 0.7 environment except swapping out the kafka jar file.   Now, I
created a whole new environment and pull in all the jar files from the
0.8.  That seems to solve my 0.8 gzip problem.   Thank you for all the
help.


Re: having problem with 0.8 gzip compression

2013-07-11 Thread Scott Wang
Joel,

Would you mind point me to how I would be able to enable the trace logs in
the producer and broker?

Thanks,
Scott


On Wed, Jul 10, 2013 at 5:33 PM, Joel Koshy  wrote:

> Weird - I tried your exact code and it worked for me (although I was
> using 0.8 head and not the beta). Can you re-run with trace logs
> enabled in your producer and paste that output? Broker logs also if
> you can?
>
> Thanks,
>
> Joel
>
> On Wed, Jul 10, 2013 at 10:23 AM, Scott Wang
>  wrote:
> > Jun,
> >
> > I did a test this morning and got a very interesting result with you
> > command.  I started by wipe all the log files and clean up all zookeeper
> > data files.
> >
> > Once I restarted both server, producer and consumer then execute your
> > command, what I got is a empty log as following:
> >
> > Dumping /Users/scott/Temp/kafka/test-topic-0/.log
> > Starting offset: 0
> >
> > One observation, the .index file was getting huge but
> > there was nothing in .log file.
> >
> > Thanks,
> > Scott
> >
> >
> >
> >
> > On Tue, Jul 9, 2013 at 8:40 PM, Jun Rao  wrote:
> >
> >> Could you run the following command on one of the log files of your
> topic
> >> and attach the output?
> >>
> >> bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files
> >> /tmp/kafka-logs/testtopic-0/.log
> >>
> >> Thanks,
> >>
> >> Jun
> >>
> >>
> >> On Tue, Jul 9, 2013 at 3:23 PM, Scott Wang <
> >> scott.w...@rumbleentertainment.com> wrote:
> >>
> >> > Another piece of information, the snappy compression also does not
> work.
> >> >
> >> > Thanks,
> >> > Scott
> >> >
> >> >
> >> > On Tue, Jul 9, 2013 at 11:07 AM, Scott Wang <
> >> > scott.w...@rumbleentertainment.com> wrote:
> >> >
> >> > > I just try it and it still not showing up, thanks for looking into
> >> this.
> >> > >
> >> > > Thanks,
> >> > > Scott
> >> > >
> >> > >
> >> > > On Tue, Jul 9, 2013 at 8:06 AM, Jun Rao  wrote:
> >> > >
> >> > >> Could you try starting the consumer first (and enable gzip in the
> >> > >> producer)?
> >> > >>
> >> > >> Thanks,
> >> > >>
> >> > >> Jun
> >> > >>
> >> > >>
> >> > >> On Mon, Jul 8, 2013 at 9:37 PM, Scott Wang <
> >> > >> scott.w...@rumbleentertainment.com> wrote:
> >> > >>
> >> > >> > No, I did not start the consumer before the producer.  I actually
> >> > >> started
> >> > >> > the producer first and nothing showed up in the consumer unless I
> >> > >> commented
> >> > >> > out this line -- props.put("compression.codec", "gzip").If I
> >> > >> commented
> >> > >> > out the compression codec, everything just works.
> >> > >> >
> >> > >> >
> >> > >> > On Mon, Jul 8, 2013 at 9:07 PM, Jun Rao 
> wrote:
> >> > >> >
> >> > >> > > Did you start the consumer before the producer? Be default, the
> >> > >> consumer
> >> > >> > > gets only the new data?
> >> > >> > >
> >> > >> > > Thanks,
> >> > >> > >
> >> > >> > > Jun
> >> > >> > >
> >> > >> > >
> >> > >> > > On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang <
> >> > >> > > scott.w...@rumbleentertainment.com> wrote:
> >> > >> > >
> >> > >> > > > I am testing with Kafka 0.8 beta and having problem of
> receiving
> >> > >> > message
> >> > >> > > in
> >> > >> > > > consumer.  There is no error so does anyone have any
> insights.
> >> > >>  When I
> >> > >> > > > commented out the "compression.code" everything works fine.
> >> > >> > > >
> >> > >> > > > My producer:
> >> > >> > > > public class TestKafka08Prod {
> >> > >&g

Re: having problem with 0.8 gzip compression

2013-07-10 Thread Scott Wang
Jun,

I did a test this morning and got a very interesting result with you
command.  I started by wipe all the log files and clean up all zookeeper
data files.

Once I restarted both server, producer and consumer then execute your
command, what I got is a empty log as following:

Dumping /Users/scott/Temp/kafka/test-topic-0/.log
Starting offset: 0

One observation, the .index file was getting huge but
there was nothing in .log file.

Thanks,
Scott




On Tue, Jul 9, 2013 at 8:40 PM, Jun Rao  wrote:

> Could you run the following command on one of the log files of your topic
> and attach the output?
>
> bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files
> /tmp/kafka-logs/testtopic-0/.log
>
> Thanks,
>
> Jun
>
>
> On Tue, Jul 9, 2013 at 3:23 PM, Scott Wang <
> scott.w...@rumbleentertainment.com> wrote:
>
> > Another piece of information, the snappy compression also does not work.
> >
> > Thanks,
> > Scott
> >
> >
> > On Tue, Jul 9, 2013 at 11:07 AM, Scott Wang <
> > scott.w...@rumbleentertainment.com> wrote:
> >
> > > I just try it and it still not showing up, thanks for looking into
> this.
> > >
> > > Thanks,
> > > Scott
> > >
> > >
> > > On Tue, Jul 9, 2013 at 8:06 AM, Jun Rao  wrote:
> > >
> > >> Could you try starting the consumer first (and enable gzip in the
> > >> producer)?
> > >>
> > >> Thanks,
> > >>
> > >> Jun
> > >>
> > >>
> > >> On Mon, Jul 8, 2013 at 9:37 PM, Scott Wang <
> > >> scott.w...@rumbleentertainment.com> wrote:
> > >>
> > >> > No, I did not start the consumer before the producer.  I actually
> > >> started
> > >> > the producer first and nothing showed up in the consumer unless I
> > >> commented
> > >> > out this line -- props.put("compression.codec", "gzip").If I
> > >> commented
> > >> > out the compression codec, everything just works.
> > >> >
> > >> >
> > >> > On Mon, Jul 8, 2013 at 9:07 PM, Jun Rao  wrote:
> > >> >
> > >> > > Did you start the consumer before the producer? Be default, the
> > >> consumer
> > >> > > gets only the new data?
> > >> > >
> > >> > > Thanks,
> > >> > >
> > >> > > Jun
> > >> > >
> > >> > >
> > >> > > On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang <
> > >> > > scott.w...@rumbleentertainment.com> wrote:
> > >> > >
> > >> > > > I am testing with Kafka 0.8 beta and having problem of receiving
> > >> > message
> > >> > > in
> > >> > > > consumer.  There is no error so does anyone have any insights.
> > >>  When I
> > >> > > > commented out the "compression.code" everything works fine.
> > >> > > >
> > >> > > > My producer:
> > >> > > > public class TestKafka08Prod {
> > >> > > >
> > >> > > > public static void main(String [] args) {
> > >> > > >
> > >> > > > Producer producer = null;
> > >> > > > try {
> > >> > > > Properties props = new Properties();
> > >> > > > props.put("metadata.broker.list", "localhost:9092");
> > >> > > > props.put("serializer.class",
> > >> > > > "kafka.serializer.StringEncoder");
> > >> > > > props.put("producer.type", "sync");
> > >> > > > props.put("request.required.acks","1");
> > >> > > > props.put("compression.codec", "gzip");
> > >> > > > ProducerConfig config = new ProducerConfig(props);
> > >> > > > producer = new Producer(config);
> > >> > > > int j=0;
> > >> > > > for(int i=0; i<10; i++) {
> > >> > > > KeyedMessage data = new
> > >> > > > KeyedMessage("test-topic", "test-me

Re: having problem with 0.8 gzip compression

2013-07-09 Thread Scott Wang
Another piece of information, the snappy compression also does not work.

Thanks,
Scott


On Tue, Jul 9, 2013 at 11:07 AM, Scott Wang <
scott.w...@rumbleentertainment.com> wrote:

> I just try it and it still not showing up, thanks for looking into this.
>
> Thanks,
> Scott
>
>
> On Tue, Jul 9, 2013 at 8:06 AM, Jun Rao  wrote:
>
>> Could you try starting the consumer first (and enable gzip in the
>> producer)?
>>
>> Thanks,
>>
>> Jun
>>
>>
>> On Mon, Jul 8, 2013 at 9:37 PM, Scott Wang <
>> scott.w...@rumbleentertainment.com> wrote:
>>
>> > No, I did not start the consumer before the producer.  I actually
>> started
>> > the producer first and nothing showed up in the consumer unless I
>> commented
>> > out this line -- props.put("compression.codec", "gzip").If I
>> commented
>> > out the compression codec, everything just works.
>> >
>> >
>> > On Mon, Jul 8, 2013 at 9:07 PM, Jun Rao  wrote:
>> >
>> > > Did you start the consumer before the producer? Be default, the
>> consumer
>> > > gets only the new data?
>> > >
>> > > Thanks,
>> > >
>> > > Jun
>> > >
>> > >
>> > > On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang <
>> > > scott.w...@rumbleentertainment.com> wrote:
>> > >
>> > > > I am testing with Kafka 0.8 beta and having problem of receiving
>> > message
>> > > in
>> > > > consumer.  There is no error so does anyone have any insights.
>>  When I
>> > > > commented out the "compression.code" everything works fine.
>> > > >
>> > > > My producer:
>> > > > public class TestKafka08Prod {
>> > > >
>> > > > public static void main(String [] args) {
>> > > >
>> > > > Producer producer = null;
>> > > > try {
>> > > > Properties props = new Properties();
>> > > > props.put("metadata.broker.list", "localhost:9092");
>> > > > props.put("serializer.class",
>> > > > "kafka.serializer.StringEncoder");
>> > > > props.put("producer.type", "sync");
>> > > > props.put("request.required.acks","1");
>> > > > props.put("compression.codec", "gzip");
>> > > > ProducerConfig config = new ProducerConfig(props);
>> > > > producer = new Producer(config);
>> > > > int j=0;
>> > > > for(int i=0; i<10; i++) {
>> > > > KeyedMessage data = new
>> > > > KeyedMessage("test-topic", "test-message: "+i+"
>> > > > "+System.currentTimeMillis());
>> > > > producer.send(data);
>> > > >
>> > > > }
>> > > >
>> > > > } catch (Exception e) {
>> > > > System.out.println("Error happened: ");
>> > > > e.printStackTrace();
>> > > > } finally {
>> > > > if(null != null) {
>> > > > producer.close();
>> > > > }
>> > > >
>> > > > System.out.println("Ened of Sending");
>> > > > }
>> > > >
>> > > > System.exit(0);
>> > > > }
>> > > > }
>> > > >
>> > > >
>> > > > My consumer:
>> > > >
>> > > > public class TestKafka08Consumer {
>> > > > public static void main(String [] args) throws
>> > UnknownHostException,
>> > > > SocketException {
>> > > >
>> > > > Properties props = new Properties();
>> > > > props.put("zookeeper.connect", "localhost:2181/kafka_0_8");
>> > > > props.put("group.id", "test08ConsumerId");
>> > > > props.put("zk.sessiontimeout.ms", "4000");
>> > > > props.put("zk.synctime.ms", "2000");
>> > > > props.put("autocommit.interval.ms", "1000");
>> > > >
>> > > > ConsumerConfig consumerConfig = new ConsumerConfig(props);
>> > > >
>> > > > ConsumerConnector consumerConnector =
>> > > > kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
>> > > >
>> > > > String topic = "test-topic";
>> > > > Map topicCountMap = new HashMap> > > > Integer>();
>> > > > topicCountMap.put(topic, new Integer(1));
>> > > > Map>> consumerMap =
>> > > > consumerConnector.createMessageStreams(topicCountMap);
>> > > > KafkaStream stream =
>> > > >  consumerMap.get(topic).get(0);
>> > > >
>> > > > ConsumerIterator it = stream.iterator();
>> > > >
>> > > > int counter=0;
>> > > > while(it.hasNext()) {
>> > > > try {
>> > > > String fromPlatform = new
>> String(it.next().message());
>> > > > System.out.println("The messages: "+fromPlatform);
>> > > > } catch(Exception e) {
>> > > > e.printStackTrace();
>> > > > }
>> > > > }
>> > > > System.out.println("SystemOut");
>> > > > }
>> > > > }
>> > > >
>> > > >
>> > > > Thanks
>> > > >
>> > >
>> >
>>
>
>


Re: having problem with 0.8 gzip compression

2013-07-09 Thread Scott Wang
I just try it and it still not showing up, thanks for looking into this.

Thanks,
Scott


On Tue, Jul 9, 2013 at 8:06 AM, Jun Rao  wrote:

> Could you try starting the consumer first (and enable gzip in the
> producer)?
>
> Thanks,
>
> Jun
>
>
> On Mon, Jul 8, 2013 at 9:37 PM, Scott Wang <
> scott.w...@rumbleentertainment.com> wrote:
>
> > No, I did not start the consumer before the producer.  I actually started
> > the producer first and nothing showed up in the consumer unless I
> commented
> > out this line -- props.put("compression.codec", "gzip").If I
> commented
> > out the compression codec, everything just works.
> >
> >
> > On Mon, Jul 8, 2013 at 9:07 PM, Jun Rao  wrote:
> >
> > > Did you start the consumer before the producer? Be default, the
> consumer
> > > gets only the new data?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang <
> > > scott.w...@rumbleentertainment.com> wrote:
> > >
> > > > I am testing with Kafka 0.8 beta and having problem of receiving
> > message
> > > in
> > > > consumer.  There is no error so does anyone have any insights.  When
> I
> > > > commented out the "compression.code" everything works fine.
> > > >
> > > > My producer:
> > > > public class TestKafka08Prod {
> > > >
> > > > public static void main(String [] args) {
> > > >
> > > > Producer producer = null;
> > > > try {
> > > > Properties props = new Properties();
> > > > props.put("metadata.broker.list", "localhost:9092");
> > > > props.put("serializer.class",
> > > > "kafka.serializer.StringEncoder");
> > > > props.put("producer.type", "sync");
> > > > props.put("request.required.acks","1");
> > > > props.put("compression.codec", "gzip");
> > > > ProducerConfig config = new ProducerConfig(props);
> > > > producer = new Producer(config);
> > > > int j=0;
> > > > for(int i=0; i<10; i++) {
> > > > KeyedMessage data = new
> > > > KeyedMessage("test-topic", "test-message: "+i+"
> > > > "+System.currentTimeMillis());
> > > > producer.send(data);
> > > >
> > > > }
> > > >
> > > > } catch (Exception e) {
> > > > System.out.println("Error happened: ");
> > > > e.printStackTrace();
> > > > } finally {
> > > > if(null != null) {
> > > > producer.close();
> > > > }
> > > >
> > > > System.out.println("Ened of Sending");
> > > > }
> > > >
> > > > System.exit(0);
> > > > }
> > > > }
> > > >
> > > >
> > > > My consumer:
> > > >
> > > > public class TestKafka08Consumer {
> > > > public static void main(String [] args) throws
> > UnknownHostException,
> > > > SocketException {
> > > >
> > > > Properties props = new Properties();
> > > > props.put("zookeeper.connect", "localhost:2181/kafka_0_8");
> > > > props.put("group.id", "test08ConsumerId");
> > > > props.put("zk.sessiontimeout.ms", "4000");
> > > > props.put("zk.synctime.ms", "2000");
> > > > props.put("autocommit.interval.ms", "1000");
> > > >
> > > > ConsumerConfig consumerConfig = new ConsumerConfig(props);
> > > >
> > > > ConsumerConnector consumerConnector =
> > > > kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
> > > >
> > > > String topic = "test-topic";
> > > > Map topicCountMap = new HashMap > > > Integer>();
> > > > topicCountMap.put(topic, new Integer(1));
> > > > Map>> consumerMap =
> > > > consumerConnector.createMessageStreams(topicCountMap);
> > > > KafkaStream stream =
> > > >  consumerMap.get(topic).get(0);
> > > >
> > > > ConsumerIterator it = stream.iterator();
> > > >
> > > > int counter=0;
> > > > while(it.hasNext()) {
> > > > try {
> > > > String fromPlatform = new
> String(it.next().message());
> > > > System.out.println("The messages: "+fromPlatform);
> > > > } catch(Exception e) {
> > > > e.printStackTrace();
> > > > }
> > > > }
> > > > System.out.println("SystemOut");
> > > > }
> > > > }
> > > >
> > > >
> > > > Thanks
> > > >
> > >
> >
>


Re: having problem with 0.8 gzip compression

2013-07-08 Thread Scott Wang
No, I did not start the consumer before the producer.  I actually started
the producer first and nothing showed up in the consumer unless I commented
out this line -- props.put("compression.codec", "gzip").If I commented
out the compression codec, everything just works.


On Mon, Jul 8, 2013 at 9:07 PM, Jun Rao  wrote:

> Did you start the consumer before the producer? Be default, the consumer
> gets only the new data?
>
> Thanks,
>
> Jun
>
>
> On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang <
> scott.w...@rumbleentertainment.com> wrote:
>
> > I am testing with Kafka 0.8 beta and having problem of receiving message
> in
> > consumer.  There is no error so does anyone have any insights.  When I
> > commented out the "compression.code" everything works fine.
> >
> > My producer:
> > public class TestKafka08Prod {
> >
> > public static void main(String [] args) {
> >
> > Producer producer = null;
> > try {
> > Properties props = new Properties();
> > props.put("metadata.broker.list", "localhost:9092");
> > props.put("serializer.class",
> > "kafka.serializer.StringEncoder");
> > props.put("producer.type", "sync");
> > props.put("request.required.acks","1");
> > props.put("compression.codec", "gzip");
> > ProducerConfig config = new ProducerConfig(props);
> > producer = new Producer(config);
> > int j=0;
> > for(int i=0; i<10; i++) {
> > KeyedMessage data = new
> > KeyedMessage("test-topic", "test-message: "+i+"
> > "+System.currentTimeMillis());
> > producer.send(data);
> >
> > }
> >
> > } catch (Exception e) {
> > System.out.println("Error happened: ");
> > e.printStackTrace();
> > } finally {
> > if(null != null) {
> > producer.close();
> > }
> >
> > System.out.println("Ened of Sending");
> > }
> >
> > System.exit(0);
> > }
> > }
> >
> >
> > My consumer:
> >
> > public class TestKafka08Consumer {
> > public static void main(String [] args) throws UnknownHostException,
> > SocketException {
> >
> > Properties props = new Properties();
> > props.put("zookeeper.connect", "localhost:2181/kafka_0_8");
> > props.put("group.id", "test08ConsumerId");
> > props.put("zk.sessiontimeout.ms", "4000");
> > props.put("zk.synctime.ms", "2000");
> > props.put("autocommit.interval.ms", "1000");
> >
> > ConsumerConfig consumerConfig = new ConsumerConfig(props);
> >
> > ConsumerConnector consumerConnector =
> > kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
> >
> > String topic = "test-topic";
> > Map topicCountMap = new HashMap > Integer>();
> > topicCountMap.put(topic, new Integer(1));
> > Map>> consumerMap =
> > consumerConnector.createMessageStreams(topicCountMap);
> > KafkaStream stream =
> >  consumerMap.get(topic).get(0);
> >
> > ConsumerIterator it = stream.iterator();
> >
> > int counter=0;
> > while(it.hasNext()) {
> > try {
> > String fromPlatform = new String(it.next().message());
> > System.out.println("The messages: "+fromPlatform);
> > } catch(Exception e) {
> > e.printStackTrace();
> > }
> > }
> > System.out.println("SystemOut");
> > }
> > }
> >
> >
> > Thanks
> >
>


having problem with 0.8 gzip compression

2013-07-08 Thread Scott Wang
I am testing with Kafka 0.8 beta and having problem of receiving message in
consumer.  There is no error so does anyone have any insights.  When I
commented out the "compression.code" everything works fine.

My producer:
public class TestKafka08Prod {

public static void main(String [] args) {

Producer producer = null;
try {
Properties props = new Properties();
props.put("metadata.broker.list", "localhost:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("producer.type", "sync");
props.put("request.required.acks","1");
props.put("compression.codec", "gzip");
ProducerConfig config = new ProducerConfig(props);
producer = new Producer(config);
int j=0;
for(int i=0; i<10; i++) {
KeyedMessage data = new
KeyedMessage("test-topic", "test-message: "+i+"
"+System.currentTimeMillis());
producer.send(data);

}

} catch (Exception e) {
System.out.println("Error happened: ");
e.printStackTrace();
} finally {
if(null != null) {
producer.close();
}

System.out.println("Ened of Sending");
}

System.exit(0);
}
}


My consumer:

public class TestKafka08Consumer {
public static void main(String [] args) throws UnknownHostException,
SocketException {

Properties props = new Properties();
props.put("zookeeper.connect", "localhost:2181/kafka_0_8");
props.put("group.id", "test08ConsumerId");
props.put("zk.sessiontimeout.ms", "4000");
props.put("zk.synctime.ms", "2000");
props.put("autocommit.interval.ms", "1000");

ConsumerConfig consumerConfig = new ConsumerConfig(props);

ConsumerConnector consumerConnector =
kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);

String topic = "test-topic";
Map topicCountMap = new HashMap();
topicCountMap.put(topic, new Integer(1));
Map>> consumerMap =
consumerConnector.createMessageStreams(topicCountMap);
KafkaStream stream =  consumerMap.get(topic).get(0);

ConsumerIterator it = stream.iterator();

int counter=0;
while(it.hasNext()) {
try {
String fromPlatform = new String(it.next().message());
System.out.println("The messages: "+fromPlatform);
} catch(Exception e) {
e.printStackTrace();
}
}
System.out.println("SystemOut");
}
}


Thanks