RE: What is the recommended way to read AVRO data from Kafka using flink.

2016-08-04 Thread Alam, Zeeshan
Hi Stephan,

My AvroDeserializationSchema worked fine with a different Kafka topic, it seems 
like the previous Kafka topic was having heterogeneous data with both AVRO and 
JSON formatted data. Thanks for your time ☺.

Thanks & Regards
Zeeshan Alam

From: Stephan Ewen [mailto:se...@apache.org]
Sent: Thursday, August 04, 2016 6:00 PM
To: user@flink.apache.org
Subject: Re: What is the recommended way to read AVRO data from Kafka using 
flink.

Hi!

To read data from Kafka, you need a DeserializationSchema. You could create one 
that wraps the AvroInputFormat, but an AvroDeserializationSchema would simply 
be an adjustment of the AvroInputFormat to the interface of the 
DeserializationSchema.

In your Avro DeserializationSchema, you can probably create the Avro readers 
internally with an Avro schema (I believe).

Greetings,
Stephan


On Tue, Aug 2, 2016 at 4:53 PM, Alam, Zeeshan 
<zeeshan.a...@fmr.com<mailto:zeeshan.a...@fmr.com>> wrote:
Hi Stephan,

I went through one of the old mail thread 
http://mail-archives.apache.org/mod_mbox/flink-user/201510.mbox/%3CCANC1h_vq-TVjTNhXyYLoVso7GRGkdGWioM5Ppg%3DGoQPjvigqYg%40mail.gmail.com%3E


Here it is mentioned that  When reading from Kafka you are expected to define a 
DeserializationSchema. There is no out of the box (de)serializer for Flink with 
Kafka, but it should be not very hard to add.



I have some questions:



1.   As per FLINK-3691  you are adding GenericDatumReader, so I suppose I 
need to use it instead of DatumReader in my  DeserializationSchema which is 
required to read data from Kafka?



2.  What is the recommended way to read AVRO binary data from Kafka if I  have 
the AVRO schema file [*.avsc ] with me? Is there a better more efficient 
approach?



3.   Can AvroInputFormat be used to read Kafka data or 
DeserializationSchema is a must to read data from Kafka, also AvroInputFormat 
doesn’t have any javaDoc with it.





Thanks & Regards,
Zeeshan Alam




From: Stephan Ewen [mailto:se...@apache.org<mailto:se...@apache.org>]
Sent: Tuesday, August 02, 2016 7:52 PM
To: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: What is the recommended way to read AVRO data from Kafka using 
flink.

Hi!

I think this is a known limitation for Flink 1.0 and it is fixed in Flink 1.1

Here is the JIRA ticket: https://issues.apache.org/jira/browse/FLINK-3691

Here is the mail thread:
http://mail-archives.apache.org/mod_mbox/flink-user/201604.mbox/%3CCAOFSxKtJXfxRKm2=bplu+xvpwqrwd3c8ynuk3iwk9aqvgrc...@mail.gmail.com%3E

You could try and use the latest release candidate to get the 
fix:http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-Apache-Flink-1-1-0-RC1-td12723.html

The release is also happening, so should be out in a stable release soon.

Greetings,
Stephan


On Tue, Aug 2, 2016 at 4:04 PM, Alam, Zeeshan 
<zeeshan.a...@fmr.com<mailto:zeeshan.a...@fmr.com>> wrote:
Hi,

I am using Flink 1.0.3 and FlinkKafkaConsumer08 to read AVRO data from flink. I 
am having the AVRO schema file with me which was used to write data in Kafka. 
Here 
https://ci.apache.org/projects/flink/flink-docs-release-0.8/example_connectors.html
 you have mentioned that using the GenericData.Record type is possible with 
Flink, but not recommended. Since the record contains the full schema, its very 
data intensive and thus probably slow to use. So what is the recommended way to 
read AVRO data from Kafka using flink.

public static void main(String[] args) throws Exception {
  StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
  Properties properties = new Properties();
  properties.setProperty("bootstrap.servers", 
"dojo3x:9092,dojox:9092,dojox:9092");
  properties.setProperty("zookeeper.connect", 
"dojo3x:2181,dojox:2181,dojox:2181");
  properties.setProperty("group.id<http://group.id>", 
"Zeeshantest");
  AvroDeserializationSchema avroSchema = new 
AvroDeserializationSchema<>(GenericData.Record.class);
  FlinkKafkaConsumer08 kafkaConsumer = new 
FlinkKafkaConsumer08<>("myavrotopic", avroSchema, properties);
  DataStream messageStream = 
env.addSource(kafkaConsumer);
  messageStream.rebalance().print();
  env.execute("Flink AVRO KAFKA Test");
   }

This is the AvroDeserializationSchema that I am using.


public class AvroDeserializationSchema implements DeserializationSchema {

   private static final long serialVersionUID = 4330538776656642778L;

   private final Class avroType;
   private transient DatumReader reader;
   private transient BinaryDecoder decoder;

   public AvroDeserializationSchema(Class avroType) {
  this.avroType = avroType;
   }

   @Override
   p

Re: What is the recommended way to read AVRO data from Kafka using flink.

2016-08-04 Thread Stephan Ewen
Hi!

To read data from Kafka, you need a DeserializationSchema. You could create
one that wraps the AvroInputFormat, but an AvroDeserializationSchema would
simply be an adjustment of the AvroInputFormat to the interface of the
DeserializationSchema.

In your Avro DeserializationSchema, you can probably create the Avro
readers internally with an Avro schema (I believe).

Greetings,
Stephan


On Tue, Aug 2, 2016 at 4:53 PM, Alam, Zeeshan <zeeshan.a...@fmr.com> wrote:

> Hi Stephan,
>
>
>
> I went through one of the old mail thread
> http://mail-archives.apache.org/mod_mbox/flink-user/201510.mbox/%3CCANC1h_vq-TVjTNhXyYLoVso7GRGkdGWioM5Ppg%3DGoQPjvigqYg%40mail.gmail.com%3E
>
>
>
> Here it is mentioned that  *When reading from Kafka you are expected to 
> define a DeserializationSchema. There is no out of the box (de)serializer for 
> Flink with Kafka, but it should be not very hard to add.*
>
>
>
> I have some questions:
>
>
>
> 1.   As per FLINK-3691  you are adding *GenericDatumReader*, so I suppose 
> I need to use it instead of DatumReader in my  *DeserializationSchema *which 
> is required to read data from Kafka?
>
>
>
> 2.  What is the recommended way to read AVRO binary data from Kafka if I  
> have the AVRO schema file [*.avsc ] with me? Is there a better more efficient 
> approach?
>
>
>
> 3.   Can *AvroInputFormat* be used to read Kafka data or 
> *DeserializationSchema* is a must to read data from Kafka, also 
> *AvroInputFormat* doesn’t have any javaDoc with it.
>
>
>
>
>
>
>
> Thanks & Regards,
>
> Zeeshan Alam
>
>
>
>
>
>
>
> *From:* Stephan Ewen [mailto:se...@apache.org]
> *Sent:* Tuesday, August 02, 2016 7:52 PM
> *To:* user@flink.apache.org
> *Subject:* Re: What is the recommended way to read AVRO data from Kafka
> using flink.
>
>
>
> Hi!
>
>
>
> I think this is a known limitation for Flink 1.0 and it is fixed in Flink
> 1.1
>
>
>
> Here is the JIRA ticket: https://issues.apache.org/jira/browse/FLINK-3691
>
>
>
> Here is the mail thread:
>
>
> http://mail-archives.apache.org/mod_mbox/flink-user/201604.mbox/%3CCAOFSxKtJXfxRKm2=bplu+xvpwqrwd3c8ynuk3iwk9aqvgrc...@mail.gmail.com%3E
>
>
>
> You could try and use the latest release candidate to get the fix:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-Apache-Flink-1-1-0-RC1-td12723.html
>
>
>
> The release is also happening, so should be out in a stable release soon.
>
>
>
> Greetings,
>
> Stephan
>
>
>
>
>
> On Tue, Aug 2, 2016 at 4:04 PM, Alam, Zeeshan <zeeshan.a...@fmr.com>
> wrote:
>
> Hi,
>
>
>
> I am using *Flink 1.0.3* and *FlinkKafkaConsumer08* to read AVRO data
> from flink. I am having the* AVRO schema file* with me which was used to
> write data in Kafka. Here
> https://ci.apache.org/projects/flink/flink-docs-release-0.8/example_connectors.html
> you have mentioned that using the GenericData.Record type is possible with
> Flink, but not recommended. Since the record contains the full schema, its
> very data intensive and thus probably slow to use. So what is the
> recommended way to read AVRO data from Kafka using flink.
>
>
>
> *public* *static* *void* main(String[] args) *throws* Exception {
>
>   StreamExecutionEnvironment env = StreamExecutionEnvironment.
> *getExecutionEnvironment*();
>
>   Properties properties = *new* Properties();
>
>   properties.setProperty("bootstrap.servers",
> "dojo3x:9092,dojox:9092,dojox:9092");
>
>   properties.setProperty("zookeeper.connect",
> "dojo3x:2181,dojox:2181,dojox:2181");
>
>   properties.setProperty("group.id", "Zeeshantest");
>
>   AvroDeserializationSchema avroSchema =
> *new* AvroDeserializationSchema<>(GenericData.Record.*class*);
>
>   FlinkKafkaConsumer08 kafkaConsumer =
> *new* FlinkKafkaConsumer08<>("myavrotopic", avroSchema, properties);
>
>   DataStream messageStream = env
> .addSource(kafkaConsumer);
>
>   messageStream.rebalance().print();
>
>   env.execute("Flink AVRO KAFKA Test");
>
>}
>
>
>
> This is the *AvroDeserializationSchema* that I am using.
>
>
>
>
>
> *public* *class* AvroDeserializationSchema *implements*
> DeserializationSchema {
>
>
>
>*private* *static* *final* *long* *serialVersionUID* =
> 4330538776656642778L;
>
>
>
>*private* *final* Class avroType;
>
>   

RE: What is the recommended way to read AVRO data from Kafka using flink.

2016-08-02 Thread Alam, Zeeshan
Hi Stephan,

I went through one of the old mail thread 
http://mail-archives.apache.org/mod_mbox/flink-user/201510.mbox/%3CCANC1h_vq-TVjTNhXyYLoVso7GRGkdGWioM5Ppg%3DGoQPjvigqYg%40mail.gmail.com%3E


Here it is mentioned that  When reading from Kafka you are expected to define a 
DeserializationSchema. There is no out of the box (de)serializer for Flink with 
Kafka, but it should be not very hard to add.



I have some questions:



1.   As per FLINK-3691  you are adding GenericDatumReader, so I suppose I 
need to use it instead of DatumReader in my  DeserializationSchema which is 
required to read data from Kafka?



2.  What is the recommended way to read AVRO binary data from Kafka if I  have 
the AVRO schema file [*.avsc ] with me? Is there a better more efficient 
approach?



3.   Can AvroInputFormat be used to read Kafka data or 
DeserializationSchema is a must to read data from Kafka, also AvroInputFormat 
doesn’t have any javaDoc with it.





Thanks & Regards,
Zeeshan Alam




From: Stephan Ewen [mailto:se...@apache.org]
Sent: Tuesday, August 02, 2016 7:52 PM
To: user@flink.apache.org
Subject: Re: What is the recommended way to read AVRO data from Kafka using 
flink.

Hi!

I think this is a known limitation for Flink 1.0 and it is fixed in Flink 1.1

Here is the JIRA ticket: https://issues.apache.org/jira/browse/FLINK-3691

Here is the mail thread:
http://mail-archives.apache.org/mod_mbox/flink-user/201604.mbox/%3CCAOFSxKtJXfxRKm2=bplu+xvpwqrwd3c8ynuk3iwk9aqvgrc...@mail.gmail.com%3E

You could try and use the latest release candidate to get the 
fix:http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-Apache-Flink-1-1-0-RC1-td12723.html

The release is also happening, so should be out in a stable release soon.

Greetings,
Stephan


On Tue, Aug 2, 2016 at 4:04 PM, Alam, Zeeshan 
<zeeshan.a...@fmr.com<mailto:zeeshan.a...@fmr.com>> wrote:
Hi,

I am using Flink 1.0.3 and FlinkKafkaConsumer08 to read AVRO data from flink. I 
am having the AVRO schema file with me which was used to write data in Kafka. 
Here 
https://ci.apache.org/projects/flink/flink-docs-release-0.8/example_connectors.html
 you have mentioned that using the GenericData.Record type is possible with 
Flink, but not recommended. Since the record contains the full schema, its very 
data intensive and thus probably slow to use. So what is the recommended way to 
read AVRO data from Kafka using flink.

public static void main(String[] args) throws Exception {
  StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
  Properties properties = new Properties();
  properties.setProperty("bootstrap.servers", 
"dojo3x:9092,dojox:9092,dojox:9092");
  properties.setProperty("zookeeper.connect", 
"dojo3x:2181,dojox:2181,dojox:2181");
  properties.setProperty("group.id<http://group.id>", 
"Zeeshantest");
  AvroDeserializationSchema avroSchema = new 
AvroDeserializationSchema<>(GenericData.Record.class);
  FlinkKafkaConsumer08 kafkaConsumer = new 
FlinkKafkaConsumer08<>("myavrotopic", avroSchema, properties);
  DataStream messageStream = 
env.addSource(kafkaConsumer);
  messageStream.rebalance().print();
  env.execute("Flink AVRO KAFKA Test");
   }

This is the AvroDeserializationSchema that I am using.


public class AvroDeserializationSchema implements DeserializationSchema {

   private static final long serialVersionUID = 4330538776656642778L;

   private final Class avroType;
   private transient DatumReader reader;
   private transient BinaryDecoder decoder;

   public AvroDeserializationSchema(Class avroType) {
  this.avroType = avroType;
   }

   @Override
   public T deserialize(byte[] message) {
  ensureInitialized();
  try {
 decoder = DecoderFactory.get().binaryDecoder(message, 
decoder);
 return reader.read(null, decoder);
  } catch (Exception e) {
 throw new RuntimeException(e);
  }
   }

   @Override
   public boolean isEndOfStream(T nextElement) {
  return false;
   }

   @Override
   public TypeInformation getProducedType() {
  return TypeExtractor.getForClass(avroType);
   }

   private void ensureInitialized() {
  if (reader == null) {
 if 
(org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroType)) {
   reader = new SpecificDatumReader(avroType);
 } else {
   reader = new ReflectDatumReader(avroType);
 }
  }
   }
}

On running this I am getting java.lang.Exception: Not a Specific class: class 
org.apache.avro.generic.GenericData$Record.

Thanks & Regards
Zeeshan Alam





Re: What is the recommended way to read AVRO data from Kafka using flink.

2016-08-02 Thread Stephan Ewen
Hi!

I think this is a known limitation for Flink 1.0 and it is fixed in Flink
1.1

Here is the JIRA ticket: https://issues.apache.org/jira/browse/FLINK-3691

Here is the mail thread:
http://mail-archives.apache.org/mod_mbox/flink-user/201604.mbox/%3CCAOFSxKtJXfxRKm2=bplu+xvpwqrwd3c8ynuk3iwk9aqvgrc...@mail.gmail.com%3E

You could try and use the latest release candidate to get the fix:
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-Apache-Flink-1-1-0-RC1-td12723.html

The release is also happening, so should be out in a stable release soon.

Greetings,
Stephan


On Tue, Aug 2, 2016 at 4:04 PM, Alam, Zeeshan <zeeshan.a...@fmr.com> wrote:

> Hi,
>
>
>
> I am using *Flink 1.0.3* and *FlinkKafkaConsumer08* to read AVRO data
> from flink. I am having the* AVRO schema file* with me which was used to
> write data in Kafka. Here
> https://ci.apache.org/projects/flink/flink-docs-release-0.8/example_connectors.html
> you have mentioned that using the GenericData.Record type is possible with
> Flink, but not recommended. Since the record contains the full schema, its
> very data intensive and thus probably slow to use. So what is the
> recommended way to read AVRO data from Kafka using flink.
>
>
>
> *public* *static* *void* main(String[] args) *throws* Exception {
>
>   StreamExecutionEnvironment env = StreamExecutionEnvironment.
> *getExecutionEnvironment*();
>
>   Properties properties = *new* Properties();
>
>   properties.setProperty("bootstrap.servers",
> "dojo3x:9092,dojox:9092,dojox:9092");
>
>   properties.setProperty("zookeeper.connect",
> "dojo3x:2181,dojox:2181,dojox:2181");
>
>   properties.setProperty("group.id", "Zeeshantest");
>
>   AvroDeserializationSchema avroSchema =
> *new* AvroDeserializationSchema<>(GenericData.Record.*class*);
>
>   FlinkKafkaConsumer08 kafkaConsumer =
> *new* FlinkKafkaConsumer08<>("myavrotopic", avroSchema, properties);
>
>   DataStream messageStream = env
> .addSource(kafkaConsumer);
>
>   messageStream.rebalance().print();
>
>   env.execute("Flink AVRO KAFKA Test");
>
>}
>
>
>
> This is the *AvroDeserializationSchema* that I am using.
>
>
>
>
>
> *public* *class* AvroDeserializationSchema *implements*
> DeserializationSchema {
>
>
>
>*private* *static* *final* *long* *serialVersionUID* =
> 4330538776656642778L;
>
>
>
>*private* *final* Class avroType;
>
>*private* *transient* DatumReader reader;
>
>*private* *transient* BinaryDecoder decoder;
>
>
>
>*public* AvroDeserializationSchema(Class avroType) {
>
>   *this*.avroType = avroType;
>
>}
>
>
>
>@Override
>
>*public* T deserialize(*byte*[] message) {
>
>   ensureInitialized();
>
>   *try* {
>
>  decoder = DecoderFactory.*get*().binaryDecoder(
> message, decoder);
>
>  *return* reader.read(*null*, decoder);
>
>   } *catch* (Exception e) {
>
>  *throw* *new* RuntimeException(e);
>
>   }
>
>}
>
>
>
>@Override
>
>*public* *boolean* isEndOfStream(T nextElement) {
>
>   *return* *false*;
>
>}
>
>
>
>@Override
>
>*public* TypeInformation getProducedType() {
>
>   *return* TypeExtractor.*getForClass*(avroType);
>
>}
>
>
>
>*private* *void* ensureInitialized() {
>
>   *if* (reader == *null*) {
>
>  *if* (org.apache.avro.specific.SpecificRecordBase.
> *class*.isAssignableFrom(avroType)) {
>
>reader = *new* SpecificDatumReader(avroType
> );
>
>  } *else* {
>
>reader = *new* ReflectDatumReader(avroType);
>
>  }
>
>   }
>
>}
>
> }
>
>
>
> On running this I am getting *java.lang.Exception*: Not a Specific class:
> class org.apache.avro.generic.GenericData$Record.
>
>
>
> *Thanks & Regards*
>
> *Zeeshan Alam *
>
> [image: cid:image001.jpg@01CFC06C.80406AE0]
>
> *[image: cid:image002.jpg@01CFC2B0.B0315750] +91 80 6626 5982  [image:
> cid:image003.jpg@01CFC2B0.B0315750] +91 7259501608 <%2B91%207259501608>*
>
> *Fidelity Internal Information*
> <http://fnw.fmr.com/issg/Popi_def-ex.html#internal>
>
>
>
> *Techworks Monitoring link*
> <https://techworks.fmr.com/products/monitoring-overview>
>
>
>
>
>


What is the recommended way to read AVRO data from Kafka using flink.

2016-08-02 Thread Alam, Zeeshan
Hi,

I am using Flink 1.0.3 and FlinkKafkaConsumer08 to read AVRO data from flink. I 
am having the AVRO schema file with me which was used to write data in Kafka. 
Here 
https://ci.apache.org/projects/flink/flink-docs-release-0.8/example_connectors.html
 you have mentioned that using the GenericData.Record type is possible with 
Flink, but not recommended. Since the record contains the full schema, its very 
data intensive and thus probably slow to use. So what is the recommended way to 
read AVRO data from Kafka using flink.

public static void main(String[] args) throws Exception {
  StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
  Properties properties = new Properties();
  properties.setProperty("bootstrap.servers", 
"dojo3x:9092,dojox:9092,dojox:9092");
  properties.setProperty("zookeeper.connect", 
"dojo3x:2181,dojox:2181,dojox:2181");
  properties.setProperty("group.id", "Zeeshantest");
  AvroDeserializationSchema avroSchema = new 
AvroDeserializationSchema<>(GenericData.Record.class);
  FlinkKafkaConsumer08 kafkaConsumer = new 
FlinkKafkaConsumer08<>("myavrotopic", avroSchema, properties);
  DataStream messageStream = 
env.addSource(kafkaConsumer);
  messageStream.rebalance().print();
  env.execute("Flink AVRO KAFKA Test");
   }

This is the AvroDeserializationSchema that I am using.


public class AvroDeserializationSchema implements DeserializationSchema {

   private static final long serialVersionUID = 4330538776656642778L;

   private final Class avroType;
   private transient DatumReader reader;
   private transient BinaryDecoder decoder;

   public AvroDeserializationSchema(Class avroType) {
  this.avroType = avroType;
   }

   @Override
   public T deserialize(byte[] message) {
  ensureInitialized();
  try {
 decoder = DecoderFactory.get().binaryDecoder(message, 
decoder);
 return reader.read(null, decoder);
  } catch (Exception e) {
 throw new RuntimeException(e);
  }
   }

   @Override
   public boolean isEndOfStream(T nextElement) {
  return false;
   }

   @Override
   public TypeInformation getProducedType() {
  return TypeExtractor.getForClass(avroType);
   }

   private void ensureInitialized() {
  if (reader == null) {
 if 
(org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroType)) {
   reader = new SpecificDatumReader(avroType);
 } else {
   reader = new ReflectDatumReader(avroType);
 }
  }
   }
}

On running this I am getting java.lang.Exception: Not a Specific class: class 
org.apache.avro.generic.GenericData$Record.

Thanks & Regards
Zeeshan Alam
[cid:image001.jpg@01CFC06C.80406AE0]
[cid:image002.jpg@01CFC2B0.B0315750] +91 80 6626 5982  
[cid:image003.jpg@01CFC2B0.B0315750]  +91 7259501608
Fidelity Internal Information<http://fnw.fmr.com/issg/Popi_def-ex.html#internal>

Techworks Monitoring 
link<https://techworks.fmr.com/products/monitoring-overview>