Re: Link read avro from Kafka Connect Issue

2016-11-03 Thread Dayong
Confirmed. It is Magic byte ahead of each avro message. I am able to get it 
flink consumer work. Thanks you, Dave :)

Thanks,
Dayong

> On Nov 3, 2016, at 8:01 AM, Dayong  wrote:
> 
> Not quite sure, will try to find out today.
> 
> Thanks,
> Dayong
> 
>> On Nov 2, 2016, at 9:59 PM, "Tauzell, Dave"  
>> wrote:
>> 
>> Is Kafka connect adding some bytes to the beginning of the avro with the 
>> scheme registry id?
>> 
>> Dave
>> 
>>> On Nov 2, 2016, at 18:43, Will Du  wrote:
>>> 
>>> By using the kafka-avro-console-consumer I am able to get rich message from 
>>> kafka connect with AvroConvert, but it got no output except schema from 
>>> Flink
>>> 
>>> By using the producer with defaultEncoding, the kafka-avro-console-consumer 
>>> throws exceptions show how. But Flink consumer works. But my target is to 
>>> get Flink costume avro data produced by Kafka connect
>>> 
 On Nov 2, 2016, at 7:36 PM, Will Du  wrote:
 
 
 On Nov 2, 2016, at 7:31 PM, Will Du > wrote:
 
 Hi folks,
 I am trying to consume avro data from Kafka in Flink. The data is produced 
 by Kafka connect using AvroConverter. I have created a 
 AvroDeserializationSchema.java 
  used 
 by Flink consumer. Then, I use following code to read it.
 
 public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
 StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", “localhost:9092");
properties.setProperty("zookeeper.connect", “localhost:2181”);
 Schema schema = new Parser().parse("{" + "\"name\": \"test\", "
 + "\"type\": \"record\", "
 + "\"fields\": "
 +" [ "
 + "  { \"name\": \"name\", \"type\": 
 \"string\" },"
 + "  { \"name\": \"symbol\", \"type\": 
 \"string\" },"
 + "  { \"name\": \"exchange\", 
 \"type\": \"string\"}"
 + "] "
 +"}");
 
AvroDeserializationSchema avroSchema = new 
 AvroDeserializationSchema<>(schema);
FlinkKafkaConsumer09 kafkaConsumer =
new FlinkKafkaConsumer09<>("myavrotopic",avroSchema, 
 properties);
DataStream messageStream = 
 env.addSource(kafkaConsumer);
messageStream.rebalance().print();
env.execute("Flink AVRO KAFKA Test");
 }
 
 Once, I run the code, I am able to get the schema information only as 
 follows.
 {"name":"", "symbol":"", "exchange":""}
 {"name":"", "symbol":"", "exchange":""}
 {"name":"", "symbol":"", "exchange":""}
 {"name":"", "symbol":"", "exchange":”"}
 
 Could anyone help to find out the issues why I cannot decode it?
 
 Further troubleshooting, I found out if I use a kafka producer here 
  to 
 send the avro data especially using kafka.serializer.DefaultEncoder. Above 
 code can get correct result. Does any body know how to either set 
 DefaultEncoder in Kafka Connect or set it when writing customized kafka 
 connect? Or in the other way, how should I modify the 
 AvroDeserializationSchema.java for instead?
 
 Thanks, I’ll post this to the Flink user group as well.
 Will
>> This e-mail and any files transmitted with it are confidential, may contain 
>> sensitive information, and are intended solely for the use of the individual 
>> or entity to whom they are addressed. If you have received this e-mail in 
>> error, please notify the sender by reply e-mail immediately and destroy all 
>> copies of the e-mail and any attachments.


Re: Link read avro from Kafka Connect Issue

2016-11-03 Thread Dayong
Not quite sure, will try to find out today.

Thanks,
Dayong

> On Nov 2, 2016, at 9:59 PM, "Tauzell, Dave"  
> wrote:
> 
> Is Kafka connect adding some bytes to the beginning of the avro with the 
> scheme registry id?
> 
> Dave
> 
>> On Nov 2, 2016, at 18:43, Will Du  wrote:
>> 
>> By using the kafka-avro-console-consumer I am able to get rich message from 
>> kafka connect with AvroConvert, but it got no output except schema from Flink
>> 
>> By using the producer with defaultEncoding, the kafka-avro-console-consumer 
>> throws exceptions show how. But Flink consumer works. But my target is to 
>> get Flink costume avro data produced by Kafka connect
>> 
>>> On Nov 2, 2016, at 7:36 PM, Will Du  wrote:
>>> 
>>> 
>>> On Nov 2, 2016, at 7:31 PM, Will Du >> > wrote:
>>> 
>>> Hi folks,
>>> I am trying to consume avro data from Kafka in Flink. The data is produced 
>>> by Kafka connect using AvroConverter. I have created a 
>>> AvroDeserializationSchema.java 
>>>  used 
>>> by Flink consumer. Then, I use following code to read it.
>>> 
>>> public static void main(String[] args) throws Exception {
>>> StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> Properties properties = new Properties();
>>> properties.setProperty("bootstrap.servers", “localhost:9092");
>>> properties.setProperty("zookeeper.connect", “localhost:2181”);
>>> Schema schema = new Parser().parse("{" + "\"name\": \"test\", "
>>>  + "\"type\": \"record\", "
>>>  + "\"fields\": "
>>>  +" [ "
>>>  + "  { \"name\": \"name\", \"type\": 
>>> \"string\" },"
>>>  + "  { \"name\": \"symbol\", \"type\": 
>>> \"string\" },"
>>>  + "  { \"name\": \"exchange\", 
>>> \"type\": \"string\"}"
>>>  + "] "
>>>  +"}");
>>> 
>>> AvroDeserializationSchema avroSchema = new 
>>> AvroDeserializationSchema<>(schema);
>>> FlinkKafkaConsumer09 kafkaConsumer =
>>> new FlinkKafkaConsumer09<>("myavrotopic",avroSchema, 
>>> properties);
>>> DataStream messageStream = 
>>> env.addSource(kafkaConsumer);
>>> messageStream.rebalance().print();
>>> env.execute("Flink AVRO KAFKA Test");
>>> }
>>> 
>>> Once, I run the code, I am able to get the schema information only as 
>>> follows.
>>> {"name":"", "symbol":"", "exchange":""}
>>> {"name":"", "symbol":"", "exchange":""}
>>> {"name":"", "symbol":"", "exchange":""}
>>> {"name":"", "symbol":"", "exchange":”"}
>>> 
>>> Could anyone help to find out the issues why I cannot decode it?
>>> 
>>> Further troubleshooting, I found out if I use a kafka producer here 
>>>  to 
>>> send the avro data especially using kafka.serializer.DefaultEncoder. Above 
>>> code can get correct result. Does any body know how to either set 
>>> DefaultEncoder in Kafka Connect or set it when writing customized kafka 
>>> connect? Or in the other way, how should I modify the 
>>> AvroDeserializationSchema.java for instead?
>>> 
>>> Thanks, I’ll post this to the Flink user group as well.
>>> Will
>> 
> This e-mail and any files transmitted with it are confidential, may contain 
> sensitive information, and are intended solely for the use of the individual 
> or entity to whom they are addressed. If you have received this e-mail in 
> error, please notify the sender by reply e-mail immediately and destroy all 
> copies of the e-mail and any attachments.


Re: Link read avro from Kafka Connect Issue

2016-11-02 Thread Tauzell, Dave
Is Kafka connect adding some bytes to the beginning of the avro with the scheme 
registry id?

Dave

> On Nov 2, 2016, at 18:43, Will Du  wrote:
>
> By using the kafka-avro-console-consumer I am able to get rich message from 
> kafka connect with AvroConvert, but it got no output except schema from Flink
>
> By using the producer with defaultEncoding, the kafka-avro-console-consumer 
> throws exceptions show how. But Flink consumer works. But my target is to get 
> Flink costume avro data produced by Kafka connect
>
>> On Nov 2, 2016, at 7:36 PM, Will Du  wrote:
>>
>>
>> On Nov 2, 2016, at 7:31 PM, Will Du > > wrote:
>>
>> Hi folks,
>> I am trying to consume avro data from Kafka in Flink. The data is produced 
>> by Kafka connect using AvroConverter. I have created a 
>> AvroDeserializationSchema.java 
>>  used 
>> by Flink consumer. Then, I use following code to read it.
>>
>> public static void main(String[] args) throws Exception {
>>  StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>  Properties properties = new Properties();
>>  properties.setProperty("bootstrap.servers", “localhost:9092");
>>  properties.setProperty("zookeeper.connect", “localhost:2181”);
>> Schema schema = new Parser().parse("{" + "\"name\": \"test\", "
>>   + "\"type\": \"record\", "
>>   + "\"fields\": "
>>   +" [ "
>>   + "  { \"name\": \"name\", \"type\": 
>> \"string\" },"
>>   + "  { \"name\": \"symbol\", \"type\": 
>> \"string\" },"
>>   + "  { \"name\": \"exchange\", 
>> \"type\": \"string\"}"
>>   + "] "
>>   +"}");
>>
>>  AvroDeserializationSchema avroSchema = new 
>> AvroDeserializationSchema<>(schema);
>>  FlinkKafkaConsumer09 kafkaConsumer =
>>  new FlinkKafkaConsumer09<>("myavrotopic",avroSchema, 
>> properties);
>>  DataStream messageStream = 
>> env.addSource(kafkaConsumer);
>>  messageStream.rebalance().print();
>>  env.execute("Flink AVRO KAFKA Test");
>> }
>>
>> Once, I run the code, I am able to get the schema information only as 
>> follows.
>> {"name":"", "symbol":"", "exchange":""}
>> {"name":"", "symbol":"", "exchange":""}
>> {"name":"", "symbol":"", "exchange":""}
>> {"name":"", "symbol":"", "exchange":”"}
>>
>> Could anyone help to find out the issues why I cannot decode it?
>>
>> Further troubleshooting, I found out if I use a kafka producer here 
>>  to 
>> send the avro data especially using kafka.serializer.DefaultEncoder. Above 
>> code can get correct result. Does any body know how to either set 
>> DefaultEncoder in Kafka Connect or set it when writing customized kafka 
>> connect? Or in the other way, how should I modify the 
>> AvroDeserializationSchema.java for instead?
>>
>> Thanks, I’ll post this to the Flink user group as well.
>> Will
>
This e-mail and any files transmitted with it are confidential, may contain 
sensitive information, and are intended solely for the use of the individual or 
entity to whom they are addressed. If you have received this e-mail in error, 
please notify the sender by reply e-mail immediately and destroy all copies of 
the e-mail and any attachments.


Re: Link read avro from Kafka Connect Issue

2016-11-02 Thread Will Du
By using the kafka-avro-console-consumer I am able to get rich message from 
kafka connect with AvroConvert, but it got no output except schema from Flink 

By using the producer with defaultEncoding, the kafka-avro-console-consumer 
throws exceptions show how. But Flink consumer works. But my target is to get 
Flink costume avro data produced by Kafka connect

> On Nov 2, 2016, at 7:36 PM, Will Du  wrote:
> 
> 
> On Nov 2, 2016, at 7:31 PM, Will Du  > wrote:
> 
> Hi folks,
> I am trying to consume avro data from Kafka in Flink. The data is produced by 
> Kafka connect using AvroConverter. I have created a 
> AvroDeserializationSchema.java 
>  used by 
> Flink consumer. Then, I use following code to read it.
> 
> public static void main(String[] args) throws Exception {
>   StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   Properties properties = new Properties();
>   properties.setProperty("bootstrap.servers", “localhost:9092");
>   properties.setProperty("zookeeper.connect", “localhost:2181”);
> Schema schema = new Parser().parse("{" + "\"name\": \"test\", "
>  + "\"type\": \"record\", "
>  + "\"fields\": "
>  +" [ "
>  + "  { \"name\": \"name\", \"type\": 
> \"string\" },"
>  + "  { \"name\": \"symbol\", 
> \"type\": \"string\" },"
>  + "  { \"name\": \"exchange\", 
> \"type\": \"string\"}"
>  + "] "
>  +"}");
> 
>   AvroDeserializationSchema avroSchema = new 
> AvroDeserializationSchema<>(schema);
>   FlinkKafkaConsumer09 kafkaConsumer = 
>   new FlinkKafkaConsumer09<>("myavrotopic",avroSchema, 
> properties);
>   DataStream messageStream = 
> env.addSource(kafkaConsumer);
>   messageStream.rebalance().print();
>   env.execute("Flink AVRO KAFKA Test");
> }
> 
> Once, I run the code, I am able to get the schema information only as follows.
> {"name":"", "symbol":"", "exchange":""}
> {"name":"", "symbol":"", "exchange":""}
> {"name":"", "symbol":"", "exchange":""}
> {"name":"", "symbol":"", "exchange":”"}
> 
> Could anyone help to find out the issues why I cannot decode it?
> 
> Further troubleshooting, I found out if I use a kafka producer here 
>  to send 
> the avro data especially using kafka.serializer.DefaultEncoder. Above code 
> can get correct result. Does any body know how to either set DefaultEncoder 
> in Kafka Connect or set it when writing customized kafka connect? Or in the 
> other way, how should I modify the AvroDeserializationSchema.java for instead?
> 
> Thanks, I’ll post this to the Flink user group as well.
> Will



Link read avro from Kafka Connect Issue

2016-11-02 Thread Will Du

On Nov 2, 2016, at 7:31 PM, Will Du  wrote:

Hi folks,
I am trying to consume avro data from Kafka in Flink. The data is produced by 
Kafka connect using AvroConverter. I have created a 
AvroDeserializationSchema.java 
 used by 
Flink consumer. Then, I use following code to read it.

public static void main(String[] args) throws Exception {
  StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
  Properties properties = new Properties();
  properties.setProperty("bootstrap.servers", “localhost:9092");
  properties.setProperty("zookeeper.connect", “localhost:2181”);
Schema schema = new Parser().parse("{" + "\"name\": \"test\", "
   + "\"type\": \"record\", "
   + "\"fields\": "
   +" [ "
   + "  { \"name\": \"name\", \"type\": 
\"string\" },"
   + "  { \"name\": \"symbol\", 
\"type\": \"string\" },"
   + "  { \"name\": \"exchange\", 
\"type\": \"string\"}"
   + "] "
   +"}");

  AvroDeserializationSchema avroSchema = new 
AvroDeserializationSchema<>(schema);
  FlinkKafkaConsumer09 kafkaConsumer = 
new FlinkKafkaConsumer09<>("myavrotopic",avroSchema, 
properties);
  DataStream messageStream = 
env.addSource(kafkaConsumer);
  messageStream.rebalance().print();
  env.execute("Flink AVRO KAFKA Test");
}

Once, I run the code, I am able to get the schema information only as follows.
{"name":"", "symbol":"", "exchange":""}
{"name":"", "symbol":"", "exchange":""}
{"name":"", "symbol":"", "exchange":""}
{"name":"", "symbol":"", "exchange":”"}

Could anyone help to find out the issues why I cannot decode it?

Further troubleshooting, I found out if I use a kafka producer here 
 to send 
the avro data especially using kafka.serializer.DefaultEncoder. Above code can 
get correct result. Does any body know how to either set DefaultEncoder in 
Kafka Connect or set it when writing customized kafka connect? Or in the other 
way, how should I modify the AvroDeserializationSchema.java for instead?

Thanks, I’ll post this to the Flink user group as well.
Will