[
https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Onur Karaman updated KAFKA-2413:
--------------------------------
Status: Patch Available (was: Open)
> New consumer's subscribe(Topic...) api fails if called more than once
> ---------------------------------------------------------------------
>
> Key: KAFKA-2413
> URL: https://issues.apache.org/jira/browse/KAFKA-2413
> Project: Kafka
> Issue Type: Bug
> Components: consumer
> Reporter: Ashish K Singh
> Assignee: Onur Karaman
>
> I believe new consumer is supposed to allow adding to existing topic
> subscriptions. If it is then the issue is that on trying to subscribe to a
> topic when consumer is already subscribed to a topic, below exception is
> thrown.
> {code}
> [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null
> (kafka.server.KafkaApis:103)
> java.util.NoSuchElementException: key not found: topic
> at scala.collection.MapLike$class.default(MapLike.scala:228)
> at scala.collection.AbstractMap.default(Map.scala:58)
> at scala.collection.MapLike$class.apply(MapLike.scala:141)
> at scala.collection.AbstractMap.apply(Map.scala:58)
> at
> kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109)
> at
> kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
> at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108)
> at
> kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378)
> at
> kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360)
> at
> kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414)
> at
> kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39)
> at
> kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72)
> at
> kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37)
> at
> kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388)
> at
> kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37)
> at
> kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307)
> at
> kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227)
> at
> kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186)
> at
> kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131)
> at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:67)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
> at java.lang.Thread.run(Thread.java:745)
> Unexpected error in join group response: The server experienced an unexpected
> error when processing the request
> org.apache.kafka.common.KafkaException: Unexpected error in join group
> response: The server experienced an unexpected error when processing the
> request
> at
> org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362)
> at
> org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311)
> at
> org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:703)
> at
> org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:677)
> at
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:163)
> at
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:129)
> at
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:105)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:293)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:237)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:274)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:182)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:145)
> at
> org.apache.kafka.clients.consumer.internals.Coordinator.reassignPartitions(Coordinator.java:197)
> at
> org.apache.kafka.clients.consumer.internals.Coordinator.ensurePartitionAssignment(Coordinator.java:172)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:764)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:725)
> at
> kafka.api.ConsumerTest$$anonfun$testRepetitiveTopicSubscription$2.apply$mcZ$sp(ConsumerTest.scala:80)
> at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:616)
> at
> kafka.api.ConsumerTest.testRepetitiveTopicSubscription(ConsumerTest.scala:79)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at junit.framework.TestCase.runTest(TestCase.java:168)
> at junit.framework.TestCase.runBare(TestCase.java:134)
> at junit.framework.TestResult$1.protect(TestResult.java:110)
> at junit.framework.TestResult.runProtected(TestResult.java:128)
> at junit.framework.TestResult.run(TestResult.java:113)
> at junit.framework.TestCase.run(TestCase.java:124)
> at junit.framework.TestSuite.runTest(TestSuite.java:232)
> at junit.framework.TestSuite.run(TestSuite.java:227)
> at org.scalatest.junit.JUnit3Suite.run(JUnit3Suite.scala:309)
> at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:55)
> at
> org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2563)
> at
> org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2557)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:2557)
> at
> org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1044)
> at
> org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1043)
> at
> org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:2722)
> at
> org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1043)
> at org.scalatest.tools.Runner$.run(Runner.scala:883)
> at org.scalatest.tools.Runner.run(Runner.scala)
> at
> org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:138)
> at
> org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)