[jira] [Commented] (KAFKA-2413) New consumer's subscribe(Topic...) api fails if called more than once
[ https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14662548#comment-14662548 ] ASF GitHub Bot commented on KAFKA-2413: --- Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/122 > New consumer's subscribe(Topic...) api fails if called more than once > - > > Key: KAFKA-2413 > URL: https://issues.apache.org/jira/browse/KAFKA-2413 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: Ashish K Singh >Assignee: Onur Karaman > Fix For: 0.8.3 > > > I believe new consumer is supposed to allow adding to existing topic > subscriptions. If it is then the issue is that on trying to subscribe to a > topic when consumer is already subscribed to a topic, below exception is > thrown. > {code} > [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null > (kafka.server.KafkaApis:103) > java.util.NoSuchElementException: key not found: topic > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:58) > at scala.collection.MapLike$class.apply(MapLike.scala:141) > at scala.collection.AbstractMap.apply(Map.scala:58) > at > kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109) > at > kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) > at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108) > at > kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378) > at > kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360) > at > kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414) > at > kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39) > at > kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72) > at > kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37) > at > kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388) > at > kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37) > at > kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307) > at > kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227) > at > kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186) > at > kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131) > at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578) > at kafka.server.KafkaApis.handle(KafkaApis.scala:67) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) > at java.lang.Thread.run(Thread.java:745) > Unexpected error in join group response: The server experienced an unexpected > error when processing the request > org.apache.kafka.common.KafkaException: Unexpected error in join group > response: The server experienced an unexpected error when processing the > request > at > org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362) > at > org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311) > at > org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:703) > at > org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:677) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:163) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:129) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:105) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:293) > at org.apache.kafka.cli
[jira] [Commented] (KAFKA-2413) New consumer's subscribe(Topic...) api fails if called more than once
[ https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14661329#comment-14661329 ] Ashish K Singh commented on KAFKA-2413: --- I am not sure if posting patches to issues other person is working on is a good idea. It is discouraging, that is all I can see. There has to be a reason why "Assignee" field is present in JIRA. I guess it will be a moot point to discuss about it anymore as the fix is already posted. Feel free to assign yourself to the JIRA. I will review the patch posted by you as I have already spent some time to find the issue and to try to fix it. I just hope it does not happen again. > New consumer's subscribe(Topic...) api fails if called more than once > - > > Key: KAFKA-2413 > URL: https://issues.apache.org/jira/browse/KAFKA-2413 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: Ashish K Singh >Assignee: Ashish K Singh > > I believe new consumer is supposed to allow adding to existing topic > subscriptions. If it is then the issue is that on trying to subscribe to a > topic when consumer is already subscribed to a topic, below exception is > thrown. > {code} > [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null > (kafka.server.KafkaApis:103) > java.util.NoSuchElementException: key not found: topic > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:58) > at scala.collection.MapLike$class.apply(MapLike.scala:141) > at scala.collection.AbstractMap.apply(Map.scala:58) > at > kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109) > at > kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) > at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108) > at > kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378) > at > kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360) > at > kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414) > at > kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39) > at > kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72) > at > kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37) > at > kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388) > at > kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37) > at > kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307) > at > kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227) > at > kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186) > at > kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131) > at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578) > at kafka.server.KafkaApis.handle(KafkaApis.scala:67) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) > at java.lang.Thread.run(Thread.java:745) > Unexpected error in join group response: The server experienced an unexpected > error when processing the request > org.apache.kafka.common.KafkaException: Unexpected error in join group > response: The server experienced an unexpected error when processing the > request > at > org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362) > at > org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311) > at > org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:703) > at > org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:677) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:163) > at > org.apache.kafka.clients
[jira] [Commented] (KAFKA-2413) New consumer's subscribe(Topic...) api fails if called more than once
[ https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14661319#comment-14661319 ] Onur Karaman commented on KAFKA-2413: - Aand I just saw this. My bad. > New consumer's subscribe(Topic...) api fails if called more than once > - > > Key: KAFKA-2413 > URL: https://issues.apache.org/jira/browse/KAFKA-2413 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: Ashish K Singh >Assignee: Ashish K Singh > > I believe new consumer is supposed to allow adding to existing topic > subscriptions. If it is then the issue is that on trying to subscribe to a > topic when consumer is already subscribed to a topic, below exception is > thrown. > {code} > [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null > (kafka.server.KafkaApis:103) > java.util.NoSuchElementException: key not found: topic > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:58) > at scala.collection.MapLike$class.apply(MapLike.scala:141) > at scala.collection.AbstractMap.apply(Map.scala:58) > at > kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109) > at > kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) > at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108) > at > kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378) > at > kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360) > at > kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414) > at > kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39) > at > kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72) > at > kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37) > at > kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388) > at > kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37) > at > kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307) > at > kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227) > at > kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186) > at > kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131) > at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578) > at kafka.server.KafkaApis.handle(KafkaApis.scala:67) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) > at java.lang.Thread.run(Thread.java:745) > Unexpected error in join group response: The server experienced an unexpected > error when processing the request > org.apache.kafka.common.KafkaException: Unexpected error in join group > response: The server experienced an unexpected error when processing the > request > at > org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362) > at > org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311) > at > org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:703) > at > org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:677) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:163) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:129) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:105) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:293) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:237) > at > org.apache.kafka.clients.consum
[jira] [Commented] (KAFKA-2413) New consumer's subscribe(Topic...) api fails if called more than once
[ https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14661317#comment-14661317 ] ASF GitHub Bot commented on KAFKA-2413: --- GitHub user onurkaraman opened a pull request: https://github.com/apache/kafka/pull/122 KAFKA-2413: fix ConsumerCoordinator updateConsumer You can merge this pull request into a Git repository by running: $ git pull https://github.com/onurkaraman/kafka KAFKA-2413 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/122.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #122 commit 073dc4b716594880de4fb58c8832f02dd3792683 Author: Onur Karaman Date: 2015-08-07T04:49:53Z fix ConsumerCoordinator updateConsumer > New consumer's subscribe(Topic...) api fails if called more than once > - > > Key: KAFKA-2413 > URL: https://issues.apache.org/jira/browse/KAFKA-2413 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: Ashish K Singh >Assignee: Ashish K Singh > > I believe new consumer is supposed to allow adding to existing topic > subscriptions. If it is then the issue is that on trying to subscribe to a > topic when consumer is already subscribed to a topic, below exception is > thrown. > {code} > [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null > (kafka.server.KafkaApis:103) > java.util.NoSuchElementException: key not found: topic > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:58) > at scala.collection.MapLike$class.apply(MapLike.scala:141) > at scala.collection.AbstractMap.apply(Map.scala:58) > at > kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109) > at > kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) > at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108) > at > kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378) > at > kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360) > at > kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414) > at > kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39) > at > kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72) > at > kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37) > at > kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388) > at > kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37) > at > kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307) > at > kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227) > at > kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186) > at > kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131) > at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578) > at kafka.server.KafkaApis.handle(KafkaApis.scala:67) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) > at java.lang.Thread.run(Thread.java:745) > Unexpected error in join group response: The server experienced an unexpected > error when processing the request > org.apache.kafka.common.KafkaException: Unexpected error in join group > response: The server experienced an unexpected error when processing the > request > at > org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362) > at > org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311) > at > org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:703) > at > org.apache.kafka.clients.cons
[jira] [Commented] (KAFKA-2413) New consumer's subscribe(Topic...) api fails if called more than once
[ https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14661294#comment-14661294 ] Ashish K Singh commented on KAFKA-2413: --- Hi [~onurkaraman], sorry for not explicitly saying this, but I am working on a patch already. Thanks for your help! > New consumer's subscribe(Topic...) api fails if called more than once > - > > Key: KAFKA-2413 > URL: https://issues.apache.org/jira/browse/KAFKA-2413 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: Ashish K Singh >Assignee: Ashish K Singh > > I believe new consumer is supposed to allow adding to existing topic > subscriptions. If it is then the issue is that on trying to subscribe to a > topic when consumer is already subscribed to a topic, below exception is > thrown. > {code} > [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null > (kafka.server.KafkaApis:103) > java.util.NoSuchElementException: key not found: topic > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:58) > at scala.collection.MapLike$class.apply(MapLike.scala:141) > at scala.collection.AbstractMap.apply(Map.scala:58) > at > kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109) > at > kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) > at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108) > at > kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378) > at > kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360) > at > kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414) > at > kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39) > at > kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72) > at > kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37) > at > kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388) > at > kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37) > at > kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307) > at > kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227) > at > kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186) > at > kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131) > at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578) > at kafka.server.KafkaApis.handle(KafkaApis.scala:67) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) > at java.lang.Thread.run(Thread.java:745) > Unexpected error in join group response: The server experienced an unexpected > error when processing the request > org.apache.kafka.common.KafkaException: Unexpected error in join group > response: The server experienced an unexpected error when processing the > request > at > org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362) > at > org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311) > at > org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:703) > at > org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:677) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:163) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:129) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:105) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:293) > at org.apache.kafka.clients.N
[jira] [Commented] (KAFKA-2413) New consumer's subscribe(Topic...) api fails if called more than once
[ https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14661246#comment-14661246 ] Onur Karaman commented on KAFKA-2413: - I'll have the patch ready later today. > New consumer's subscribe(Topic...) api fails if called more than once > - > > Key: KAFKA-2413 > URL: https://issues.apache.org/jira/browse/KAFKA-2413 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: Ashish K Singh >Assignee: Ashish K Singh > > I believe new consumer is supposed to allow adding to existing topic > subscriptions. If it is then the issue is that on trying to subscribe to a > topic when consumer is already subscribed to a topic, below exception is > thrown. > {code} > [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null > (kafka.server.KafkaApis:103) > java.util.NoSuchElementException: key not found: topic > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:58) > at scala.collection.MapLike$class.apply(MapLike.scala:141) > at scala.collection.AbstractMap.apply(Map.scala:58) > at > kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109) > at > kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) > at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108) > at > kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378) > at > kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360) > at > kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414) > at > kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39) > at > kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72) > at > kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37) > at > kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388) > at > kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37) > at > kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307) > at > kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227) > at > kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186) > at > kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131) > at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578) > at kafka.server.KafkaApis.handle(KafkaApis.scala:67) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) > at java.lang.Thread.run(Thread.java:745) > Unexpected error in join group response: The server experienced an unexpected > error when processing the request > org.apache.kafka.common.KafkaException: Unexpected error in join group > response: The server experienced an unexpected error when processing the > request > at > org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362) > at > org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311) > at > org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:703) > at > org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:677) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:163) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:129) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:105) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:293) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:237) > at > org.apache.kafka.clients.c
[jira] [Commented] (KAFKA-2413) New consumer's subscribe(Topic...) api fails if called more than once
[ https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14661242#comment-14661242 ] Onur Karaman commented on KAFKA-2413: - Okay I just tested it out. It seems to have fixed the bug. > New consumer's subscribe(Topic...) api fails if called more than once > - > > Key: KAFKA-2413 > URL: https://issues.apache.org/jira/browse/KAFKA-2413 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: Ashish K Singh >Assignee: Ashish K Singh > > I believe new consumer is supposed to allow adding to existing topic > subscriptions. If it is then the issue is that on trying to subscribe to a > topic when consumer is already subscribed to a topic, below exception is > thrown. > {code} > [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null > (kafka.server.KafkaApis:103) > java.util.NoSuchElementException: key not found: topic > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:58) > at scala.collection.MapLike$class.apply(MapLike.scala:141) > at scala.collection.AbstractMap.apply(Map.scala:58) > at > kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109) > at > kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) > at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108) > at > kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378) > at > kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360) > at > kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414) > at > kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39) > at > kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72) > at > kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37) > at > kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388) > at > kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37) > at > kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307) > at > kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227) > at > kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186) > at > kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131) > at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578) > at kafka.server.KafkaApis.handle(KafkaApis.scala:67) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) > at java.lang.Thread.run(Thread.java:745) > Unexpected error in join group response: The server experienced an unexpected > error when processing the request > org.apache.kafka.common.KafkaException: Unexpected error in join group > response: The server experienced an unexpected error when processing the > request > at > org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362) > at > org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311) > at > org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:703) > at > org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:677) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:163) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:129) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:105) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:293) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:237) > at > org.ap
[jira] [Commented] (KAFKA-2413) New consumer's subscribe(Topic...) api fails if called more than once
[ https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14661227#comment-14661227 ] Onur Karaman commented on KAFKA-2413: - Woops! I haven't tried it out yet, but I think the fix is: {code} private def updateConsumer(group: ConsumerGroupMetadata, consumer: ConsumerMetadata, topics: Set[String]) { val topicsToBind = topics -- group.topics group.remove(consumer.consumerId) val topicsToUnbind = consumer.topics -- (group.topics ++ topics) group.add(consumer.consumerId, consumer) consumer.topics = topics coordinatorMetadata.bindAndUnbindGroupFromTopics(group.groupId, topicsToBind, topicsToUnbind) } {code} > New consumer's subscribe(Topic...) api fails if called more than once > - > > Key: KAFKA-2413 > URL: https://issues.apache.org/jira/browse/KAFKA-2413 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: Ashish K Singh >Assignee: Ashish K Singh > > I believe new consumer is supposed to allow adding to existing topic > subscriptions. If it is then the issue is that on trying to subscribe to a > topic when consumer is already subscribed to a topic, below exception is > thrown. > {code} > [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null > (kafka.server.KafkaApis:103) > java.util.NoSuchElementException: key not found: topic > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:58) > at scala.collection.MapLike$class.apply(MapLike.scala:141) > at scala.collection.AbstractMap.apply(Map.scala:58) > at > kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109) > at > kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) > at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108) > at > kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378) > at > kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360) > at > kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414) > at > kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39) > at > kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72) > at > kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37) > at > kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388) > at > kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37) > at > kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307) > at > kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227) > at > kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186) > at > kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131) > at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578) > at kafka.server.KafkaApis.handle(KafkaApis.scala:67) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) > at java.lang.Thread.run(Thread.java:745) > Unexpected error in join group response: The server experienced an unexpected > error when processing the request > org.apache.kafka.common.KafkaException: Unexpected error in join group > response: The server experienced an unexpected error when processing the > request > at > org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362) > at > org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311) > at > org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:703) > at > org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:677) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:163) > at >
[jira] [Commented] (KAFKA-2413) New consumer's subscribe(Topic...) api fails if called more than once
[ https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14661177#comment-14661177 ] Ashish K Singh commented on KAFKA-2413: --- [~hachikuji] I was planning to play with it to get more accustomed with the new consumer's intricacies. If you have not already worked out a patch, then is it OK I try to fix it by tonight? > New consumer's subscribe(Topic...) api fails if called more than once > - > > Key: KAFKA-2413 > URL: https://issues.apache.org/jira/browse/KAFKA-2413 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: Ashish K Singh >Assignee: Ashish K Singh > > I believe new consumer is supposed to allow adding to existing topic > subscriptions. If it is then the issue is that on trying to subscribe to a > topic when consumer is already subscribed to a topic, below exception is > thrown. > {code} > [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null > (kafka.server.KafkaApis:103) > java.util.NoSuchElementException: key not found: topic > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:58) > at scala.collection.MapLike$class.apply(MapLike.scala:141) > at scala.collection.AbstractMap.apply(Map.scala:58) > at > kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109) > at > kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) > at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108) > at > kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378) > at > kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360) > at > kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414) > at > kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39) > at > kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72) > at > kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37) > at > kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388) > at > kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37) > at > kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307) > at > kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227) > at > kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186) > at > kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131) > at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578) > at kafka.server.KafkaApis.handle(KafkaApis.scala:67) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) > at java.lang.Thread.run(Thread.java:745) > Unexpected error in join group response: The server experienced an unexpected > error when processing the request > org.apache.kafka.common.KafkaException: Unexpected error in join group > response: The server experienced an unexpected error when processing the > request > at > org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362) > at > org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311) > at > org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:703) > at > org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:677) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:163) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:129) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:105) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onCom
[jira] [Commented] (KAFKA-2413) New consumer's subscribe(Topic...) api fails if called more than once
[ https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14661167#comment-14661167 ] Jason Gustafson commented on KAFKA-2413: [~singhashish] I can attempt a patch if you want. > New consumer's subscribe(Topic...) api fails if called more than once > - > > Key: KAFKA-2413 > URL: https://issues.apache.org/jira/browse/KAFKA-2413 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: Ashish K Singh >Assignee: Ashish K Singh > > I believe new consumer is supposed to allow adding to existing topic > subscriptions. If it is then the issue is that on trying to subscribe to a > topic when consumer is already subscribed to a topic, below exception is > thrown. > {code} > [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null > (kafka.server.KafkaApis:103) > java.util.NoSuchElementException: key not found: topic > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:58) > at scala.collection.MapLike$class.apply(MapLike.scala:141) > at scala.collection.AbstractMap.apply(Map.scala:58) > at > kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109) > at > kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) > at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108) > at > kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378) > at > kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360) > at > kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414) > at > kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39) > at > kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72) > at > kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37) > at > kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388) > at > kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37) > at > kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307) > at > kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227) > at > kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186) > at > kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131) > at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578) > at kafka.server.KafkaApis.handle(KafkaApis.scala:67) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) > at java.lang.Thread.run(Thread.java:745) > Unexpected error in join group response: The server experienced an unexpected > error when processing the request > org.apache.kafka.common.KafkaException: Unexpected error in join group > response: The server experienced an unexpected error when processing the > request > at > org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362) > at > org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311) > at > org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:703) > at > org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:677) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:163) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:129) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:105) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:293) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:237) > at > org.apach
[jira] [Commented] (KAFKA-2413) New consumer's subscribe(Topic...) api fails if called more than once
[ https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14661160#comment-14661160 ] Jason Gustafson commented on KAFKA-2413: I think the issue is in here: {code} private def updateConsumer(group: ConsumerGroupMetadata, consumer: ConsumerMetadata, topics: Set[String]) { val topicsToBind = topics -- group.topics group.remove(consumer.consumerId) val topicsToUnbind = consumer.topics -- group.topics group.add(consumer.consumerId, consumer) consumer.topics = topics coordinatorMetadata.bindAndUnbindGroupFromTopics(group.groupId, topicsToBind, topicsToUnbind) } {code} In particular, it looks like topicsToUnbind is taking the difference in the wrong order. But I'm not sure that just reversing that is correct either since we'd only wan to unbind the topic if no other consumers in the group are subscribed. > New consumer's subscribe(Topic...) api fails if called more than once > - > > Key: KAFKA-2413 > URL: https://issues.apache.org/jira/browse/KAFKA-2413 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: Ashish K Singh >Assignee: Ashish K Singh > > I believe new consumer is supposed to allow adding to existing topic > subscriptions. If it is then the issue is that on trying to subscribe to a > topic when consumer is already subscribed to a topic, below exception is > thrown. > {code} > [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null > (kafka.server.KafkaApis:103) > java.util.NoSuchElementException: key not found: topic > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:58) > at scala.collection.MapLike$class.apply(MapLike.scala:141) > at scala.collection.AbstractMap.apply(Map.scala:58) > at > kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109) > at > kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) > at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108) > at > kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378) > at > kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360) > at > kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414) > at > kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39) > at > kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72) > at > kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37) > at > kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388) > at > kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37) > at > kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307) > at > kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227) > at > kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186) > at > kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131) > at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578) > at kafka.server.KafkaApis.handle(KafkaApis.scala:67) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) > at java.lang.Thread.run(Thread.java:745) > Unexpected error in join group response: The server experienced an unexpected > error when processing the request > org.apache.kafka.common.KafkaException: Unexpected error in join group > response: The server experienced an unexpected error when processing the > request > at > org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362) > at > org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311) > at > org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:703) > at > org.apache.kafka.clients
[jira] [Commented] (KAFKA-2413) New consumer's subscribe(Topic...) api fails if called more than once
[ https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14661157#comment-14661157 ] Ashish K Singh commented on KAFKA-2413: --- Thanks guys for quick response! Below is a test case that can repro the issue. {code} def testRepetitiveTopicSubscription() { val numRecords = 1 sendRecords(numRecords) this.consumers(0).subscribe("topic") TestUtils.waitUntilTrue(() => { this.consumers(0).poll(50) this.consumers(0).subscriptions.size == 2 }, "Could not find expected number of subscriptions") TestUtils.createTopic(this.zkClient, "tblablac", 2, serverCount, this.servers) sendRecords(1000, new TopicPartition("tblablac", 0)) sendRecords(1000, new TopicPartition("tblablac", 1)) this.consumers(0).subscribe("tblablac") TestUtils.waitUntilTrue(() => { this.consumers(0).poll(50) this.consumers(0).subscriptions.size == 4 }, "Could not find expected number of subscriptions") } {code} I guess another interesting problem to solve. > New consumer's subscribe(Topic...) api fails if called more than once > - > > Key: KAFKA-2413 > URL: https://issues.apache.org/jira/browse/KAFKA-2413 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: Ashish K Singh >Assignee: Ashish K Singh > > I believe new consumer is supposed to allow adding to existing topic > subscriptions. If it is then the issue is that on trying to subscribe to a > topic when consumer is already subscribed to a topic, below exception is > thrown. > {code} > [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null > (kafka.server.KafkaApis:103) > java.util.NoSuchElementException: key not found: topic > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:58) > at scala.collection.MapLike$class.apply(MapLike.scala:141) > at scala.collection.AbstractMap.apply(Map.scala:58) > at > kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109) > at > kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) > at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108) > at > kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378) > at > kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360) > at > kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414) > at > kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39) > at > kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72) > at > kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37) > at > kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388) > at > kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37) > at > kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307) > at > kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227) > at > kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186) > at > kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131) > at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578) > at kafka.server.KafkaApis.handle(KafkaApis.scala:67) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) > at java.lang.Thread.run(Thread.java:745) > Unexpected error in join group response: The server experienced an unexpected > error when processing the request > org.apache.kafka.common.KafkaException: Unexpected error in join group > response: The server experienced an unexpected error when processing the > request > at > org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362) > at > org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java
[jira] [Commented] (KAFKA-2413) New consumer's subscribe(Topic...) api fails if called more than once
[ https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14661135#comment-14661135 ] Jason Gustafson commented on KAFKA-2413: [~onurkaraman] I was able to reproduce this error on trunk by subscribing to a second topic while in the consumer's poll loop. It looks like the error might be related to how the server manages group topics. > New consumer's subscribe(Topic...) api fails if called more than once > - > > Key: KAFKA-2413 > URL: https://issues.apache.org/jira/browse/KAFKA-2413 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: Ashish K Singh >Assignee: Ashish K Singh > > I believe new consumer is supposed to allow adding to existing topic > subscriptions. If it is then the issue is that on trying to subscribe to a > topic when consumer is already subscribed to a topic, below exception is > thrown. > {code} > [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null > (kafka.server.KafkaApis:103) > java.util.NoSuchElementException: key not found: topic > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:58) > at scala.collection.MapLike$class.apply(MapLike.scala:141) > at scala.collection.AbstractMap.apply(Map.scala:58) > at > kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109) > at > kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) > at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108) > at > kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378) > at > kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360) > at > kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414) > at > kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39) > at > kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72) > at > kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37) > at > kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388) > at > kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37) > at > kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307) > at > kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227) > at > kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186) > at > kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131) > at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578) > at kafka.server.KafkaApis.handle(KafkaApis.scala:67) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) > at java.lang.Thread.run(Thread.java:745) > Unexpected error in join group response: The server experienced an unexpected > error when processing the request > org.apache.kafka.common.KafkaException: Unexpected error in join group > response: The server experienced an unexpected error when processing the > request > at > org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362) > at > org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311) > at > org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:703) > at > org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:677) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:163) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:129) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:105) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCom
[jira] [Commented] (KAFKA-2413) New consumer's subscribe(Topic...) api fails if called more than once
[ https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14661110#comment-14661110 ] Onur Karaman commented on KAFKA-2413: - Couple questions to start things off: 1. What is the git hash of kafka you're running on the brokers and consumers? 2. How can I reproduce this? What does your consumer code look like? More specifically, can you go into more detail on the "called more than once" part? > New consumer's subscribe(Topic...) api fails if called more than once > - > > Key: KAFKA-2413 > URL: https://issues.apache.org/jira/browse/KAFKA-2413 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: Ashish K Singh >Assignee: Ashish K Singh > > I believe new consumer is supposed to allow adding to existing topic > subscriptions. If it is then the issue is that on trying to subscribe to a > topic when consumer is already subscribed to a topic, below exception is > thrown. > {code} > [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null > (kafka.server.KafkaApis:103) > java.util.NoSuchElementException: key not found: topic > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:58) > at scala.collection.MapLike$class.apply(MapLike.scala:141) > at scala.collection.AbstractMap.apply(Map.scala:58) > at > kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109) > at > kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) > at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108) > at > kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378) > at > kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360) > at > kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414) > at > kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39) > at > kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72) > at > kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37) > at > kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388) > at > kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37) > at > kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307) > at > kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227) > at > kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186) > at > kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131) > at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578) > at kafka.server.KafkaApis.handle(KafkaApis.scala:67) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) > at java.lang.Thread.run(Thread.java:745) > Unexpected error in join group response: The server experienced an unexpected > error when processing the request > org.apache.kafka.common.KafkaException: Unexpected error in join group > response: The server experienced an unexpected error when processing the > request > at > org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362) > at > org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311) > at > org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:703) > at > org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:677) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:163) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:129) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:105) > at > org.apache.kafka.clients.con
[jira] [Commented] (KAFKA-2413) New consumer's subscribe(Topic...) api fails if called more than once
[ https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14661094#comment-14661094 ] Ashish K Singh commented on KAFKA-2413: --- [~hachikuji] thoughts? > New consumer's subscribe(Topic...) api fails if called more than once > - > > Key: KAFKA-2413 > URL: https://issues.apache.org/jira/browse/KAFKA-2413 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: Ashish K Singh >Assignee: Ashish K Singh > > I believe new consumer is supposed to allow adding to existing topic > subscriptions. If it is then the issue is that on trying to subscribe to a > topic when consumer is already subscribed to a topic, below exception is > thrown. > {code} > [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null > (kafka.server.KafkaApis:103) > java.util.NoSuchElementException: key not found: topic > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:58) > at scala.collection.MapLike$class.apply(MapLike.scala:141) > at scala.collection.AbstractMap.apply(Map.scala:58) > at > kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109) > at > kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) > at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108) > at > kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378) > at > kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360) > at > kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414) > at > kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39) > at > kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72) > at > kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37) > at > kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388) > at > kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37) > at > kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307) > at > kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227) > at > kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186) > at > kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131) > at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578) > at kafka.server.KafkaApis.handle(KafkaApis.scala:67) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) > at java.lang.Thread.run(Thread.java:745) > Unexpected error in join group response: The server experienced an unexpected > error when processing the request > org.apache.kafka.common.KafkaException: Unexpected error in join group > response: The server experienced an unexpected error when processing the > request > at > org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362) > at > org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311) > at > org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:703) > at > org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:677) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:163) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:129) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:105) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:293) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:237) > at > org.apache.kafka.clients.consumer.inte