Hi Raghu, Thanks much for the update
We are using kafka 0.8.x Can you please also add a working example (beam+flink+kafka) with Avro-generated Pojo ? Kaniska On Wed, Apr 27, 2016 at 12:14 PM, Raghu Angadi <[email protected]> wrote: > Kaniska, > > If your kafka cluster is running 0.9, you can also try native KafkaIO > <https://github.com/apache/incubator-beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L88> > merged recently into Beam. It supports reading from Kafka and I am working > on Sink support (this week). > > See TopHashtagsExample.java > <https://github.com/apache/incubator-beam/pull/142/commits/f5c809d5d3551c4fcb64bc7bcde0c64f8dd76e0a#diff-796ac0dad9e90975cfea2e2b05a90d69R121> > for a complete example (it writes results back to Kafka in ParDo() rather > than a Sink). To use AvroCoder, your consumer test will have something like > > pipline.apply(KafkaIO.read() > .withBootstrapServers(options.getBroker()) > .withTopics(ImmutableList.of(TOPICS)) > .withValueCoder(AvroCoder.of(User.class)) > ... > > Raghu. > > > > On Wed, Apr 27, 2016 at 10:05 AM, Maximilian Michels <[email protected]> > wrote: > >> Hi Kaniska, >> >> Thanks for your mail. First of all, let us state clearly the problem >> you are trying to solve. >> >> Do you want to use Avro serialization or Flink serialization? If you >> use the TypeInformationSerializationSchema you use Flink's >> serialization stack - no Avro involved then. You have to be consistent >> and stick with either one. Otherwise problems are bound to happen as >> the Flink serialization doesn't understand Avro's serialization and >> also the other way around. >> >> From your initial question it appears you want to read/write Avro >> serialized data from/to Kafka. So let's see how to do this: >> >> ============ >> A custom class >> ============ >> >> Let's assume you have a class like this: >> >> public class MyCounts implements Serializable { >> >> private String word; >> private long count; >> >> public MyCounts() {} >> >> public MyCounts(String word, long count) { >> this.word = word; >> this.count = count; >> } >> >> @Override >> public String toString() { >> return "MyCounts{" + >> "word='" + word + '\'' + >> ", count=" + count + >> '}'; >> } >> } >> >> >> ================================= >> The Serialization / Deserialization Schema >> ================================= >> >> This is the schema for the Kafka Producer/Consumer to >> serialize/deserialize data: >> >> public class AvroSerializationDeserializationSchema<T> >> implements SerializationSchema<T>, DeserializationSchema<T> { >> >> private final Class<T> avroType; >> >> private final AvroCoder<T> coder; >> private transient ByteArrayOutputStream out; >> >> public AvroSerializationDeserializationSchema(Class<T> clazz) { >> this.avroType = clazz; >> this.coder = AvroCoder.of(clazz); >> this.out = new ByteArrayOutputStream(); >> } >> >> @Override >> public byte[] serialize(T element) { >> if (out == null) { >> out = new ByteArrayOutputStream(); >> } >> try { >> out.reset(); >> coder.encode(element, out, Coder.Context.NESTED); >> } catch (IOException e) { >> throw new RuntimeException("Avro encoding failed.", e); >> } >> return out.toByteArray(); >> } >> >> @Override >> public T deserialize(byte[] message) throws IOException { >> return coder.decode(new ByteArrayInputStream(message), >> Coder.Context.NESTED); >> } >> >> @Override >> public boolean isEndOfStream(T nextElement) { >> return false; >> } >> >> @Override >> public TypeInformation<T> getProducedType() { >> return TypeExtractor.getForClass(avroType); >> } >> } >> >> ====================================== >> Writing some Avro serialized data to a Kafka topic >> ====================================== >> >> Pipeline pipeline = Pipeline.create(options); >> >> PCollection<MyCounts> words = >> pipeline.apply(Create.of( >> new MyCounts("word", 1L), >> new MyCounts("another", 2L), >> new MyCounts("yet another", 3L))); >> >> FlinkKafkaProducer08<MyCounts> kafkaSink = >> new FlinkKafkaProducer08<>(options.getKafkaOutputTopic(), >> new >> AvroSerializationDeserializationSchema<>(MyCounts.class), props); >> >> words.apply(Write.to(UnboundedFlinkSink.of(kafkaSink))); >> >> pipeline.run(); >> >> Let's execute that. >> >> ==================================== >> Reading Avro serialized data from a Kafka topic >> ==================================== >> >> Now time to read back data from Kafka: >> >> Pipeline pipeline = Pipeline.create(options); >> >> FlinkKafkaConsumer08<MyCounts> kafkaConsumer = new >> FlinkKafkaConsumer08<>( >> options.getKafkaTopic(), >> new AvroSerializationDeserializationSchema<>(MyCounts.class), >> props); >> >> PCollection<MyCounts> words = pipeline >> .apply(Read.from(UnboundedFlinkSource.of(kafkaConsumer))) >> .apply(ParDo.of(new PrintFn())); >> >> pipeline.run(); >> >> =================== >> Executing the examples >> =================== >> >> It prints: >> >> MyCounts{word='word', count=1} >> MyCounts{word='another', count=2} >> MyCounts{word='yet another', count=3} >> >> >> Let me know if that helps you! I've omitted the Kafka props and >> options for brevity. >> >> I hope that we will soon have native Kafka IO for both reading and >> writing to Kafka available in Beam. >> >> Cheers, >> Max >> >> On Wed, Apr 27, 2016 at 4:21 AM, kaniska Mandal >> <[email protected]> wrote: >> > Sorry for cluttering the post with some code. >> > I have attached couples of java file and one Avro-generated pojo. >> > >> > I am facing some issues while reading / writing data using flink's >> DeSer / >> > Ser schema. >> > >> > >> > A) << producer >> BeamKafkaFlinkAvroProducerTest >> > >> >>> if I use KafkaProducer directly (i.e. call produceSimpleData() ) , >> >>> things are working fine (just for testing ) >> > >> >>> Using FlinkKafkaProducer as UnboundedSource (this is what I should >> do) >> > >> > produceAvroData2() { ... >> > >> > 1) First, if I use >> AvroSerializationSchema schema = new >> > AvroSerializationSchema(Test.class); >> > >> > i.e. essentially using Avro’s >> org.apache.avro.specific.SpecificDatumWriter ; >> > I face following error >> >> > >> > Caused by: java.lang.ClassCastException: java.lang.String cannot be >> cast to >> > org.apache.avro.generic.IndexedRecord >> > >> > at org.apache.avro.generic.GenericData.getField(GenericData.java:580) >> > >> > at org.apache.avro.generic.GenericData.getField(GenericData.java:595) >> > >> > at >> > >> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:112) >> > >> > at >> > >> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104) >> > >> > >> > 2) Next, if I use TypeInformationSerializationSchema (irrespective of >> > AvroCoder in Pipeline) , things apparently work fine >> > >> > as Kafka test consumer tool prints the message >> >> > java.lang.String{"uname": "Joe", "id": 6} >> > >> > >> > B) <<Consumer>> , BeamKafkaFlinkAvroConsumerTest >> > >> >>> I understand we should either use TypeInformationSerializationSchema >> in >> >>> both consumer and producer OR >> > >> > should use AvroDeserializationSchema and AvroSerializationSchema in >> Consumer >> > and Producer respectively !! >> > >> > But, irrespective of using AvroDeserializationSchema or >> > TypeInformationSerializationSchema, I get the following exception >> >> > >> > Exception in thread "main" java.lang.NullPointerException: null value in >> > entry: V=null >> > >> > at >> > >> com.google.common.collect.CollectPreconditions.checkEntryNotNull(CollectPreconditions.java:33) >> > >> > at >> > >> com.google.common.collect.SingletonImmutableBiMap.<init>(SingletonImmutableBiMap.java:39) >> > >> > at com.google.common.collect.ImmutableBiMap.of(ImmutableBiMap.java:49) >> > >> > at com.google.common.collect.ImmutableMap.of(ImmutableMap.java:70) >> > >> > at >> > >> org.apache.beam.sdk.coders.CoderRegistry.getDefaultOutputCoder(CoderRegistry.java:221) >> > >> > >> > Any clues whats wrong here ? May be missing some simple step >> > .. Trying to make a simple Avro schema work here and eventually we need >> to >> > deal with very complex Avro schema. >> > >> > Much appreciate your help. >> > >> > Thanks >> > Kaniska >> > >> > On Tue, Apr 26, 2016 at 9:05 AM, Maximilian Michels <[email protected]> >> wrote: >> >> >> >> Hi Kaniska, >> >> >> >> To read data from Kafka, you need to supply a DeserializationSchema. >> >> Here is one you could use: >> >> https://gist.github.com/StephanEwen/d515e10dd1c609f70bed >> >> >> >> Similarly, to write data into Kafka using the Producer, you will need >> >> a SerializationSchema. You need to serialize your data into bytes >> >> using your Avro schema. Actually, you could use the AvroCoder which is >> >> supplied in Beam for this. Or you implement your own analogue to the >> >> DeserializationSchema above. >> >> >> >> - Max >> >> >> >> >> >> On Tue, Apr 26, 2016 at 8:43 AM, kaniska Mandal >> >> <[email protected]> wrote: >> >> > Hi, >> >> > >> >> > I followed the example in AvroITCase.java - which reads/writes Avro >> data >> >> > from/to filesystem. >> >> >> I don't want to update AvroIO to embed Kafka consumer / producer >> >> > >> >> > Also looked into - https://issues.apache.org/jira/browse/FLINK-2597 >> >> >> But I couldn't instantiate TypeInformationSerializationSchema in >> >> >> following >> >> >> FlinkKafkaConsumer / Producer - as I am not sure how do get access >> to >> >> >> ExecutionConfig >> >> > >> >> > Essentially, need help to change the following code in Beam Pipeline >> >> >> to read Avro data from Kafka and deserialize into my custom object. >> >> > >> >> > public static UnboundedSource<String, CheckpointMark> >> consumeMessages() >> >> > { >> >> > FlinkKafkaConsumer08<String> kafkaConsumer = new >> >> > FlinkKafkaConsumer08<>(options.getKafkaTopic(), >> >> > ? , props); >> >> > >> >> > return UnboundedFlinkSource.of(kafkaConsumer); >> >> > } >> >> > >> >> >> and how write Avro data into kafka ? >> >> > public static void produceData(){ >> >> > FlinkKafkaProducer08<String> kafkaSink = >> >> > new FlinkKafkaProducer08<>(TOPIC, ? , props); >> >> > >> >> > Pipeline pipeline = Pipeline.create(options); >> >> > pipeline >> >> > .apply(Create.of( >> >> > new User("Joe", 3, "red"), >> >> > new User("Mary", 4, "blue"), >> >> > new User("Mark", 1, "green"), >> >> > new User("Julia", 5, "purple")) >> >> > .withCoder(AvroCoder.of(User.class))) >> >> > >> >> > .apply(transformationToWriteToKafkaSink()); >> >> > >> >> > pipeline.run(); >> >> > >> >> > } >> >> > >> >> > Thanks >> >> > Kaniska >> > >> > >> > >
