In the documentation we have an example on how to implement deserialization
from bytes to Jackson ObjectNode objects - JSONKeyValueDeserializationSchema
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/

However, there is no example on the other direction: Taking an
ObjectNode/JsonNode (or just any POJO class) and using Jackson to serialize
it to string

You can write a simple schema like so


public class JSONKafkaSerializationSchema implements
KafkaSerializationSchema<JsonNode> {
    private final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public ProducerRecord<byte[], byte[]> serialize(JsonNode element,
@Nullable Long timestamp) {
        String topic = getTargetTopic(element);

        byte[] value;

        try {
            value = objectMapper.writeValueAsBytes(element);
            return new ProducerRecord<>(topic, value);
        } catch (JsonProcessingException e) {
            return null;
        }
    }

    private String getTargetTopic(JsonNode jsonNode) {
        return jsonNode.get("topic").asText();
    }
}

But this raises a question - What to do when a serialization fails?
if the input class is a simple POJO then Jackson should always succeed in
converting to bytes but that's not 100% guaranteed.
In case of failures, can we return null and the record will be discarded?
Null values are discarded in the case of the deserialization schema, from
the documentation - "Returns: The deserialized message as an object (null
if the message cannot be deserialized)."
If this is not possible, what is the proper way to serialize Jackson objets
into bytes in flink? Its possible to convert everything to String before
the kafka producer but then any logic to determine the topic we need to
send to will need to deserialize the string again

Reply via email to