Hi Kaniska,

To read data from Kafka, you need to supply a DeserializationSchema.
Here is one you could use:
https://gist.github.com/StephanEwen/d515e10dd1c609f70bed

Similarly, to write data into Kafka using the Producer, you will need
a SerializationSchema. You need to serialize your data into bytes
using your Avro schema. Actually, you could use the AvroCoder which is
supplied in Beam for this. Or you implement your own analogue to the
DeserializationSchema above.

- Max


On Tue, Apr 26, 2016 at 8:43 AM, kaniska Mandal
<[email protected]> wrote:
> Hi,
>
> I followed the example in AvroITCase.java - which reads/writes Avro data
> from/to  filesystem.
>> I don't want to update AvroIO to embed Kafka consumer / producer
>
> Also looked into - https://issues.apache.org/jira/browse/FLINK-2597
>> But I couldn't instantiate TypeInformationSerializationSchema in following
>> FlinkKafkaConsumer / Producer - as I am not sure how do get access to
>> ExecutionConfig
>
> Essentially, need help to change the following code in Beam Pipeline
>> to read Avro data from Kafka and deserialize into my custom object.
>
> public static UnboundedSource<String, CheckpointMark> consumeMessages() {
>         FlinkKafkaConsumer08<String> kafkaConsumer = new
> FlinkKafkaConsumer08<>(options.getKafkaTopic(),
>                 ? , props);
>
>         return UnboundedFlinkSource.of(kafkaConsumer);
>     }
>
>>  and how write Avro data into kafka ?
> public static void produceData(){
>         FlinkKafkaProducer08<String> kafkaSink =
>                 new FlinkKafkaProducer08<>(TOPIC, ? , props);
>
>         Pipeline pipeline = Pipeline.create(options);
>         pipeline
>         .apply(Create.of(
>                 new User("Joe", 3, "red"),
>                 new User("Mary", 4, "blue"),
>                 new User("Mark", 1, "green"),
>                 new User("Julia", 5, "purple"))
>             .withCoder(AvroCoder.of(User.class)))
>
>         .apply(transformationToWriteToKafkaSink());
>
>         pipeline.run();
>
>     }
>
> Thanks
> Kaniska

Reply via email to