Have you tried to replace import org.apache.flink.streaming.api.environment._ import org.apache.flink.streaming.connectors.kafka import org.apache.flink.streaming.connectors.kafka.api._ import org.apache.flink.streaming.util.serialization._
With import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.api.KafkaSourceimport org.apache.flink.streaming.util.serialization.SimpleStringSchema I agree with you that the examples in the user guide might underestimate the importance of import. The import statements should be in the example. Anwar. On Thu, Jul 16, 2015 at 2:45 AM, Wendong <wendong....@gmail.com> wrote: > Hello, > > Does anyone have a simple example of Flink Kafka written in Scala? > > I've been struggling to make my test program working. Below is my program > which has error in addSink (the part of KafkaWordCountProducer is copied > from Spark sample program): > > import java.util.HashMap > import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, > ProducerRecord} > > import org.apache.flink.streaming.api.environment._ > import org.apache.flink.streaming.connectors.kafka > import org.apache.flink.streaming.connectors.kafka.api._ > import org.apache.flink.streaming.util.serialization._ > > object TestKafka { > def main(args: Array[String]) { > val env = StreamExecutionEnvironment.getExecutionEnvironment > val stream = env > .addSource(new KafkaSource[String]("localhost:2181", "test", new > SimpleStringSchema)) > .addSink(new KafkaSink[String]("localhost:2181", "test", new > SimpleStringSchema)) > .print > > println("Hi TestKafka") > env.execute("Test Kafka") > } > } > > object KafkaWordCountProducer { > > def main(args: Array[String]) { > if (args.length < 4) { > System.err.println("Usage: KafkaWordCountProducer > <metadataBrokerList> > <topic> " + > "<messagesPerSec> <wordsPerMessage>") > System.exit(1) > } > > val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args > > // Zookeeper connection properties > val props = new HashMap[String, Object]() > props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) > props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.StringSerializer") > props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.StringSerializer") > > val producer = new KafkaProducer[String, String](props) > > // Send some messages > while(true) { > (1 to messagesPerSec.toInt).foreach { messageNum => > val str = (1 to wordsPerMessage.toInt).map(x => > scala.util.Random.nextInt(10).toString) > .mkString(" ") > > val message = new ProducerRecord[String, String](topic, null, str) > producer.send(message) > } > > Thread.sleep(1000) > } > } > } > > There is compilation error: > > [error] .................TestKafka.scala:15: type mismatch; > [error] found : > org.apache.flink.streaming.util.serialization.SimpleStringSchema > [error] required: > > org.apache.flink.streaming.util.serialization.SerializationSchema[String,Array[Byte]] > [error] .addSink(new KafkaSink[String]("localhost:2181", "test", new > SimpleStringSchema)) > > I changed SimpleStringSchema to SerializationSchema which still doesn't > work. > > I am trying to transit from Spark to Flink, but the samples in Flink are > far > less than those in Spark. It would be very helpful if there is an example > of > KafkaWordCount in Scala similar to that in Spark. > > Thanks, > > Wendong > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-example-in-Scala-tp2069.html > Sent from the Apache Flink User Mailing List archive. mailing list archive > at Nabble.com. >