Repository: samza Updated Branches: refs/heads/master 3eb15a053 -> ffa84c0b7
SAMZA-608; don't hange on serde errors in system consumers Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ffa84c0b Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ffa84c0b Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ffa84c0b Branch: refs/heads/master Commit: ffa84c0b724fd2251190a4402a122ac27fded560 Parents: 3eb15a0 Author: Yi Pan <[email protected]> Authored: Mon Apr 13 10:46:31 2015 -0700 Committer: Chris Riccomini <[email protected]> Committed: Mon Apr 13 10:46:31 2015 -0700 ---------------------------------------------------------------------- .../apache/samza/system/SystemConsumers.scala | 18 ++++++++++---- .../samza/system/TestSystemConsumers.scala | 25 ++++++++++++++++++-- 2 files changed, 37 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/ffa84c0b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala index 76fa4ad..1ec5e32 100644 --- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala +++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala @@ -204,9 +204,7 @@ class SystemConsumers( metrics.choseObject.inc metrics.systemStreamMessagesChosen(envelopeFromChooser.getSystemStreamPartition.getSystemStream).inc - if (!update(systemStreamPartition)) { - emptySystemStreamPartitionsBySystem.get(systemStreamPartition.getSystem).add(systemStreamPartition) - } + tryUpdate(systemStreamPartition) } if (envelopeFromChooser == null || lastPollMs < clock() - pollIntervalMs) { @@ -257,7 +255,7 @@ class SystemConsumers( // Update the chooser if it needs a message for this SSP. if (emptySystemStreamPartitionsBySystem.get(systemStreamPartition.getSystem).remove(systemStreamPartition)) { - update(systemStreamPartition) + tryUpdate(systemStreamPartition) } } } @@ -266,6 +264,18 @@ class SystemConsumers( } } + private def tryUpdate(ssp: SystemStreamPartition) { + var updated = false + try { + updated = update(ssp) + } finally { + if (!updated) { + // if failed to update the chooser, add the ssp back into the emptySystemStreamPartitionBySystem map to ensure that we will poll for the next message + emptySystemStreamPartitionsBySystem.get(ssp.getSystem).add(ssp) + } + } + } + private def refresh { trace("Refreshing chooser with new messages.") http://git-wip-us.apache.org/repos/asf/samza/blob/ffa84c0b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala index 3fdc781..fbaa8ee 100644 --- a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala +++ b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala @@ -241,9 +241,10 @@ class TestSystemConsumers { // it should not throw exceptions when deserializaion fails if dropDeserializationError is set to true val consumers2 = new SystemConsumers(msgChooser, consumer, serdeManager, dropDeserializationError = true) consumers2.register(systemStreamPartition, "0") - consumers2.start consumer(system).putBytesMessage consumer(system).putStringMessage + consumer(system).putBytesMessage + consumers2.start var notThrowException = true; try { @@ -251,9 +252,29 @@ class TestSystemConsumers { } catch { case e: Throwable => notThrowException = false } - assertTrue("it should not throw any exception", notThrowException) + + var msgEnvelope = Some(consumers2.choose) + assertTrue("Consumer did not succeed in receiving the second message after Serde exception in choose", msgEnvelope.get != null) + consumers2.stop + + // ensure that the system consumer will continue after poll() method ignored a Serde exception + consumer(system).putStringMessage + consumer(system).putBytesMessage + + notThrowException = true; + try { + consumers2.start + } catch { + case e: Throwable => notThrowException = false + } + assertTrue("SystemConsumer start should not throw any Serde exception", notThrowException) + + msgEnvelope = null + msgEnvelope = Some(consumers2.choose) + assertTrue("Consumer did not succeed in receiving the second message after Serde exception in poll", msgEnvelope.get != null) consumers2.stop + } /**
