Hi Olga, There is an open PR[1] that has some in-progress work on corresponding AvroSerializationSchema, you can have a look at it. The bigger issue there is that SerializationSchema does not have access to event's key so using topic pattern might be problematic. Best, Dawid
[1] https://github.com/apache/flink/pull/6259 On Mon, 22 Oct 2018 at 16:51, Kostas Kloudas <[email protected]> wrote: > Hi Olga, > > Sorry for the late reply. > I think that Gordon (cc’ed) could be able to answer your question. > > Cheers, > Kostas > > On Oct 13, 2018, at 3:10 PM, Olga Luganska <[email protected]> wrote: > > Any suggestions? > > Thank you > > Sent from my iPhone > > On Oct 9, 2018, at 9:28 PM, Olga Luganska <[email protected]> wrote: > > Hello, > > I would like to use Confluent Schema Registry in my streaming job. > I was able to make it work with the help of generic Kafka producer and > FlinkKafkaConsumer which is using > ConfluentRegistryAvroDeserializationSchema. > > FlinkKafkaConsumer011<GenericRecord> consumer = new > FlinkKafkaConsumer011<>(MY_TOPIC, > ConfluentRegistryAvroDeserializationSchema.forGeneric(schema, SCHEMA_URI), > kafkaProperties); > > My question: is it possible to implement *producer* logic in the > FlinkKafkaProducer to serialize message and store schema id in the > Confluent Schema registry? > > I don't think this is going to work with the current interface because > creation and caching of the schema id in the Confluent Schema Registry is > done with the help of > *io.confluent.kafka.serializers.KafkaAvroSerializer.class* and all > FlinkKafkaProducer constructors have either SerializationSchema or > KeyedSerializationSchema (part of Flink's own serialization stack) as one > of the parameters. > If my assumption is wrong, could you please provide details of > implementation? > Thank you very much, > Olga > > > > > > > > > > > >
