Hi there, my question is about Kafka Streams. I'm writting an application using Streams. I read JSON from Kafka topic and I make some transformations.
I'm using Serde<JsonNode> jsonNodeSerder = Serdes.serdeFrom(new JsonSerializer(), new JsonDeserializer()); KStream<String, JsonNode> kStream = builder.stream(Serdes.String(), jsonNodeSerder, topic); If, unluckily, we receive a non-json message, JsonDeserializer launch a throw new SerializationException("Error serializing JSON message", e); and streams threads die so the entire application dies. In order to make robust my application I have to skip this errors and send the error message to another topic to be analyzed. Currently I've implemented MyOwnJSONDeserializer extends Kafka JsonDeserializer and I catch and manage the exception but this don't seems really clean to me. How would you manage this errors? I thought I might have used String SerDe and "manually" convert to JsonNode, is there any better solution? Thanks.