Hi Devs,
I just started using Flink and would like to ass kafka as Sink. I went
through the documentation but so far I've not succeeded in writing to Kafka
from Flink....

I' building application in Scala.... Here is my code snippet

case class *Demo*(city: String, country: String, zipcode: Int)

The map stage returns an instance of Demo type

 val env = StreamExecutionEnvironment.getExecutionEnvironment

  val properties = new Properties()

  properties.setProperty("bootstrap.servers", "127.0.0.1:9092")
  properties.setProperty("zookeeper.connect", "127.0.0.1:2181")
  properties.setProperty("group.id", "test_topic")
val mapToDemo: String => Demo = {//Implementation}

val stream = env.addSource(new FlinkKafkaConsumer082[String]("test", new
SimpleStringSchema, properties))

stream.map(mapToDemo).addSink(new FlinkKafkaProducer[Demo]("127.0.0.1:9092",
"test_topic", new SimpleStringSchema()))

Can anyone explain me what am I doing wrong in adding Kafka as Sink ?
-- 
Thanks,
Deepak Jha

Reply via email to