Repository: samza
Updated Branches:
  refs/heads/master a94e1ed89 -> db2d93e34


SAMZA-643: Improve error messages fro null stream in OutgoingMessageEnvelop


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/db2d93e3
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/db2d93e3
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/db2d93e3

Branch: refs/heads/master
Commit: db2d93e3424713fb0dcbd4349f05995b30e1d864
Parents: a94e1ed
Author: Gustavo Anatoly <gustavoanat...@gmail.com>
Authored: Fri May 15 11:29:50 2015 -0700
Committer: Yan Fang <yanfang...@gmail.com>
Committed: Fri May 15 11:29:50 2015 -0700

----------------------------------------------------------------------
 .../system/kafka/KafkaSystemProducer.scala      |  4 ++++
 .../system/kafka/TestKafkaSystemProducer.scala  | 25 ++++++++++++++++++++
 2 files changed, 29 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/db2d93e3/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
index 19bc37d..39c54aa 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
@@ -46,6 +46,7 @@ class KafkaSystemProducer(systemName: String,
   val latestFuture: javaMap[String, Future[RecordMetadata]] = new 
util.HashMap[String, Future[RecordMetadata]]()
   val sendFailed: AtomicBoolean = new AtomicBoolean(false)
   var exceptionThrown: AtomicReference[Exception] = new 
AtomicReference[Exception]()
+  val StreamNameNullOrEmptyErrorMsg = "Stream Name should be specified in the 
stream configuration file.";
 
   def start() {
   }
@@ -74,6 +75,9 @@ class KafkaSystemProducer(systemName: String,
     // Java-based Kafka producer API requires an "Integer" type partitionKey 
and does not allow custom overriding of Partitioners
     // Any kind of custom partitioning has to be done on the client-side
     val topicName = envelope.getSystemStream.getStream
+    if (topicName == null || topicName == "") {
+      throw new IllegalArgumentException(StreamNameNullOrEmptyErrorMsg)
+    }
     val partitions: java.util.List[PartitionInfo]  = 
producer.partitionsFor(topicName)
     val partitionKey = if(envelope.getPartitionKey != null) 
KafkaUtil.getIntegerPartitionKey(envelope, partitions) else null
     val record = new ProducerRecord(envelope.getSystemStream.getStream,

http://git-wip-us.apache.org/repos/asf/samza/blob/db2d93e3/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
index ef5a55b..39426d8 100644
--- 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
@@ -33,6 +33,7 @@ import org.apache.samza.SamzaException
 class TestKafkaSystemProducer {
 
   val someMessage = new OutgoingMessageEnvelope(new SystemStream("test", 
"test"), "test".getBytes)
+  val StreamNameNullOrEmptyErrorMsg = "Stream Name should be specified in the 
stream configuration file.";
 
   @Test
   def testKafkaProducer {
@@ -194,4 +195,28 @@ class TestKafkaSystemProducer {
     systemProducer.stop()
     assertEquals(4, mockProducer.getMsgsSent)
   }
+
+  @Test
+  def testSystemStreamNameNullOrEmpty {
+    val omeStreamNameNull = new OutgoingMessageEnvelope(new 
SystemStream("test", null), "a".getBytes)
+    val omeStreamNameEmpty = new OutgoingMessageEnvelope(new 
SystemStream("test", ""), "a".getBytes)
+    val mockProducer = new MockKafkaProducer(1, "testMock", 1)
+    val producer = new KafkaSystemProducer(systemName = "test", getProducer = 
() => mockProducer,
+                                           metrics = new 
KafkaSystemProducerMetrics)
+
+    val thrownNull = intercept[IllegalArgumentException] {
+      producer.register("test1")
+      producer.start()
+      producer.send("testSrc1", omeStreamNameNull)
+      assertEquals(0, mockProducer.getMsgsSent)
+    }
+    val thrownEmpty = intercept[IllegalArgumentException] {
+      producer.register("test2")
+      producer.start()
+      producer.send("testSrc2", omeStreamNameEmpty)
+      assertEquals(0, mockProducer.getMsgsSent)
+    }
+    assertTrue(thrownNull.getMessage() == StreamNameNullOrEmptyErrorMsg)
+    assertTrue(thrownEmpty.getMessage() == StreamNameNullOrEmptyErrorMsg)
+  }
 }

Reply via email to