Hi, I am using json serde to receive json strings from kafka and automatically convert it to LinkedHashMap. I am using the following code fragment to achieve this:
LinkedHashMap<String, Object> rawEvent = (LinkedHashMap<String, Object>) envelope.getMessage(); I am now getting an exception when samsa encounters a bad json string from kafka. 16/11/22 11:33:14 ERROR container.SamzaContainer: Caught exception in process loop. org.apache.samza.system.SystemConsumersException: Cannot deserialize an incoming message. at org.apache.samza.system.SystemConsumers.update(SystemConsumers.scala:303) at org.apache.samza.system.SystemConsumers.tryUpdate(SystemConsumers.scala:270) . . . . Caused by: org.codehaus.jackson.JsonParseException: Invalid UTF-8 middle byte 0x6e at [Source: [B@7ce97ee5; line: 1, column: 811] at org.codehaus.jackson.JsonParser._constructError(JsonParser.java:1291) at org.codehaus.jackson.impl.JsonParserMinimalBase._reportError(JsonParserMinimalBase.java:385) at org.codehaus.jackson.impl.Utf8StreamParser._reportInvalidOther(Utf8StreamParser.java:2244) . . . . Since the exceptions seems to be raised in scala code, how best to catch such exceptions and deal with it? Thanks and Regards, Raj