Hi All
I am preparing Kafka and Flink performance test now. In order to avoid my mistakes, I have downloaded Kafka example from http://kafka.apache.org/ and Flink streaming Kafka example from http://flink.apache.org I have run both producer examples on the same cluster. No any issues from kafka.apache.org. But I have received some errors as below when I ran apache Flink Kafka producer. I also posted both code for your reference. Please take a look at it. Thanks. Exception in thread "main" java.lang.Error: Unresolved compilation problems: The import kafka.consumer cannot be resolved The import kafka.consumer cannot be resolved The import kafka.consumer cannot be resolved The import kafka.consumer cannot be resolved The import kafka.javaapi cannot be resolved ConsumerConnector cannot be resolved to a type ConsumerIterator cannot be resolved to a type ConsumerConnector cannot be resolved to a type Consumer cannot be resolved ConsumerConfig cannot be resolved to a type KafkaStream cannot be resolved to a type ConsumerConnector cannot be resolved to a type KafkaStream cannot be resolved to a type KafkaStream cannot be resolved to a type ConsumerConnector cannot be resolved to a type ConsumerIterator cannot be resolved to a type ConsumerIterator cannot be resolved to a type ConsumerIterator cannot be resolved to a type ConsumerConnector cannot be resolved to a type ConsumerConnector cannot be resolved to a type ConsumerConnector cannot be resolved to a type at org.apache.flink.streaming.connectors.kafka.api.KafkaSource.<init>(KafkaSour ce.java:26) at org.apache.flink.streaming.connectors.kafka.KafkaConsumerExample.main(KafkaC onsumerExample.java:42) Here is the Apache Flink example: *************************************Apache Flink*********************************************************************** StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setParallelism(4); @SuppressWarnings({ "unused", "serial" }) DataStream<String> stream1 = env.addSource(new SourceFunction<String>() { public void run(Collector<String> collector) throws Exception { for (int i = 0; i < 20; i++) { collector.collect("message #" + i); Thread.sleep(100L); } collector.collect(new String("q")); } public void cancel() { } }).addSink( new KafkaSink<String>(host + ":" + port, topic, new JavaDefaultStringSchema()) ) .setParallelism(3); System.out.println(host+" "+port+" "+topic); env.execute(); **********************************Apache Kafka*************************************************************** public Producer(String topic) { props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("metadata.broker.list", "192.168.0.112:9092"); // Use random partitioner. Don't need the key type. Just set it to Integer. // The message is of type String. producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props)); this.topic = topic; } public void run() { int messageNo = 1; while(true) { String messageStr = new String("LA_" + messageNo); producer.send(new KeyedMessage<Integer, String>(topic, messageStr)); messageNo++; } } Best regards Hawin