Hello Gaspar, In your case, a single topic can have messages in different format, and my guess is that they usually have different semantics (e.g. one format for data record, and another format for control message / error log / etc).
In this case, I'd suggest similar solutions as you mentioned, to either "filter" out the non-related messages or "branch" this topic into multiple streams based on their data format with a String-with-manual-Json serde first, before doing the actual processing. Kafka Streams has the corresponding filter / branch operators in the high-level DSL for these purposes. Guozhang On Mon, Jul 4, 2016 at 1:46 AM, Gaspar Muñoz <gmu...@stratio.com> wrote: > Hi there, > > my question is about Kafka Streams. I'm writting an application using > Streams. I read JSON from Kafka topic and I make some transformations. > > I'm using > > Serde<JsonNode> jsonNodeSerder = Serdes.serdeFrom(new JsonSerializer(), new > JsonDeserializer()); > KStream<String, JsonNode> kStream = builder.stream(Serdes.String(), > jsonNodeSerder, topic); > > If, unluckily, we receive a non-json message, JsonDeserializer launch a > > throw new SerializationException("Error serializing JSON message", e); > > and streams threads die so the entire application dies. In order to make > robust my application I have to skip this errors and send the error > message to another topic to be analyzed. > > Currently I've implemented MyOwnJSONDeserializer extends Kafka > JsonDeserializer and I catch and manage the exception but this don't seems > really clean to me. > > How would you manage this errors? I thought I might have used String SerDe > and "manually" convert to JsonNode, is there any better solution? > > Thanks. > -- -- Guozhang