On Wed, Apr 27, 2016 at 11:09 PM, kaniska Mandal <[email protected]> wrote:
> Hi Raghu, > > When the KafkaIO sink code is merged , can you please add couple of > examples to demonstrate that : > > 1) KafkaIO works fine with FinkPipelineRunner > KafkaIO source should work with FlinkPipelineRunner. See this thread from March <http://mail-archives.apache.org/mod_mbox/beam-user/201603.mbox/%[email protected]%3E> where one of the users got his app working with KafkaIO and FlinkRunner (Max helped with the issues there too). That said, if you could try it in your environment, we are more than happy to help with any problems you run into. I am going to send pull request soon with Sink support. It will be great if you could try it. I am not that familiar with Flink serialization to comment on (2). But using AvroCoder with Test.getSchema() should work with KafkaIO in Beam. Raghu. > > > the reason I am asking this; earlier when I tried to port KafkaIO code > (added a custom Sink code - copied from KafkaWriter) from spark-runner into > beam sdk; I ended up registering 'KafkaIO.Write.Bound.class' inside > FlinkStreamingTransformTranslators > (I don't want to update the Flink-Runner core API) > > > It was a dirty hack and somehow worked (didn't test Avro that time) , so > I quickly adapted Max's suggestion on FlinkUnboundedSource & Sink approach > - which is very clean and worked for simple non-avro data > > > Is Max's contrib 'FlinkUnboudedSink' going to be merged as well ? > > 2) KafkaIO can serialize an Avro-generated POJO (e.g. sample attached > here) into byte-array and then corresponding Sink can deserialize it > effortlessly . > > > this is a very important use case for Network Industry where thousands > of Sensor-generated machine-data are converted into Avro and sent to Kafka > > so I am trying to make it work seamlessly inside Beam-Flink Pipeline ( > as explained in the other reply to the thread - based on Max's feedback ) > > ** I am not yet able to Ser / DeSer the attached avro , even after > following Max's suggestions and even after > > returning new AvroTypeInfo(avroType) , in method > AvroSerializationDeserializationSchema#getProducedType() > > ** I have posted the error messages in other response message > > > Thanks > > Kaniska > > On Wed, Apr 27, 2016 at 9:18 PM, Raghu Angadi <[email protected]> wrote: > >> Oh, does not work with 0.8.x as it it uses new consumer api in 0.9x. >> >> On Wed, Apr 27, 2016 at 4:35 PM, kaniska Mandal <[email protected] >> > wrote: >> >>> Hi Raghu, >>> >>> Thanks much for the update >>> >>> We are using kafka 0.8.x >>> >>> Can you please also add a working example (beam+flink+kafka) with >>> Avro-generated Pojo ? >>> >>> Kaniska >>> >>> On Wed, Apr 27, 2016 at 12:14 PM, Raghu Angadi <[email protected]> >>> wrote: >>> >>>> Kaniska, >>>> >>>> If your kafka cluster is running 0.9, you can also try native KafkaIO >>>> <https://github.com/apache/incubator-beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L88> >>>> merged recently into Beam. It supports reading from Kafka and I am working >>>> on Sink support (this week). >>>> >>>> See TopHashtagsExample.java >>>> <https://github.com/apache/incubator-beam/pull/142/commits/f5c809d5d3551c4fcb64bc7bcde0c64f8dd76e0a#diff-796ac0dad9e90975cfea2e2b05a90d69R121> >>>> for a complete example (it writes results back to Kafka in ParDo() rather >>>> than a Sink). To use AvroCoder, your consumer test will have something like >>>> >>>> pipline.apply(KafkaIO.read() >>>> .withBootstrapServers(options.getBroker()) >>>> .withTopics(ImmutableList.of(TOPICS)) >>>> .withValueCoder(AvroCoder.of(User.class)) >>>> ... >>>> >>>> Raghu. >>>> >>>> >>>> >>>> On Wed, Apr 27, 2016 at 10:05 AM, Maximilian Michels <[email protected]> >>>> wrote: >>>> >>>>> Hi Kaniska, >>>>> >>>>> Thanks for your mail. First of all, let us state clearly the problem >>>>> you are trying to solve. >>>>> >>>>> Do you want to use Avro serialization or Flink serialization? If you >>>>> use the TypeInformationSerializationSchema you use Flink's >>>>> serialization stack - no Avro involved then. You have to be consistent >>>>> and stick with either one. Otherwise problems are bound to happen as >>>>> the Flink serialization doesn't understand Avro's serialization and >>>>> also the other way around. >>>>> >>>>> From your initial question it appears you want to read/write Avro >>>>> serialized data from/to Kafka. So let's see how to do this: >>>>> >>>>> ============ >>>>> A custom class >>>>> ============ >>>>> >>>>> Let's assume you have a class like this: >>>>> >>>>> public class MyCounts implements Serializable { >>>>> >>>>> private String word; >>>>> private long count; >>>>> >>>>> public MyCounts() {} >>>>> >>>>> public MyCounts(String word, long count) { >>>>> this.word = word; >>>>> this.count = count; >>>>> } >>>>> >>>>> @Override >>>>> public String toString() { >>>>> return "MyCounts{" + >>>>> "word='" + word + '\'' + >>>>> ", count=" + count + >>>>> '}'; >>>>> } >>>>> } >>>>> >>>>> >>>>> ================================= >>>>> The Serialization / Deserialization Schema >>>>> ================================= >>>>> >>>>> This is the schema for the Kafka Producer/Consumer to >>>>> serialize/deserialize data: >>>>> >>>>> public class AvroSerializationDeserializationSchema<T> >>>>> implements SerializationSchema<T>, DeserializationSchema<T> { >>>>> >>>>> private final Class<T> avroType; >>>>> >>>>> private final AvroCoder<T> coder; >>>>> private transient ByteArrayOutputStream out; >>>>> >>>>> public AvroSerializationDeserializationSchema(Class<T> clazz) { >>>>> this.avroType = clazz; >>>>> this.coder = AvroCoder.of(clazz); >>>>> this.out = new ByteArrayOutputStream(); >>>>> } >>>>> >>>>> @Override >>>>> public byte[] serialize(T element) { >>>>> if (out == null) { >>>>> out = new ByteArrayOutputStream(); >>>>> } >>>>> try { >>>>> out.reset(); >>>>> coder.encode(element, out, Coder.Context.NESTED); >>>>> } catch (IOException e) { >>>>> throw new RuntimeException("Avro encoding failed.", e); >>>>> } >>>>> return out.toByteArray(); >>>>> } >>>>> >>>>> @Override >>>>> public T deserialize(byte[] message) throws IOException { >>>>> return coder.decode(new ByteArrayInputStream(message), >>>>> Coder.Context.NESTED); >>>>> } >>>>> >>>>> @Override >>>>> public boolean isEndOfStream(T nextElement) { >>>>> return false; >>>>> } >>>>> >>>>> @Override >>>>> public TypeInformation<T> getProducedType() { >>>>> return TypeExtractor.getForClass(avroType); >>>>> } >>>>> } >>>>> >>>>> ====================================== >>>>> Writing some Avro serialized data to a Kafka topic >>>>> ====================================== >>>>> >>>>> Pipeline pipeline = Pipeline.create(options); >>>>> >>>>> PCollection<MyCounts> words = >>>>> pipeline.apply(Create.of( >>>>> new MyCounts("word", 1L), >>>>> new MyCounts("another", 2L), >>>>> new MyCounts("yet another", 3L))); >>>>> >>>>> FlinkKafkaProducer08<MyCounts> kafkaSink = >>>>> new FlinkKafkaProducer08<>(options.getKafkaOutputTopic(), >>>>> new >>>>> AvroSerializationDeserializationSchema<>(MyCounts.class), props); >>>>> >>>>> words.apply(Write.to(UnboundedFlinkSink.of(kafkaSink))); >>>>> >>>>> pipeline.run(); >>>>> >>>>> Let's execute that. >>>>> >>>>> ==================================== >>>>> Reading Avro serialized data from a Kafka topic >>>>> ==================================== >>>>> >>>>> Now time to read back data from Kafka: >>>>> >>>>> Pipeline pipeline = Pipeline.create(options); >>>>> >>>>> FlinkKafkaConsumer08<MyCounts> kafkaConsumer = new >>>>> FlinkKafkaConsumer08<>( >>>>> options.getKafkaTopic(), >>>>> new AvroSerializationDeserializationSchema<>(MyCounts.class), >>>>> props); >>>>> >>>>> PCollection<MyCounts> words = pipeline >>>>> .apply(Read.from(UnboundedFlinkSource.of(kafkaConsumer))) >>>>> .apply(ParDo.of(new PrintFn())); >>>>> >>>>> pipeline.run(); >>>>> >>>>> =================== >>>>> Executing the examples >>>>> =================== >>>>> >>>>> It prints: >>>>> >>>>> MyCounts{word='word', count=1} >>>>> MyCounts{word='another', count=2} >>>>> MyCounts{word='yet another', count=3} >>>>> >>>>> >>>>> Let me know if that helps you! I've omitted the Kafka props and >>>>> options for brevity. >>>>> >>>>> I hope that we will soon have native Kafka IO for both reading and >>>>> writing to Kafka available in Beam. >>>>> >>>>> Cheers, >>>>> Max >>>>> >>>>> On Wed, Apr 27, 2016 at 4:21 AM, kaniska Mandal >>>>> <[email protected]> wrote: >>>>> > Sorry for cluttering the post with some code. >>>>> > I have attached couples of java file and one Avro-generated pojo. >>>>> > >>>>> > I am facing some issues while reading / writing data using flink's >>>>> DeSer / >>>>> > Ser schema. >>>>> > >>>>> > >>>>> > A) << producer >> BeamKafkaFlinkAvroProducerTest >>>>> > >>>>> >>> if I use KafkaProducer directly (i.e. call produceSimpleData() ) >>>>> , >>>>> >>> things are working fine (just for testing ) >>>>> > >>>>> >>> Using FlinkKafkaProducer as UnboundedSource (this is what I >>>>> should do) >>>>> > >>>>> > produceAvroData2() { ... >>>>> > >>>>> > 1) First, if I use >> AvroSerializationSchema schema = new >>>>> > AvroSerializationSchema(Test.class); >>>>> > >>>>> > i.e. essentially using Avro’s >>>>> org.apache.avro.specific.SpecificDatumWriter ; >>>>> > I face following error >> >>>>> > >>>>> > Caused by: java.lang.ClassCastException: java.lang.String cannot be >>>>> cast to >>>>> > org.apache.avro.generic.IndexedRecord >>>>> > >>>>> > at org.apache.avro.generic.GenericData.getField(GenericData.java:580) >>>>> > >>>>> > at org.apache.avro.generic.GenericData.getField(GenericData.java:595) >>>>> > >>>>> > at >>>>> > >>>>> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:112) >>>>> > >>>>> > at >>>>> > >>>>> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104) >>>>> > >>>>> > >>>>> > 2) Next, if I use TypeInformationSerializationSchema (irrespective >>>>> of >>>>> > AvroCoder in Pipeline) , things apparently work fine >>>>> > >>>>> > as Kafka test consumer tool prints the message >> >>>>> > java.lang.String{"uname": "Joe", "id": 6} >>>>> > >>>>> > >>>>> > B) <<Consumer>> , BeamKafkaFlinkAvroConsumerTest >>>>> > >>>>> >>> I understand we should either use >>>>> TypeInformationSerializationSchema in >>>>> >>> both consumer and producer OR >>>>> > >>>>> > should use AvroDeserializationSchema and AvroSerializationSchema in >>>>> Consumer >>>>> > and Producer respectively !! >>>>> > >>>>> > But, irrespective of using AvroDeserializationSchema or >>>>> > TypeInformationSerializationSchema, I get the following exception >> >>>>> > >>>>> > Exception in thread "main" java.lang.NullPointerException: null >>>>> value in >>>>> > entry: V=null >>>>> > >>>>> > at >>>>> > >>>>> com.google.common.collect.CollectPreconditions.checkEntryNotNull(CollectPreconditions.java:33) >>>>> > >>>>> > at >>>>> > >>>>> com.google.common.collect.SingletonImmutableBiMap.<init>(SingletonImmutableBiMap.java:39) >>>>> > >>>>> > at >>>>> com.google.common.collect.ImmutableBiMap.of(ImmutableBiMap.java:49) >>>>> > >>>>> > at com.google.common.collect.ImmutableMap.of(ImmutableMap.java:70) >>>>> > >>>>> > at >>>>> > >>>>> org.apache.beam.sdk.coders.CoderRegistry.getDefaultOutputCoder(CoderRegistry.java:221) >>>>> > >>>>> > >>>>> > Any clues whats wrong here ? May be missing some simple step >>>>> > .. Trying to make a simple Avro schema work here and eventually we >>>>> need to >>>>> > deal with very complex Avro schema. >>>>> > >>>>> > Much appreciate your help. >>>>> > >>>>> > Thanks >>>>> > Kaniska >>>>> > >>>>> > On Tue, Apr 26, 2016 at 9:05 AM, Maximilian Michels <[email protected]> >>>>> wrote: >>>>> >> >>>>> >> Hi Kaniska, >>>>> >> >>>>> >> To read data from Kafka, you need to supply a DeserializationSchema. >>>>> >> Here is one you could use: >>>>> >> https://gist.github.com/StephanEwen/d515e10dd1c609f70bed >>>>> >> >>>>> >> Similarly, to write data into Kafka using the Producer, you will >>>>> need >>>>> >> a SerializationSchema. You need to serialize your data into bytes >>>>> >> using your Avro schema. Actually, you could use the AvroCoder which >>>>> is >>>>> >> supplied in Beam for this. Or you implement your own analogue to the >>>>> >> DeserializationSchema above. >>>>> >> >>>>> >> - Max >>>>> >> >>>>> >> >>>>> >> On Tue, Apr 26, 2016 at 8:43 AM, kaniska Mandal >>>>> >> <[email protected]> wrote: >>>>> >> > Hi, >>>>> >> > >>>>> >> > I followed the example in AvroITCase.java - which reads/writes >>>>> Avro data >>>>> >> > from/to filesystem. >>>>> >> >> I don't want to update AvroIO to embed Kafka consumer / producer >>>>> >> > >>>>> >> > Also looked into - >>>>> https://issues.apache.org/jira/browse/FLINK-2597 >>>>> >> >> But I couldn't instantiate TypeInformationSerializationSchema in >>>>> >> >> following >>>>> >> >> FlinkKafkaConsumer / Producer - as I am not sure how do get >>>>> access to >>>>> >> >> ExecutionConfig >>>>> >> > >>>>> >> > Essentially, need help to change the following code in Beam >>>>> Pipeline >>>>> >> >> to read Avro data from Kafka and deserialize into my custom >>>>> object. >>>>> >> > >>>>> >> > public static UnboundedSource<String, CheckpointMark> >>>>> consumeMessages() >>>>> >> > { >>>>> >> > FlinkKafkaConsumer08<String> kafkaConsumer = new >>>>> >> > FlinkKafkaConsumer08<>(options.getKafkaTopic(), >>>>> >> > ? , props); >>>>> >> > >>>>> >> > return UnboundedFlinkSource.of(kafkaConsumer); >>>>> >> > } >>>>> >> > >>>>> >> >> and how write Avro data into kafka ? >>>>> >> > public static void produceData(){ >>>>> >> > FlinkKafkaProducer08<String> kafkaSink = >>>>> >> > new FlinkKafkaProducer08<>(TOPIC, ? , props); >>>>> >> > >>>>> >> > Pipeline pipeline = Pipeline.create(options); >>>>> >> > pipeline >>>>> >> > .apply(Create.of( >>>>> >> > new User("Joe", 3, "red"), >>>>> >> > new User("Mary", 4, "blue"), >>>>> >> > new User("Mark", 1, "green"), >>>>> >> > new User("Julia", 5, "purple")) >>>>> >> > .withCoder(AvroCoder.of(User.class))) >>>>> >> > >>>>> >> > .apply(transformationToWriteToKafkaSink()); >>>>> >> > >>>>> >> > pipeline.run(); >>>>> >> > >>>>> >> > } >>>>> >> > >>>>> >> > Thanks >>>>> >> > Kaniska >>>>> > >>>>> > >>>>> >>>> >>>> >>> >> >
