Multiple Processes Consuming from Same GroupID
A Design Question that needs verification: 1. Created a topic T with 'n' partitions. 2. Created a consumer group process with 'n + 1' threads subscribing from topic 'T' with a groupID 'y' 3. Added another consumer group process with 'n + 1' threads subscribing from same topic 'T' with same groupID 'y' On doing so, I noticed that the previous consumer group stops consuming and the new consumer beings to consume I was attempting to model on demand parallelization in an event where an consumer group cannot keep up with the events produced. Rather than increase the threadpool capacity in the same process, it would make sense to distribute the load across multiple processes. Advice please? Regards Amardeep
Re: Multiple Processes Consuming from Same GroupID
I think you are hitting this - https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whysomeoftheconsumersinaconsumergroupneverreceiveanymessage%3F Let us know if we can improve the documentation to make it clearer. Thanks, Neha On Wed, Sep 11, 2013 at 5:28 PM, prashant amar amasin...@gmail.com wrote: Also attempted another pattern where 1. Created a topic T with 'n' partitions. 2. Created a consumer group process with 'n + 1' threads subscribing from topic 'T' with a groupID 'y' 3. Added another consumer group process with 'n + 1' threads subscribing from same topic 'T' with same groupID 'z' (Note that 2 and 3 subscribe from same topic but different groups) Can a single topic with multiple partitions abetted with multiple consumer groups increase parallelism is consumption? On Wed, Sep 11, 2013 at 4:48 PM, prashant amar amasin...@gmail.com wrote: A Design Question that needs verification: 1. Created a topic T with 'n' partitions. 2. Created a consumer group process with 'n + 1' threads subscribing from topic 'T' with a groupID 'y' 3. Added another consumer group process with 'n + 1' threads subscribing from same topic 'T' with same groupID 'y' On doing so, I noticed that the previous consumer group stops consuming and the new consumer beings to consume I was attempting to model on demand parallelization in an event where an consumer group cannot keep up with the events produced. Rather than increase the threadpool capacity in the same process, it would make sense to distribute the load across multiple processes. Advice please? Regards Amardeep
Re: Multiple Processes Consuming from Same GroupID
Also noticed another issue Specified below is the current configuration Topic1 - n Partitions - 2 Consumer Groups (gr1 and gr2) Topic2 - n Partitions - 2 Consumer Groups (gr1 and gr2) Notice that I have used the same naming convention on the consumer group set i.e. 'gr1' and 'gr2' are consumer groups associated with 2 sets of topics. On calling the *ConsumerOffsetChecker* API, I am receiving a ClosedChannelException (Check Trace Below) Is there any namespace collision occurring here ? This issue is reproducible with the following setup above *bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group gr2 --zkconnect localhost:2181* 2013-09-12 01:01:59,701] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=3 watcher=org.I0Itec.zkclient.ZkClient@3af0ce45(org.apache.zookeeper.ZooKeeper) [2013-09-12 01:01:59,724] INFO Opening socket connection to server localhost/127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn) [2013-09-12 01:01:59,732] INFO Socket connection established to localhost/ 127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn) [2013-09-12 01:01:59,741] INFO Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x140924380790211, negotiated timeout = 3 (org.apache.zookeeper.ClientCnxn) [2013-09-12 01:01:59,744] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient) Group Topic Pid Offset logSize Lag Owner gr2 pe10 129985 130625 640 none gr2 pe11 0 0 0 none gr2 pe20 130493 130493 0 gr2_ip-XX-6c6f5d94-0 [2013-09-12 01:02:00,514] INFO Reconnect due to socket error: (kafka.consumer.SimpleConsumer) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:89) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125) at kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:72) at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:90) at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90) at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) at scala.collection.immutable.List.foreach(List.scala:45) at kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:89) at kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154) at kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) at scala.collection.immutable.List.foreach(List.scala:45) at kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:153) at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala) gr2 pe21 0 0 0 gr2_ip-XXX-6c6f5d94-1 [2013-09-12 01:02:00,523] INFO Terminate ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread) [2013-09-12 01:02:00,526] INFO Session: 0x140924380790211 closed (org.apache.zookeeper.ZooKeeper) [2013-09-12 01:02:00,526] INFO EventThread shut down (org.apache.zookeeper.ClientCnxn) On Wed, Sep 11, 2013 at 5:46 PM, Neha Narkhede neha.narkh...@gmail.com wrote: I think you are hitting this - https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whysomeoftheconsumersinaconsumergroupneverreceiveanymessage%3F Let us know if we can improve the documentation to make it clearer. Thanks, Neha On Wed, Sep 11, 2013 at 5:28 PM, prashant amar amasin...@gmail.com wrote: Also attempted another pattern where 1. Created a topic T with 'n' partitions. 2. Created a consumer group process with 'n + 1' threads subscribing from topic 'T' with a groupID 'y' 3. Added another consumer group process with 'n + 1' threads subscribing from same topic 'T' with same groupID 'z' (Note that 2 and 3 subscribe from same topic but different groups) Can a single topic with multiple partitions abetted with multiple consumer groups increase parallelism is consumption? On Wed, Sep 11, 2013 at 4:48 PM, prashant amar amasin...@gmail.com wrote: A Design Question that
Re: Multiple Processes Consuming from Same GroupID
This means the broker somehow closed the socket connection. Anything in the broker log around the same time? Thanks, Jun On Wed, Sep 11, 2013 at 6:07 PM, prashant amar amasin...@gmail.com wrote: Also noticed another issue Specified below is the current configuration Topic1 - n Partitions - 2 Consumer Groups (gr1 and gr2) Topic2 - n Partitions - 2 Consumer Groups (gr1 and gr2) Notice that I have used the same naming convention on the consumer group set i.e. 'gr1' and 'gr2' are consumer groups associated with 2 sets of topics. On calling the *ConsumerOffsetChecker* API, I am receiving a ClosedChannelException (Check Trace Below) Is there any namespace collision occurring here ? This issue is reproducible with the following setup above *bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group gr2 --zkconnect localhost:2181* 2013-09-12 01:01:59,701] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=3 watcher=org.I0Itec.zkclient.ZkClient@3af0ce45 (org.apache.zookeeper.ZooKeeper) [2013-09-12 01:01:59,724] INFO Opening socket connection to server localhost/127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn) [2013-09-12 01:01:59,732] INFO Socket connection established to localhost/ 127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn) [2013-09-12 01:01:59,741] INFO Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x140924380790211, negotiated timeout = 3 (org.apache.zookeeper.ClientCnxn) [2013-09-12 01:01:59,744] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient) Group Topic Pid Offset logSize Lag Owner gr2 pe10 129985 130625 640 none gr2 pe11 0 0 0 none gr2 pe20 130493 130493 0 gr2_ip-XX-6c6f5d94-0 [2013-09-12 01:02:00,514] INFO Reconnect due to socket error: (kafka.consumer.SimpleConsumer) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:89) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125) at kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:72) at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:90) at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90) at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) at scala.collection.immutable.List.foreach(List.scala:45) at kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:89) at kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154) at kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) at scala.collection.immutable.List.foreach(List.scala:45) at kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:153) at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala) gr2 pe21 0 0 0 gr2_ip-XXX-6c6f5d94-1 [2013-09-12 01:02:00,523] INFO Terminate ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread) [2013-09-12 01:02:00,526] INFO Session: 0x140924380790211 closed (org.apache.zookeeper.ZooKeeper) [2013-09-12 01:02:00,526] INFO EventThread shut down (org.apache.zookeeper.ClientCnxn) On Wed, Sep 11, 2013 at 5:46 PM, Neha Narkhede neha.narkh...@gmail.com wrote: I think you are hitting this - https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whysomeoftheconsumersinaconsumergroupneverreceiveanymessage%3F Let us know if we can improve the documentation to make it clearer. Thanks, Neha On Wed, Sep 11, 2013 at 5:28 PM, prashant amar amasin...@gmail.com wrote: Also attempted another pattern where 1. Created a topic T with 'n' partitions. 2. Created a consumer group process with 'n + 1' threads subscribing from topic 'T' with a groupID 'y' 3. Added another consumer group process with 'n + 1' threads subscribing from same topic 'T' with same groupID 'z' (Note
Re: Multiple Processes Consuming from Same GroupID
From the broker log: INFO Reconnect due to socket error: (kafka.consumer.SimpleConsumer) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:89) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125) at kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:72) at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:90) at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90) at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) at scala.collection.immutable.List.foreach(List.scala:45) at kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:89) at kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154) at kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) at scala.collection.immutable.List.foreach(List.scala:45) at kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:153) at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala) On Wed, Sep 11, 2013 at 10:11 PM, Jun Rao jun...@gmail.com wrote: This means the broker somehow closed the socket connection. Anything in the broker log around the same time? Thanks, Jun On Wed, Sep 11, 2013 at 6:07 PM, prashant amar amasin...@gmail.com wrote: Also noticed another issue Specified below is the current configuration Topic1 - n Partitions - 2 Consumer Groups (gr1 and gr2) Topic2 - n Partitions - 2 Consumer Groups (gr1 and gr2) Notice that I have used the same naming convention on the consumer group set i.e. 'gr1' and 'gr2' are consumer groups associated with 2 sets of topics. On calling the *ConsumerOffsetChecker* API, I am receiving a ClosedChannelException (Check Trace Below) Is there any namespace collision occurring here ? This issue is reproducible with the following setup above *bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group gr2 --zkconnect localhost:2181* 2013-09-12 01:01:59,701] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=3 watcher=org.I0Itec.zkclient.ZkClient@3af0ce45 (org.apache.zookeeper.ZooKeeper) [2013-09-12 01:01:59,724] INFO Opening socket connection to server localhost/127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn) [2013-09-12 01:01:59,732] INFO Socket connection established to localhost/ 127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn) [2013-09-12 01:01:59,741] INFO Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x140924380790211, negotiated timeout = 3 (org.apache.zookeeper.ClientCnxn) [2013-09-12 01:01:59,744] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient) Group Topic Pid Offset logSize Lag Owner gr2 pe10 129985 130625 640 none gr2 pe11 0 0 0 none gr2 pe20 130493 130493 0 gr2_ip-XX-6c6f5d94-0 [2013-09-12 01:02:00,514] INFO Reconnect due to socket error: (kafka.consumer.SimpleConsumer) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:89) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125) at kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:72) at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:90) at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90) at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90) at
Re: Multiple Processes Consuming from Same GroupID
I usually get this exception when I define 2 partitions .. Current configuration : Single Topic - 4 partitions 1 Consumers Group - 10 Threads On Wed, Sep 11, 2013 at 10:24 PM, prashant amar amasin...@gmail.com wrote: From the broker log: INFO Reconnect due to socket error: (kafka.consumer.SimpleConsumer) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:89) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125) at kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:72) at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:90) at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90) at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) at scala.collection.immutable.List.foreach(List.scala:45) at kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:89) at kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154) at kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) at scala.collection.immutable.List.foreach(List.scala:45) at kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:153) at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala) On Wed, Sep 11, 2013 at 10:11 PM, Jun Rao jun...@gmail.com wrote: This means the broker somehow closed the socket connection. Anything in the broker log around the same time? Thanks, Jun On Wed, Sep 11, 2013 at 6:07 PM, prashant amar amasin...@gmail.com wrote: Also noticed another issue Specified below is the current configuration Topic1 - n Partitions - 2 Consumer Groups (gr1 and gr2) Topic2 - n Partitions - 2 Consumer Groups (gr1 and gr2) Notice that I have used the same naming convention on the consumer group set i.e. 'gr1' and 'gr2' are consumer groups associated with 2 sets of topics. On calling the *ConsumerOffsetChecker* API, I am receiving a ClosedChannelException (Check Trace Below) Is there any namespace collision occurring here ? This issue is reproducible with the following setup above *bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group gr2 --zkconnect localhost:2181* 2013-09-12 01:01:59,701] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=3 watcher=org.I0Itec.zkclient.ZkClient@3af0ce45 (org.apache.zookeeper.ZooKeeper) [2013-09-12 01:01:59,724] INFO Opening socket connection to server localhost/127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn) [2013-09-12 01:01:59,732] INFO Socket connection established to localhost/ 127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn) [2013-09-12 01:01:59,741] INFO Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x140924380790211, negotiated timeout = 3 (org.apache.zookeeper.ClientCnxn) [2013-09-12 01:01:59,744] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient) Group Topic Pid Offset logSize Lag Owner gr2 pe10 129985 130625 640 none gr2 pe11 0 0 0 none gr2 pe20 130493 130493 0 gr2_ip-XX-6c6f5d94-0 [2013-09-12 01:02:00,514] INFO Reconnect due to socket error: (kafka.consumer.SimpleConsumer) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:89) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125) at kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:72) at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:90) at