Did you make sure the flinkconnector version and flink version is the same ? Also for 0.8.0.0 you will have to use FlinkKafkaConsumer08
On Fri, Apr 1, 2016 at 3:21 PM, Zhun Shen <shenzhunal...@gmail.com> wrote: > I follow the example of kafka 0.8.0.0 on Flink doc. > > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > Properties properties = new Properties(); > properties.setProperty("bootstrap.servers", "localhost:9092"); > properties.setProperty("zookeeper.connect", "localhost:2181"); > properties.setProperty("group.id", "test"); > properties.setProperty("key.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > properties.setProperty("value.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > properties.setProperty("partition.assignment.strategy", "range"); > > DataStream<String> messageStream = env > .addSource(new FlinkKafkaConsumer09<String>("nginx-logs", > new SimpleStringSchema(), properties)); > > messageStream > .rebalance() > .map(new MapFunction<String, String>() { > > @Override > public String map(String value) throws Exception { > return "Kafka and Flink says: " + value; > } > }).print(); > > env.execute(); > } > > > Always got the error below: > > java.lang.NoSuchMethodError: > org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(Ljava/lang/String;)Ljava/util/List; > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:194) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:164) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:131) > > > > > On Apr 1, 2016, at 1:40 PM, Ashutosh Kumar <kmr.ashutos...@gmail.com> > wrote: > > I am using flink 1.0.0 with kafka 0.9 . I works fine for me. I use > following dependency. > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-kafka-0.9_2.10</artifactId> > <version>1.0.0</version> > <scope>provided</scope> > </dependency> > > Thanks > Ashutosh > > On Fri, Apr 1, 2016 at 10:46 AM, Zhun Shen <shenzhunal...@gmail.com> > wrote: > >> Hi there, >> >> I check my build.gradle file, I use >> 'org.apache.flink:flink-connector-kafka-0.9_2.10:1.0.0’, but I found that >> this lib is based on kaka-clients 0.9.0.1. >> >> I want to use Flink streaming to consume Kafka’s events in realtime, but >> I’m confused by Flink’s libs with different versions. Which >> flink-connector-kafka is comparable with kafka 0.9.0.0 ? >> My environment is Kafka: 0.9.0.0, Flink: 1.0.0, Language: java >> >> part of my build.grade: >> 'org.apache.kafka:kafka_2.10:0.9.0.0', >> 'org.apache.kafka:kafka-clients:0.9.0.0', >> 'org.apache.flink:flink-java:1.0.0', >> 'org.apache.flink:flink-streaming-java_2.10:1.0.0', >> 'org.apache.flink:flink-connector-kafka-0.9_2.10:1.0.0', >> 'org.apache.flink:flink-connector-kafka-base_2.10:1.0.0 >> >> Any advice ? >> >> Thanks. >> >> >> On Mar 30, 2016, at 10:35 PM, Stephan Ewen <se...@apache.org> wrote: >> >> Hi! >> >> A "NoSuchMethodError" usually means that you compile and run against >> different versions. >> >> Make sure the version you reference in the IDE and the version on the >> cluster are the same. >> >> Greetings, >> Stephan >> >> >> >> On Wed, Mar 30, 2016 at 9:42 AM, Balaji Rajagopalan < >> balaji.rajagopa...@olacabs.com> wrote: >> >>> I have tested kafka 0.8.0.2 with flink 1.0.0 and it works for me. Can't >>> talk about kafka 0.9.0.1. >>> >>> On Wed, Mar 30, 2016 at 12:51 PM, Zhun Shen <shenzhunal...@gmail.com> >>> wrote: >>> >>>> Hi there, >>>> >>>> flink version: 1.0.0 >>>> kafka version: 0.9.0.0 >>>> env: local >>>> >>>> I run the script below: >>>> ./bin/flink run -c com.test.flink.FlinkTest test.jar --topic nginx-logs >>>> --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 -- >>>> group.id myGroup --partition.assignment.strategy round robin >>>> >>>> But I got the error: >>>> ava.lang.NoSuchMethodError: >>>> org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(Ljava/lang/String;)Ljava/util/List; >>>> at >>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:194) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:164) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:131) >>>> >>>> >>>> The code as below: >>>> DataStream<String> messageStream = env.addSource(new >>>> FlinkKafkaConsumer09<>("nginx-logs", new >>>> SimpleStringSchema(),parameterTool.getProperties())); >>>> messageStream.rebalance().map(new MapFunction<String, String>() >>>> { >>>> >>>> @Override >>>> public String map(String value) throws Exception { >>>> return "Kafka and Flink says: " + value; >>>> } >>>> }).print(); >>>> >>>> >>>> I check the error with google, but it shows that it is a method of >>>> kafka 0.9.01. Any idea? Thanks. >>>> >>>> >>> >> >> > >