Repository: incubator-samza Updated Branches: refs/heads/master 90d5a00fc -> a66a66c53
SAMZA-432; throw better exception when sending messages to an undefined system Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/a66a66c5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/a66a66c5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/a66a66c5 Branch: refs/heads/master Commit: a66a66c534c587f0092d1d87328b4d56810bd315 Parents: 90d5a00 Author: Chris Riccomini <[email protected]> Authored: Wed Oct 15 13:13:47 2014 -0700 Committer: Chris Riccomini <[email protected]> Committed: Wed Oct 15 13:13:47 2014 -0700 ---------------------------------------------------------------------- .../src/main/scala/org/apache/samza/system/SystemProducers.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/a66a66c5/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala index a022b2e..a4b3ffb 100644 --- a/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala +++ b/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala @@ -82,7 +82,9 @@ class SystemProducers( } if (!bytesEnvelope.isEmpty) { - producers(envelope.getSystemStream.getSystem).send(source, bytesEnvelope.get) + val system = envelope.getSystemStream.getSystem + val producer = producers.getOrElse(system, throw new SamzaException("Attempting to produce to unknown system: %s. Available systems: %s. Please add the system to your configuration, or update outgoing message envelope to send to a defined system." format (system, producers.keySet))) + producer.send(source, bytesEnvelope.get) } } }
