If Storm 0.9.2. uses the SimpleConsumer, this could be a bug in the leader
fault tolerance logic in Storm itself. The high level consumer
(ZookeeperConsumerConnector) handles this automatically for you.

On Thu, Sep 4, 2014 at 8:31 AM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hi Siddharth,
>
> The error log shows that the kafka broker the consumer talks to is no
> longer hosting the leader replica of the topic-partition, is there any
> broker failures during that time (you may find it in server and controller
> logs)?
>
> Guozhang
>
>
>
> On Thu, Sep 4, 2014 at 12:04 AM, siddharth ubale <
> siddharth.ub...@gmail.com>
> wrote:
>
> > Hi,
> >
> > I am using a kafka spout to read data into Storm 0.9.2.
> > whenever i run my program and check the console consumer, i can see that
> > there is an aerror saying
> > *" Failed to add leader for partitions [Json1234,0]; will retry
> > (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread"*
> > along with a log :
> > kafka.common.NotLeaderForPartitionException
> > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> >  at
> >
> >
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> > at
> >
> >
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> >  at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> > at java.lang.Class.newInstance(Class.java:374)
> >  at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:73)
> > at
> >
> >
> kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:160)
> >  at
> >
> >
> kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:60)
> > at
> >
> >
> kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:179)
> >  at
> >
> >
> kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:174)
> > at
> >
> >
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> >  at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
> > at
> >
> >
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> >  at
> >
> >
> kafka.server.AbstractFetcherThread.addPartitions(AbstractFetcherThread.scala:174)
> > at
> >
> >
> kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:86)
> >  at
> >
> >
> kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:76)
> > at
> >
> >
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> >  at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
> > at
> >
> >
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> >  at
> >
> >
> kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:76)
> > at
> >
> >
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:95)
> >  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > ^C[2014-09-04 12:27:45,186] WARN Reconnect due to socket error: null
> > (kafka.consumer.SimpleConsumer)
> > [2014-09-04 12:27:45,223] WARN Reconnect due to socket error: null
> > (kafka.consumer.SimpleConsumer)
> > [2014-09-04 12:27:45,237] WARN Reconnect due to socket error: null
> > (kafka.consumer.SimpleConsumer)
> >
> > has any one encountered this issue.
> > I am using *kafka_2.10-0.8.1.1 and storm 0.9.2 incubating with zookeeper
> > 3.4.6.*
> >
> > *do let me know. also i see this very often whenever i have restarted my
> > topic .*
> >
> > Thanks,
> > Siddharth ubale
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to