Hi Devs, I just started using Flink and would like to ass kafka as Sink. I went through the documentation but so far I've not succeeded in writing to Kafka from Flink....
I' building application in Scala.... Here is my code snippet case class *Demo*(city: String, country: String, zipcode: Int) The map stage returns an instance of Demo type val env = StreamExecutionEnvironment.getExecutionEnvironment val properties = new Properties() properties.setProperty("bootstrap.servers", "127.0.0.1:9092") properties.setProperty("zookeeper.connect", "127.0.0.1:2181") properties.setProperty("group.id", "test_topic") val mapToDemo: String => Demo = {//Implementation} val stream = env.addSource(new FlinkKafkaConsumer082[String]("test", new SimpleStringSchema, properties)) stream.map(mapToDemo).addSink(new FlinkKafkaProducer[Demo]("127.0.0.1:9092", "test_topic", new SimpleStringSchema())) Can anyone explain me what am I doing wrong in adding Kafka as Sink ? -- Thanks, Deepak Jha