Hi,

I followed the example in AvroITCase.java - which reads/writes Avro data
from/to  filesystem.
> I don't want to update AvroIO to embed Kafka consumer / producer

Also looked into - https://issues.apache.org/jira/browse/FLINK-2597
> But I couldn't instantiate TypeInformationSerializationSchema in
following FlinkKafkaConsumer / Producer - as I am not sure how do get
access to ExecutionConfig

*Essentially, need help to change the following code in Beam Pipeline *
*> to read Avro data from Kafka and deserialize into my custom object.*

public static UnboundedSource<String, CheckpointMark> consumeMessages() {
        *FlinkKafkaConsumer08*<String> kafkaConsumer = new
FlinkKafkaConsumer08<>(options.getKafkaTopic(),
                *?* , props);

        return UnboundedFlinkSource.of(kafkaConsumer);
    }

*>  and how write Avro data into kafka ?*
public static void produceData(){
        FlinkKafkaProducer08<String> kafkaSink =
                new FlinkKafkaProducer08<>(TOPIC, *?* , props);

        Pipeline pipeline = Pipeline.create(options);
        pipeline
        .apply(Create.of(
                new User("Joe", 3, "red"),
                new User("Mary", 4, "blue"),
                new User("Mark", 1, "green"),
                new User("Julia", 5, "purple"))
            .withCoder(AvroCoder.of(User.class)))

        .apply(transformationToWriteToKafkaSink());

        pipeline.run();

    }

Thanks
Kaniska

Reply via email to