[ 
https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14661160#comment-14661160
 ] 

Jason Gustafson commented on KAFKA-2413:
----------------------------------------

I think the issue is in here:

{code}
  private def updateConsumer(group: ConsumerGroupMetadata, consumer: 
ConsumerMetadata, topics: Set[String]) {
    val topicsToBind = topics -- group.topics
    group.remove(consumer.consumerId)
    val topicsToUnbind = consumer.topics -- group.topics
    group.add(consumer.consumerId, consumer)
    consumer.topics = topics
    coordinatorMetadata.bindAndUnbindGroupFromTopics(group.groupId, 
topicsToBind, topicsToUnbind)
  }
{code}

In particular, it looks like topicsToUnbind is taking the difference in the 
wrong order. But I'm not sure that just reversing that is correct either since 
we'd only wan to unbind the topic if no other consumers in the group are 
subscribed.

> New consumer's subscribe(Topic...) api fails if called more than once
> ---------------------------------------------------------------------
>
>                 Key: KAFKA-2413
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2413
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>            Reporter: Ashish K Singh
>            Assignee: Ashish K Singh
>
> I believe new consumer is supposed to allow adding to existing topic 
> subscriptions. If it is then the issue is that on trying to subscribe to a 
> topic when consumer is already subscribed to a topic, below exception is 
> thrown.
> {code}
> [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null 
> (kafka.server.KafkaApis:103)
> java.util.NoSuchElementException: key not found: topic
>       at scala.collection.MapLike$class.default(MapLike.scala:228)
>       at scala.collection.AbstractMap.default(Map.scala:58)
>       at scala.collection.MapLike$class.apply(MapLike.scala:141)
>       at scala.collection.AbstractMap.apply(Map.scala:58)
>       at 
> kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109)
>       at 
> kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108)
>       at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>       at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>       at 
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>       at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>       at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108)
>       at 
> kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378)
>       at 
> kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360)
>       at 
> kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414)
>       at 
> kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39)
>       at 
> kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72)
>       at 
> kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37)
>       at 
> kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388)
>       at 
> kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37)
>       at 
> kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307)
>       at 
> kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227)
>       at 
> kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186)
>       at 
> kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131)
>       at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578)
>       at kafka.server.KafkaApis.handle(KafkaApis.scala:67)
>       at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>       at java.lang.Thread.run(Thread.java:745)
> Unexpected error in join group response: The server experienced an unexpected 
> error when processing the request
> org.apache.kafka.common.KafkaException: Unexpected error in join group 
> response: The server experienced an unexpected error when processing the 
> request
>       at 
> org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362)
>       at 
> org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311)
>       at 
> org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:703)
>       at 
> org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:677)
>       at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:163)
>       at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:129)
>       at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:105)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:293)
>       at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:237)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:274)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:182)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:145)
>       at 
> org.apache.kafka.clients.consumer.internals.Coordinator.reassignPartitions(Coordinator.java:197)
>       at 
> org.apache.kafka.clients.consumer.internals.Coordinator.ensurePartitionAssignment(Coordinator.java:172)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:764)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:725)
>       at 
> kafka.api.ConsumerTest$$anonfun$testRepetitiveTopicSubscription$2.apply$mcZ$sp(ConsumerTest.scala:80)
>       at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:616)
>       at 
> kafka.api.ConsumerTest.testRepetitiveTopicSubscription(ConsumerTest.scala:79)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:606)
>       at junit.framework.TestCase.runTest(TestCase.java:168)
>       at junit.framework.TestCase.runBare(TestCase.java:134)
>       at junit.framework.TestResult$1.protect(TestResult.java:110)
>       at junit.framework.TestResult.runProtected(TestResult.java:128)
>       at junit.framework.TestResult.run(TestResult.java:113)
>       at junit.framework.TestCase.run(TestCase.java:124)
>       at junit.framework.TestSuite.runTest(TestSuite.java:232)
>       at junit.framework.TestSuite.run(TestSuite.java:227)
>       at org.scalatest.junit.JUnit3Suite.run(JUnit3Suite.scala:309)
>       at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:55)
>       at 
> org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2563)
>       at 
> org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2557)
>       at scala.collection.immutable.List.foreach(List.scala:318)
>       at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:2557)
>       at 
> org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1044)
>       at 
> org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1043)
>       at 
> org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:2722)
>       at 
> org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1043)
>       at org.scalatest.tools.Runner$.run(Runner.scala:883)
>       at org.scalatest.tools.Runner.run(Runner.scala)
>       at 
> org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:138)
>       at 
> org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:606)
>       at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to