By using the kafka-avro-console-consumer I am able to get rich message from kafka connect with AvroConvert, but it got no output except schema from Flink
By using the producer with defaultEncoding, the kafka-avro-console-consumer throws exceptions show how. But Flink consumer works. But my target is to get Flink costume avro data produced by Kafka connect > On Nov 2, 2016, at 7:36 PM, Will Du <will...@gmail.com> wrote: > > > On Nov 2, 2016, at 7:31 PM, Will Du <will...@gmail.com > <mailto:will...@gmail.com>> wrote: > > Hi folks, > I am trying to consume avro data from Kafka in Flink. The data is produced by > Kafka connect using AvroConverter. I have created a > AvroDeserializationSchema.java > <https://gist.github.com/datafibers/ae9d624b6db44865ae14defe8a838123> used by > Flink consumer. Then, I use following code to read it. > > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > Properties properties = new Properties(); > properties.setProperty("bootstrap.servers", “localhost:9092"); > properties.setProperty("zookeeper.connect", “localhost:2181”); > Schema schema = new Parser().parse("{" + "\"name\": \"test\", " > + "\"type\": \"record\", " > + "\"fields\": " > +" [ " > + " { \"name\": \"name\", \"type\": > \"string\" }," > + " { \"name\": \"symbol\", > \"type\": \"string\" }," > + " { \"name\": \"exchange\", > \"type\": \"string\"}" > + "] " > +"}"); > > AvroDeserializationSchema avroSchema = new > AvroDeserializationSchema<>(schema); > FlinkKafkaConsumer09<GenericRecord> kafkaConsumer = > new FlinkKafkaConsumer09<>("myavrotopic",avroSchema, > properties); > DataStream<GenericRecord> messageStream = > env.addSource(kafkaConsumer); > messageStream.rebalance().print(); > env.execute("Flink AVRO KAFKA Test"); > } > > Once, I run the code, I am able to get the schema information only as follows. > {"name":"", "symbol":"", "exchange":""} > {"name":"", "symbol":"", "exchange":""} > {"name":"", "symbol":"", "exchange":""} > {"name":"", "symbol":"", "exchange":”"} > > Could anyone help to find out the issues why I cannot decode it? > > Further troubleshooting, I found out if I use a kafka producer here > <https://gist.github.com/datafibers/d063b255b50fa34515c0ac9e24d4485c> to send > the avro data especially using kafka.serializer.DefaultEncoder. Above code > can get correct result. Does any body know how to either set DefaultEncoder > in Kafka Connect or set it when writing customized kafka connect? Or in the > other way, how should I modify the AvroDeserializationSchema.java for instead? > > Thanks, I’ll post this to the Flink user group as well. > Will