Repository: samza Updated Branches: refs/heads/master 3dcd2e9ec -> 17e65d1cb
SAMZA-1003: Restore lazy init for kafka system producer Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/17e65d1c Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/17e65d1c Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/17e65d1c Branch: refs/heads/master Commit: 17e65d1cbdda1ad436f47c15fa4c86332e229a93 Parents: 3dcd2e9 Author: Xinyu Liu <[email protected]> Authored: Thu Aug 18 16:32:13 2016 -0700 Committer: Yi Pan (Data Infrastructure) <[email protected]> Committed: Thu Aug 18 16:32:13 2016 -0700 ---------------------------------------------------------------------- .../samza/system/kafka/KafkaSystemProducer.scala | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/17e65d1c/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala index 5a16580..5ff6d3c 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala @@ -70,12 +70,6 @@ class KafkaSystemProducer(systemName: String, val sources: ConcurrentHashMap[String, SourceData] = new ConcurrentHashMap[String, SourceData] def start(): Unit = { - producerLock.synchronized { - if (producer == null) { - info("Creating a new producer for system %s." format systemName) - producer = getProducer() - } - } } def stop() { @@ -122,6 +116,16 @@ class KafkaSystemProducer(systemName: String, throw exception } + // lazy initialization of the producer + if (producer == null) { + producerLock.synchronized { + if (producer == null) { + info("Creating a new producer for system %s." format systemName) + producer = getProducer() + } + } + } + val currentProducer = producer if (currentProducer == null) { throw new SamzaException("Kafka system producer is not available.")
