Repository: samza Updated Branches: refs/heads/master a94e1ed89 -> db2d93e34
SAMZA-643: Improve error messages fro null stream in OutgoingMessageEnvelop Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/db2d93e3 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/db2d93e3 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/db2d93e3 Branch: refs/heads/master Commit: db2d93e3424713fb0dcbd4349f05995b30e1d864 Parents: a94e1ed Author: Gustavo Anatoly <gustavoanat...@gmail.com> Authored: Fri May 15 11:29:50 2015 -0700 Committer: Yan Fang <yanfang...@gmail.com> Committed: Fri May 15 11:29:50 2015 -0700 ---------------------------------------------------------------------- .../system/kafka/KafkaSystemProducer.scala | 4 ++++ .../system/kafka/TestKafkaSystemProducer.scala | 25 ++++++++++++++++++++ 2 files changed, 29 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/db2d93e3/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala index 19bc37d..39c54aa 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala @@ -46,6 +46,7 @@ class KafkaSystemProducer(systemName: String, val latestFuture: javaMap[String, Future[RecordMetadata]] = new util.HashMap[String, Future[RecordMetadata]]() val sendFailed: AtomicBoolean = new AtomicBoolean(false) var exceptionThrown: AtomicReference[Exception] = new AtomicReference[Exception]() + val StreamNameNullOrEmptyErrorMsg = "Stream Name should be specified in the stream configuration file."; def start() { } @@ -74,6 +75,9 @@ class KafkaSystemProducer(systemName: String, // Java-based Kafka producer API requires an "Integer" type partitionKey and does not allow custom overriding of Partitioners // Any kind of custom partitioning has to be done on the client-side val topicName = envelope.getSystemStream.getStream + if (topicName == null || topicName == "") { + throw new IllegalArgumentException(StreamNameNullOrEmptyErrorMsg) + } val partitions: java.util.List[PartitionInfo] = producer.partitionsFor(topicName) val partitionKey = if(envelope.getPartitionKey != null) KafkaUtil.getIntegerPartitionKey(envelope, partitions) else null val record = new ProducerRecord(envelope.getSystemStream.getStream, http://git-wip-us.apache.org/repos/asf/samza/blob/db2d93e3/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala index ef5a55b..39426d8 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala @@ -33,6 +33,7 @@ import org.apache.samza.SamzaException class TestKafkaSystemProducer { val someMessage = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "test".getBytes) + val StreamNameNullOrEmptyErrorMsg = "Stream Name should be specified in the stream configuration file."; @Test def testKafkaProducer { @@ -194,4 +195,28 @@ class TestKafkaSystemProducer { systemProducer.stop() assertEquals(4, mockProducer.getMsgsSent) } + + @Test + def testSystemStreamNameNullOrEmpty { + val omeStreamNameNull = new OutgoingMessageEnvelope(new SystemStream("test", null), "a".getBytes) + val omeStreamNameEmpty = new OutgoingMessageEnvelope(new SystemStream("test", ""), "a".getBytes) + val mockProducer = new MockKafkaProducer(1, "testMock", 1) + val producer = new KafkaSystemProducer(systemName = "test", getProducer = () => mockProducer, + metrics = new KafkaSystemProducerMetrics) + + val thrownNull = intercept[IllegalArgumentException] { + producer.register("test1") + producer.start() + producer.send("testSrc1", omeStreamNameNull) + assertEquals(0, mockProducer.getMsgsSent) + } + val thrownEmpty = intercept[IllegalArgumentException] { + producer.register("test2") + producer.start() + producer.send("testSrc2", omeStreamNameEmpty) + assertEquals(0, mockProducer.getMsgsSent) + } + assertTrue(thrownNull.getMessage() == StreamNameNullOrEmptyErrorMsg) + assertTrue(thrownEmpty.getMessage() == StreamNameNullOrEmptyErrorMsg) + } }