Repository: samza Updated Branches: refs/heads/master 0b0b2958e -> abe39676e
SAMZA-1157: Serialization/deserialization throwables should not be suppressed â¦ppressed if user don't configure to drop those errors Author: Yi Pan (Data Infrastructure) <nickpa...@gmail.com> Reviewers: Navina Ramesh <nav...@apache.org> Closes #128 from nickpan47/fix-serde-error-suppressed Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/abe39676 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/abe39676 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/abe39676 Branch: refs/heads/master Commit: abe39676ef6ab6fd0e87b1d489a3cca7d91852bc Parents: 0b0b295 Author: Yi Pan (Data Infrastructure) <nickpa...@gmail.com> Authored: Tue Apr 18 17:15:32 2017 -0700 Committer: Yi Pan (Data Infrastructure) <nickpa...@gmail.com> Committed: Tue Apr 18 17:15:32 2017 -0700 ---------------------------------------------------------------------- .../src/main/scala/org/apache/samza/system/SystemConsumers.scala | 4 ++-- .../src/main/scala/org/apache/samza/system/SystemProducers.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/abe39676/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala index f1acb15..6f0b53a 100644 --- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala +++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala @@ -331,11 +331,11 @@ class SystemConsumers ( val deserializedEnvelope = try { Some(serdeManager.fromBytes(rawEnvelope)) } catch { - case e: Exception if !dropDeserializationError => + case e: Throwable if !dropDeserializationError => throw new SystemConsumersException( "Cannot deserialize an incoming message for %s" .format(systemStreamPartition.getSystemStream.toString), e) - case ex: Exception => + case ex: Throwable => debug("Cannot deserialize an incoming message for %s. Dropping the error message." .format(systemStreamPartition.getSystemStream.toString), ex) metrics.deserializationError.inc http://git-wip-us.apache.org/repos/asf/samza/blob/abe39676/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala index a4b3ffb..e8ce961 100644 --- a/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala +++ b/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala @@ -73,7 +73,7 @@ class SystemProducers( val bytesEnvelope = try { Some(serdeManager.toBytes(envelope)) } catch { - case e: Exception if !dropSerializationError => throw new SamzaException("can not serialize the message", e) + case e: Throwable if !dropSerializationError => throw new SamzaException("can not serialize the message", e) case ex: Throwable => { debug("Serialization fails: %s . Drop the error message" format ex) metrics.serializationError.inc