Hi Raj, You have an invalid JSON message in your input. When Samza tries to de-serialize the input from your source, it encountered a De-serialization error.
If you are okay with dropping messages when there are ser-de errors, please take a look at : task.drop.deserialization.errors task.drop.serialization.errors Also, you probably want to fix the problem at the source you're consuming from (on why it was generating invalid messages) https://samza.apache.org/learn/documentation/0.10/jobs/configuration-table.html Thanks, Jagdish On Tue, Nov 22, 2016 at 3:38 AM, Raj raj <rajlistu...@gmail.com> wrote: > Hi, > > I am using json serde to receive json strings from kafka and > automatically convert it to LinkedHashMap. I am using the following > code fragment to achieve this: > > LinkedHashMap<String, Object> rawEvent = (LinkedHashMap<String, > Object>) envelope.getMessage(); > > I am now getting an exception when samsa encounters a bad json string > from kafka. > > 16/11/22 11:33:14 ERROR container.SamzaContainer: Caught exception in > process loop. > org.apache.samza.system.SystemConsumersException: Cannot deserialize > an incoming message. > at org.apache.samza.system.SystemConsumers.update( > SystemConsumers.scala:303) > at org.apache.samza.system.SystemConsumers.tryUpdate( > SystemConsumers.scala:270) > . > . > . > . > Caused by: org.codehaus.jackson.JsonParseException: Invalid UTF-8 > middle byte 0x6e > at [Source: [B@7ce97ee5; line: 1, column: 811] > at org.codehaus.jackson.JsonParser._constructError( > JsonParser.java:1291) > at org.codehaus.jackson.impl.JsonParserMinimalBase._reportError( > JsonParserMinimalBase.java:385) > at org.codehaus.jackson.impl.Utf8StreamParser._reportInvalidOther( > Utf8StreamParser.java:2244) > . > . > . > . > > Since the exceptions seems to be raised in scala code, how best to > catch such exceptions and deal with it? > > Thanks and Regards, > > Raj > -- Jagadish V, Graduate Student, Department of Computer Science, Stanford University