Repository: samza
Updated Branches:
  refs/heads/master 3dcd2e9ec -> 17e65d1cb


SAMZA-1003: Restore lazy init for kafka system producer


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/17e65d1c
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/17e65d1c
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/17e65d1c

Branch: refs/heads/master
Commit: 17e65d1cbdda1ad436f47c15fa4c86332e229a93
Parents: 3dcd2e9
Author: Xinyu Liu <[email protected]>
Authored: Thu Aug 18 16:32:13 2016 -0700
Committer: Yi Pan (Data Infrastructure) <[email protected]>
Committed: Thu Aug 18 16:32:13 2016 -0700

----------------------------------------------------------------------
 .../samza/system/kafka/KafkaSystemProducer.scala    | 16 ++++++++++------
 1 file changed, 10 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/17e65d1c/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
index 5a16580..5ff6d3c 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
@@ -70,12 +70,6 @@ class KafkaSystemProducer(systemName: String,
   val sources: ConcurrentHashMap[String, SourceData] = new 
ConcurrentHashMap[String, SourceData]
 
   def start(): Unit = {
-    producerLock.synchronized {
-      if (producer == null) {
-        info("Creating a new producer for system %s." format systemName)
-        producer = getProducer()
-      }
-    }
   }
 
   def stop() {
@@ -122,6 +116,16 @@ class KafkaSystemProducer(systemName: String,
       throw exception
     }
 
+    // lazy initialization of the producer
+    if (producer == null) {
+      producerLock.synchronized {
+        if (producer == null) {
+          info("Creating a new producer for system %s." format systemName)
+          producer = getProducer()
+        }
+      }
+    }
+
     val currentProducer = producer
     if (currentProducer == null) {
       throw new SamzaException("Kafka system producer is not available.")

Reply via email to