Sorry for cluttering the post with some code.
I have attached couples of java file and one Avro-generated pojo.

I am facing some issues while reading / writing data using  flink's DeSer /
Ser schema.


*A) << producer >> BeamKafkaFlinkAvroProducerTest *

>> if I use  KafkaProducer directly (i.e. call produceSimpleData()  ) ,
things are working fine   (just for testing )

>> Using FlinkKafkaProducer as UnboundedSource  (this is what I should do)

produceAvroData2() { ...

1) First, if I use >> AvroSerializationSchema schema = new
AvroSerializationSchema(Test.class);

i.e. essentially using Avro’s org.apache.avro.specific.SpecificDatumWriter
; I face following error >>

Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to
org.apache.avro.generic.IndexedRecord

at org.apache.avro.generic.GenericData.getField(GenericData.java:580)

at org.apache.avro.generic.GenericData.getField(GenericData.java:595)

at org.apache.avro.generic.GenericDatumWriter.writeField(
GenericDatumWriter.java:112)

at org.apache.avro.generic.GenericDatumWriter.writeRecord(
GenericDatumWriter.java:104)


2) Next,  if I use *TypeInformationSerializationSchema* (irrespective of
AvroCoder in Pipeline) , things apparently work fine

as Kafka test consumer tool prints the message  >>
java.lang.String{"uname": "Joe", "id": 6}


*B) <<Consumer>> ,  BeamKafkaFlinkAvroConsumerTest*

>> I understand we should either use TypeInformationSerializationSchema in
both consumer and producer OR

should use AvroDeserializationSchema and AvroSerializationSchema in
Consumer and Producer respectively !!

But, irrespective of using AvroDeserializationSchema or
TypeInformationSerializationSchema,
I get the following exception >>

Exception in thread "main" java.lang.NullPointerException: null value in
entry: V=null

at com.google.common.collect.CollectPreconditions.checkEntryNotNull(
CollectPreconditions.java:33)

at com.google.common.collect.SingletonImmutableBiMap.<init>(
SingletonImmutableBiMap.java:39)

at com.google.common.collect.ImmutableBiMap.of(ImmutableBiMap.java:49)

at com.google.common.collect.ImmutableMap.of(ImmutableMap.java:70)

at
org.apache.beam.sdk.coders.CoderRegistry.getDefaultOutputCoder(CoderRegistry.java:221)


Any clues whats wrong here ?   May be missing some simple step
.. Trying to make a simple Avro schema work here and eventually we need to
deal with very complex Avro schema.

Much appreciate your help.

Thanks
Kaniska

On Tue, Apr 26, 2016 at 9:05 AM, Maximilian Michels <[email protected]> wrote:

> Hi Kaniska,
>
> To read data from Kafka, you need to supply a DeserializationSchema.
> Here is one you could use:
> https://gist.github.com/StephanEwen/d515e10dd1c609f70bed
>
> Similarly, to write data into Kafka using the Producer, you will need
> a SerializationSchema. You need to serialize your data into bytes
> using your Avro schema. Actually, you could use the AvroCoder which is
> supplied in Beam for this. Or you implement your own analogue to the
> DeserializationSchema above.
>
> - Max
>
>
> On Tue, Apr 26, 2016 at 8:43 AM, kaniska Mandal
> <[email protected]> wrote:
> > Hi,
> >
> > I followed the example in AvroITCase.java - which reads/writes Avro data
> > from/to  filesystem.
> >> I don't want to update AvroIO to embed Kafka consumer / producer
> >
> > Also looked into - https://issues.apache.org/jira/browse/FLINK-2597
> >> But I couldn't instantiate TypeInformationSerializationSchema in
> following
> >> FlinkKafkaConsumer / Producer - as I am not sure how do get access to
> >> ExecutionConfig
> >
> > Essentially, need help to change the following code in Beam Pipeline
> >> to read Avro data from Kafka and deserialize into my custom object.
> >
> > public static UnboundedSource<String, CheckpointMark> consumeMessages() {
> >         FlinkKafkaConsumer08<String> kafkaConsumer = new
> > FlinkKafkaConsumer08<>(options.getKafkaTopic(),
> >                 ? , props);
> >
> >         return UnboundedFlinkSource.of(kafkaConsumer);
> >     }
> >
> >>  and how write Avro data into kafka ?
> > public static void produceData(){
> >         FlinkKafkaProducer08<String> kafkaSink =
> >                 new FlinkKafkaProducer08<>(TOPIC, ? , props);
> >
> >         Pipeline pipeline = Pipeline.create(options);
> >         pipeline
> >         .apply(Create.of(
> >                 new User("Joe", 3, "red"),
> >                 new User("Mary", 4, "blue"),
> >                 new User("Mark", 1, "green"),
> >                 new User("Julia", 5, "purple"))
> >             .withCoder(AvroCoder.of(User.class)))
> >
> >         .apply(transformationToWriteToKafkaSink());
> >
> >         pipeline.run();
> >
> >     }
> >
> > Thanks
> > Kaniska
>

Attachment: Test.java
Description: Binary data

Attachment: BeamKafkaFlinkAvroConsumerTest.java
Description: Binary data

Attachment: BeamKafkaFlinkAvroProducerTest.java
Description: Binary data

Reply via email to