Guozhang,

No, I don't think the patch of KAFKA-2056 would fix this problem. The NPE
is thrown at the line that is called before the fix executes. But I do
notice that the code in trunk did fix the issue by ensuring the size of map
returned from ctx.consumersForTopic is > 0. So the code in trunk is safe.

On Wed, Apr 15, 2015 at 3:45 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Tao,
>
> Do you think the solution to KAFKA-2056 will resolve this issue? It will be
> included in 0.8.3 release.
>
> Guozhang
>
> On Wed, Apr 15, 2015 at 2:21 PM, tao xiao <xiaotao...@gmail.com> wrote:
>
> > Hi team,
> >
> > I discovered an issue that when a high level consumer with roundrobin
> > assignment strategy consumes a topic that hasn't been created on broker a
> > NPE exception is thrown during partition rebalancing phase. I use Kafka
> > 0.8.2.1
> >
> > Here is the step to reproduce:
> >
> > 1. create a high level consumer with roundrobin
> > 2. use connector.createMessageStreamsByFilter to create a message stream
> in
> > the consumer to a topic that yet to be created on broker
> >
> > Below is the exception.
> >
> > 2015-04-15 14:16:46 INFO  kafka.utils.Logging$class:68 -
> > [test12345667fffff_localhost], Committing all offsets after clearing the
> > fetcher queues
> > 2015-04-15 14:16:46 INFO  kafka.utils.Logging$class:68 -
> > [test12345667fffff_localhost], Releasing partition ownership
> > 2015-04-15 14:16:46 INFO  kafka.utils.Logging$class:76 -
> > [test12345667fffff_localhost], exception during rebalance
> > java.lang.NullPointerException
> > at scala.collection.mutable.HashTable$$anon$1.next(HashTable.scala:210)
> > at scala.collection.mutable.HashTable$$anon$1.next(HashTable.scala:202)
> > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> > at scala.collection.IterableLike$class.head(IterableLike.scala:91)
> > at scala.collection.AbstractIterable.head(Iterable.scala:54)
> > at kafka.consumer.RoundRobinAssignor.assign(PartitionAssignor.scala:75)
> > at kafka.consumer.RoundRobinAssignor.assign(PartitionAssignor.scala:69)
> > at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:660)
> > at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:608)
> > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
> > at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:602)
> > at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:599)
> > at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:599)
> > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:598)
> > at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:905)
> > at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:939)
> > at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:160)
> > at
> >
> >
> kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:101)
> > at
> >
> >
> kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:105)
> > at com.ebay.kafka.demo.Consumer.main(Consumer.java:61)
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > at
> >
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> > at
> >
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > at java.lang.reflect.Method.invoke(Method.java:606)
> > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
> >
> > --
> > Regards,
> > Tao
> >
>
>
>
> --
> -- Guozhang
>



-- 
Regards,
Tao

Reply via email to