Hi, I followed the example in AvroITCase.java - which reads/writes Avro data from/to filesystem. > I don't want to update AvroIO to embed Kafka consumer / producer
Also looked into - https://issues.apache.org/jira/browse/FLINK-2597 > But I couldn't instantiate TypeInformationSerializationSchema in following FlinkKafkaConsumer / Producer - as I am not sure how do get access to ExecutionConfig *Essentially, need help to change the following code in Beam Pipeline * *> to read Avro data from Kafka and deserialize into my custom object.* public static UnboundedSource<String, CheckpointMark> consumeMessages() { *FlinkKafkaConsumer08*<String> kafkaConsumer = new FlinkKafkaConsumer08<>(options.getKafkaTopic(), *?* , props); return UnboundedFlinkSource.of(kafkaConsumer); } *> and how write Avro data into kafka ?* public static void produceData(){ FlinkKafkaProducer08<String> kafkaSink = new FlinkKafkaProducer08<>(TOPIC, *?* , props); Pipeline pipeline = Pipeline.create(options); pipeline .apply(Create.of( new User("Joe", 3, "red"), new User("Mary", 4, "blue"), new User("Mark", 1, "green"), new User("Julia", 5, "purple")) .withCoder(AvroCoder.of(User.class))) .apply(transformationToWriteToKafkaSink()); pipeline.run(); } Thanks Kaniska
