is it possible to publish message to Kafka serialized with KafkaAvroSerializer by Confluent. I’m using Flink 1.9.1 have saw that some development is going on newer version of flink-avro (1.11.0) but I’m stick to the version.
I would like to use the newly introduced KafkaSerializationSchema for serializing the message to Confluent schema-registry and Kakfa. Here I have currently a class that is converting a class type T to avro but I want to use the confluent serialization. ``` public class KafkaMessageSerialization<T extends SpecificRecordBase> implements KafkaSerializationSchema<T> { public static final Logger LOG = LoggerFactory.getLogger(KafkaMessageSerialization.class); final private String topic; public KafkaMessageSerialization(String topic) { this.topic = topic; } @Override public ProducerRecord<byte[], byte[]> serialize(T event, Long timestamp) { final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); final Schema schema = event.getSchema(); final DatumWriter<T> writer = new ReflectDatumWriter<>(schema); final BinaryEncoder binEncoder = EncoderFactory.get().binaryEncoder(outputStream, null); try { writer.write(event, binEncoder); binEncoder.flush(); } catch (final Exception e) { LOG.error("serialization error", e); throw new RuntimeException(e); } return new ProducerRecord<>(topic, outputStream.toByteArray()); } } ``` The usage is quite convenient .addSink(new FlinkKafkaProducer<>(SINK_TOPIC, new KafkaMessageSerialization<>(SINK_TOPIC), producerProps, Semantic.AT_LEAST_ONCE))