Hi,

I am using json serde to receive json strings from kafka and
automatically convert it to LinkedHashMap. I am using the following
code fragment to achieve this:

LinkedHashMap<String, Object> rawEvent = (LinkedHashMap<String,
Object>) envelope.getMessage();

I am now getting an exception when samsa encounters a bad json string
from kafka.

16/11/22 11:33:14 ERROR container.SamzaContainer: Caught exception in
process loop.
org.apache.samza.system.SystemConsumersException: Cannot deserialize
an incoming message.
        at 
org.apache.samza.system.SystemConsumers.update(SystemConsumers.scala:303)
        at 
org.apache.samza.system.SystemConsumers.tryUpdate(SystemConsumers.scala:270)
.
.
.
.
Caused by: org.codehaus.jackson.JsonParseException: Invalid UTF-8
middle byte 0x6e
 at [Source: [B@7ce97ee5; line: 1, column: 811]
        at org.codehaus.jackson.JsonParser._constructError(JsonParser.java:1291)
        at 
org.codehaus.jackson.impl.JsonParserMinimalBase._reportError(JsonParserMinimalBase.java:385)
        at 
org.codehaus.jackson.impl.Utf8StreamParser._reportInvalidOther(Utf8StreamParser.java:2244)
.
.
.
.

Since the exceptions seems  to be raised in scala code, how best to
catch such exceptions and deal with it?

Thanks and Regards,

Raj

Reply via email to