Also noticed another issue

Specified below is the current configuration

Topic1 -> n Partitions -> 2 Consumer Groups (gr1 and gr2)
Topic2 -> n Partitions -> 2 Consumer Groups (gr1 and gr2)

Notice that I have used the same naming convention on the consumer group
set i.e. 'gr1' and 'gr2' are consumer groups associated with 2 sets of
topics.

On calling the *ConsumerOffsetChecker* API, I am receiving a
ClosedChannelException

(Check Trace Below)

Is there any namespace collision occurring here ? This issue is
reproducible with the following setup above


*bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group gr2
--zkconnect localhost:2181*


2013-09-12 01:01:59,701] INFO Initiating client connection,
connectString=localhost:2181 sessionTimeout=30000
watcher=org.I0Itec.zkclient.ZkClient@3af0ce45(org.apache.zookeeper.ZooKeeper)
[2013-09-12 01:01:59,724] INFO Opening socket connection to server
localhost/127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn)
[2013-09-12 01:01:59,732] INFO Socket connection established to localhost/
127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2013-09-12 01:01:59,741] INFO Session establishment complete on server
localhost/127.0.0.1:2181, sessionid = 0x140924380790211, negotiated timeout
= 30000 (org.apache.zookeeper.ClientCnxn)
[2013-09-12 01:01:59,744] INFO zookeeper state changed (SyncConnected)
(org.I0Itec.zkclient.ZkClient)
Group           Topic                          Pid Offset          logSize
        Lag             Owner
gr2             pe1                            0   129985          130625
       640             none
gr2             pe1                            1   0               0
        0               none
gr2             pe2                            0   130493          130493
       0               gr2_ip-XXXXXXXXXX-6c6f5d94-0
[2013-09-12 01:02:00,514] INFO Reconnect due to socket error:
 (kafka.consumer.SimpleConsumer)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72)
 at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125)
 at
kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:72)
at
kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:90)
 at
kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
at
kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
 at
scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
at scala.collection.immutable.List.foreach(List.scala:45)
 at
kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:89)
at
kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
 at
kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
at
scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
 at scala.collection.immutable.List.foreach(List.scala:45)
at kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:153)
 at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala)
gr2             pe2                            1   0               0
        0               gr2_ip-XXXXXXX-6c6f5d94-1
[2013-09-12 01:02:00,523] INFO Terminate ZkClient event thread.
(org.I0Itec.zkclient.ZkEventThread)
[2013-09-12 01:02:00,526] INFO Session: 0x140924380790211 closed
(org.apache.zookeeper.ZooKeeper)
[2013-09-12 01:02:00,526] INFO EventThread shut down
(org.apache.zookeeper.ClientCnxn)







On Wed, Sep 11, 2013 at 5:46 PM, Neha Narkhede <neha.narkh...@gmail.com>
 wrote:

> I think you are hitting this -
>
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whysomeoftheconsumersinaconsumergroupneverreceiveanymessage%3F
>
> Let us know if we can improve the documentation to make it clearer.
>
> Thanks,
> Neha
>
>
> On Wed, Sep 11, 2013 at 5:28 PM, prashant amar <amasin...@gmail.com>
> wrote:
>
> > Also attempted another pattern where
> >
> > 1. Created a topic T with 'n' partitions.
> > 2. Created a consumer group process with 'n + 1' threads subscribing from
> > topic 'T' with a groupID 'y'
> > 3. Added another consumer group process with 'n + 1' threads subscribing
> > from same topic 'T' with same groupID 'z'
> > (Note that 2 and 3 subscribe from same topic but different groups)
> >
> > Can a single topic with multiple partitions abetted with multiple
> consumer
> > groups increase parallelism is consumption?
> >
> >
> >
> >
> >
> >
> >
> >
> > On Wed, Sep 11, 2013 at 4:48 PM, prashant amar <amasin...@gmail.com>
> > wrote:
> >
> > > A Design Question that needs verification:
> > >
> > > 1. Created a topic T with 'n' partitions.
> > > 2. Created a consumer group process with 'n + 1' threads subscribing
> from
> > > topic 'T' with a groupID 'y'
> > > 3. Added another consumer group process with 'n + 1' threads
> subscribing
> > > from same topic 'T' with same groupID 'y'
> > >
> > > On doing so, I noticed that the previous consumer group stops consuming
> > > and the new consumer beings to consume
> > >
> > > I was attempting to model on demand parallelization in an event where
> an
> > > consumer group cannot keep up with the events produced. Rather than
> > > increase the threadpool capacity in the same process, it would make
> sense
> > > to distribute the load across multiple processes.
> > >
> > > Advice please?
> > >
> > > Regards
> > > Amardeep
> > >
> >
>
>

Reply via email to