Hi All

 

I am preparing Kafka and Flink performance test now.  In order to avoid my
mistakes, I have downloaded Kafka example from http://kafka.apache.org/ and
Flink streaming Kafka example from http://flink.apache.org

I have run both producer examples on the same cluster.  No any issues from
kafka.apache.org.

But I have received some errors as below when I ran apache Flink Kafka
producer.  

I also posted both code for your reference. Please take a look at it. 

Thanks. 

 

Exception in thread "main" java.lang.Error: Unresolved compilation problems:


    The import kafka.consumer cannot be resolved

    The import kafka.consumer cannot be resolved

    The import kafka.consumer cannot be resolved

    The import kafka.consumer cannot be resolved

    The import kafka.javaapi cannot be resolved

    ConsumerConnector cannot be resolved to a type

    ConsumerIterator cannot be resolved to a type

    ConsumerConnector cannot be resolved to a type

    Consumer cannot be resolved

    ConsumerConfig cannot be resolved to a type

    KafkaStream cannot be resolved to a type

    ConsumerConnector cannot be resolved to a type

    KafkaStream cannot be resolved to a type

    KafkaStream cannot be resolved to a type

    ConsumerConnector cannot be resolved to a type

    ConsumerIterator cannot be resolved to a type

    ConsumerIterator cannot be resolved to a type

    ConsumerIterator cannot be resolved to a type

    ConsumerConnector cannot be resolved to a type

    ConsumerConnector cannot be resolved to a type

    ConsumerConnector cannot be resolved to a type

 

    at
org.apache.flink.streaming.connectors.kafka.api.KafkaSource.<init>(KafkaSour
ce.java:26)

    at
org.apache.flink.streaming.connectors.kafka.KafkaConsumerExample.main(KafkaC
onsumerExample.java:42)

 

 

 

Here is the Apache Flink example:

*************************************Apache
Flink***********************************************************************

StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment().setParallelism(4);

 

        @SuppressWarnings({ "unused", "serial" })

        DataStream<String> stream1 = env.addSource(new
SourceFunction<String>() {

            public void run(Collector<String> collector) throws Exception {

                for (int i = 0; i < 20; i++) {

                    collector.collect("message #" + i);

                    Thread.sleep(100L);

                }

 

                collector.collect(new String("q"));

            }

 

            public void cancel() {              

            }

            

            

        }).addSink(

                new KafkaSink<String>(host + ":" + port, topic, new
JavaDefaultStringSchema())

        )

        .setParallelism(3);

 

        System.out.println(host+" "+port+" "+topic);

        

        env.execute();

 

 

 

**********************************Apache
Kafka***************************************************************

public Producer(String topic)

  {

    props.put("serializer.class", "kafka.serializer.StringEncoder");

    props.put("metadata.broker.list", "192.168.0.112:9092");

    // Use random partitioner. Don't need the key type. Just set it to
Integer.

    // The message is of type String.

    producer = new kafka.javaapi.producer.Producer<Integer, String>(new
ProducerConfig(props));

    this.topic = topic;

  }

  

  public void run() {

    int messageNo = 1;

    while(true)

    {

      String messageStr = new String("LA_" + messageNo);

      producer.send(new KeyedMessage<Integer, String>(topic, messageStr));

      messageNo++;

    }

  }

 

 

 

 

Best regards

Hawin

 

 

Reply via email to