Have you tried to replace

import org.apache.flink.streaming.api.environment._
import org.apache.flink.streaming.connectors.kafka
import org.apache.flink.streaming.connectors.kafka.api._
import org.apache.flink.streaming.util.serialization._

With

import org.apache.flink.streaming.api.scala._

import org.apache.flink.streaming.connectors.kafka.api.KafkaSourceimport
org.apache.flink.streaming.util.serialization.SimpleStringSchema


I agree with you that the examples in the user guide might underestimate
the importance of import. The import statements should be in the example.

Anwar.

On Thu, Jul 16, 2015 at 2:45 AM, Wendong <wendong....@gmail.com> wrote:

> Hello,
>
> Does anyone have a simple example of Flink Kafka written in Scala?
>
> I've been struggling to make my test program working. Below is my program
> which has error in addSink (the part of KafkaWordCountProducer is copied
> from Spark sample program):
>
> import java.util.HashMap
> import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer,
> ProducerRecord}
>
> import org.apache.flink.streaming.api.environment._
> import org.apache.flink.streaming.connectors.kafka
> import org.apache.flink.streaming.connectors.kafka.api._
> import org.apache.flink.streaming.util.serialization._
>
> object TestKafka {
>   def main(args: Array[String]) {
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     val stream = env
>       .addSource(new KafkaSource[String]("localhost:2181", "test", new
> SimpleStringSchema))
>       .addSink(new KafkaSink[String]("localhost:2181", "test", new
> SimpleStringSchema))
>       .print
>
>     println("Hi TestKafka")
>     env.execute("Test Kafka")
>   }
> }
>
> object KafkaWordCountProducer {
>
>   def main(args: Array[String]) {
>     if (args.length < 4) {
>       System.err.println("Usage: KafkaWordCountProducer
> <metadataBrokerList>
> <topic> " +
>         "<messagesPerSec> <wordsPerMessage>")
>       System.exit(1)
>     }
>
>     val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args
>
>     // Zookeeper connection properties
>     val props = new HashMap[String, Object]()
>     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
>     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
>       "org.apache.kafka.common.serialization.StringSerializer")
>     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
>       "org.apache.kafka.common.serialization.StringSerializer")
>
>     val producer = new KafkaProducer[String, String](props)
>
>     // Send some messages
>     while(true) {
>       (1 to messagesPerSec.toInt).foreach { messageNum =>
>          val str = (1 to wordsPerMessage.toInt).map(x =>
> scala.util.Random.nextInt(10).toString)
>           .mkString(" ")
>
>         val message = new ProducerRecord[String, String](topic, null, str)
>         producer.send(message)
>       }
>
>       Thread.sleep(1000)
>     }
>   }
> }
>
> There is compilation error:
>
> [error] .................TestKafka.scala:15: type mismatch;
> [error]  found   :
> org.apache.flink.streaming.util.serialization.SimpleStringSchema
> [error]  required:
>
> org.apache.flink.streaming.util.serialization.SerializationSchema[String,Array[Byte]]
> [error]       .addSink(new KafkaSink[String]("localhost:2181", "test", new
> SimpleStringSchema))
>
> I changed SimpleStringSchema to SerializationSchema which still doesn't
> work.
>
> I am trying to transit from Spark to Flink, but the samples in Flink are
> far
> less than those in Spark. It would be very helpful if there is an example
> of
> KafkaWordCount in Scala similar to that in Spark.
>
> Thanks,
>
> Wendong
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-example-in-Scala-tp2069.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Reply via email to