Sorry for cluttering the post with some code. I have attached couples of java file and one Avro-generated pojo.
I am facing some issues while reading / writing data using flink's DeSer /
Ser schema.
*A) << producer >> BeamKafkaFlinkAvroProducerTest *
>> if I use KafkaProducer directly (i.e. call produceSimpleData() ) ,
things are working fine (just for testing )
>> Using FlinkKafkaProducer as UnboundedSource (this is what I should do)
produceAvroData2() { ...
1) First, if I use >> AvroSerializationSchema schema = new
AvroSerializationSchema(Test.class);
i.e. essentially using Avro’s org.apache.avro.specific.SpecificDatumWriter
; I face following error >>
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to
org.apache.avro.generic.IndexedRecord
at org.apache.avro.generic.GenericData.getField(GenericData.java:580)
at org.apache.avro.generic.GenericData.getField(GenericData.java:595)
at org.apache.avro.generic.GenericDatumWriter.writeField(
GenericDatumWriter.java:112)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(
GenericDatumWriter.java:104)
2) Next, if I use *TypeInformationSerializationSchema* (irrespective of
AvroCoder in Pipeline) , things apparently work fine
as Kafka test consumer tool prints the message >>
java.lang.String{"uname": "Joe", "id": 6}
*B) <<Consumer>> , BeamKafkaFlinkAvroConsumerTest*
>> I understand we should either use TypeInformationSerializationSchema in
both consumer and producer OR
should use AvroDeserializationSchema and AvroSerializationSchema in
Consumer and Producer respectively !!
But, irrespective of using AvroDeserializationSchema or
TypeInformationSerializationSchema,
I get the following exception >>
Exception in thread "main" java.lang.NullPointerException: null value in
entry: V=null
at com.google.common.collect.CollectPreconditions.checkEntryNotNull(
CollectPreconditions.java:33)
at com.google.common.collect.SingletonImmutableBiMap.<init>(
SingletonImmutableBiMap.java:39)
at com.google.common.collect.ImmutableBiMap.of(ImmutableBiMap.java:49)
at com.google.common.collect.ImmutableMap.of(ImmutableMap.java:70)
at
org.apache.beam.sdk.coders.CoderRegistry.getDefaultOutputCoder(CoderRegistry.java:221)
Any clues whats wrong here ? May be missing some simple step
.. Trying to make a simple Avro schema work here and eventually we need to
deal with very complex Avro schema.
Much appreciate your help.
Thanks
Kaniska
On Tue, Apr 26, 2016 at 9:05 AM, Maximilian Michels <[email protected]> wrote:
> Hi Kaniska,
>
> To read data from Kafka, you need to supply a DeserializationSchema.
> Here is one you could use:
> https://gist.github.com/StephanEwen/d515e10dd1c609f70bed
>
> Similarly, to write data into Kafka using the Producer, you will need
> a SerializationSchema. You need to serialize your data into bytes
> using your Avro schema. Actually, you could use the AvroCoder which is
> supplied in Beam for this. Or you implement your own analogue to the
> DeserializationSchema above.
>
> - Max
>
>
> On Tue, Apr 26, 2016 at 8:43 AM, kaniska Mandal
> <[email protected]> wrote:
> > Hi,
> >
> > I followed the example in AvroITCase.java - which reads/writes Avro data
> > from/to filesystem.
> >> I don't want to update AvroIO to embed Kafka consumer / producer
> >
> > Also looked into - https://issues.apache.org/jira/browse/FLINK-2597
> >> But I couldn't instantiate TypeInformationSerializationSchema in
> following
> >> FlinkKafkaConsumer / Producer - as I am not sure how do get access to
> >> ExecutionConfig
> >
> > Essentially, need help to change the following code in Beam Pipeline
> >> to read Avro data from Kafka and deserialize into my custom object.
> >
> > public static UnboundedSource<String, CheckpointMark> consumeMessages() {
> > FlinkKafkaConsumer08<String> kafkaConsumer = new
> > FlinkKafkaConsumer08<>(options.getKafkaTopic(),
> > ? , props);
> >
> > return UnboundedFlinkSource.of(kafkaConsumer);
> > }
> >
> >> and how write Avro data into kafka ?
> > public static void produceData(){
> > FlinkKafkaProducer08<String> kafkaSink =
> > new FlinkKafkaProducer08<>(TOPIC, ? , props);
> >
> > Pipeline pipeline = Pipeline.create(options);
> > pipeline
> > .apply(Create.of(
> > new User("Joe", 3, "red"),
> > new User("Mary", 4, "blue"),
> > new User("Mark", 1, "green"),
> > new User("Julia", 5, "purple"))
> > .withCoder(AvroCoder.of(User.class)))
> >
> > .apply(transformationToWriteToKafkaSink());
> >
> > pipeline.run();
> >
> > }
> >
> > Thanks
> > Kaniska
>
Test.java
Description: Binary data
BeamKafkaFlinkAvroConsumerTest.java
Description: Binary data
BeamKafkaFlinkAvroProducerTest.java
Description: Binary data
