hi foo it seems your stack showed exception was caused by kafka itself > Failed to add leader for partitions
I have used kafka sink and source of flume 1.6 for several weeks and it works well. Could you please try to use kafka console producer first to test if the partitionis okay or not? Frank Yao @Vipshop, Shanghai from iPhone > 在 2014年12月30日,04:21,Foo Lim <[email protected]> 写道: > > BTW, I followed the directions & ran > > ~/flume-ng-kafka-sink$ mvn clean install > >> On Mon, Dec 29, 2014 at 12:10 PM, Foo Lim <[email protected]> wrote: >> Hi Gwen, >> >> Thanks for the reply. >> >> I'll try the CDH jar file. Where do I put it in the flume directory >> structure? >> >> I have kafka_2.9.2-0.8.1.1 running. Here's the test error (which keeps >> repeating) in the project >> [email protected]:thilinamb/flume-ng-kafka-sink.git >> >> [2014-12-29 20:02:34,028] INFO Verifying properties >> (kafka.utils.VerifiableProperties) >> [2014-12-29 20:02:34,029] INFO Property client.id is overridden to >> group_1 (kafka.utils.VerifiableProperties) >> [2014-12-29 20:02:34,030] INFO Property metadata.broker.list is >> overridden to vagrant-ubuntu-precise-64:50753 >> (kafka.utils.VerifiableProperties) >> [2014-12-29 20:02:34,030] INFO Property request.timeout.ms is >> overridden to 30000 (kafka.utils.VerifiableProperties) >> [2014-12-29 20:02:34,031] INFO Fetching metadata from broker >> id:0,host:vagrant-ubuntu-precise-64,port:50753 with correlation id 18 >> for 1 topic(s) Set(custom-topic) (kafka.client.ClientUtils$) >> [2014-12-29 20:02:34,032] INFO Connected to >> vagrant-ubuntu-precise-64:50753 for producing >> (kafka.producer.SyncProducer) >> [2014-12-29 20:02:34,035] INFO Disconnecting from >> vagrant-ubuntu-precise-64:50753 (kafka.producer.SyncProducer) >> [2014-12-29 20:02:34,036] INFO Closing socket connection to >> /10.0.2.15. (kafka.network.Processor) >> [2014-12-29 20:02:34,038] WARN [KafkaApi-0] Offset request with >> correlation id 0 from client >> group_1-ConsumerFetcherThread-group_1_vagrant-ubuntu-precise-64-1419883349017-325c06d5-0-0 >> on partition [custom-topic,1] failed due to Leader not local for >> partition [custom-topic,1] on broker 0 (kafka.server.KafkaApis) >> [2014-12-29 20:02:34,040] WARN >> [group_1_vagrant-ubuntu-precise-64-1419883349017-325c06d5-leader-finder-thread], >> Failed to add leader for partitions [custom-topic,1],[custom-topic,0]; >> will retry (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread) >> kafka.common.NotLeaderForPartitionException >> at sun.reflect.GeneratedConstructorAccessor1.newInstance(Unknown Source) >> at >> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >> at java.lang.reflect.Constructor.newInstance(Constructor.java:526) >> at java.lang.Class.newInstance(Class.java:379) >> at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:73) >> at >> kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:160) >> at >> kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:60) >> at >> kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:179) >> at >> kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:174) >> at >> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) >> at scala.collection.immutable.Map$Map2.foreach(Map.scala:130) >> at >> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) >> at >> kafka.server.AbstractFetcherThread.addPartitions(AbstractFetcherThread.scala:174) >> at >> kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:86) >> at >> kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:76) >> at >> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) >> at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) >> at >> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) >> at >> kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:76) >> at >> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:95) >> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) >> [2014-12-29 20:02:34,045] INFO >> [ConsumerFetcherThread-group_1_vagrant-ubuntu-precise-64-1419883349017-325c06d5-0-0], >> Shutting down (kafka.consumer.ConsumerFetcherThread) >> [2014-12-29 20:02:34,039] INFO >> [ConsumerFetcherThread-group_1_vagrant-ubuntu-precise-64-1419883349017-325c06d5-0-0], >> Starting (kafka.consumer.ConsumerFetcherThread) >> [2014-12-29 20:02:34,046] INFO >> [ConsumerFetcherThread-group_1_vagrant-ubuntu-precise-64-1419883349017-325c06d5-0-0], >> Shutdown completed (kafka.consumer.ConsumerFetcherThread) >> [2014-12-29 20:02:34,047] INFO Closing socket connection to >> /10.0.2.15. (kafka.network.Processor) >> [2014-12-29 20:02:34,048] INFO >> [ConsumerFetcherThread-group_1_vagrant-ubuntu-precise-64-1419883349017-325c06d5-0-0], >> Stopped (kafka.consumer.ConsumerFetcherThread) >> >>> On Fri, Dec 26, 2014 at 4:06 PM, Gwen Shapira <[email protected]> wrote: >>> I can't say when's the 1.6 release, but I have other solutions :) >>> >>> 1. The packages that are part of CDH5.3 release will contain that jar. >>> Perhaps use this distro? Or even just get the RPM, unpackage and dig the jar >>> out? >>> 2. Let us know what's the compilation error, perhaps we can help there? >>> >>>> On Fri, Dec 26, 2014 at 3:30 PM, Foo Lim <[email protected]> wrote: >>>> >>>> Hi all, >>>> >>>> Happy holidays! Just wondering if there's any ETA on a 1.6 release. >>>> Looking forward to the kafka sink plugin that I can't get to compile >>>> independently. :-/ >>>> >>>> Thanks! >>> >>>
