Hi,
I am using Flink 1.0.3 and FlinkKafkaConsumer08 to read AVRO data from flink. I
am having the AVRO schema file with me which was used to write data in Kafka.
Here
https://ci.apache.org/projects/flink/flink-docs-release-0.8/example_connectors.html
you have mentioned that using the GenericData.Record type is possible with
Flink, but not recommended. Since the record contains the full schema, its very
data intensive and thus probably slow to use. So what is the recommended way to
read AVRO data from Kafka using flink.
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers",
"dojo3xxxxx:9092,dojoxxxxx:9092,dojoxxxxx:9092");
properties.setProperty("zookeeper.connect",
"dojo3xxxxx:2181,dojoxxxxx:2181,dojoxxxxx:2181");
properties.setProperty("group.id", "Zeeshantest");
AvroDeserializationSchema<GenericData.Record> avroSchema = new
AvroDeserializationSchema<>(GenericData.Record.class);
FlinkKafkaConsumer08<GenericData.Record> kafkaConsumer = new
FlinkKafkaConsumer08<>("myavrotopic", avroSchema, properties);
DataStream<GenericData.Record> messageStream =
env.addSource(kafkaConsumer);
messageStream.rebalance().print();
env.execute("Flink AVRO KAFKA Test");
}
This is the AvroDeserializationSchema that I am using.
public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {
private static final long serialVersionUID = 4330538776656642778L;
private final Class<T> avroType;
private transient DatumReader<T> reader;
private transient BinaryDecoder decoder;
public AvroDeserializationSchema(Class<T> avroType) {
this.avroType = avroType;
}
@Override
public T deserialize(byte[] message) {
ensureInitialized();
try {
decoder = DecoderFactory.get().binaryDecoder(message,
decoder);
return reader.read(null, decoder);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public boolean isEndOfStream(T nextElement) {
return false;
}
@Override
public TypeInformation<T> getProducedType() {
return TypeExtractor.getForClass(avroType);
}
private void ensureInitialized() {
if (reader == null) {
if
(org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroType)) {
reader = new SpecificDatumReader<T>(avroType);
} else {
reader = new ReflectDatumReader<T>(avroType);
}
}
}
}
On running this I am getting java.lang.Exception: Not a Specific class: class
org.apache.avro.generic.GenericData$Record.
Thanks & Regards
Zeeshan Alam
[cid:[email protected]]
[cid:[email protected]] +91 80 6626 5982
[cid:[email protected]] +91 7259501608
Fidelity Internal Information<http://fnw.fmr.com/issg/Popi_def-ex.html#internal>
Techworks Monitoring
link<https://techworks.fmr.com/products/monitoring-overview>