Re: having problem with 0.8 gzip compression

2013-07-09 Thread Jun Rao
Could you run the following command on one of the log files of your topic
and attach the output?

bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files
/tmp/kafka-logs/testtopic-0/.log

Thanks,

Jun


On Tue, Jul 9, 2013 at 3:23 PM, Scott Wang <
scott.w...@rumbleentertainment.com> wrote:

> Another piece of information, the snappy compression also does not work.
>
> Thanks,
> Scott
>
>
> On Tue, Jul 9, 2013 at 11:07 AM, Scott Wang <
> scott.w...@rumbleentertainment.com> wrote:
>
> > I just try it and it still not showing up, thanks for looking into this.
> >
> > Thanks,
> > Scott
> >
> >
> > On Tue, Jul 9, 2013 at 8:06 AM, Jun Rao  wrote:
> >
> >> Could you try starting the consumer first (and enable gzip in the
> >> producer)?
> >>
> >> Thanks,
> >>
> >> Jun
> >>
> >>
> >> On Mon, Jul 8, 2013 at 9:37 PM, Scott Wang <
> >> scott.w...@rumbleentertainment.com> wrote:
> >>
> >> > No, I did not start the consumer before the producer.  I actually
> >> started
> >> > the producer first and nothing showed up in the consumer unless I
> >> commented
> >> > out this line -- props.put("compression.codec", "gzip").If I
> >> commented
> >> > out the compression codec, everything just works.
> >> >
> >> >
> >> > On Mon, Jul 8, 2013 at 9:07 PM, Jun Rao  wrote:
> >> >
> >> > > Did you start the consumer before the producer? Be default, the
> >> consumer
> >> > > gets only the new data?
> >> > >
> >> > > Thanks,
> >> > >
> >> > > Jun
> >> > >
> >> > >
> >> > > On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang <
> >> > > scott.w...@rumbleentertainment.com> wrote:
> >> > >
> >> > > > I am testing with Kafka 0.8 beta and having problem of receiving
> >> > message
> >> > > in
> >> > > > consumer.  There is no error so does anyone have any insights.
> >>  When I
> >> > > > commented out the "compression.code" everything works fine.
> >> > > >
> >> > > > My producer:
> >> > > > public class TestKafka08Prod {
> >> > > >
> >> > > > public static void main(String [] args) {
> >> > > >
> >> > > > Producer producer = null;
> >> > > > try {
> >> > > > Properties props = new Properties();
> >> > > > props.put("metadata.broker.list", "localhost:9092");
> >> > > > props.put("serializer.class",
> >> > > > "kafka.serializer.StringEncoder");
> >> > > > props.put("producer.type", "sync");
> >> > > > props.put("request.required.acks","1");
> >> > > > props.put("compression.codec", "gzip");
> >> > > > ProducerConfig config = new ProducerConfig(props);
> >> > > > producer = new Producer(config);
> >> > > > int j=0;
> >> > > > for(int i=0; i<10; i++) {
> >> > > > KeyedMessage data = new
> >> > > > KeyedMessage("test-topic", "test-message: "+i+"
> >> > > > "+System.currentTimeMillis());
> >> > > > producer.send(data);
> >> > > >
> >> > > > }
> >> > > >
> >> > > > } catch (Exception e) {
> >> > > > System.out.println("Error happened: ");
> >> > > > e.printStackTrace();
> >> > > > } finally {
> >> > > > if(null != null) {
> >> > > > producer.close();
> >> > > > }
> >> > > >
> >> > > > System.out.println("Ened of Sending");
> >> > > > }
> >> > > >
> >> > > > System.exit(0);
> >> > > > }
> >> > > > }
> >> > > >
> >> > > >
> >> > > > My consumer:
> >> > > >
> >> > > > public class TestKafka08Consumer {
> >> > > > public static void main(String [] args) throws
> >> > UnknownHostException,
> >> > > > SocketException {
> >> > > >
> >> > > > Properties props = new Properties();
> >> > > > props.put("zookeeper.connect",
> "localhost:2181/kafka_0_8");
> >> > > > props.put("group.id", "test08ConsumerId");
> >> > > > props.put("zk.sessiontimeout.ms", "4000");
> >> > > > props.put("zk.synctime.ms", "2000");
> >> > > > props.put("autocommit.interval.ms", "1000");
> >> > > >
> >> > > > ConsumerConfig consumerConfig = new ConsumerConfig(props);
> >> > > >
> >> > > > ConsumerConnector consumerConnector =
> >> > > >
> kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
> >> > > >
> >> > > > String topic = "test-topic";
> >> > > > Map topicCountMap = new HashMap >> > > > Integer>();
> >> > > > topicCountMap.put(topic, new Integer(1));
> >> > > > Map>>
> consumerMap =
> >> > > > consumerConnector.createMessageStreams(topicCountMap);
> >> > > > KafkaStream stream =
> >> > > >  consumerMap.get(topic).get(0);
> >> > > >
> >> > > > ConsumerIterator it = stream.iterator();
> >> > > >
> >> > > > int counter=0;
> >> > > > while(it.hasNext()) {
> >> > > > try {
> >> > > > String fromPlatform = new
> >> String(it.next().message());
> >> > > > System.out.println("The messages

[jira] [Updated] (KAFKA-967) Use key range in ProducerPerformance

2013-07-09 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-967?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-967:


Status: Patch Available  (was: Open)

1. Add messageKeyRange to ProducerPerfConfig

2. In generateProducerData, select the key in a round robin manner within the 
key range.

3. Set the key in generateMessageWithSeqId and add the key value to the payload.

> Use key range in ProducerPerformance
> 
>
> Key: KAFKA-967
> URL: https://issues.apache.org/jira/browse/KAFKA-967
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
> Attachments: KAFKA-967.v1.patch
>
>
> Currently in ProducerPerformance, the key of the message is set to MessageID. 
> It would better to set it to a specific key within a key range (Integer type) 
> so that we can test the semantic partitioning case. This is related to 
> KAFKA-957.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-967) Use key range in ProducerPerformance

2013-07-09 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-967?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-967:


Attachment: KAFKA-967.v1.patch

1. Add messageKeyRange to ProducerPerfConfig

2. In generateProducerData, select the key in a round robin manner within the 
key range.

3. Set the key in generateMessageWithSeqId and add the key value to the 
payload. 

> Use key range in ProducerPerformance
> 
>
> Key: KAFKA-967
> URL: https://issues.apache.org/jira/browse/KAFKA-967
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
> Attachments: KAFKA-967.v1.patch
>
>
> Currently in ProducerPerformance, the key of the message is set to MessageID. 
> It would better to set it to a specific key within a key range (Integer type) 
> so that we can test the semantic partitioning case. This is related to 
> KAFKA-957.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Issue Comment Deleted] (KAFKA-967) Use key range in ProducerPerformance

2013-07-09 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-967?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-967:


Comment: was deleted

(was: 1. Add messageKeyRange to ProducerPerfConfig

2. In generateProducerData, select the key in a round robin manner within the 
key range.

3. Set the key in generateMessageWithSeqId and add the key value to the 
payload.)

> Use key range in ProducerPerformance
> 
>
> Key: KAFKA-967
> URL: https://issues.apache.org/jira/browse/KAFKA-967
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
> Attachments: KAFKA-967.v1.patch
>
>
> Currently in ProducerPerformance, the key of the message is set to MessageID. 
> It would better to set it to a specific key within a key range (Integer type) 
> so that we can test the semantic partitioning case. This is related to 
> KAFKA-957.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Created] (KAFKA-967) Use key range in ProducerPerformance

2013-07-09 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-967:
---

 Summary: Use key range in ProducerPerformance
 Key: KAFKA-967
 URL: https://issues.apache.org/jira/browse/KAFKA-967
 Project: Kafka
  Issue Type: Improvement
Reporter: Guozhang Wang
Assignee: Guozhang Wang


Currently in ProducerPerformance, the key of the message is set to MessageID. 
It would better to set it to a specific key within a key range (Integer type) 
so that we can test the semantic partitioning case. This is related to 
KAFKA-957.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


Re: having problem with 0.8 gzip compression

2013-07-09 Thread Scott Wang
Another piece of information, the snappy compression also does not work.

Thanks,
Scott


On Tue, Jul 9, 2013 at 11:07 AM, Scott Wang <
scott.w...@rumbleentertainment.com> wrote:

> I just try it and it still not showing up, thanks for looking into this.
>
> Thanks,
> Scott
>
>
> On Tue, Jul 9, 2013 at 8:06 AM, Jun Rao  wrote:
>
>> Could you try starting the consumer first (and enable gzip in the
>> producer)?
>>
>> Thanks,
>>
>> Jun
>>
>>
>> On Mon, Jul 8, 2013 at 9:37 PM, Scott Wang <
>> scott.w...@rumbleentertainment.com> wrote:
>>
>> > No, I did not start the consumer before the producer.  I actually
>> started
>> > the producer first and nothing showed up in the consumer unless I
>> commented
>> > out this line -- props.put("compression.codec", "gzip").If I
>> commented
>> > out the compression codec, everything just works.
>> >
>> >
>> > On Mon, Jul 8, 2013 at 9:07 PM, Jun Rao  wrote:
>> >
>> > > Did you start the consumer before the producer? Be default, the
>> consumer
>> > > gets only the new data?
>> > >
>> > > Thanks,
>> > >
>> > > Jun
>> > >
>> > >
>> > > On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang <
>> > > scott.w...@rumbleentertainment.com> wrote:
>> > >
>> > > > I am testing with Kafka 0.8 beta and having problem of receiving
>> > message
>> > > in
>> > > > consumer.  There is no error so does anyone have any insights.
>>  When I
>> > > > commented out the "compression.code" everything works fine.
>> > > >
>> > > > My producer:
>> > > > public class TestKafka08Prod {
>> > > >
>> > > > public static void main(String [] args) {
>> > > >
>> > > > Producer producer = null;
>> > > > try {
>> > > > Properties props = new Properties();
>> > > > props.put("metadata.broker.list", "localhost:9092");
>> > > > props.put("serializer.class",
>> > > > "kafka.serializer.StringEncoder");
>> > > > props.put("producer.type", "sync");
>> > > > props.put("request.required.acks","1");
>> > > > props.put("compression.codec", "gzip");
>> > > > ProducerConfig config = new ProducerConfig(props);
>> > > > producer = new Producer(config);
>> > > > int j=0;
>> > > > for(int i=0; i<10; i++) {
>> > > > KeyedMessage data = new
>> > > > KeyedMessage("test-topic", "test-message: "+i+"
>> > > > "+System.currentTimeMillis());
>> > > > producer.send(data);
>> > > >
>> > > > }
>> > > >
>> > > > } catch (Exception e) {
>> > > > System.out.println("Error happened: ");
>> > > > e.printStackTrace();
>> > > > } finally {
>> > > > if(null != null) {
>> > > > producer.close();
>> > > > }
>> > > >
>> > > > System.out.println("Ened of Sending");
>> > > > }
>> > > >
>> > > > System.exit(0);
>> > > > }
>> > > > }
>> > > >
>> > > >
>> > > > My consumer:
>> > > >
>> > > > public class TestKafka08Consumer {
>> > > > public static void main(String [] args) throws
>> > UnknownHostException,
>> > > > SocketException {
>> > > >
>> > > > Properties props = new Properties();
>> > > > props.put("zookeeper.connect", "localhost:2181/kafka_0_8");
>> > > > props.put("group.id", "test08ConsumerId");
>> > > > props.put("zk.sessiontimeout.ms", "4000");
>> > > > props.put("zk.synctime.ms", "2000");
>> > > > props.put("autocommit.interval.ms", "1000");
>> > > >
>> > > > ConsumerConfig consumerConfig = new ConsumerConfig(props);
>> > > >
>> > > > ConsumerConnector consumerConnector =
>> > > > kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
>> > > >
>> > > > String topic = "test-topic";
>> > > > Map topicCountMap = new HashMap> > > > Integer>();
>> > > > topicCountMap.put(topic, new Integer(1));
>> > > > Map>> consumerMap =
>> > > > consumerConnector.createMessageStreams(topicCountMap);
>> > > > KafkaStream stream =
>> > > >  consumerMap.get(topic).get(0);
>> > > >
>> > > > ConsumerIterator it = stream.iterator();
>> > > >
>> > > > int counter=0;
>> > > > while(it.hasNext()) {
>> > > > try {
>> > > > String fromPlatform = new
>> String(it.next().message());
>> > > > System.out.println("The messages: "+fromPlatform);
>> > > > } catch(Exception e) {
>> > > > e.printStackTrace();
>> > > > }
>> > > > }
>> > > > System.out.println("SystemOut");
>> > > > }
>> > > > }
>> > > >
>> > > >
>> > > > Thanks
>> > > >
>> > >
>> >
>>
>
>


[jira] [Commented] (KAFKA-915) System Test - Mirror Maker testcase_5001 failed

2013-07-09 Thread John Fung (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13703827#comment-13703827
 ] 

John Fung commented on KAFKA-915:
-

Hi Joel,

After apply kafka-915-v1.patch (which is to create topic manually before 
starting mirror maker), testcase_5001 passes. However, testcase_5003 & 
testcase_5005 are failing due to data loss.

Thanks,
John

> System Test - Mirror Maker testcase_5001 failed
> ---
>
> Key: KAFKA-915
> URL: https://issues.apache.org/jira/browse/KAFKA-915
> Project: Kafka
>  Issue Type: Bug
>Reporter: John Fung
>Assignee: Joel Koshy
>Priority: Critical
>  Labels: kafka-0.8, replication-testing
> Attachments: kafka-915-v1.patch, testcase_5001_debug_logs.tar.gz
>
>
> This case passes if brokers are set to partition = 1, replicas = 1
> It fails if brokers are set to partition = 5, replicas = 3 (consistently 
> reproducible)
> This test case is set up as shown below.
> 1. Start 2 ZK as a cluster in Source
> 2. Start 2 ZK as a cluster in Target
> 3. Start 3 brokers as a cluster in Source (partition = 1, replicas = 1)
> 4. Start 3 brokers as a cluster in Target (partition = 1, replicas = 1)
> 5. Start 1 MM
> 6. Start ProducerPerformance to send some data
> 7. After Producer is done, start ConsoleConsumer to consume data
> 8. Stop all processes and validate if there is any data loss.
> 9. No failure is introduced to any process in this test
> Attached a tar file which contains the logs and system test output for both 
> cases.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-915) System Test - Mirror Maker testcase_5001 failed

2013-07-09 Thread John Fung (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Fung updated KAFKA-915:


Status: Open  (was: Patch Available)

> System Test - Mirror Maker testcase_5001 failed
> ---
>
> Key: KAFKA-915
> URL: https://issues.apache.org/jira/browse/KAFKA-915
> Project: Kafka
>  Issue Type: Bug
>Reporter: John Fung
>Assignee: Joel Koshy
>Priority: Critical
>  Labels: kafka-0.8, replication-testing
> Attachments: kafka-915-v1.patch, testcase_5001_debug_logs.tar.gz
>
>
> This case passes if brokers are set to partition = 1, replicas = 1
> It fails if brokers are set to partition = 5, replicas = 3 (consistently 
> reproducible)
> This test case is set up as shown below.
> 1. Start 2 ZK as a cluster in Source
> 2. Start 2 ZK as a cluster in Target
> 3. Start 3 brokers as a cluster in Source (partition = 1, replicas = 1)
> 4. Start 3 brokers as a cluster in Target (partition = 1, replicas = 1)
> 5. Start 1 MM
> 6. Start ProducerPerformance to send some data
> 7. After Producer is done, start ConsoleConsumer to consume data
> 8. Stop all processes and validate if there is any data loss.
> 9. No failure is introduced to any process in this test
> Attached a tar file which contains the logs and system test output for both 
> cases.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-966) Allow high level consumer to 'nak' a message and force Kafka to close the KafkaStream without losing that message

2013-07-09 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13703755#comment-13703755
 ] 

Joel Koshy commented on KAFKA-966:
--

One way to accomplish this is to turn off autocommit and checkpoint offsets 
only after a message (or batch of messages) have been written to the DB.

One caveat though is that rebalances (e.g., if a new consumer instance shows 
up) will result in offsets being committed so there would be an issue if the DB 
is unavailable and a rebalance occurs simultaneously and there are unprocessed 
messages that have already been pulled out of the iterator.


> Allow high level consumer to 'nak' a message and force Kafka to close the 
> KafkaStream without losing that message
> -
>
> Key: KAFKA-966
> URL: https://issues.apache.org/jira/browse/KAFKA-966
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Affects Versions: 0.8
>Reporter: Chris Curtin
>Assignee: Neha Narkhede
>Priority: Minor
>
> Enhancement request.
> The high level consumer is very close to handling a lot of situations a 
> 'typical' client would need. Except for when the message received from Kafka 
> is valid, but the business logic that wants to consume it has a problem.
> For example if I want to write the value to a MongoDB or Cassandra database 
> and the database is not available. I won't know until I go to do the write 
> that the database isn't available, but by then it is too late to NOT read the 
> message from Kafka. Thus if I call shutdown() to stop reading, that message 
> is lost since the offset Kafka writes to ZooKeeper is the next offset.
> Ideally I'd like to be able to tell Kafka: close the KafkaStream but set the 
> next offset to read for this partition to this message when I start up again. 
> And if there are any messages in the BlockingQueue for other partitions, find 
> the lowest # and use it for that partitions offset since I haven't consumed 
> them yet.
> Thus I can cleanly shutdown my processing, resolve whatever the issue is and 
> restart the process.
> Another idea might be to allow a 'peek' into the next message and if I 
> succeed in writing to the database call 'next' to remove it from the queue. 
> I understand this won't deal with a 'kill -9' or hard failure of the JVM 
> leading to the latest offsets not being written to ZooKeeper but it addresses 
> a likely common scenario for consumers. Nor will it add true transactional 
> support since the ZK update could fail.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Created] (KAFKA-966) Allow high level consumer to 'nak' a message and force Kafka to close the KafkaStream without losing that message

2013-07-09 Thread Chris Curtin (JIRA)
Chris Curtin created KAFKA-966:
--

 Summary: Allow high level consumer to 'nak' a message and force 
Kafka to close the KafkaStream without losing that message
 Key: KAFKA-966
 URL: https://issues.apache.org/jira/browse/KAFKA-966
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Affects Versions: 0.8
Reporter: Chris Curtin
Assignee: Neha Narkhede
Priority: Minor


Enhancement request.

The high level consumer is very close to handling a lot of situations a 
'typical' client would need. Except for when the message received from Kafka is 
valid, but the business logic that wants to consume it has a problem.

For example if I want to write the value to a MongoDB or Cassandra database and 
the database is not available. I won't know until I go to do the write that the 
database isn't available, but by then it is too late to NOT read the message 
from Kafka. Thus if I call shutdown() to stop reading, that message is lost 
since the offset Kafka writes to ZooKeeper is the next offset.

Ideally I'd like to be able to tell Kafka: close the KafkaStream but set the 
next offset to read for this partition to this message when I start up again. 
And if there are any messages in the BlockingQueue for other partitions, find 
the lowest # and use it for that partitions offset since I haven't consumed 
them yet.

Thus I can cleanly shutdown my processing, resolve whatever the issue is and 
restart the process.

Another idea might be to allow a 'peek' into the next message and if I succeed 
in writing to the database call 'next' to remove it from the queue. 

I understand this won't deal with a 'kill -9' or hard failure of the JVM 
leading to the latest offsets not being written to ZooKeeper but it addresses a 
likely common scenario for consumers. Nor will it add true transactional 
support since the ZK update could fail.


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


Re: beta2 => Maven Repository Signed and Staged

2013-07-09 Thread Joe Stein
I agree it is very very low touch since it is only a change when building
for publishing and does not affect the jar.  I don't have objections and
unless anyone else does have objections then I will publish and see maybe
we get more errors/issues or 0.8.0-beta1 will be available from maven.  I
should be able to-do this later tonight or in the morning unless someone
has an issue with proceeding.


On Sun, Jul 7, 2013 at 5:40 PM, Jay Kreps  wrote:

> That's awesome!
>
> Personally I think it is okay if there is a slight cosmetic difference as
> long as we fix it before the proper 0.8 release.
>
> -Jay
>
>
> On Sun, Jul 7, 2013 at 6:55 AM, Joe Stein  wrote:
>
> > So, I think we have everything now for a successful publishing to maven
> > repo for 0.8.0-betaX Kafka build on Scala 2.8.0,  2.8.2,  2.9.1,  2.9.2
> >
> > Since I had to make a slight code change
> > https://issues.apache.org/jira/browse/KAFKA-963 I am not sure if it is
> > proper to officially publish the artifacts since it was not what we voted
> > upon technically speaking.
> >
> > I don't know for sure if when I hit "close" to the staging release to
> > promote it to public release if any other errors will come up through the
> > process.
> >
> > My thinking is that we should roll a beta2 release and give the publish
> to
> > public release another go.
> >
> > Thoughts?
> >
> > /*
> > Joe Stein
> > http://www.linkedin.com/in/charmalloc
> > Twitter: @allthingshadoop 
> > */
> >
>



-- 

/*
Joe Stein
http://www.linkedin.com/in/charmalloc
Twitter: @allthingshadoop 
*/


Re: having problem with 0.8 gzip compression

2013-07-09 Thread Scott Wang
I just try it and it still not showing up, thanks for looking into this.

Thanks,
Scott


On Tue, Jul 9, 2013 at 8:06 AM, Jun Rao  wrote:

> Could you try starting the consumer first (and enable gzip in the
> producer)?
>
> Thanks,
>
> Jun
>
>
> On Mon, Jul 8, 2013 at 9:37 PM, Scott Wang <
> scott.w...@rumbleentertainment.com> wrote:
>
> > No, I did not start the consumer before the producer.  I actually started
> > the producer first and nothing showed up in the consumer unless I
> commented
> > out this line -- props.put("compression.codec", "gzip").If I
> commented
> > out the compression codec, everything just works.
> >
> >
> > On Mon, Jul 8, 2013 at 9:07 PM, Jun Rao  wrote:
> >
> > > Did you start the consumer before the producer? Be default, the
> consumer
> > > gets only the new data?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang <
> > > scott.w...@rumbleentertainment.com> wrote:
> > >
> > > > I am testing with Kafka 0.8 beta and having problem of receiving
> > message
> > > in
> > > > consumer.  There is no error so does anyone have any insights.  When
> I
> > > > commented out the "compression.code" everything works fine.
> > > >
> > > > My producer:
> > > > public class TestKafka08Prod {
> > > >
> > > > public static void main(String [] args) {
> > > >
> > > > Producer producer = null;
> > > > try {
> > > > Properties props = new Properties();
> > > > props.put("metadata.broker.list", "localhost:9092");
> > > > props.put("serializer.class",
> > > > "kafka.serializer.StringEncoder");
> > > > props.put("producer.type", "sync");
> > > > props.put("request.required.acks","1");
> > > > props.put("compression.codec", "gzip");
> > > > ProducerConfig config = new ProducerConfig(props);
> > > > producer = new Producer(config);
> > > > int j=0;
> > > > for(int i=0; i<10; i++) {
> > > > KeyedMessage data = new
> > > > KeyedMessage("test-topic", "test-message: "+i+"
> > > > "+System.currentTimeMillis());
> > > > producer.send(data);
> > > >
> > > > }
> > > >
> > > > } catch (Exception e) {
> > > > System.out.println("Error happened: ");
> > > > e.printStackTrace();
> > > > } finally {
> > > > if(null != null) {
> > > > producer.close();
> > > > }
> > > >
> > > > System.out.println("Ened of Sending");
> > > > }
> > > >
> > > > System.exit(0);
> > > > }
> > > > }
> > > >
> > > >
> > > > My consumer:
> > > >
> > > > public class TestKafka08Consumer {
> > > > public static void main(String [] args) throws
> > UnknownHostException,
> > > > SocketException {
> > > >
> > > > Properties props = new Properties();
> > > > props.put("zookeeper.connect", "localhost:2181/kafka_0_8");
> > > > props.put("group.id", "test08ConsumerId");
> > > > props.put("zk.sessiontimeout.ms", "4000");
> > > > props.put("zk.synctime.ms", "2000");
> > > > props.put("autocommit.interval.ms", "1000");
> > > >
> > > > ConsumerConfig consumerConfig = new ConsumerConfig(props);
> > > >
> > > > ConsumerConnector consumerConnector =
> > > > kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
> > > >
> > > > String topic = "test-topic";
> > > > Map topicCountMap = new HashMap > > > Integer>();
> > > > topicCountMap.put(topic, new Integer(1));
> > > > Map>> consumerMap =
> > > > consumerConnector.createMessageStreams(topicCountMap);
> > > > KafkaStream stream =
> > > >  consumerMap.get(topic).get(0);
> > > >
> > > > ConsumerIterator it = stream.iterator();
> > > >
> > > > int counter=0;
> > > > while(it.hasNext()) {
> > > > try {
> > > > String fromPlatform = new
> String(it.next().message());
> > > > System.out.println("The messages: "+fromPlatform);
> > > > } catch(Exception e) {
> > > > e.printStackTrace();
> > > > }
> > > > }
> > > > System.out.println("SystemOut");
> > > > }
> > > > }
> > > >
> > > >
> > > > Thanks
> > > >
> > >
> >
>


[jira] [Commented] (KAFKA-559) Garbage collect old consumer metadata entries

2013-07-09 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13703525#comment-13703525
 ] 

Guozhang Wang commented on KAFKA-559:
-

One minor comment: add one debug() at the entry of each function such as 
removeObsoleteConsumerGroups and removeObsoleteConsumerTopics so that, for 
example, if we get an warn that the function aborts, we know it aborts from 
which point.

> Garbage collect old consumer metadata entries
> -
>
> Key: KAFKA-559
> URL: https://issues.apache.org/jira/browse/KAFKA-559
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Jay Kreps
>Assignee: Tejas Patil
>  Labels: project
> Attachments: KAFKA-559.v1.patch, KAFKA-559.v2.patch
>
>
> Many use cases involve tranient consumers. These consumers create entries 
> under their consumer group in zk and maintain offsets there as well. There is 
> currently no way to delete these entries. It would be good to have a tool 
> that did something like
>   bin/delete-obsolete-consumer-groups.sh [--topic t1] --since [date] 
> --zookeeper [zk_connect]
> This would scan through consumer group entries and delete any that had no 
> offset update since the given date.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Work started] (KAFKA-965) merge c39d37e9dd97bf2462ffbd1a96c0b2cb05034bae from 0.8 to trunk

2013-07-09 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-965 started by Jun Rao.

> merge c39d37e9dd97bf2462ffbd1a96c0b2cb05034bae from 0.8 to trunk
> 
>
> Key: KAFKA-965
> URL: https://issues.apache.org/jira/browse/KAFKA-965
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Affects Versions: 0.8.1
>Reporter: Jun Rao
>Assignee: Jun Rao
> Attachments: kafka-965.patch
>
>


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-965) merge c39d37e9dd97bf2462ffbd1a96c0b2cb05034bae from 0.8 to trunk

2013-07-09 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-965:
--

Status: Patch Available  (was: In Progress)

> merge c39d37e9dd97bf2462ffbd1a96c0b2cb05034bae from 0.8 to trunk
> 
>
> Key: KAFKA-965
> URL: https://issues.apache.org/jira/browse/KAFKA-965
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Affects Versions: 0.8.1
>Reporter: Jun Rao
>Assignee: Jun Rao
> Attachments: kafka-965.patch
>
>


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-965) merge c39d37e9dd97bf2462ffbd1a96c0b2cb05034bae from 0.8 to trunk

2013-07-09 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-965:
--

Attachment: kafka-965.patch

Attach a patch. The following files have to be merged manually.

# Unmerged paths:
#   (use "git add/rm ..." as appropriate to mark resolution)
#
#   both modified:  core/src/main/scala/kafka/admin/AdminUtils.scala
#   both modified:  core/src/main/scala/kafka/api/RequestKeys.scala
#   both modified:  core/src/main/scala/kafka/log/Log.scala
#   both modified:  core/src/main/scala/kafka/log/LogSegment.scala
#   deleted by us:  
core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala
#   both modified:  core/src/main/scala/kafka/server/KafkaApis.scala
#   both modified:  core/src/main/scala/kafka/server/KafkaConfig.scala
#   both modified:  core/src/main/scala/kafka/server/KafkaServer.scala
#   both modified:  
core/src/main/scala/kafka/server/ReplicaManager.scala
#   both modified:  core/src/main/scala/kafka/utils/ZkUtils.scala
#   both modified:  core/src/test/scala/unit/kafka/admin/AdminTest.scala
#   both modified:  
core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
#   both modified:  
core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
#   both modified:  
core/src/test/scala/unit/kafka/producer/ProducerTest.scala
#   both modified:  
core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
#   both modified:  
core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
#   both modified:  
core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
#
# Changed but not updated:
#   (use "git add ..." to update what will be committed)
#   (use "git checkout -- ..." to discard changes in working directory)
#
#   modified:   core/src/main/scala/kafka/server/OffsetCheckpoint.scala


> merge c39d37e9dd97bf2462ffbd1a96c0b2cb05034bae from 0.8 to trunk
> 
>
> Key: KAFKA-965
> URL: https://issues.apache.org/jira/browse/KAFKA-965
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Affects Versions: 0.8.1
>Reporter: Jun Rao
>Assignee: Jun Rao
> Attachments: kafka-965.patch
>
>


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Created] (KAFKA-965) merge c39d37e9dd97bf2462ffbd1a96c0b2cb05034bae from 0.8 to trunk

2013-07-09 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-965:
-

 Summary: merge c39d37e9dd97bf2462ffbd1a96c0b2cb05034bae from 0.8 
to trunk
 Key: KAFKA-965
 URL: https://issues.apache.org/jira/browse/KAFKA-965
 Project: Kafka
  Issue Type: Task
  Components: core
Affects Versions: 0.8.1
Reporter: Jun Rao
Assignee: Jun Rao




--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


Re: having problem with 0.8 gzip compression

2013-07-09 Thread Jun Rao
Could you try starting the consumer first (and enable gzip in the producer)?

Thanks,

Jun


On Mon, Jul 8, 2013 at 9:37 PM, Scott Wang <
scott.w...@rumbleentertainment.com> wrote:

> No, I did not start the consumer before the producer.  I actually started
> the producer first and nothing showed up in the consumer unless I commented
> out this line -- props.put("compression.codec", "gzip").If I commented
> out the compression codec, everything just works.
>
>
> On Mon, Jul 8, 2013 at 9:07 PM, Jun Rao  wrote:
>
> > Did you start the consumer before the producer? Be default, the consumer
> > gets only the new data?
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang <
> > scott.w...@rumbleentertainment.com> wrote:
> >
> > > I am testing with Kafka 0.8 beta and having problem of receiving
> message
> > in
> > > consumer.  There is no error so does anyone have any insights.  When I
> > > commented out the "compression.code" everything works fine.
> > >
> > > My producer:
> > > public class TestKafka08Prod {
> > >
> > > public static void main(String [] args) {
> > >
> > > Producer producer = null;
> > > try {
> > > Properties props = new Properties();
> > > props.put("metadata.broker.list", "localhost:9092");
> > > props.put("serializer.class",
> > > "kafka.serializer.StringEncoder");
> > > props.put("producer.type", "sync");
> > > props.put("request.required.acks","1");
> > > props.put("compression.codec", "gzip");
> > > ProducerConfig config = new ProducerConfig(props);
> > > producer = new Producer(config);
> > > int j=0;
> > > for(int i=0; i<10; i++) {
> > > KeyedMessage data = new
> > > KeyedMessage("test-topic", "test-message: "+i+"
> > > "+System.currentTimeMillis());
> > > producer.send(data);
> > >
> > > }
> > >
> > > } catch (Exception e) {
> > > System.out.println("Error happened: ");
> > > e.printStackTrace();
> > > } finally {
> > > if(null != null) {
> > > producer.close();
> > > }
> > >
> > > System.out.println("Ened of Sending");
> > > }
> > >
> > > System.exit(0);
> > > }
> > > }
> > >
> > >
> > > My consumer:
> > >
> > > public class TestKafka08Consumer {
> > > public static void main(String [] args) throws
> UnknownHostException,
> > > SocketException {
> > >
> > > Properties props = new Properties();
> > > props.put("zookeeper.connect", "localhost:2181/kafka_0_8");
> > > props.put("group.id", "test08ConsumerId");
> > > props.put("zk.sessiontimeout.ms", "4000");
> > > props.put("zk.synctime.ms", "2000");
> > > props.put("autocommit.interval.ms", "1000");
> > >
> > > ConsumerConfig consumerConfig = new ConsumerConfig(props);
> > >
> > > ConsumerConnector consumerConnector =
> > > kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
> > >
> > > String topic = "test-topic";
> > > Map topicCountMap = new HashMap > > Integer>();
> > > topicCountMap.put(topic, new Integer(1));
> > > Map>> consumerMap =
> > > consumerConnector.createMessageStreams(topicCountMap);
> > > KafkaStream stream =
> > >  consumerMap.get(topic).get(0);
> > >
> > > ConsumerIterator it = stream.iterator();
> > >
> > > int counter=0;
> > > while(it.hasNext()) {
> > > try {
> > > String fromPlatform = new String(it.next().message());
> > > System.out.println("The messages: "+fromPlatform);
> > > } catch(Exception e) {
> > > e.printStackTrace();
> > > }
> > > }
> > > System.out.println("SystemOut");
> > > }
> > > }
> > >
> > >
> > > Thanks
> > >
> >
>


[jira] [Commented] (KAFKA-559) Garbage collect old consumer metadata entries

2013-07-09 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13703007#comment-13703007
 ] 

Joel Koshy commented on KAFKA-559:
--

Thanks for the patch. Overall, looks good.  Couple of comments, mostly minor
in no particular order:

* I think dry-run does not need any further qualifier such as withOptionalArg, 
describedAs, ofType - it's just a flag.
* For --since, I prefer the seconds since epoch over some fixed input format 
which then brings in ambiguity such as timezone 24h vs 12h, etc. A better 
alternative would be to accept date strings and use  the DateFormat class with 
lenient parsing turned on or something like that. --before may be more 
intuitive than --since.
* Can use CommandLineUtils.checkRequiredArgs
* deleteBy matching - prefer to use case match and thereby avoid the explicit 
check on valid values. Also the message on invalid value of deleteBy should 
inform what the valid values are.
* Right now you support the following modes: delete stale topics across all 
groups, delete stale topics in a specific group. I think it would be useful to 
make deleteBy optional - if unspecified, it scans all groups and gets rid of 
stale groups.
* line 75: warn ("msg", e)
* line 101: should provide a reason for aborting
* line 110: doesn't gropudirs have an offset path? if not maybe we should add it
* Logging should include last mtime as that may be useful information reported 
by the dry-run
* No need to add a wrapper shell script for the tool.
* make all of the methods except main private.
* The return statements can be dropped - i.e., just write the return value.
* Several vars can be vals instead.
* removeBrokerPartitionpairs: I don't think you would want to do a partial 
delete under a topic directory. You can check that all the partition offset 
paths are <= since and if so, just delete the topic path. With that the method 
would be better named something like deleteUnconsumedTopicsFromGroup?
* Finally, you are probably aware that there are a bunch of race conditions - 
e.g., checkIfLiveConsumers is a helpful check to have but not guaranteed to be 
correct as some consumers may creep in while the tool is running. However, I 
think it is reasonable for a tool like this to ignore that since a "since" 
value way back would mean the probability of that occuring is very low. Similar 
note for deleteGroupIfNoTopicExists.


> Garbage collect old consumer metadata entries
> -
>
> Key: KAFKA-559
> URL: https://issues.apache.org/jira/browse/KAFKA-559
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Jay Kreps
>Assignee: Tejas Patil
>  Labels: project
> Attachments: KAFKA-559.v1.patch, KAFKA-559.v2.patch
>
>
> Many use cases involve tranient consumers. These consumers create entries 
> under their consumer group in zk and maintain offsets there as well. There is 
> currently no way to delete these entries. It would be good to have a tool 
> that did something like
>   bin/delete-obsolete-consumer-groups.sh [--topic t1] --since [date] 
> --zookeeper [zk_connect]
> This would scan through consumer group entries and delete any that had no 
> offset update since the given date.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira