Hi Raghu, When the KafkaIO sink code is merged , can you please add couple of examples to demonstrate that :
1) KafkaIO works fine with FinkPipelineRunner > the reason I am asking this; earlier when I tried to port KafkaIO code (added a custom Sink code - copied from KafkaWriter) from spark-runner into beam sdk; I ended up registering 'KafkaIO.Write.Bound.class' inside FlinkStreamingTransformTranslators (I don't want to update the Flink-Runner core API) > It was a dirty hack and somehow worked (didn't test Avro that time) , so I quickly adapted Max's suggestion on FlinkUnboundedSource & Sink approach - which is very clean and worked for simple non-avro data > Is Max's contrib 'FlinkUnboudedSink' going to be merged as well ? 2) KafkaIO can serialize an Avro-generated POJO (e.g. sample attached here) into byte-array and then corresponding Sink can deserialize it effortlessly . > this is a very important use case for Network Industry where thousands of Sensor-generated machine-data are converted into Avro and sent to Kafka > so I am trying to make it work seamlessly inside Beam-Flink Pipeline ( as explained in the other reply to the thread - based on Max's feedback ) ** I am not yet able to Ser / DeSer the attached avro , even after following Max's suggestions and even after returning new AvroTypeInfo(avroType) , in method AvroSerializationDeserializationSchema#getProducedType() ** I have posted the error messages in other response message Thanks Kaniska On Wed, Apr 27, 2016 at 9:18 PM, Raghu Angadi <[email protected]> wrote: > Oh, does not work with 0.8.x as it it uses new consumer api in 0.9x. > > On Wed, Apr 27, 2016 at 4:35 PM, kaniska Mandal <[email protected]> > wrote: > >> Hi Raghu, >> >> Thanks much for the update >> >> We are using kafka 0.8.x >> >> Can you please also add a working example (beam+flink+kafka) with >> Avro-generated Pojo ? >> >> Kaniska >> >> On Wed, Apr 27, 2016 at 12:14 PM, Raghu Angadi <[email protected]> >> wrote: >> >>> Kaniska, >>> >>> If your kafka cluster is running 0.9, you can also try native KafkaIO >>> <https://github.com/apache/incubator-beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L88> >>> merged recently into Beam. It supports reading from Kafka and I am working >>> on Sink support (this week). >>> >>> See TopHashtagsExample.java >>> <https://github.com/apache/incubator-beam/pull/142/commits/f5c809d5d3551c4fcb64bc7bcde0c64f8dd76e0a#diff-796ac0dad9e90975cfea2e2b05a90d69R121> >>> for a complete example (it writes results back to Kafka in ParDo() rather >>> than a Sink). To use AvroCoder, your consumer test will have something like >>> >>> pipline.apply(KafkaIO.read() >>> .withBootstrapServers(options.getBroker()) >>> .withTopics(ImmutableList.of(TOPICS)) >>> .withValueCoder(AvroCoder.of(User.class)) >>> ... >>> >>> Raghu. >>> >>> >>> >>> On Wed, Apr 27, 2016 at 10:05 AM, Maximilian Michels <[email protected]> >>> wrote: >>> >>>> Hi Kaniska, >>>> >>>> Thanks for your mail. First of all, let us state clearly the problem >>>> you are trying to solve. >>>> >>>> Do you want to use Avro serialization or Flink serialization? If you >>>> use the TypeInformationSerializationSchema you use Flink's >>>> serialization stack - no Avro involved then. You have to be consistent >>>> and stick with either one. Otherwise problems are bound to happen as >>>> the Flink serialization doesn't understand Avro's serialization and >>>> also the other way around. >>>> >>>> From your initial question it appears you want to read/write Avro >>>> serialized data from/to Kafka. So let's see how to do this: >>>> >>>> ============ >>>> A custom class >>>> ============ >>>> >>>> Let's assume you have a class like this: >>>> >>>> public class MyCounts implements Serializable { >>>> >>>> private String word; >>>> private long count; >>>> >>>> public MyCounts() {} >>>> >>>> public MyCounts(String word, long count) { >>>> this.word = word; >>>> this.count = count; >>>> } >>>> >>>> @Override >>>> public String toString() { >>>> return "MyCounts{" + >>>> "word='" + word + '\'' + >>>> ", count=" + count + >>>> '}'; >>>> } >>>> } >>>> >>>> >>>> ================================= >>>> The Serialization / Deserialization Schema >>>> ================================= >>>> >>>> This is the schema for the Kafka Producer/Consumer to >>>> serialize/deserialize data: >>>> >>>> public class AvroSerializationDeserializationSchema<T> >>>> implements SerializationSchema<T>, DeserializationSchema<T> { >>>> >>>> private final Class<T> avroType; >>>> >>>> private final AvroCoder<T> coder; >>>> private transient ByteArrayOutputStream out; >>>> >>>> public AvroSerializationDeserializationSchema(Class<T> clazz) { >>>> this.avroType = clazz; >>>> this.coder = AvroCoder.of(clazz); >>>> this.out = new ByteArrayOutputStream(); >>>> } >>>> >>>> @Override >>>> public byte[] serialize(T element) { >>>> if (out == null) { >>>> out = new ByteArrayOutputStream(); >>>> } >>>> try { >>>> out.reset(); >>>> coder.encode(element, out, Coder.Context.NESTED); >>>> } catch (IOException e) { >>>> throw new RuntimeException("Avro encoding failed.", e); >>>> } >>>> return out.toByteArray(); >>>> } >>>> >>>> @Override >>>> public T deserialize(byte[] message) throws IOException { >>>> return coder.decode(new ByteArrayInputStream(message), >>>> Coder.Context.NESTED); >>>> } >>>> >>>> @Override >>>> public boolean isEndOfStream(T nextElement) { >>>> return false; >>>> } >>>> >>>> @Override >>>> public TypeInformation<T> getProducedType() { >>>> return TypeExtractor.getForClass(avroType); >>>> } >>>> } >>>> >>>> ====================================== >>>> Writing some Avro serialized data to a Kafka topic >>>> ====================================== >>>> >>>> Pipeline pipeline = Pipeline.create(options); >>>> >>>> PCollection<MyCounts> words = >>>> pipeline.apply(Create.of( >>>> new MyCounts("word", 1L), >>>> new MyCounts("another", 2L), >>>> new MyCounts("yet another", 3L))); >>>> >>>> FlinkKafkaProducer08<MyCounts> kafkaSink = >>>> new FlinkKafkaProducer08<>(options.getKafkaOutputTopic(), >>>> new >>>> AvroSerializationDeserializationSchema<>(MyCounts.class), props); >>>> >>>> words.apply(Write.to(UnboundedFlinkSink.of(kafkaSink))); >>>> >>>> pipeline.run(); >>>> >>>> Let's execute that. >>>> >>>> ==================================== >>>> Reading Avro serialized data from a Kafka topic >>>> ==================================== >>>> >>>> Now time to read back data from Kafka: >>>> >>>> Pipeline pipeline = Pipeline.create(options); >>>> >>>> FlinkKafkaConsumer08<MyCounts> kafkaConsumer = new >>>> FlinkKafkaConsumer08<>( >>>> options.getKafkaTopic(), >>>> new AvroSerializationDeserializationSchema<>(MyCounts.class), >>>> props); >>>> >>>> PCollection<MyCounts> words = pipeline >>>> .apply(Read.from(UnboundedFlinkSource.of(kafkaConsumer))) >>>> .apply(ParDo.of(new PrintFn())); >>>> >>>> pipeline.run(); >>>> >>>> =================== >>>> Executing the examples >>>> =================== >>>> >>>> It prints: >>>> >>>> MyCounts{word='word', count=1} >>>> MyCounts{word='another', count=2} >>>> MyCounts{word='yet another', count=3} >>>> >>>> >>>> Let me know if that helps you! I've omitted the Kafka props and >>>> options for brevity. >>>> >>>> I hope that we will soon have native Kafka IO for both reading and >>>> writing to Kafka available in Beam. >>>> >>>> Cheers, >>>> Max >>>> >>>> On Wed, Apr 27, 2016 at 4:21 AM, kaniska Mandal >>>> <[email protected]> wrote: >>>> > Sorry for cluttering the post with some code. >>>> > I have attached couples of java file and one Avro-generated pojo. >>>> > >>>> > I am facing some issues while reading / writing data using flink's >>>> DeSer / >>>> > Ser schema. >>>> > >>>> > >>>> > A) << producer >> BeamKafkaFlinkAvroProducerTest >>>> > >>>> >>> if I use KafkaProducer directly (i.e. call produceSimpleData() ) , >>>> >>> things are working fine (just for testing ) >>>> > >>>> >>> Using FlinkKafkaProducer as UnboundedSource (this is what I should >>>> do) >>>> > >>>> > produceAvroData2() { ... >>>> > >>>> > 1) First, if I use >> AvroSerializationSchema schema = new >>>> > AvroSerializationSchema(Test.class); >>>> > >>>> > i.e. essentially using Avro’s >>>> org.apache.avro.specific.SpecificDatumWriter ; >>>> > I face following error >> >>>> > >>>> > Caused by: java.lang.ClassCastException: java.lang.String cannot be >>>> cast to >>>> > org.apache.avro.generic.IndexedRecord >>>> > >>>> > at org.apache.avro.generic.GenericData.getField(GenericData.java:580) >>>> > >>>> > at org.apache.avro.generic.GenericData.getField(GenericData.java:595) >>>> > >>>> > at >>>> > >>>> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:112) >>>> > >>>> > at >>>> > >>>> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104) >>>> > >>>> > >>>> > 2) Next, if I use TypeInformationSerializationSchema (irrespective of >>>> > AvroCoder in Pipeline) , things apparently work fine >>>> > >>>> > as Kafka test consumer tool prints the message >> >>>> > java.lang.String{"uname": "Joe", "id": 6} >>>> > >>>> > >>>> > B) <<Consumer>> , BeamKafkaFlinkAvroConsumerTest >>>> > >>>> >>> I understand we should either use >>>> TypeInformationSerializationSchema in >>>> >>> both consumer and producer OR >>>> > >>>> > should use AvroDeserializationSchema and AvroSerializationSchema in >>>> Consumer >>>> > and Producer respectively !! >>>> > >>>> > But, irrespective of using AvroDeserializationSchema or >>>> > TypeInformationSerializationSchema, I get the following exception >> >>>> > >>>> > Exception in thread "main" java.lang.NullPointerException: null value >>>> in >>>> > entry: V=null >>>> > >>>> > at >>>> > >>>> com.google.common.collect.CollectPreconditions.checkEntryNotNull(CollectPreconditions.java:33) >>>> > >>>> > at >>>> > >>>> com.google.common.collect.SingletonImmutableBiMap.<init>(SingletonImmutableBiMap.java:39) >>>> > >>>> > at com.google.common.collect.ImmutableBiMap.of(ImmutableBiMap.java:49) >>>> > >>>> > at com.google.common.collect.ImmutableMap.of(ImmutableMap.java:70) >>>> > >>>> > at >>>> > >>>> org.apache.beam.sdk.coders.CoderRegistry.getDefaultOutputCoder(CoderRegistry.java:221) >>>> > >>>> > >>>> > Any clues whats wrong here ? May be missing some simple step >>>> > .. Trying to make a simple Avro schema work here and eventually we >>>> need to >>>> > deal with very complex Avro schema. >>>> > >>>> > Much appreciate your help. >>>> > >>>> > Thanks >>>> > Kaniska >>>> > >>>> > On Tue, Apr 26, 2016 at 9:05 AM, Maximilian Michels <[email protected]> >>>> wrote: >>>> >> >>>> >> Hi Kaniska, >>>> >> >>>> >> To read data from Kafka, you need to supply a DeserializationSchema. >>>> >> Here is one you could use: >>>> >> https://gist.github.com/StephanEwen/d515e10dd1c609f70bed >>>> >> >>>> >> Similarly, to write data into Kafka using the Producer, you will need >>>> >> a SerializationSchema. You need to serialize your data into bytes >>>> >> using your Avro schema. Actually, you could use the AvroCoder which >>>> is >>>> >> supplied in Beam for this. Or you implement your own analogue to the >>>> >> DeserializationSchema above. >>>> >> >>>> >> - Max >>>> >> >>>> >> >>>> >> On Tue, Apr 26, 2016 at 8:43 AM, kaniska Mandal >>>> >> <[email protected]> wrote: >>>> >> > Hi, >>>> >> > >>>> >> > I followed the example in AvroITCase.java - which reads/writes >>>> Avro data >>>> >> > from/to filesystem. >>>> >> >> I don't want to update AvroIO to embed Kafka consumer / producer >>>> >> > >>>> >> > Also looked into - >>>> https://issues.apache.org/jira/browse/FLINK-2597 >>>> >> >> But I couldn't instantiate TypeInformationSerializationSchema in >>>> >> >> following >>>> >> >> FlinkKafkaConsumer / Producer - as I am not sure how do get >>>> access to >>>> >> >> ExecutionConfig >>>> >> > >>>> >> > Essentially, need help to change the following code in Beam >>>> Pipeline >>>> >> >> to read Avro data from Kafka and deserialize into my custom >>>> object. >>>> >> > >>>> >> > public static UnboundedSource<String, CheckpointMark> >>>> consumeMessages() >>>> >> > { >>>> >> > FlinkKafkaConsumer08<String> kafkaConsumer = new >>>> >> > FlinkKafkaConsumer08<>(options.getKafkaTopic(), >>>> >> > ? , props); >>>> >> > >>>> >> > return UnboundedFlinkSource.of(kafkaConsumer); >>>> >> > } >>>> >> > >>>> >> >> and how write Avro data into kafka ? >>>> >> > public static void produceData(){ >>>> >> > FlinkKafkaProducer08<String> kafkaSink = >>>> >> > new FlinkKafkaProducer08<>(TOPIC, ? , props); >>>> >> > >>>> >> > Pipeline pipeline = Pipeline.create(options); >>>> >> > pipeline >>>> >> > .apply(Create.of( >>>> >> > new User("Joe", 3, "red"), >>>> >> > new User("Mary", 4, "blue"), >>>> >> > new User("Mark", 1, "green"), >>>> >> > new User("Julia", 5, "purple")) >>>> >> > .withCoder(AvroCoder.of(User.class))) >>>> >> > >>>> >> > .apply(transformationToWriteToKafkaSink()); >>>> >> > >>>> >> > pipeline.run(); >>>> >> > >>>> >> > } >>>> >> > >>>> >> > Thanks >>>> >> > Kaniska >>>> > >>>> > >>>> >>> >>> >> >
Test.java
Description: Binary data
