Multiple Processes Consuming from Same GroupID

2013-09-11 Thread prashant amar
A Design Question that needs verification:

1. Created a topic T with 'n' partitions.
2. Created a consumer group process with 'n + 1' threads subscribing from
topic 'T' with a groupID 'y'
3. Added another consumer group process with 'n + 1' threads subscribing
from same topic 'T' with same groupID 'y'

On doing so, I noticed that the previous consumer group stops consuming and
the new consumer beings to consume

I was attempting to model on demand parallelization in an event where an
consumer group cannot keep up with the events produced. Rather than
increase the threadpool capacity in the same process, it would make sense
to distribute the load across multiple processes.

Advice please?

Regards
Amardeep


Re: Multiple Processes Consuming from Same GroupID

2013-09-11 Thread Neha Narkhede
I think you are hitting this -
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whysomeoftheconsumersinaconsumergroupneverreceiveanymessage%3F

Let us know if we can improve the documentation to make it clearer.

Thanks,
Neha


On Wed, Sep 11, 2013 at 5:28 PM, prashant amar amasin...@gmail.com wrote:

 Also attempted another pattern where

 1. Created a topic T with 'n' partitions.
 2. Created a consumer group process with 'n + 1' threads subscribing from
 topic 'T' with a groupID 'y'
 3. Added another consumer group process with 'n + 1' threads subscribing
 from same topic 'T' with same groupID 'z'
 (Note that 2 and 3 subscribe from same topic but different groups)

 Can a single topic with multiple partitions abetted with multiple consumer
 groups increase parallelism is consumption?








 On Wed, Sep 11, 2013 at 4:48 PM, prashant amar amasin...@gmail.com
 wrote:

  A Design Question that needs verification:
 
  1. Created a topic T with 'n' partitions.
  2. Created a consumer group process with 'n + 1' threads subscribing from
  topic 'T' with a groupID 'y'
  3. Added another consumer group process with 'n + 1' threads subscribing
  from same topic 'T' with same groupID 'y'
 
  On doing so, I noticed that the previous consumer group stops consuming
  and the new consumer beings to consume
 
  I was attempting to model on demand parallelization in an event where an
  consumer group cannot keep up with the events produced. Rather than
  increase the threadpool capacity in the same process, it would make sense
  to distribute the load across multiple processes.
 
  Advice please?
 
  Regards
  Amardeep
 



Re: Multiple Processes Consuming from Same GroupID

2013-09-11 Thread prashant amar
Also noticed another issue

Specified below is the current configuration

Topic1 - n Partitions - 2 Consumer Groups (gr1 and gr2)
Topic2 - n Partitions - 2 Consumer Groups (gr1 and gr2)

Notice that I have used the same naming convention on the consumer group
set i.e. 'gr1' and 'gr2' are consumer groups associated with 2 sets of
topics.

On calling the *ConsumerOffsetChecker* API, I am receiving a
ClosedChannelException

(Check Trace Below)

Is there any namespace collision occurring here ? This issue is
reproducible with the following setup above


*bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group gr2
--zkconnect localhost:2181*


2013-09-12 01:01:59,701] INFO Initiating client connection,
connectString=localhost:2181 sessionTimeout=3
watcher=org.I0Itec.zkclient.ZkClient@3af0ce45(org.apache.zookeeper.ZooKeeper)
[2013-09-12 01:01:59,724] INFO Opening socket connection to server
localhost/127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn)
[2013-09-12 01:01:59,732] INFO Socket connection established to localhost/
127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2013-09-12 01:01:59,741] INFO Session establishment complete on server
localhost/127.0.0.1:2181, sessionid = 0x140924380790211, negotiated timeout
= 3 (org.apache.zookeeper.ClientCnxn)
[2013-09-12 01:01:59,744] INFO zookeeper state changed (SyncConnected)
(org.I0Itec.zkclient.ZkClient)
Group   Topic  Pid Offset  logSize
Lag Owner
gr2 pe10   129985  130625
   640 none
gr2 pe11   0   0
0   none
gr2 pe20   130493  130493
   0   gr2_ip-XX-6c6f5d94-0
[2013-09-12 01:02:00,514] INFO Reconnect due to socket error:
 (kafka.consumer.SimpleConsumer)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72)
 at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125)
 at
kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:72)
at
kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:90)
 at
kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
at
kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
 at
scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
at scala.collection.immutable.List.foreach(List.scala:45)
 at
kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:89)
at
kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
 at
kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
at
scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
 at scala.collection.immutable.List.foreach(List.scala:45)
at kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:153)
 at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala)
gr2 pe21   0   0
0   gr2_ip-XXX-6c6f5d94-1
[2013-09-12 01:02:00,523] INFO Terminate ZkClient event thread.
(org.I0Itec.zkclient.ZkEventThread)
[2013-09-12 01:02:00,526] INFO Session: 0x140924380790211 closed
(org.apache.zookeeper.ZooKeeper)
[2013-09-12 01:02:00,526] INFO EventThread shut down
(org.apache.zookeeper.ClientCnxn)







On Wed, Sep 11, 2013 at 5:46 PM, Neha Narkhede neha.narkh...@gmail.com
 wrote:

 I think you are hitting this -

 https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whysomeoftheconsumersinaconsumergroupneverreceiveanymessage%3F

 Let us know if we can improve the documentation to make it clearer.

 Thanks,
 Neha


 On Wed, Sep 11, 2013 at 5:28 PM, prashant amar amasin...@gmail.com
 wrote:

  Also attempted another pattern where
 
  1. Created a topic T with 'n' partitions.
  2. Created a consumer group process with 'n + 1' threads subscribing from
  topic 'T' with a groupID 'y'
  3. Added another consumer group process with 'n + 1' threads subscribing
  from same topic 'T' with same groupID 'z'
  (Note that 2 and 3 subscribe from same topic but different groups)
 
  Can a single topic with multiple partitions abetted with multiple
 consumer
  groups increase parallelism is consumption?
 
 
 
 
 
 
 
 
  On Wed, Sep 11, 2013 at 4:48 PM, prashant amar amasin...@gmail.com
  wrote:
 
   A Design Question that 

Re: Multiple Processes Consuming from Same GroupID

2013-09-11 Thread Jun Rao
This means the broker somehow closed the socket connection. Anything in the
broker log around the same time?

Thanks,

Jun


On Wed, Sep 11, 2013 at 6:07 PM, prashant amar amasin...@gmail.com wrote:

 Also noticed another issue

 Specified below is the current configuration

 Topic1 - n Partitions - 2 Consumer Groups (gr1 and gr2)
 Topic2 - n Partitions - 2 Consumer Groups (gr1 and gr2)

 Notice that I have used the same naming convention on the consumer group
 set i.e. 'gr1' and 'gr2' are consumer groups associated with 2 sets of
 topics.

 On calling the *ConsumerOffsetChecker* API, I am receiving a
 ClosedChannelException

 (Check Trace Below)

 Is there any namespace collision occurring here ? This issue is
 reproducible with the following setup above


 *bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group gr2
 --zkconnect localhost:2181*


 2013-09-12 01:01:59,701] INFO Initiating client connection,
 connectString=localhost:2181 sessionTimeout=3
 watcher=org.I0Itec.zkclient.ZkClient@3af0ce45
 (org.apache.zookeeper.ZooKeeper)
 [2013-09-12 01:01:59,724] INFO Opening socket connection to server
 localhost/127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn)
 [2013-09-12 01:01:59,732] INFO Socket connection established to localhost/
 127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
 [2013-09-12 01:01:59,741] INFO Session establishment complete on server
 localhost/127.0.0.1:2181, sessionid = 0x140924380790211, negotiated
 timeout
 = 3 (org.apache.zookeeper.ClientCnxn)
 [2013-09-12 01:01:59,744] INFO zookeeper state changed (SyncConnected)
 (org.I0Itec.zkclient.ZkClient)
 Group   Topic  Pid Offset  logSize
 Lag Owner
 gr2 pe10   129985  130625
640 none
 gr2 pe11   0   0
 0   none
 gr2 pe20   130493  130493
0   gr2_ip-XX-6c6f5d94-0
 [2013-09-12 01:02:00,514] INFO Reconnect due to socket error:
  (kafka.consumer.SimpleConsumer)
 java.nio.channels.ClosedChannelException
 at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
 at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72)
  at

 kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
 at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125)
  at

 kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:72)
 at

 kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:90)
  at

 kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
 at

 kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
  at

 scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
 at scala.collection.immutable.List.foreach(List.scala:45)
  at

 kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:89)
 at

 kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
  at

 kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
 at

 scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
  at scala.collection.immutable.List.foreach(List.scala:45)
 at kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:153)
  at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala)
 gr2 pe21   0   0
 0   gr2_ip-XXX-6c6f5d94-1
 [2013-09-12 01:02:00,523] INFO Terminate ZkClient event thread.
 (org.I0Itec.zkclient.ZkEventThread)
 [2013-09-12 01:02:00,526] INFO Session: 0x140924380790211 closed
 (org.apache.zookeeper.ZooKeeper)
 [2013-09-12 01:02:00,526] INFO EventThread shut down
 (org.apache.zookeeper.ClientCnxn)







 On Wed, Sep 11, 2013 at 5:46 PM, Neha Narkhede neha.narkh...@gmail.com
  wrote:

  I think you are hitting this -
 
 
 https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whysomeoftheconsumersinaconsumergroupneverreceiveanymessage%3F
 
  Let us know if we can improve the documentation to make it clearer.
 
  Thanks,
  Neha
 
 
  On Wed, Sep 11, 2013 at 5:28 PM, prashant amar amasin...@gmail.com
  wrote:
 
   Also attempted another pattern where
  
   1. Created a topic T with 'n' partitions.
   2. Created a consumer group process with 'n + 1' threads subscribing
 from
   topic 'T' with a groupID 'y'
   3. Added another consumer group process with 'n + 1' threads
 subscribing
   from same topic 'T' with same groupID 'z'
   (Note 

Re: Multiple Processes Consuming from Same GroupID

2013-09-11 Thread prashant amar
From the broker log:


INFO Reconnect due to socket error:  (kafka.consumer.SimpleConsumer)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72)
at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125)
at
kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:72)
at
kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:90)
at
kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
at
kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
at
scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
at scala.collection.immutable.List.foreach(List.scala:45)
at
kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:89)
at
kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
at
kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
at
scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
at scala.collection.immutable.List.foreach(List.scala:45)
at kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:153)
at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala)


On Wed, Sep 11, 2013 at 10:11 PM, Jun Rao jun...@gmail.com wrote:

 This means the broker somehow closed the socket connection. Anything in the
 broker log around the same time?

 Thanks,

 Jun


 On Wed, Sep 11, 2013 at 6:07 PM, prashant amar amasin...@gmail.com
 wrote:

  Also noticed another issue
 
  Specified below is the current configuration
 
  Topic1 - n Partitions - 2 Consumer Groups (gr1 and gr2)
  Topic2 - n Partitions - 2 Consumer Groups (gr1 and gr2)
 
  Notice that I have used the same naming convention on the consumer group
  set i.e. 'gr1' and 'gr2' are consumer groups associated with 2 sets of
  topics.
 
  On calling the *ConsumerOffsetChecker* API, I am receiving a
  ClosedChannelException
 
  (Check Trace Below)
 
  Is there any namespace collision occurring here ? This issue is
  reproducible with the following setup above
 
 
  *bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group gr2
  --zkconnect localhost:2181*
 
 
  2013-09-12 01:01:59,701] INFO Initiating client connection,
  connectString=localhost:2181 sessionTimeout=3
  watcher=org.I0Itec.zkclient.ZkClient@3af0ce45
  (org.apache.zookeeper.ZooKeeper)
  [2013-09-12 01:01:59,724] INFO Opening socket connection to server
  localhost/127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn)
  [2013-09-12 01:01:59,732] INFO Socket connection established to
 localhost/
  127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
  [2013-09-12 01:01:59,741] INFO Session establishment complete on server
  localhost/127.0.0.1:2181, sessionid = 0x140924380790211, negotiated
  timeout
  = 3 (org.apache.zookeeper.ClientCnxn)
  [2013-09-12 01:01:59,744] INFO zookeeper state changed (SyncConnected)
  (org.I0Itec.zkclient.ZkClient)
  Group   Topic  Pid Offset
  logSize
  Lag Owner
  gr2 pe10   129985  130625
 640 none
  gr2 pe11   0   0
  0   none
  gr2 pe20   130493  130493
 0   gr2_ip-XX-6c6f5d94-0
  [2013-09-12 01:02:00,514] INFO Reconnect due to socket error:
   (kafka.consumer.SimpleConsumer)
  java.nio.channels.ClosedChannelException
  at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
  at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72)
   at
 
 
 kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
  at
 kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125)
   at
 
 
 kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:72)
  at
 
 
 kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:90)
   at
 
 
 kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
  at
 
 
 kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
   at
 
 
 

Re: Multiple Processes Consuming from Same GroupID

2013-09-11 Thread prashant amar
I usually get this exception when I define  2 partitions ..

Current configuration :

Single Topic - 4 partitions
1 Consumers Group - 10 Threads




On Wed, Sep 11, 2013 at 10:24 PM, prashant amar amasin...@gmail.com wrote:

 From the broker log:


 INFO Reconnect due to socket error:  (kafka.consumer.SimpleConsumer)
 java.nio.channels.ClosedChannelException
 at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
  at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72)
 at
 kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
  at
 kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125)
 at
 kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:72)
  at
 kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:90)
 at
 kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
  at
 kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
 at
 scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
  at scala.collection.immutable.List.foreach(List.scala:45)
 at
 kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:89)
  at
 kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
 at
 kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
  at
 scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
 at scala.collection.immutable.List.foreach(List.scala:45)
  at
 kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:153)
 at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala)


 On Wed, Sep 11, 2013 at 10:11 PM, Jun Rao jun...@gmail.com wrote:

 This means the broker somehow closed the socket connection. Anything in
 the
 broker log around the same time?

 Thanks,

 Jun


 On Wed, Sep 11, 2013 at 6:07 PM, prashant amar amasin...@gmail.com
 wrote:

  Also noticed another issue
 
  Specified below is the current configuration
 
  Topic1 - n Partitions - 2 Consumer Groups (gr1 and gr2)
  Topic2 - n Partitions - 2 Consumer Groups (gr1 and gr2)
 
  Notice that I have used the same naming convention on the consumer group
  set i.e. 'gr1' and 'gr2' are consumer groups associated with 2 sets of
  topics.
 
  On calling the *ConsumerOffsetChecker* API, I am receiving a
  ClosedChannelException
 
  (Check Trace Below)
 
  Is there any namespace collision occurring here ? This issue is
  reproducible with the following setup above
 
 
  *bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group gr2
  --zkconnect localhost:2181*
 
 
  2013-09-12 01:01:59,701] INFO Initiating client connection,
  connectString=localhost:2181 sessionTimeout=3
  watcher=org.I0Itec.zkclient.ZkClient@3af0ce45
  (org.apache.zookeeper.ZooKeeper)
  [2013-09-12 01:01:59,724] INFO Opening socket connection to server
  localhost/127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn)
  [2013-09-12 01:01:59,732] INFO Socket connection established to
 localhost/
  127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
  [2013-09-12 01:01:59,741] INFO Session establishment complete on server
  localhost/127.0.0.1:2181, sessionid = 0x140924380790211, negotiated
  timeout
  = 3 (org.apache.zookeeper.ClientCnxn)
  [2013-09-12 01:01:59,744] INFO zookeeper state changed (SyncConnected)
  (org.I0Itec.zkclient.ZkClient)
  Group   Topic  Pid Offset
  logSize
  Lag Owner
  gr2 pe10   129985
  130625
 640 none
  gr2 pe11   0   0
  0   none
  gr2 pe20   130493
  130493
 0   gr2_ip-XX-6c6f5d94-0
  [2013-09-12 01:02:00,514] INFO Reconnect due to socket error:
   (kafka.consumer.SimpleConsumer)
  java.nio.channels.ClosedChannelException
  at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
  at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72)
   at
 
 
 kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
  at
 kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125)
   at
 
 
 kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:72)
  at
 
 
 kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:90)
   at