is it possible to publish message to Kafka serialized with KafkaAvroSerializer 
by Confluent. I’m using Flink 1.9.1 have saw that some development is going on 
newer version of flink-avro (1.11.0) but I’m stick to the version.

I would like to use the newly introduced KafkaSerializationSchema for 
serializing the message to Confluent schema-registry and Kakfa.

Here I have currently a class that is converting a class type T to avro but I 
want to use the confluent serialization.

```

public class KafkaMessageSerialization<T extends SpecificRecordBase> implements 
KafkaSerializationSchema<T> {
public static final Logger LOG = 
LoggerFactory.getLogger(KafkaMessageSerialization.class);

final private String topic;

public KafkaMessageSerialization(String topic) {
    this.topic = topic;
}

@Override
public ProducerRecord<byte[], byte[]> serialize(T event, Long timestamp) {
    final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
    final Schema schema = event.getSchema();
    final DatumWriter<T> writer = new ReflectDatumWriter<>(schema);
    final BinaryEncoder binEncoder = 
EncoderFactory.get().binaryEncoder(outputStream, null);

    try {
        writer.write(event, binEncoder);
        binEncoder.flush();
    } catch (final Exception e) {
        LOG.error("serialization error", e);
        throw new RuntimeException(e);
    }

    return new ProducerRecord<>(topic, outputStream.toByteArray());
}
}
```

The usage is quite convenient .addSink(new FlinkKafkaProducer<>(SINK_TOPIC, new 
KafkaMessageSerialization<>(SINK_TOPIC), producerProps, Semantic.AT_LEAST_ONCE))

Reply via email to