Hi,

generally speaking, we have to check that all runners work fine with the provided IO. I don't think it's a good idea that the runners themselves implement any IO: they should use "out of the box" IO.

I started to test the Spark runner around that. I will apply same test pipelines with the Flink runner to see how it behaves.

Regards
JB

On 04/28/2016 08:09 AM, kaniska Mandal wrote:
Hi Raghu,

When the KafkaIO sink code is merged , can you please add couple of
examples to demonstrate that :

1) KafkaIO works fine with FinkPipelineRunner

 > the reason I am asking this; earlier when I tried to port KafkaIO
code (added a custom Sink code - copied from KafkaWriter) from
spark-runner into beam sdk; I ended up registering
'KafkaIO.Write.Bound.class'   inside FlinkStreamingTransformTranslators
(I don't want to update the Flink-Runner core API)

It was a dirty hack and somehow worked (didn't test Avro that time) , so I quickly 
adapted Max's suggestion on FlinkUnboundedSource & Sink approach - which is 
very clean and worked for simple non-avro data

 > Is Max's contrib 'FlinkUnboudedSink' going to be merged as well ?

2) KafkaIO can serialize an Avro-generated POJO (e.g. sample attached
here)  into byte-array and then corresponding Sink can deserialize it
effortlessly .

 > this is a very important use case for Network Industry where
thousands of Sensor-generated machine-data are converted into Avro and
sent to Kafka
 > so I am trying to make it work seamlessly inside Beam-Flink Pipeline
( as explained in the other reply to the thread - based on Max's feedback )

** I am not yet able to Ser / DeSer the attached avro , even after
following Max's suggestions and even after

returning newAvroTypeInfo(avroType) , in method
AvroSerializationDeserializationSchema#getProducedType()

** I have posted the error messages in other response message


Thanks

Kaniska


On Wed, Apr 27, 2016 at 9:18 PM, Raghu Angadi <[email protected]
<mailto:[email protected]>> wrote:

    Oh,  does not work with 0.8.x as it it uses new consumer api in 0.9x.

    On Wed, Apr 27, 2016 at 4:35 PM, kaniska Mandal
    <[email protected] <mailto:[email protected]>> wrote:

        Hi Raghu,

        Thanks much for the update

        We are using kafka 0.8.x

        Can you please also add a working example (beam+flink+kafka)
        with Avro-generated Pojo ?

        Kaniska

        On Wed, Apr 27, 2016 at 12:14 PM, Raghu Angadi
        <[email protected] <mailto:[email protected]>> wrote:

            Kaniska,

            If your kafka cluster is running 0.9, you can also try
            native KafkaIO
            
<https://github.com/apache/incubator-beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L88>
            merged recently into Beam. It supports reading from Kafka
            and I am working on Sink support (this week).

            See TopHashtagsExample.java
            
<https://github.com/apache/incubator-beam/pull/142/commits/f5c809d5d3551c4fcb64bc7bcde0c64f8dd76e0a#diff-796ac0dad9e90975cfea2e2b05a90d69R121>
            for a complete example (it writes results back to Kafka in
            ParDo() rather than a Sink). To use AvroCoder, your consumer
            test will have something like

                pipline.apply(KafkaIO.read()
                         .withBootstrapServers(options.getBroker())
                         .withTopics(ImmutableList.of(TOPICS))
                         .withValueCoder(AvroCoder.of(User.class))
                        ...

            Raghu.



            On Wed, Apr 27, 2016 at 10:05 AM, Maximilian Michels
            <[email protected] <mailto:[email protected]>> wrote:

                Hi Kaniska,

                Thanks for your mail. First of all, let us state clearly
                the problem
                you are trying to solve.

                Do you want to use Avro serialization or Flink
                serialization? If you
                use the TypeInformationSerializationSchema you use Flink's
                serialization stack - no Avro involved then. You have to
                be consistent
                and stick with either one. Otherwise problems are bound
                to happen as
                the Flink serialization doesn't understand Avro's
                serialization and
                also the other way around.

                 From your initial question it appears you want to
                read/write Avro
                serialized data from/to Kafka. So let's see how to do this:

                ============
                A custom class
                ============

                Let's assume you have a class like this:

                   public class MyCounts implements Serializable {

                     private String word;
                     private long count;

                     public MyCounts() {}

                     public MyCounts(String word, long count) {
                       this.word = word;
                       this.count = count;
                     }

                     @Override
                     public String toString() {
                       return "MyCounts{" +
                           "word='" + word + '\'' +
                           ", count=" + count +
                           '}';
                     }
                   }


                =================================
                The Serialization / Deserialization Schema
                =================================

                This is the schema for the Kafka Producer/Consumer to
                serialize/deserialize data:

                   public class AvroSerializationDeserializationSchema<T>
                       implements SerializationSchema<T>,
                DeserializationSchema<T> {

                     private final Class<T> avroType;

                     private final AvroCoder<T> coder;
                     private transient ByteArrayOutputStream out;

                     public
                AvroSerializationDeserializationSchema(Class<T> clazz) {
                       this.avroType = clazz;
                       this.coder = AvroCoder.of(clazz);
                       this.out = new ByteArrayOutputStream();
                     }

                     @Override
                     public byte[] serialize(T element) {
                       if (out == null) {
                         out = new ByteArrayOutputStream();
                       }
                       try {
                         out.reset();
                         coder.encode(element, out, Coder.Context.NESTED);
                       } catch (IOException e) {
                         throw new RuntimeException("Avro encoding
                failed.", e);
                       }
                       return out.toByteArray();
                     }

                     @Override
                     public T deserialize(byte[] message) throws
                IOException {
                       return coder.decode(new
                ByteArrayInputStream(message),
                Coder.Context.NESTED);
                     }

                     @Override
                     public boolean isEndOfStream(T nextElement) {
                       return false;
                     }

                     @Override
                     public TypeInformation<T> getProducedType() {
                       return TypeExtractor.getForClass(avroType);
                     }
                   }

                ======================================
                Writing some Avro serialized data to a Kafka topic
                ======================================

                     Pipeline pipeline = Pipeline.create(options);

                     PCollection<MyCounts> words =
                         pipeline.apply(Create.of(
                             new MyCounts("word", 1L),
                             new MyCounts("another", 2L),
                             new MyCounts("yet another", 3L)));

                     FlinkKafkaProducer08<MyCounts> kafkaSink =
                         new
                FlinkKafkaProducer08<>(options.getKafkaOutputTopic(),
                             new
                AvroSerializationDeserializationSchema<>(MyCounts.class), 
props);


                words.apply(Write.to(UnboundedFlinkSink.of(kafkaSink)));

                     pipeline.run();

                Let's execute that.

                ====================================
                Reading Avro serialized data from a Kafka topic
                ====================================

                Now time to read back data from Kafka:

                     Pipeline pipeline = Pipeline.create(options);

                     FlinkKafkaConsumer08<MyCounts> kafkaConsumer = new
                FlinkKafkaConsumer08<>(
                         options.getKafkaTopic(),
                         new
                AvroSerializationDeserializationSchema<>(MyCounts.class), 
props);

                     PCollection<MyCounts> words = pipeline

                .apply(Read.from(UnboundedFlinkSource.of(kafkaConsumer)))
                         .apply(ParDo.of(new PrintFn()));

                     pipeline.run();

                ===================
                Executing the examples
                ===================

                It prints:

                     MyCounts{word='word', count=1}
                     MyCounts{word='another', count=2}
                     MyCounts{word='yet another', count=3}


                Let me know if that helps you! I've omitted the Kafka
                props and
                options for brevity.

                I hope that we will soon have native Kafka IO for both
                reading and
                writing to Kafka available in Beam.

                Cheers,
                Max

                On Wed, Apr 27, 2016 at 4:21 AM, kaniska Mandal
                <[email protected]
                <mailto:[email protected]>> wrote:
                 > Sorry for cluttering the post with some code.
                 > I have attached couples of java file and one
                Avro-generated pojo.
                 >
                 > I am facing some issues while reading / writing data
                using  flink's DeSer /
                 > Ser schema.
                 >
                 >
                 > A) << producer >> BeamKafkaFlinkAvroProducerTest
                 >
                 >>> if I use  KafkaProducer directly (i.e. call
                produceSimpleData()  ) ,
                 >>> things are working fine   (just for testing )
                 >
                 >>> Using FlinkKafkaProducer as UnboundedSource  (this
                is what I should do)
                 >
                 > produceAvroData2() { ...
                 >
                 > 1) First, if I use >> AvroSerializationSchema schema
                = new
                 > AvroSerializationSchema(Test.class);
                 >
                 > i.e. essentially using Avro’s
                org.apache.avro.specific.SpecificDatumWriter ;
                 > I face following error >>
                 >
                 > Caused by: java.lang.ClassCastException:
                java.lang.String cannot be cast to
                 > org.apache.avro.generic.IndexedRecord
                 >
                 > at
                
org.apache.avro.generic.GenericData.getField(GenericData.java:580)
                 >
                 > at
                
org.apache.avro.generic.GenericData.getField(GenericData.java:595)
                 >
                 > at
                 >
                
org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:112)
                 >
                 > at
                 >
                
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
                 >
                 >
                 > 2) Next,  if I use TypeInformationSerializationSchema
                (irrespective of
                 > AvroCoder in Pipeline) , things apparently work fine
                 >
                 > as Kafka test consumer tool prints the message  >>
                 > java.lang.String{"uname": "Joe", "id": 6}
                 >
                 >
                 > B) <<Consumer>> ,  BeamKafkaFlinkAvroConsumerTest
                 >
                 >>> I understand we should either use
                TypeInformationSerializationSchema in
                 >>> both consumer and producer OR
                 >
                 > should use AvroDeserializationSchema and
                AvroSerializationSchema in Consumer
                 > and Producer respectively !!
                 >
                 > But, irrespective of using AvroDeserializationSchema or
                 > TypeInformationSerializationSchema, I get the
                following exception >>
                 >
                 > Exception in thread "main"
                java.lang.NullPointerException: null value in
                 > entry: V=null
                 >
                 > at
                 >
                
com.google.common.collect.CollectPreconditions.checkEntryNotNull(CollectPreconditions.java:33)
                 >
                 > at
                 >
                
com.google.common.collect.SingletonImmutableBiMap.<init>(SingletonImmutableBiMap.java:39)
                 >
                 > at
                
com.google.common.collect.ImmutableBiMap.of(ImmutableBiMap.java:49)
                 >
                 > at
                com.google.common.collect.ImmutableMap.of(ImmutableMap.java:70)
                 >
                 > at
                 >
                
org.apache.beam.sdk.coders.CoderRegistry.getDefaultOutputCoder(CoderRegistry.java:221)
                 >
                 >
                 > Any clues whats wrong here ?   May be missing some
                simple step
                 > .. Trying to make a simple Avro schema work here and
                eventually we need to
                 > deal with very complex Avro schema.
                 >
                 > Much appreciate your help.
                 >
                 > Thanks
                 > Kaniska
                 >
                 > On Tue, Apr 26, 2016 at 9:05 AM, Maximilian Michels
                <[email protected] <mailto:[email protected]>> wrote:
                 >>
                 >> Hi Kaniska,
                 >>
                 >> To read data from Kafka, you need to supply a
                DeserializationSchema.
                 >> Here is one you could use:
                 >> https://gist.github.com/StephanEwen/d515e10dd1c609f70bed
                 >>
                 >> Similarly, to write data into Kafka using the
                Producer, you will need
                 >> a SerializationSchema. You need to serialize your
                data into bytes
                 >> using your Avro schema. Actually, you could use the
                AvroCoder which is
                 >> supplied in Beam for this. Or you implement your own
                analogue to the
                 >> DeserializationSchema above.
                 >>
                 >> - Max
                 >>
                 >>
                 >> On Tue, Apr 26, 2016 at 8:43 AM, kaniska Mandal
                 >> <[email protected]
                <mailto:[email protected]>> wrote:
                 >> > Hi,
                 >> >
                 >> > I followed the example in AvroITCase.java - which
                reads/writes Avro data
                 >> > from/to  filesystem.
                 >> >> I don't want to update AvroIO to embed Kafka
                consumer / producer
                 >> >
                 >> > Also looked into -
                https://issues.apache.org/jira/browse/FLINK-2597
                 >> >> But I couldn't instantiate
                TypeInformationSerializationSchema in
                 >> >> following
                 >> >> FlinkKafkaConsumer / Producer - as I am not sure
                how do get access to
                 >> >> ExecutionConfig
                 >> >
                 >> > Essentially, need help to change the following
                code in Beam Pipeline
                 >> >> to read Avro data from Kafka and deserialize into
                my custom object.
                 >> >
                 >> > public static UnboundedSource<String,
                CheckpointMark> consumeMessages()
                 >> > {
                 >> >         FlinkKafkaConsumer08<String> kafkaConsumer
                = new
                 >> > FlinkKafkaConsumer08<>(options.getKafkaTopic(),
                 >> >                 ? , props);
                 >> >
                 >> >         return UnboundedFlinkSource.of(kafkaConsumer);
                 >> >     }
                 >> >
                 >> >>  and how write Avro data into kafka ?
                 >> > public static void produceData(){
                 >> >         FlinkKafkaProducer08<String> kafkaSink =
                 >> >                 new FlinkKafkaProducer08<>(TOPIC,
                ? , props);
                 >> >
                 >> >         Pipeline pipeline = Pipeline.create(options);
                 >> >         pipeline
                 >> >         .apply(Create.of(
                 >> >                 new User("Joe", 3, "red"),
                 >> >                 new User("Mary", 4, "blue"),
                 >> >                 new User("Mark", 1, "green"),
                 >> >                 new User("Julia", 5, "purple"))
                 >> >             .withCoder(AvroCoder.of(User.class)))
                 >> >
                 >> >         .apply(transformationToWriteToKafkaSink());
                 >> >
                 >> >         pipeline.run();
                 >> >
                 >> >     }
                 >> >
                 >> > Thanks
                 >> > Kaniska
                 >
                 >






--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to