Hi Raghu,

Thanks much for the update

We are using kafka 0.8.x

Can you please also add a working example (beam+flink+kafka) with
Avro-generated Pojo ?

Kaniska

On Wed, Apr 27, 2016 at 12:14 PM, Raghu Angadi <[email protected]> wrote:

> Kaniska,
>
> If your kafka cluster is running 0.9, you can also try native KafkaIO
> <https://github.com/apache/incubator-beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L88>
> merged recently into Beam. It supports reading from Kafka and I am working
> on Sink support (this week).
>
> See TopHashtagsExample.java
> <https://github.com/apache/incubator-beam/pull/142/commits/f5c809d5d3551c4fcb64bc7bcde0c64f8dd76e0a#diff-796ac0dad9e90975cfea2e2b05a90d69R121>
> for a complete example (it writes results back to Kafka in ParDo() rather
> than a Sink). To use AvroCoder, your consumer test will have something like
>
>    pipline.apply(KafkaIO.read()
>             .withBootstrapServers(options.getBroker())
>             .withTopics(ImmutableList.of(TOPICS))
>             .withValueCoder(AvroCoder.of(User.class))
>            ...
>
> Raghu.
>
>
>
> On Wed, Apr 27, 2016 at 10:05 AM, Maximilian Michels <[email protected]>
> wrote:
>
>> Hi Kaniska,
>>
>> Thanks for your mail. First of all, let us state clearly the problem
>> you are trying to solve.
>>
>> Do you want to use Avro serialization or Flink serialization? If you
>> use the TypeInformationSerializationSchema you use Flink's
>> serialization stack - no Avro involved then. You have to be consistent
>> and stick with either one. Otherwise problems are bound to happen as
>> the Flink serialization doesn't understand Avro's serialization and
>> also the other way around.
>>
>> From your initial question it appears you want to read/write Avro
>> serialized data from/to Kafka. So let's see how to do this:
>>
>> ============
>> A custom class
>> ============
>>
>> Let's assume you have a class like this:
>>
>>   public class MyCounts implements Serializable {
>>
>>     private String word;
>>     private long count;
>>
>>     public MyCounts() {}
>>
>>     public MyCounts(String word, long count) {
>>       this.word = word;
>>       this.count = count;
>>     }
>>
>>     @Override
>>     public String toString() {
>>       return "MyCounts{" +
>>           "word='" + word + '\'' +
>>           ", count=" + count +
>>           '}';
>>     }
>>   }
>>
>>
>> =================================
>> The Serialization / Deserialization Schema
>> =================================
>>
>> This is the schema for the Kafka Producer/Consumer to
>> serialize/deserialize data:
>>
>>   public class AvroSerializationDeserializationSchema<T>
>>       implements SerializationSchema<T>, DeserializationSchema<T> {
>>
>>     private final Class<T> avroType;
>>
>>     private final AvroCoder<T> coder;
>>     private transient ByteArrayOutputStream out;
>>
>>     public AvroSerializationDeserializationSchema(Class<T> clazz) {
>>       this.avroType = clazz;
>>       this.coder = AvroCoder.of(clazz);
>>       this.out = new ByteArrayOutputStream();
>>     }
>>
>>     @Override
>>     public byte[] serialize(T element) {
>>       if (out == null) {
>>         out = new ByteArrayOutputStream();
>>       }
>>       try {
>>         out.reset();
>>         coder.encode(element, out, Coder.Context.NESTED);
>>       } catch (IOException e) {
>>         throw new RuntimeException("Avro encoding failed.", e);
>>       }
>>       return out.toByteArray();
>>     }
>>
>>     @Override
>>     public T deserialize(byte[] message) throws IOException {
>>       return coder.decode(new ByteArrayInputStream(message),
>> Coder.Context.NESTED);
>>     }
>>
>>     @Override
>>     public boolean isEndOfStream(T nextElement) {
>>       return false;
>>     }
>>
>>     @Override
>>     public TypeInformation<T> getProducedType() {
>>       return TypeExtractor.getForClass(avroType);
>>     }
>>   }
>>
>> ======================================
>> Writing some Avro serialized data to a Kafka topic
>> ======================================
>>
>>     Pipeline pipeline = Pipeline.create(options);
>>
>>     PCollection<MyCounts> words =
>>         pipeline.apply(Create.of(
>>             new MyCounts("word", 1L),
>>             new MyCounts("another", 2L),
>>             new MyCounts("yet another", 3L)));
>>
>>     FlinkKafkaProducer08<MyCounts> kafkaSink =
>>         new FlinkKafkaProducer08<>(options.getKafkaOutputTopic(),
>>             new
>> AvroSerializationDeserializationSchema<>(MyCounts.class), props);
>>
>>     words.apply(Write.to(UnboundedFlinkSink.of(kafkaSink)));
>>
>>     pipeline.run();
>>
>> Let's execute that.
>>
>> ====================================
>> Reading Avro serialized data from a Kafka topic
>> ====================================
>>
>> Now time to read back data from Kafka:
>>
>>     Pipeline pipeline = Pipeline.create(options);
>>
>>     FlinkKafkaConsumer08<MyCounts> kafkaConsumer = new
>> FlinkKafkaConsumer08<>(
>>         options.getKafkaTopic(),
>>         new AvroSerializationDeserializationSchema<>(MyCounts.class),
>> props);
>>
>>     PCollection<MyCounts> words = pipeline
>>         .apply(Read.from(UnboundedFlinkSource.of(kafkaConsumer)))
>>         .apply(ParDo.of(new PrintFn()));
>>
>>     pipeline.run();
>>
>> ===================
>> Executing the examples
>> ===================
>>
>> It prints:
>>
>>     MyCounts{word='word', count=1}
>>     MyCounts{word='another', count=2}
>>     MyCounts{word='yet another', count=3}
>>
>>
>> Let me know if that helps you! I've omitted the Kafka props and
>> options for brevity.
>>
>> I hope that we will soon have native Kafka IO for both reading and
>> writing to Kafka available in Beam.
>>
>> Cheers,
>> Max
>>
>> On Wed, Apr 27, 2016 at 4:21 AM, kaniska Mandal
>> <[email protected]> wrote:
>> > Sorry for cluttering the post with some code.
>> > I have attached couples of java file and one Avro-generated pojo.
>> >
>> > I am facing some issues while reading / writing data using  flink's
>> DeSer /
>> > Ser schema.
>> >
>> >
>> > A) << producer >> BeamKafkaFlinkAvroProducerTest
>> >
>> >>> if I use  KafkaProducer directly (i.e. call produceSimpleData()  ) ,
>> >>> things are working fine   (just for testing )
>> >
>> >>> Using FlinkKafkaProducer as UnboundedSource  (this is what I should
>> do)
>> >
>> > produceAvroData2() { ...
>> >
>> > 1) First, if I use >> AvroSerializationSchema schema = new
>> > AvroSerializationSchema(Test.class);
>> >
>> > i.e. essentially using Avro’s
>> org.apache.avro.specific.SpecificDatumWriter ;
>> > I face following error >>
>> >
>> > Caused by: java.lang.ClassCastException: java.lang.String cannot be
>> cast to
>> > org.apache.avro.generic.IndexedRecord
>> >
>> > at org.apache.avro.generic.GenericData.getField(GenericData.java:580)
>> >
>> > at org.apache.avro.generic.GenericData.getField(GenericData.java:595)
>> >
>> > at
>> >
>> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:112)
>> >
>> > at
>> >
>> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
>> >
>> >
>> > 2) Next,  if I use TypeInformationSerializationSchema (irrespective of
>> > AvroCoder in Pipeline) , things apparently work fine
>> >
>> > as Kafka test consumer tool prints the message  >>
>> > java.lang.String{"uname": "Joe", "id": 6}
>> >
>> >
>> > B) <<Consumer>> ,  BeamKafkaFlinkAvroConsumerTest
>> >
>> >>> I understand we should either use TypeInformationSerializationSchema
>> in
>> >>> both consumer and producer OR
>> >
>> > should use AvroDeserializationSchema and AvroSerializationSchema in
>> Consumer
>> > and Producer respectively !!
>> >
>> > But, irrespective of using AvroDeserializationSchema or
>> > TypeInformationSerializationSchema, I get the following exception >>
>> >
>> > Exception in thread "main" java.lang.NullPointerException: null value in
>> > entry: V=null
>> >
>> > at
>> >
>> com.google.common.collect.CollectPreconditions.checkEntryNotNull(CollectPreconditions.java:33)
>> >
>> > at
>> >
>> com.google.common.collect.SingletonImmutableBiMap.<init>(SingletonImmutableBiMap.java:39)
>> >
>> > at com.google.common.collect.ImmutableBiMap.of(ImmutableBiMap.java:49)
>> >
>> > at com.google.common.collect.ImmutableMap.of(ImmutableMap.java:70)
>> >
>> > at
>> >
>> org.apache.beam.sdk.coders.CoderRegistry.getDefaultOutputCoder(CoderRegistry.java:221)
>> >
>> >
>> > Any clues whats wrong here ?   May be missing some simple step
>> > .. Trying to make a simple Avro schema work here and eventually we need
>> to
>> > deal with very complex Avro schema.
>> >
>> > Much appreciate your help.
>> >
>> > Thanks
>> > Kaniska
>> >
>> > On Tue, Apr 26, 2016 at 9:05 AM, Maximilian Michels <[email protected]>
>> wrote:
>> >>
>> >> Hi Kaniska,
>> >>
>> >> To read data from Kafka, you need to supply a DeserializationSchema.
>> >> Here is one you could use:
>> >> https://gist.github.com/StephanEwen/d515e10dd1c609f70bed
>> >>
>> >> Similarly, to write data into Kafka using the Producer, you will need
>> >> a SerializationSchema. You need to serialize your data into bytes
>> >> using your Avro schema. Actually, you could use the AvroCoder which is
>> >> supplied in Beam for this. Or you implement your own analogue to the
>> >> DeserializationSchema above.
>> >>
>> >> - Max
>> >>
>> >>
>> >> On Tue, Apr 26, 2016 at 8:43 AM, kaniska Mandal
>> >> <[email protected]> wrote:
>> >> > Hi,
>> >> >
>> >> > I followed the example in AvroITCase.java - which reads/writes Avro
>> data
>> >> > from/to  filesystem.
>> >> >> I don't want to update AvroIO to embed Kafka consumer / producer
>> >> >
>> >> > Also looked into - https://issues.apache.org/jira/browse/FLINK-2597
>> >> >> But I couldn't instantiate TypeInformationSerializationSchema in
>> >> >> following
>> >> >> FlinkKafkaConsumer / Producer - as I am not sure how do get access
>> to
>> >> >> ExecutionConfig
>> >> >
>> >> > Essentially, need help to change the following code in Beam Pipeline
>> >> >> to read Avro data from Kafka and deserialize into my custom object.
>> >> >
>> >> > public static UnboundedSource<String, CheckpointMark>
>> consumeMessages()
>> >> > {
>> >> >         FlinkKafkaConsumer08<String> kafkaConsumer = new
>> >> > FlinkKafkaConsumer08<>(options.getKafkaTopic(),
>> >> >                 ? , props);
>> >> >
>> >> >         return UnboundedFlinkSource.of(kafkaConsumer);
>> >> >     }
>> >> >
>> >> >>  and how write Avro data into kafka ?
>> >> > public static void produceData(){
>> >> >         FlinkKafkaProducer08<String> kafkaSink =
>> >> >                 new FlinkKafkaProducer08<>(TOPIC, ? , props);
>> >> >
>> >> >         Pipeline pipeline = Pipeline.create(options);
>> >> >         pipeline
>> >> >         .apply(Create.of(
>> >> >                 new User("Joe", 3, "red"),
>> >> >                 new User("Mary", 4, "blue"),
>> >> >                 new User("Mark", 1, "green"),
>> >> >                 new User("Julia", 5, "purple"))
>> >> >             .withCoder(AvroCoder.of(User.class)))
>> >> >
>> >> >         .apply(transformationToWriteToKafkaSink());
>> >> >
>> >> >         pipeline.run();
>> >> >
>> >> >     }
>> >> >
>> >> > Thanks
>> >> > Kaniska
>> >
>> >
>>
>
>

Reply via email to