Guozhang, No, I don't think the patch of KAFKA-2056 would fix this problem. The NPE is thrown at the line that is called before the fix executes. But I do notice that the code in trunk did fix the issue by ensuring the size of map returned from ctx.consumersForTopic is > 0. So the code in trunk is safe.
On Wed, Apr 15, 2015 at 3:45 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Hello Tao, > > Do you think the solution to KAFKA-2056 will resolve this issue? It will be > included in 0.8.3 release. > > Guozhang > > On Wed, Apr 15, 2015 at 2:21 PM, tao xiao <xiaotao...@gmail.com> wrote: > > > Hi team, > > > > I discovered an issue that when a high level consumer with roundrobin > > assignment strategy consumes a topic that hasn't been created on broker a > > NPE exception is thrown during partition rebalancing phase. I use Kafka > > 0.8.2.1 > > > > Here is the step to reproduce: > > > > 1. create a high level consumer with roundrobin > > 2. use connector.createMessageStreamsByFilter to create a message stream > in > > the consumer to a topic that yet to be created on broker > > > > Below is the exception. > > > > 2015-04-15 14:16:46 INFO kafka.utils.Logging$class:68 - > > [test12345667fffff_localhost], Committing all offsets after clearing the > > fetcher queues > > 2015-04-15 14:16:46 INFO kafka.utils.Logging$class:68 - > > [test12345667fffff_localhost], Releasing partition ownership > > 2015-04-15 14:16:46 INFO kafka.utils.Logging$class:76 - > > [test12345667fffff_localhost], exception during rebalance > > java.lang.NullPointerException > > at scala.collection.mutable.HashTable$$anon$1.next(HashTable.scala:210) > > at scala.collection.mutable.HashTable$$anon$1.next(HashTable.scala:202) > > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > > at scala.collection.IterableLike$class.head(IterableLike.scala:91) > > at scala.collection.AbstractIterable.head(Iterable.scala:54) > > at kafka.consumer.RoundRobinAssignor.assign(PartitionAssignor.scala:75) > > at kafka.consumer.RoundRobinAssignor.assign(PartitionAssignor.scala:69) > > at > > > > > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:660) > > at > > > > > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:608) > > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) > > at > > > > > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:602) > > at > > > > > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:599) > > at > > > > > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:599) > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > > at > > > > > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:598) > > at > > > > > kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:905) > > at > > > > > kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:939) > > at > > > > > kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:160) > > at > > > > > kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:101) > > at > > > > > kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:105) > > at com.ebay.kafka.demo.Consumer.main(Consumer.java:61) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > at > > > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > > at > > > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:606) > > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) > > > > -- > > Regards, > > Tao > > > > > > -- > -- Guozhang > -- Regards, Tao