Hi, did you get any error message while executing the job? I don't think you can serialize the "Demo" type with the "SimpleStringSchema".
On Fri, Jan 22, 2016 at 8:13 PM, Deepak Jha <dkjhan...@gmail.com> wrote: > Hi Devs, > I just started using Flink and would like to ass kafka as Sink. I went > through the documentation but so far I've not succeeded in writing to Kafka > from Flink.... > > I' building application in Scala.... Here is my code snippet > > case class *Demo*(city: String, country: String, zipcode: Int) > > The map stage returns an instance of Demo type > > val env = StreamExecutionEnvironment.getExecutionEnvironment > > val properties = new Properties() > > properties.setProperty("bootstrap.servers", "127.0.0.1:9092") > properties.setProperty("zookeeper.connect", "127.0.0.1:2181") > properties.setProperty("group.id", "test_topic") > val mapToDemo: String => Demo = {//Implementation} > > val stream = env.addSource(new FlinkKafkaConsumer082[String]("test", new > SimpleStringSchema, properties)) > > stream.map(mapToDemo).addSink(new FlinkKafkaProducer[Demo]("127.0.0.1:9092 > ", > "test_topic", new SimpleStringSchema())) > > Can anyone explain me what am I doing wrong in adding Kafka as Sink ? > -- > Thanks, > Deepak Jha >