Hi Frank, Thanks for the reply. I have other topics in kafka that are working:
~/kafka_2.9.2-0.8.1.1$ bin/kafka-topics.sh --zookeeper localhost --describe Topic:requests PartitionCount:1 ReplicationFactor:1 Configs: Topic: requests Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic:test PartitionCount:1 ReplicationFactor:1 Configs: Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0 $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test test tst1 $ bin/kafka-console-consumer.sh --zookeeper localhost --topic test --from-beginning test tst1 Am I supposed to create the 'custom-topic' with 2 partitions before running 'mvn clean install'? Let's try creating the 'custom-topic': $ bin/kafka-topics.sh --create --partitions 2 --topic custom-topic --zookeeper localhost --replication-factor 1 Created topic "custom-topic". $ bin/kafka-topics.sh --zookeeper localhost --describe --topic custom-topic Topic:custom-topic PartitionCount:2 ReplicationFactor:1 Configs: Topic: custom-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: custom-topic Partition: 1 Leader: 0 Replicas: 0 Isr: 0 $ cd ~/flume-ng-kafka-sink $ mvn clean install [...] [2014-12-30 02:34:38,520] INFO Fetching metadata from broker id:0,host:vagrant-ubuntu-precise-64,port:51064 with correlation id 26 for 1 topic(s) Set(custom-topic) (kafka.client.ClientUtils$) [2014-12-30 02:34:38,521] INFO Connected to vagrant-ubuntu-precise-64:51064 for producing (kafka.producer.SyncProducer) [2014-12-30 02:34:38,525] INFO Disconnecting from vagrant-ubuntu-precise-64:51064 (kafka.producer.SyncProducer) [2014-12-30 02:34:38,526] INFO Closing socket connection to /10.0.2.15. (kafka.network.Processor) [2014-12-30 02:34:38,528] WARN [KafkaApi-0] Offset request with correlation id 0 from client group_1-ConsumerFetcherThread-group_1_vagrant-ubuntu-precise-64-1419906871686-f7db9c60-0-0 on partition [custom-topic,1] failed due to Leader not local for partition [custom-topic,1] on broker 0 (kafka.server.KafkaApis) [2014-12-30 02:34:38,530] WARN [group_1_vagrant-ubuntu-precise-64-1419906871686-f7db9c60-leader-finder-thread], Failed to add leader for partitions [custom-topic,1],[custom-topic,0]; will retry (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread) kafka.common.NotLeaderForPartitionException at sun.reflect.GeneratedConstructorAccessor1.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at java.lang.Class.newInstance(Class.java:379) at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:73) at kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:160) at kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:60) at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:179) at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:174) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.immutable.Map$Map2.foreach(Map.scala:130) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at kafka.server.AbstractFetcherThread.addPartitions(AbstractFetcherThread.scala:174) at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:86) at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:76) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:76) at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:95) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) [2014-12-30 02:34:38,536] INFO [ConsumerFetcherThread-group_1_vagrant-ubuntu-precise-64-1419906871686-f7db9c60-0-0], Shutting down (kafka.consumer.ConsumerFetcherThread) [2014-12-30 02:34:38,528] INFO [ConsumerFetcherThread-group_1_vagrant-ubuntu-precise-64-1419906871686-f7db9c60-0-0], Starting (kafka.consumer.ConsumerFetcherThread) [2014-12-30 02:34:38,539] INFO [ConsumerFetcherThread-group_1_vagrant-ubuntu-precise-64-1419906871686-f7db9c60-0-0], Shutdown completed (kafka.consumer.ConsumerFetcherThread) On Mon, Dec 29, 2014 at 3:53 PM, Frank Yao <[email protected]> wrote: > hi foo > > it seems your stack showed exception was caused by kafka itself > > Failed to add leader for partitions > > > I have used kafka sink and source of flume 1.6 for several weeks and it > works well. > > Could you please try to use kafka console producer first to test if the > partitionis okay or not? > Frank Yao > @Vipshop, Shanghai > from iPhone > > 在 2014年12月30日,04:21,Foo Lim <[email protected]> 写道: > > BTW, I followed the directions & ran > > ~/flume-ng-kafka-sink$ mvn clean install > > On Mon, Dec 29, 2014 at 12:10 PM, Foo Lim <[email protected]> wrote: > > Hi Gwen, > > > Thanks for the reply. > > > I'll try the CDH jar file. Where do I put it in the flume directory > structure? > > > I have kafka_2.9.2-0.8.1.1 running. Here's the test error (which keeps > > repeating) in the project > > [email protected]:thilinamb/flume-ng-kafka-sink.git > > > [2014-12-29 20:02:34,028] INFO Verifying properties > > (kafka.utils.VerifiableProperties) > > [2014-12-29 20:02:34,029] INFO Property client.id is overridden to > > group_1 (kafka.utils.VerifiableProperties) > > [2014-12-29 20:02:34,030] INFO Property metadata.broker.list is > > overridden to vagrant-ubuntu-precise-64:50753 > > (kafka.utils.VerifiableProperties) > > [2014-12-29 20:02:34,030] INFO Property request.timeout.ms is > > overridden to 30000 (kafka.utils.VerifiableProperties) > > [2014-12-29 20:02:34,031] INFO Fetching metadata from broker > > id:0,host:vagrant-ubuntu-precise-64,port:50753 with correlation id 18 > > for 1 topic(s) Set(custom-topic) (kafka.client.ClientUtils$) > > [2014-12-29 20:02:34,032] INFO Connected to > > vagrant-ubuntu-precise-64:50753 for producing > > (kafka.producer.SyncProducer) > > [2014-12-29 20:02:34,035] INFO Disconnecting from > > vagrant-ubuntu-precise-64:50753 (kafka.producer.SyncProducer) > > [2014-12-29 20:02:34,036] INFO Closing socket connection to > > /10.0.2.15. (kafka.network.Processor) > > [2014-12-29 20:02:34,038] WARN [KafkaApi-0] Offset request with > > correlation id 0 from client > > group_1-ConsumerFetcherThread-group_1_vagrant-ubuntu-precise-64-1419883349017-325c06d5-0-0 > > on partition [custom-topic,1] failed due to Leader not local for > > partition [custom-topic,1] on broker 0 (kafka.server.KafkaApis) > > [2014-12-29 20:02:34,040] WARN > > [group_1_vagrant-ubuntu-precise-64-1419883349017-325c06d5-leader-finder-thread], > > Failed to add leader for partitions [custom-topic,1],[custom-topic,0]; > > will retry (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread) > > kafka.common.NotLeaderForPartitionException > > at sun.reflect.GeneratedConstructorAccessor1.newInstance(Unknown Source) > > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > > at java.lang.reflect.Constructor.newInstance(Constructor.java:526) > > at java.lang.Class.newInstance(Class.java:379) > > at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:73) > > at > kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:160) > > at > kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:60) > > at > kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:179) > > at > kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:174) > > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) > > at scala.collection.immutable.Map$Map2.foreach(Map.scala:130) > > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) > > at > kafka.server.AbstractFetcherThread.addPartitions(AbstractFetcherThread.scala:174) > > at > kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:86) > > at > kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:76) > > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) > > at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) > > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) > > at > kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:76) > > at > kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:95) > > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > > [2014-12-29 20:02:34,045] INFO > > [ConsumerFetcherThread-group_1_vagrant-ubuntu-precise-64-1419883349017-325c06d5-0-0], > > Shutting down (kafka.consumer.ConsumerFetcherThread) > > [2014-12-29 20:02:34,039] INFO > > [ConsumerFetcherThread-group_1_vagrant-ubuntu-precise-64-1419883349017-325c06d5-0-0], > > Starting (kafka.consumer.ConsumerFetcherThread) > > [2014-12-29 20:02:34,046] INFO > > [ConsumerFetcherThread-group_1_vagrant-ubuntu-precise-64-1419883349017-325c06d5-0-0], > > Shutdown completed (kafka.consumer.ConsumerFetcherThread) > > [2014-12-29 20:02:34,047] INFO Closing socket connection to > > /10.0.2.15. (kafka.network.Processor) > > [2014-12-29 20:02:34,048] INFO > > [ConsumerFetcherThread-group_1_vagrant-ubuntu-precise-64-1419883349017-325c06d5-0-0], > > Stopped (kafka.consumer.ConsumerFetcherThread) > > > On Fri, Dec 26, 2014 at 4:06 PM, Gwen Shapira <[email protected]> wrote: > > I can't say when's the 1.6 release, but I have other solutions :) > > > 1. The packages that are part of CDH5.3 release will contain that jar. > > Perhaps use this distro? Or even just get the RPM, unpackage and dig the jar > > out? > > 2. Let us know what's the compilation error, perhaps we can help there? > > > On Fri, Dec 26, 2014 at 3:30 PM, Foo Lim <[email protected]> wrote: > > > Hi all, > > > Happy holidays! Just wondering if there's any ETA on a 1.6 release. > > Looking forward to the kafka sink plugin that I can't get to compile > > independently. :-/ > > > Thanks! > > >
