By using the kafka-avro-console-consumer I am able to get rich message from 
kafka connect with AvroConvert, but it got no output except schema from Flink 

By using the producer with defaultEncoding, the kafka-avro-console-consumer 
throws exceptions show how. But Flink consumer works. But my target is to get 
Flink costume avro data produced by Kafka connect

> On Nov 2, 2016, at 7:36 PM, Will Du <will...@gmail.com> wrote:
> 
> 
> On Nov 2, 2016, at 7:31 PM, Will Du <will...@gmail.com 
> <mailto:will...@gmail.com>> wrote:
> 
> Hi folks,
> I am trying to consume avro data from Kafka in Flink. The data is produced by 
> Kafka connect using AvroConverter. I have created a 
> AvroDeserializationSchema.java 
> <https://gist.github.com/datafibers/ae9d624b6db44865ae14defe8a838123> used by 
> Flink consumer. Then, I use following code to read it.
> 
> public static void main(String[] args) throws Exception {
>               StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>               Properties properties = new Properties();
>               properties.setProperty("bootstrap.servers", “localhost:9092");
>               properties.setProperty("zookeeper.connect", “localhost:2181”);
> Schema schema = new Parser().parse("{" + "\"name\": \"test\", "
>                                          + "\"type\": \"record\", "
>                                          + "\"fields\": "
>                                          +" [ "
>                                          + "  { \"name\": \"name\", \"type\": 
> \"string\" },"
>                                          + "  { \"name\": \"symbol\", 
> \"type\": \"string\" },"
>                                          + "  { \"name\": \"exchange\", 
> \"type\": \"string\"}"
>                                          + "] "
>                                          +"}");
> 
>               AvroDeserializationSchema avroSchema = new 
> AvroDeserializationSchema<>(schema);
>               FlinkKafkaConsumer09<GenericRecord> kafkaConsumer = 
>                       new FlinkKafkaConsumer09<>("myavrotopic",avroSchema, 
> properties);
>               DataStream<GenericRecord> messageStream = 
> env.addSource(kafkaConsumer);
>               messageStream.rebalance().print();
>               env.execute("Flink AVRO KAFKA Test");
> }
> 
> Once, I run the code, I am able to get the schema information only as follows.
> {"name":"", "symbol":"", "exchange":""}
> {"name":"", "symbol":"", "exchange":""}
> {"name":"", "symbol":"", "exchange":""}
> {"name":"", "symbol":"", "exchange":”"}
> 
> Could anyone help to find out the issues why I cannot decode it?
> 
> Further troubleshooting, I found out if I use a kafka producer here 
> <https://gist.github.com/datafibers/d063b255b50fa34515c0ac9e24d4485c> to send 
> the avro data especially using kafka.serializer.DefaultEncoder. Above code 
> can get correct result. Does any body know how to either set DefaultEncoder 
> in Kafka Connect or set it when writing customized kafka connect? Or in the 
> other way, how should I modify the AvroDeserializationSchema.java for instead?
> 
> Thanks, I’ll post this to the Flink user group as well.
> Will

Reply via email to