Hi!

To read data from Kafka, you need a DeserializationSchema. You could create
one that wraps the AvroInputFormat, but an AvroDeserializationSchema would
simply be an adjustment of the AvroInputFormat to the interface of the
DeserializationSchema.

In your Avro DeserializationSchema, you can probably create the Avro
readers internally with an Avro schema (I believe).

Greetings,
Stephan


On Tue, Aug 2, 2016 at 4:53 PM, Alam, Zeeshan <zeeshan.a...@fmr.com> wrote:

> Hi Stephan,
>
>
>
> I went through one of the old mail thread
> http://mail-archives.apache.org/mod_mbox/flink-user/201510.mbox/%3CCANC1h_vq-TVjTNhXyYLoVso7GRGkdGWioM5Ppg%3DGoQPjvigqYg%40mail.gmail.com%3E
>
>
>
> Here it is mentioned that  *When reading from Kafka you are expected to 
> define a DeserializationSchema. There is no out of the box (de)serializer for 
> Flink with Kafka, but it should be not very hard to add.*
>
>
>
> I have some questions:
>
>
>
> 1.       As per FLINK-3691  you are adding *GenericDatumReader*, so I suppose 
> I need to use it instead of DatumReader in my  *DeserializationSchema *which 
> is required to read data from Kafka?
>
>
>
> 2.  What is the recommended way to read AVRO binary data from Kafka if I  
> have the AVRO schema file [*.avsc ] with me? Is there a better more efficient 
> approach?
>
>
>
> 3.       Can *AvroInputFormat* be used to read Kafka data or 
> *DeserializationSchema* is a must to read data from Kafka, also 
> *AvroInputFormat* doesn’t have any javaDoc with it.
>
>
>
>
>
>
>
> Thanks & Regards,
>
> Zeeshan Alam
>
>
>
>
>
>
>
> *From:* Stephan Ewen [mailto:se...@apache.org]
> *Sent:* Tuesday, August 02, 2016 7:52 PM
> *To:* user@flink.apache.org
> *Subject:* Re: What is the recommended way to read AVRO data from Kafka
> using flink.
>
>
>
> Hi!
>
>
>
> I think this is a known limitation for Flink 1.0 and it is fixed in Flink
> 1.1
>
>
>
> Here is the JIRA ticket: https://issues.apache.org/jira/browse/FLINK-3691
>
>
>
> Here is the mail thread:
>
>
> http://mail-archives.apache.org/mod_mbox/flink-user/201604.mbox/%3CCAOFSxKtJXfxRKm2=bplu+xvpwqrwd3c8ynuk3iwk9aqvgrc...@mail.gmail.com%3E
>
>
>
> You could try and use the latest release candidate to get the fix:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-Apache-Flink-1-1-0-RC1-td12723.html
>
>
>
> The release is also happening, so should be out in a stable release soon.
>
>
>
> Greetings,
>
> Stephan
>
>
>
>
>
> On Tue, Aug 2, 2016 at 4:04 PM, Alam, Zeeshan <zeeshan.a...@fmr.com>
> wrote:
>
> Hi,
>
>
>
> I am using *Flink 1.0.3* and *FlinkKafkaConsumer08* to read AVRO data
> from flink. I am having the* AVRO schema file* with me which was used to
> write data in Kafka. Here
> https://ci.apache.org/projects/flink/flink-docs-release-0.8/example_connectors.html
> you have mentioned that using the GenericData.Record type is possible with
> Flink, but not recommended. Since the record contains the full schema, its
> very data intensive and thus probably slow to use. So what is the
> recommended way to read AVRO data from Kafka using flink.
>
>
>
> *public* *static* *void* main(String[] args) *throws* Exception {
>
>               StreamExecutionEnvironment env = StreamExecutionEnvironment.
> *getExecutionEnvironment*();
>
>               Properties properties = *new* Properties();
>
>               properties.setProperty("bootstrap.servers",
> "dojo3xxxxx:9092,dojoxxxxx:9092,dojoxxxxx:9092");
>
>               properties.setProperty("zookeeper.connect",
> "dojo3xxxxx:2181,dojoxxxxx:2181,dojoxxxxx:2181");
>
>               properties.setProperty("group.id", "Zeeshantest");
>
>               AvroDeserializationSchema<GenericData.Record> avroSchema =
> *new* AvroDeserializationSchema<>(GenericData.Record.*class*);
>
>               FlinkKafkaConsumer08<GenericData.Record> kafkaConsumer =
> *new* FlinkKafkaConsumer08<>("myavrotopic", avroSchema, properties);
>
>               DataStream<GenericData.Record> messageStream = env
> .addSource(kafkaConsumer);
>
>               messageStream.rebalance().print();
>
>               env.execute("Flink AVRO KAFKA Test");
>
>        }
>
>
>
> This is the *AvroDeserializationSchema* that I am using.
>
>
>
>
>
> *public* *class* AvroDeserializationSchema<T> *implements*
> DeserializationSchema<T> {
>
>
>
>        *private* *static* *final* *long* *serialVersionUID* =
> 4330538776656642778L;
>
>
>
>        *private* *final* Class<T> avroType;
>
>        *private* *transient* DatumReader<T> reader;
>
>        *private* *transient* BinaryDecoder decoder;
>
>
>
>        *public* AvroDeserializationSchema(Class<T> avroType) {
>
>               *this*.avroType = avroType;
>
>        }
>
>
>
>        @Override
>
>        *public* T deserialize(*byte*[] message) {
>
>               ensureInitialized();
>
>               *try* {
>
>                      decoder = DecoderFactory.*get*().binaryDecoder(
> message, decoder);
>
>                      *return* reader.read(*null*, decoder);
>
>               } *catch* (Exception e) {
>
>                      *throw* *new* RuntimeException(e);
>
>               }
>
>        }
>
>
>
>        @Override
>
>        *public* *boolean* isEndOfStream(T nextElement) {
>
>               *return* *false*;
>
>        }
>
>
>
>        @Override
>
>        *public* TypeInformation<T> getProducedType() {
>
>               *return* TypeExtractor.*getForClass*(avroType);
>
>        }
>
>
>
>        *private* *void* ensureInitialized() {
>
>               *if* (reader == *null*) {
>
>                      *if* (org.apache.avro.specific.SpecificRecordBase.
> *class*.isAssignableFrom(avroType)) {
>
>                            reader = *new* SpecificDatumReader<T>(avroType
> );
>
>                      } *else* {
>
>                            reader = *new* ReflectDatumReader<T>(avroType);
>
>                      }
>
>               }
>
>        }
>
> }
>
>
>
> On running this I am getting *java.lang.Exception*: Not a Specific class:
> class org.apache.avro.generic.GenericData$Record.
>
>
>
> *Thanks & Regards*
>
> *Zeeshan Alam *
>
>
>
>
>
>
>

Reply via email to