Oh, does not work with 0.8.x as it it uses new consumer api in 0.9x. On Wed, Apr 27, 2016 at 4:35 PM, kaniska Mandal <[email protected]> wrote:
> Hi Raghu, > > Thanks much for the update > > We are using kafka 0.8.x > > Can you please also add a working example (beam+flink+kafka) with > Avro-generated Pojo ? > > Kaniska > > On Wed, Apr 27, 2016 at 12:14 PM, Raghu Angadi <[email protected]> wrote: > >> Kaniska, >> >> If your kafka cluster is running 0.9, you can also try native KafkaIO >> <https://github.com/apache/incubator-beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L88> >> merged recently into Beam. It supports reading from Kafka and I am working >> on Sink support (this week). >> >> See TopHashtagsExample.java >> <https://github.com/apache/incubator-beam/pull/142/commits/f5c809d5d3551c4fcb64bc7bcde0c64f8dd76e0a#diff-796ac0dad9e90975cfea2e2b05a90d69R121> >> for a complete example (it writes results back to Kafka in ParDo() rather >> than a Sink). To use AvroCoder, your consumer test will have something like >> >> pipline.apply(KafkaIO.read() >> .withBootstrapServers(options.getBroker()) >> .withTopics(ImmutableList.of(TOPICS)) >> .withValueCoder(AvroCoder.of(User.class)) >> ... >> >> Raghu. >> >> >> >> On Wed, Apr 27, 2016 at 10:05 AM, Maximilian Michels <[email protected]> >> wrote: >> >>> Hi Kaniska, >>> >>> Thanks for your mail. First of all, let us state clearly the problem >>> you are trying to solve. >>> >>> Do you want to use Avro serialization or Flink serialization? If you >>> use the TypeInformationSerializationSchema you use Flink's >>> serialization stack - no Avro involved then. You have to be consistent >>> and stick with either one. Otherwise problems are bound to happen as >>> the Flink serialization doesn't understand Avro's serialization and >>> also the other way around. >>> >>> From your initial question it appears you want to read/write Avro >>> serialized data from/to Kafka. So let's see how to do this: >>> >>> ============ >>> A custom class >>> ============ >>> >>> Let's assume you have a class like this: >>> >>> public class MyCounts implements Serializable { >>> >>> private String word; >>> private long count; >>> >>> public MyCounts() {} >>> >>> public MyCounts(String word, long count) { >>> this.word = word; >>> this.count = count; >>> } >>> >>> @Override >>> public String toString() { >>> return "MyCounts{" + >>> "word='" + word + '\'' + >>> ", count=" + count + >>> '}'; >>> } >>> } >>> >>> >>> ================================= >>> The Serialization / Deserialization Schema >>> ================================= >>> >>> This is the schema for the Kafka Producer/Consumer to >>> serialize/deserialize data: >>> >>> public class AvroSerializationDeserializationSchema<T> >>> implements SerializationSchema<T>, DeserializationSchema<T> { >>> >>> private final Class<T> avroType; >>> >>> private final AvroCoder<T> coder; >>> private transient ByteArrayOutputStream out; >>> >>> public AvroSerializationDeserializationSchema(Class<T> clazz) { >>> this.avroType = clazz; >>> this.coder = AvroCoder.of(clazz); >>> this.out = new ByteArrayOutputStream(); >>> } >>> >>> @Override >>> public byte[] serialize(T element) { >>> if (out == null) { >>> out = new ByteArrayOutputStream(); >>> } >>> try { >>> out.reset(); >>> coder.encode(element, out, Coder.Context.NESTED); >>> } catch (IOException e) { >>> throw new RuntimeException("Avro encoding failed.", e); >>> } >>> return out.toByteArray(); >>> } >>> >>> @Override >>> public T deserialize(byte[] message) throws IOException { >>> return coder.decode(new ByteArrayInputStream(message), >>> Coder.Context.NESTED); >>> } >>> >>> @Override >>> public boolean isEndOfStream(T nextElement) { >>> return false; >>> } >>> >>> @Override >>> public TypeInformation<T> getProducedType() { >>> return TypeExtractor.getForClass(avroType); >>> } >>> } >>> >>> ====================================== >>> Writing some Avro serialized data to a Kafka topic >>> ====================================== >>> >>> Pipeline pipeline = Pipeline.create(options); >>> >>> PCollection<MyCounts> words = >>> pipeline.apply(Create.of( >>> new MyCounts("word", 1L), >>> new MyCounts("another", 2L), >>> new MyCounts("yet another", 3L))); >>> >>> FlinkKafkaProducer08<MyCounts> kafkaSink = >>> new FlinkKafkaProducer08<>(options.getKafkaOutputTopic(), >>> new >>> AvroSerializationDeserializationSchema<>(MyCounts.class), props); >>> >>> words.apply(Write.to(UnboundedFlinkSink.of(kafkaSink))); >>> >>> pipeline.run(); >>> >>> Let's execute that. >>> >>> ==================================== >>> Reading Avro serialized data from a Kafka topic >>> ==================================== >>> >>> Now time to read back data from Kafka: >>> >>> Pipeline pipeline = Pipeline.create(options); >>> >>> FlinkKafkaConsumer08<MyCounts> kafkaConsumer = new >>> FlinkKafkaConsumer08<>( >>> options.getKafkaTopic(), >>> new AvroSerializationDeserializationSchema<>(MyCounts.class), >>> props); >>> >>> PCollection<MyCounts> words = pipeline >>> .apply(Read.from(UnboundedFlinkSource.of(kafkaConsumer))) >>> .apply(ParDo.of(new PrintFn())); >>> >>> pipeline.run(); >>> >>> =================== >>> Executing the examples >>> =================== >>> >>> It prints: >>> >>> MyCounts{word='word', count=1} >>> MyCounts{word='another', count=2} >>> MyCounts{word='yet another', count=3} >>> >>> >>> Let me know if that helps you! I've omitted the Kafka props and >>> options for brevity. >>> >>> I hope that we will soon have native Kafka IO for both reading and >>> writing to Kafka available in Beam. >>> >>> Cheers, >>> Max >>> >>> On Wed, Apr 27, 2016 at 4:21 AM, kaniska Mandal >>> <[email protected]> wrote: >>> > Sorry for cluttering the post with some code. >>> > I have attached couples of java file and one Avro-generated pojo. >>> > >>> > I am facing some issues while reading / writing data using flink's >>> DeSer / >>> > Ser schema. >>> > >>> > >>> > A) << producer >> BeamKafkaFlinkAvroProducerTest >>> > >>> >>> if I use KafkaProducer directly (i.e. call produceSimpleData() ) , >>> >>> things are working fine (just for testing ) >>> > >>> >>> Using FlinkKafkaProducer as UnboundedSource (this is what I should >>> do) >>> > >>> > produceAvroData2() { ... >>> > >>> > 1) First, if I use >> AvroSerializationSchema schema = new >>> > AvroSerializationSchema(Test.class); >>> > >>> > i.e. essentially using Avro’s >>> org.apache.avro.specific.SpecificDatumWriter ; >>> > I face following error >> >>> > >>> > Caused by: java.lang.ClassCastException: java.lang.String cannot be >>> cast to >>> > org.apache.avro.generic.IndexedRecord >>> > >>> > at org.apache.avro.generic.GenericData.getField(GenericData.java:580) >>> > >>> > at org.apache.avro.generic.GenericData.getField(GenericData.java:595) >>> > >>> > at >>> > >>> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:112) >>> > >>> > at >>> > >>> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104) >>> > >>> > >>> > 2) Next, if I use TypeInformationSerializationSchema (irrespective of >>> > AvroCoder in Pipeline) , things apparently work fine >>> > >>> > as Kafka test consumer tool prints the message >> >>> > java.lang.String{"uname": "Joe", "id": 6} >>> > >>> > >>> > B) <<Consumer>> , BeamKafkaFlinkAvroConsumerTest >>> > >>> >>> I understand we should either use TypeInformationSerializationSchema >>> in >>> >>> both consumer and producer OR >>> > >>> > should use AvroDeserializationSchema and AvroSerializationSchema in >>> Consumer >>> > and Producer respectively !! >>> > >>> > But, irrespective of using AvroDeserializationSchema or >>> > TypeInformationSerializationSchema, I get the following exception >> >>> > >>> > Exception in thread "main" java.lang.NullPointerException: null value >>> in >>> > entry: V=null >>> > >>> > at >>> > >>> com.google.common.collect.CollectPreconditions.checkEntryNotNull(CollectPreconditions.java:33) >>> > >>> > at >>> > >>> com.google.common.collect.SingletonImmutableBiMap.<init>(SingletonImmutableBiMap.java:39) >>> > >>> > at com.google.common.collect.ImmutableBiMap.of(ImmutableBiMap.java:49) >>> > >>> > at com.google.common.collect.ImmutableMap.of(ImmutableMap.java:70) >>> > >>> > at >>> > >>> org.apache.beam.sdk.coders.CoderRegistry.getDefaultOutputCoder(CoderRegistry.java:221) >>> > >>> > >>> > Any clues whats wrong here ? May be missing some simple step >>> > .. Trying to make a simple Avro schema work here and eventually we >>> need to >>> > deal with very complex Avro schema. >>> > >>> > Much appreciate your help. >>> > >>> > Thanks >>> > Kaniska >>> > >>> > On Tue, Apr 26, 2016 at 9:05 AM, Maximilian Michels <[email protected]> >>> wrote: >>> >> >>> >> Hi Kaniska, >>> >> >>> >> To read data from Kafka, you need to supply a DeserializationSchema. >>> >> Here is one you could use: >>> >> https://gist.github.com/StephanEwen/d515e10dd1c609f70bed >>> >> >>> >> Similarly, to write data into Kafka using the Producer, you will need >>> >> a SerializationSchema. You need to serialize your data into bytes >>> >> using your Avro schema. Actually, you could use the AvroCoder which is >>> >> supplied in Beam for this. Or you implement your own analogue to the >>> >> DeserializationSchema above. >>> >> >>> >> - Max >>> >> >>> >> >>> >> On Tue, Apr 26, 2016 at 8:43 AM, kaniska Mandal >>> >> <[email protected]> wrote: >>> >> > Hi, >>> >> > >>> >> > I followed the example in AvroITCase.java - which reads/writes Avro >>> data >>> >> > from/to filesystem. >>> >> >> I don't want to update AvroIO to embed Kafka consumer / producer >>> >> > >>> >> > Also looked into - https://issues.apache.org/jira/browse/FLINK-2597 >>> >> >> But I couldn't instantiate TypeInformationSerializationSchema in >>> >> >> following >>> >> >> FlinkKafkaConsumer / Producer - as I am not sure how do get access >>> to >>> >> >> ExecutionConfig >>> >> > >>> >> > Essentially, need help to change the following code in Beam Pipeline >>> >> >> to read Avro data from Kafka and deserialize into my custom object. >>> >> > >>> >> > public static UnboundedSource<String, CheckpointMark> >>> consumeMessages() >>> >> > { >>> >> > FlinkKafkaConsumer08<String> kafkaConsumer = new >>> >> > FlinkKafkaConsumer08<>(options.getKafkaTopic(), >>> >> > ? , props); >>> >> > >>> >> > return UnboundedFlinkSource.of(kafkaConsumer); >>> >> > } >>> >> > >>> >> >> and how write Avro data into kafka ? >>> >> > public static void produceData(){ >>> >> > FlinkKafkaProducer08<String> kafkaSink = >>> >> > new FlinkKafkaProducer08<>(TOPIC, ? , props); >>> >> > >>> >> > Pipeline pipeline = Pipeline.create(options); >>> >> > pipeline >>> >> > .apply(Create.of( >>> >> > new User("Joe", 3, "red"), >>> >> > new User("Mary", 4, "blue"), >>> >> > new User("Mark", 1, "green"), >>> >> > new User("Julia", 5, "purple")) >>> >> > .withCoder(AvroCoder.of(User.class))) >>> >> > >>> >> > .apply(transformationToWriteToKafkaSink()); >>> >> > >>> >> > pipeline.run(); >>> >> > >>> >> > } >>> >> > >>> >> > Thanks >>> >> > Kaniska >>> > >>> > >>> >> >> >
