Hi Max,

Thanks again for pointing out that I missed the AvroCoder inside
 SerDeserSchema

Can you please try the attached avro-generated pojo Test.java ?

  >> just to identify if there is any issue with SerDeserSchema or I am
missing any dependency

  >> note that I am still referring to your branch of flink-runner (I
checked out 5 days back) - to resolve FlinkUnboundedSink  (btw is it merged
back to main ? / when its scheduled to be merged  )

*Here goes the current results :*

(A) Data Producer :

I just made the required changes in producer code per your suggestion

But when I am trying to produce data for the attached Avro pojo now , I am
facing the following exception -

Caused by: java.lang.RuntimeException: Could not forward element to next
operator

at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(
OperatorChain.java:354)

at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(
OperatorChain.java:337)

at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
TimestampedCollector.java:51)

at
org.apache.beam.runners.flink.translation.FlinkStreamingTransformTranslators$WriteSinkStreamingTranslator$1.flatMap(
FlinkStreamingTransformTranslators.java:215)

at
org.apache.beam.runners.flink.translation.FlinkStreamingTransformTranslators$WriteSinkStreamingTranslator$1.flatMap(
FlinkStreamingTransformTranslators.java:212)

at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(
StreamFlatMap.java:48)

at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(
OperatorChain.java:351)

... 9 more

Caused by: java.lang.NullPointerException: in com.xyz.schemas.Test null of
com.xyz.schemas.Test

at org.apache.avro.reflect.ReflectDatumWriter.write(
ReflectDatumWriter.java:145)

at org.apache.avro.generic.GenericDatumWriter.write(
GenericDatumWriter.java:58)

at org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:265)

at
com.xyz.topology.netflow.beam.AvroSerializationDeserializationSchema.serialize(
AvroSerializationDeserializationSchema.java:41)

at
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper.serializeValue(
KeyedSerializationSchemaWrapper.java:41)

at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(
FlinkKafkaProducerBase.java:252)

at org.apache.flink.streaming.api.operators.StreamSink.processElement(
StreamSink.java:39)

at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(
OperatorChain.java:351)

** Note that, when I revert back to I used  the AvroCoder along with
TypeInformationSerializationSchema
pipeline.apply(Create.of(new Test("Joe", 6))
*.withCoder(AvroCoder.of(Test.class))*).
                     apply(Write.to(UnboundedFlinkSink.of(kafkaSink)));

**  the exception doesn't occur and the message is generated properly

(B) Data Consumer :

I made the changes per your suggestion.
==================================
AvroSerializationDeserializationSchema schema = new
AvroSerializationDeserializationSchema(Test.class);

FlinkKafkaConsumer08<Test> kafkaConsumer = new FlinkKafkaConsumer08<>(TOPIC,
schema, props);

PCollection<Test> users = pipeline.apply(

Read.named("StreamingWordCount").from(new UnboundedFlinkSource.of(
kafkaConsumer))

PCollection<Long> counts = users.apply(ParDo.of(new PrintFn()));

PipelineResult result = pipeline.run();

** Then , executed the Producer with my original code -> so that the
messages are generated properly ...
kafkaSink uses TypeInformationSerializationSchema
pipeline.apply(Create.of(new Test("Joe", 6))
*.withCoder(AvroCoder.of(Test.class))*).
                     apply(Write.to(UnboundedFlinkSink.of(kafkaSink)));
**

I still get the error on the received side ...

Exception in thread "main" java.lang.NullPointerException: null value in
entry: InputT=null

at com.google.common.collect.CollectPreconditions.checkEntryNotNull(
CollectPreconditions.java:33)

at com.google.common.collect.SingletonImmutableBiMap.<init>(
SingletonImmutableBiMap.java:39)

at com.google.common.collect.ImmutableBiMap.of(ImmutableBiMap.java:49)

at com.google.common.collect.ImmutableMap.of(ImmutableMap.java:70)

at org.apache.beam.sdk.coders.CoderRegistry.getTypeToCoderBindings(
CoderRegistry.java:809)

at org.apache.beam.sdk.coders.CoderRegistry.getDefaultCoder(
CoderRegistry.java:204)
at org.apache.beam.sdk.transforms.ParDo$Bound.getDefaultOutputCoder(
ParDo.java:792)


On Wed, Apr 27, 2016 at 10:05 AM, Maximilian Michels <[email protected]> wrote:

> Hi Kaniska,
>
> Thanks for your mail. First of all, let us state clearly the problem
> you are trying to solve.
>
> Do you want to use Avro serialization or Flink serialization? If you
> use the TypeInformationSerializationSchema you use Flink's
> serialization stack - no Avro involved then. You have to be consistent
> and stick with either one. Otherwise problems are bound to happen as
> the Flink serialization doesn't understand Avro's serialization and
> also the other way around.
>
> From your initial question it appears you want to read/write Avro
> serialized data from/to Kafka. So let's see how to do this:
>
> ============
> A custom class
> ============
>
> Let's assume you have a class like this:
>
>   public class MyCounts implements Serializable {
>
>     private String word;
>     private long count;
>
>     public MyCounts() {}
>
>     public MyCounts(String word, long count) {
>       this.word = word;
>       this.count = count;
>     }
>
>     @Override
>     public String toString() {
>       return "MyCounts{" +
>           "word='" + word + '\'' +
>           ", count=" + count +
>           '}';
>     }
>   }
>
>
> =================================
> The Serialization / Deserialization Schema
> =================================
>
> This is the schema for the Kafka Producer/Consumer to
> serialize/deserialize data:
>
>   public class AvroSerializationDeserializationSchema<T>
>       implements SerializationSchema<T>, DeserializationSchema<T> {
>
>     private final Class<T> avroType;
>
>     private final AvroCoder<T> coder;
>     private transient ByteArrayOutputStream out;
>
>     public AvroSerializationDeserializationSchema(Class<T> clazz) {
>       this.avroType = clazz;
>       this.coder = AvroCoder.of(clazz);
>       this.out = new ByteArrayOutputStream();
>     }
>
>     @Override
>     public byte[] serialize(T element) {
>       if (out == null) {
>         out = new ByteArrayOutputStream();
>       }
>       try {
>         out.reset();
>         coder.encode(element, out, Coder.Context.NESTED);
>       } catch (IOException e) {
>         throw new RuntimeException("Avro encoding failed.", e);
>       }
>       return out.toByteArray();
>     }
>
>     @Override
>     public T deserialize(byte[] message) throws IOException {
>       return coder.decode(new ByteArrayInputStream(message),
> Coder.Context.NESTED);
>     }
>
>     @Override
>     public boolean isEndOfStream(T nextElement) {
>       return false;
>     }
>
>     @Override
>     public TypeInformation<T> getProducedType() {
>       return TypeExtractor.getForClass(avroType);
>     }
>   }
>
> ======================================
> Writing some Avro serialized data to a Kafka topic
> ======================================
>
>     Pipeline pipeline = Pipeline.create(options);
>
>     PCollection<MyCounts> words =
>         pipeline.apply(Create.of(
>             new MyCounts("word", 1L),
>             new MyCounts("another", 2L),
>             new MyCounts("yet another", 3L)));
>
>     FlinkKafkaProducer08<MyCounts> kafkaSink =
>         new FlinkKafkaProducer08<>(options.getKafkaOutputTopic(),
>             new
> AvroSerializationDeserializationSchema<>(MyCounts.class), props);
>
>     words.apply(Write.to(UnboundedFlinkSink.of(kafkaSink)));
>
>     pipeline.run();
>
> Let's execute that.
>
> ====================================
> Reading Avro serialized data from a Kafka topic
> ====================================
>
> Now time to read back data from Kafka:
>
>     Pipeline pipeline = Pipeline.create(options);
>
>     FlinkKafkaConsumer08<MyCounts> kafkaConsumer = new
> FlinkKafkaConsumer08<>(
>         options.getKafkaTopic(),
>         new AvroSerializationDeserializationSchema<>(MyCounts.class),
> props);
>
>     PCollection<MyCounts> words = pipeline
>         .apply(Read.from(UnboundedFlinkSource.of(kafkaConsumer)))
>         .apply(ParDo.of(new PrintFn()));
>
>     pipeline.run();
>
> ===================
> Executing the examples
> ===================
>
> It prints:
>
>     MyCounts{word='word', count=1}
>     MyCounts{word='another', count=2}
>     MyCounts{word='yet another', count=3}
>
>
> Let me know if that helps you! I've omitted the Kafka props and
> options for brevity.
>
> I hope that we will soon have native Kafka IO for both reading and
> writing to Kafka available in Beam.
>
> Cheers,
> Max
>
> On Wed, Apr 27, 2016 at 4:21 AM, kaniska Mandal
> <[email protected]> wrote:
> > Sorry for cluttering the post with some code.
> > I have attached couples of java file and one Avro-generated pojo.
> >
> > I am facing some issues while reading / writing data using  flink's
> DeSer /
> > Ser schema.
> >
> >
> > A) << producer >> BeamKafkaFlinkAvroProducerTest
> >
> >>> if I use  KafkaProducer directly (i.e. call produceSimpleData()  ) ,
> >>> things are working fine   (just for testing )
> >
> >>> Using FlinkKafkaProducer as UnboundedSource  (this is what I should do)
> >
> > produceAvroData2() { ...
> >
> > 1) First, if I use >> AvroSerializationSchema schema = new
> > AvroSerializationSchema(Test.class);
> >
> > i.e. essentially using Avro’s
> org.apache.avro.specific.SpecificDatumWriter ;
> > I face following error >>
> >
> > Caused by: java.lang.ClassCastException: java.lang.String cannot be cast
> to
> > org.apache.avro.generic.IndexedRecord
> >
> > at org.apache.avro.generic.GenericData.getField(GenericData.java:580)
> >
> > at org.apache.avro.generic.GenericData.getField(GenericData.java:595)
> >
> > at
> >
> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:112)
> >
> > at
> >
> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
> >
> >
> > 2) Next,  if I use TypeInformationSerializationSchema (irrespective of
> > AvroCoder in Pipeline) , things apparently work fine
> >
> > as Kafka test consumer tool prints the message  >>
> > java.lang.String{"uname": "Joe", "id": 6}
> >
> >
> > B) <<Consumer>> ,  BeamKafkaFlinkAvroConsumerTest
> >
> >>> I understand we should either use TypeInformationSerializationSchema in
> >>> both consumer and producer OR
> >
> > should use AvroDeserializationSchema and AvroSerializationSchema in
> Consumer
> > and Producer respectively !!
> >
> > But, irrespective of using AvroDeserializationSchema or
> > TypeInformationSerializationSchema, I get the following exception >>
> >
> > Exception in thread "main" java.lang.NullPointerException: null value in
> > entry: V=null
> >
> > at
> >
> com.google.common.collect.CollectPreconditions.checkEntryNotNull(CollectPreconditions.java:33)
> >
> > at
> >
> com.google.common.collect.SingletonImmutableBiMap.<init>(SingletonImmutableBiMap.java:39)
> >
> > at com.google.common.collect.ImmutableBiMap.of(ImmutableBiMap.java:49)
> >
> > at com.google.common.collect.ImmutableMap.of(ImmutableMap.java:70)
> >
> > at
> >
> org.apache.beam.sdk.coders.CoderRegistry.getDefaultOutputCoder(CoderRegistry.java:221)
> >
> >
> > Any clues whats wrong here ?   May be missing some simple step
> > .. Trying to make a simple Avro schema work here and eventually we need
> to
> > deal with very complex Avro schema.
> >
> > Much appreciate your help.
> >
> > Thanks
> > Kaniska
> >
> > On Tue, Apr 26, 2016 at 9:05 AM, Maximilian Michels <[email protected]>
> wrote:
> >>
> >> Hi Kaniska,
> >>
> >> To read data from Kafka, you need to supply a DeserializationSchema.
> >> Here is one you could use:
> >> https://gist.github.com/StephanEwen/d515e10dd1c609f70bed
> >>
> >> Similarly, to write data into Kafka using the Producer, you will need
> >> a SerializationSchema. You need to serialize your data into bytes
> >> using your Avro schema. Actually, you could use the AvroCoder which is
> >> supplied in Beam for this. Or you implement your own analogue to the
> >> DeserializationSchema above.
> >>
> >> - Max
> >>
> >>
> >> On Tue, Apr 26, 2016 at 8:43 AM, kaniska Mandal
> >> <[email protected]> wrote:
> >> > Hi,
> >> >
> >> > I followed the example in AvroITCase.java - which reads/writes Avro
> data
> >> > from/to  filesystem.
> >> >> I don't want to update AvroIO to embed Kafka consumer / producer
> >> >
> >> > Also looked into - https://issues.apache.org/jira/browse/FLINK-2597
> >> >> But I couldn't instantiate TypeInformationSerializationSchema in
> >> >> following
> >> >> FlinkKafkaConsumer / Producer - as I am not sure how do get access to
> >> >> ExecutionConfig
> >> >
> >> > Essentially, need help to change the following code in Beam Pipeline
> >> >> to read Avro data from Kafka and deserialize into my custom object.
> >> >
> >> > public static UnboundedSource<String, CheckpointMark>
> consumeMessages()
> >> > {
> >> >         FlinkKafkaConsumer08<String> kafkaConsumer = new
> >> > FlinkKafkaConsumer08<>(options.getKafkaTopic(),
> >> >                 ? , props);
> >> >
> >> >         return UnboundedFlinkSource.of(kafkaConsumer);
> >> >     }
> >> >
> >> >>  and how write Avro data into kafka ?
> >> > public static void produceData(){
> >> >         FlinkKafkaProducer08<String> kafkaSink =
> >> >                 new FlinkKafkaProducer08<>(TOPIC, ? , props);
> >> >
> >> >         Pipeline pipeline = Pipeline.create(options);
> >> >         pipeline
> >> >         .apply(Create.of(
> >> >                 new User("Joe", 3, "red"),
> >> >                 new User("Mary", 4, "blue"),
> >> >                 new User("Mark", 1, "green"),
> >> >                 new User("Julia", 5, "purple"))
> >> >             .withCoder(AvroCoder.of(User.class)))
> >> >
> >> >         .apply(transformationToWriteToKafkaSink());
> >> >
> >> >         pipeline.run();
> >> >
> >> >     }
> >> >
> >> > Thanks
> >> > Kaniska
> >
> >
>

Attachment: Test.java
Description: Binary data

Reply via email to