Have you tried to replace

import org.apache.flink.streaming.api.environment._
import org.apache.flink.streaming.connectors.kafka
import org.apache.flink.streaming.connectors.kafka.api._
import org.apache.flink.streaming.util.serialization._


import org.apache.flink.streaming.api.scala._

import org.apache.flink.streaming.connectors.kafka.api.KafkaSourceimport

I agree with you that the examples in the user guide might underestimate
the importance of import. The import statements should be in the example.


On Thu, Jul 16, 2015 at 2:45 AM, Wendong <wendong....@gmail.com> wrote:

> Hello,
> Does anyone have a simple example of Flink Kafka written in Scala?
> I've been struggling to make my test program working. Below is my program
> which has error in addSink (the part of KafkaWordCountProducer is copied
> from Spark sample program):
> import java.util.HashMap
> import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer,
> ProducerRecord}
> import org.apache.flink.streaming.api.environment._
> import org.apache.flink.streaming.connectors.kafka
> import org.apache.flink.streaming.connectors.kafka.api._
> import org.apache.flink.streaming.util.serialization._
> object TestKafka {
>   def main(args: Array[String]) {
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     val stream = env
>       .addSource(new KafkaSource[String]("localhost:2181", "test", new
> SimpleStringSchema))
>       .addSink(new KafkaSink[String]("localhost:2181", "test", new
> SimpleStringSchema))
>       .print
>     println("Hi TestKafka")
>     env.execute("Test Kafka")
>   }
> }
> object KafkaWordCountProducer {
>   def main(args: Array[String]) {
>     if (args.length < 4) {
>       System.err.println("Usage: KafkaWordCountProducer
> <metadataBrokerList>
> <topic> " +
>         "<messagesPerSec> <wordsPerMessage>")
>       System.exit(1)
>     }
>     val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args
>     // Zookeeper connection properties
>     val props = new HashMap[String, Object]()
>     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
>     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
>       "org.apache.kafka.common.serialization.StringSerializer")
>     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
>       "org.apache.kafka.common.serialization.StringSerializer")
>     val producer = new KafkaProducer[String, String](props)
>     // Send some messages
>     while(true) {
>       (1 to messagesPerSec.toInt).foreach { messageNum =>
>          val str = (1 to wordsPerMessage.toInt).map(x =>
> scala.util.Random.nextInt(10).toString)
>           .mkString(" ")
>         val message = new ProducerRecord[String, String](topic, null, str)
>         producer.send(message)
>       }
>       Thread.sleep(1000)
>     }
>   }
> }
> There is compilation error:
> [error] .................TestKafka.scala:15: type mismatch;
> [error]  found   :
> org.apache.flink.streaming.util.serialization.SimpleStringSchema
> [error]  required:
> org.apache.flink.streaming.util.serialization.SerializationSchema[String,Array[Byte]]
> [error]       .addSink(new KafkaSink[String]("localhost:2181", "test", new
> SimpleStringSchema))
> I changed SimpleStringSchema to SerializationSchema which still doesn't
> work.
> I am trying to transit from Spark to Flink, but the samples in Flink are
> far
> less than those in Spark. It would be very helpful if there is an example
> of
> KafkaWordCount in Scala similar to that in Spark.
> Thanks,
> Wendong
