Hey Shekar, The default behavior for Samza when a serialization exception occurs is to fail the container, which will eventually trigger a failure of the job. Samza is very conservative about things--by default it will try to never drop messages. In a case where deserialization fails, you have two options: fail, or drop (or some version of drop, like log bad msgs to a different topic). Samza fails by default.
To drop, you can set this configuration: task.drop.deserialization.errors See http://samza.incubator.apache.org/learn/documentation/0.8/jobs/configuratio n-table.html for details. Cheers, Chris On 12/16/14 12:44 PM, "Shekar Tippur" <[email protected]> wrote: >Hello, > >I am facing issue when I pass a malformed json. > > >I was testing on how the system would behave if I sent a malformed json. I >seem to get a jsonparse exception and the samza process terminates. Any >idea on what could be going on and how to mitigate this? > >Exception in thread "ThreadJob" org.codehaus.jackson.JsonParseException: >Unexpected character ('D' (code 68)): expected a valid value (number, >String, array, object, 'true', 'false' or 'null') > > at [Source: [B@ef49347; line: 1, column: 16] > > at >org.codehaus.jackson.JsonParser._constructError(JsonParser.java:1291) > > at >org.codehaus.jackson.impl.JsonParserMinimalBase._reportError(JsonParserMin >imalBase.java:385) > > at >org.codehaus.jackson.impl.JsonParserMinimalBase._reportUnexpectedChar(Json >ParserMinimalBase.java:306) > > at >org.codehaus.jackson.impl.Utf8StreamParser._handleUnexpectedValue(Utf8Stre >amParser.java:1582) > > at >org.codehaus.jackson.impl.Utf8StreamParser.nextToken(Utf8StreamParser.java >:386) > > at >org.codehaus.jackson.map.deser.UntypedObjectDeserializer.mapObject(Untyped >ObjectDeserializer.java:173) > > at >org.codehaus.jackson.map.deser.UntypedObjectDeserializer.deserialize(Untyp >edObjectDeserializer.java:76) > > at >org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.java:2 >402) > > at >org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1674) > > at >org.apache.samza.serializers.JsonSerde.fromBytes(JsonSerde.scala:33) > > at >org.apache.samza.serializers.SerdeManager.fromBytes(SerdeManager.scala:115 >) > > at >org.apache.samza.system.SystemConsumers$$anonfun$org$apache$samza$system$S >ystemConsumers$$poll$5.apply(SystemConsumers.scala:245) > > at >org.apache.samza.system.SystemConsumers$$anonfun$org$apache$samza$system$S >ystemConsumers$$poll$5.apply(SystemConsumers.scala:242) > > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > > at >scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > > at org.apache.samza.system.SystemConsumers.org >$apache$samza$system$SystemConsumers$$poll(SystemConsumers.scala:242) > > at >org.apache.samza.system.SystemConsumers$$anon$1$$anonfun$call$2.apply(Syst >emConsumers.scala:180) > > at >org.apache.samza.system.SystemConsumers$$anon$1$$anonfun$call$2.apply(Syst >emConsumers.scala:180) > > at >scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scal >a:244) > > at >scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scal >a:244) > > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > > at >scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174) > > at >scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > > at >scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala >:47) > > at scala.collection.SetLike$class.map(SetLike.scala:93) > > at scala.collection.AbstractSet.map(Set.scala:47) > > at >org.apache.samza.system.SystemConsumers$$anon$1.call(SystemConsumers.scala >:180) > > at >org.apache.samza.util.DoublingBackOff.maybeCall(DoublingBackOff.scala:44) > > at >org.apache.samza.system.SystemConsumers.choose(SystemConsumers.scala:208) > > at org.apache.samza.container.RunLoop.process(RunLoop.scala:73) > > at org.apache.samza.container.RunLoop.run(RunLoop.scala:57) > > at >org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:504) > > at >org.apache.samza.job.local.ThreadJob$$anon$1.run(ThreadJob.scala:42)
