In the documentation we have an example on how to implement deserialization from bytes to Jackson ObjectNode objects - JSONKeyValueDeserializationSchema https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/
However, there is no example on the other direction: Taking an ObjectNode/JsonNode (or just any POJO class) and using Jackson to serialize it to string You can write a simple schema like so public class JSONKafkaSerializationSchema implements KafkaSerializationSchema<JsonNode> { private final ObjectMapper objectMapper = new ObjectMapper(); @Override public ProducerRecord<byte[], byte[]> serialize(JsonNode element, @Nullable Long timestamp) { String topic = getTargetTopic(element); byte[] value; try { value = objectMapper.writeValueAsBytes(element); return new ProducerRecord<>(topic, value); } catch (JsonProcessingException e) { return null; } } private String getTargetTopic(JsonNode jsonNode) { return jsonNode.get("topic").asText(); } } But this raises a question - What to do when a serialization fails? if the input class is a simple POJO then Jackson should always succeed in converting to bytes but that's not 100% guaranteed. In case of failures, can we return null and the record will be discarded? Null values are discarded in the case of the deserialization schema, from the documentation - "Returns: The deserialized message as an object (null if the message cannot be deserialized)." If this is not possible, what is the proper way to serialize Jackson objets into bytes in flink? Its possible to convert everything to String before the kafka producer but then any logic to determine the topic we need to send to will need to deserialize the string again