SAMZA-584; fix race condition in kafka system consumer

Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/fcb5cea3
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/fcb5cea3
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/fcb5cea3

Branch: refs/heads/samza-sql
Commit: fcb5cea3fc4767ba6239a5cdf05a5a06a7f92e62
Parents: 7a21bef
Author: Chris Riccomini <[email protected]>
Authored: Tue Mar 3 10:20:15 2015 -0800
Committer: Chris Riccomini <[email protected]>
Committed: Tue Mar 3 10:20:15 2015 -0800

----------------------------------------------------------------------
 .../system/kafka/KafkaSystemConsumer.scala      | 27 ++++++++++----------
 1 file changed, 13 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/fcb5cea3/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
index 38117e2..de00320 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
@@ -142,7 +142,6 @@ private[kafka] class KafkaSystemConsumer(
         // This avoids trying to re-add the same topic partition repeatedly
         def refresh(tp: List[TopicAndPartition]) = {
           val head :: rest = tpToRefresh
-          val nextOffset = topicPartitionsAndOffsets.get(head).get
           // refreshBrokers can be called from abdicate and refreshDropped, 
           // both of which are triggered from BrokerProxy threads. To prevent 
           // accidentally creating multiple objects for the same broker, or 
@@ -151,18 +150,18 @@ private[kafka] class KafkaSystemConsumer(
           this.synchronized {
             // Check if we still need this TopicAndPartition inside the 
             // critical section. If we don't, then skip it.
-            if (topicPartitionsAndOffsets.contains(head)) {
-              getHostPort(topicMetadata(head.topic), head.partition) match {
-                case Some((host, port)) =>
-                  val brokerProxy = brokerProxies.getOrElseUpdate((host, 
port), createBrokerProxy(host, port))
-                  brokerProxy.addTopicPartition(head, Option(nextOffset))
-                  brokerProxy.start
-                  debug("Claimed topic-partition (%s) for (%s)".format(head, 
brokerProxy))
-                  topicPartitionsAndOffsets -= head
-                case None => info("No metadata available for: %s. Will try to 
refresh and add to a consumer thread later." format head)
-              }
-            } else {
-              debug("Ignoring refresh for %s because we already added it from 
another thread." format head)
+            topicPartitionsAndOffsets.get(head) match {
+              case Some(nextOffset) =>
+                getHostPort(topicMetadata(head.topic), head.partition) match {
+                  case Some((host, port)) =>
+                    val brokerProxy = brokerProxies.getOrElseUpdate((host, 
port), createBrokerProxy(host, port))
+                    brokerProxy.addTopicPartition(head, Option(nextOffset))
+                    brokerProxy.start
+                    debug("Claimed topic-partition (%s) for (%s)".format(head, 
brokerProxy))
+                    topicPartitionsAndOffsets -= head
+                  case None => info("No metadata available for: %s. Will try 
to refresh and add to a consumer thread later." format head)
+                }
+              case _ => debug("Ignoring refresh for %s because we already 
added it from another thread." format head)
             }
           }
           rest
@@ -182,7 +181,7 @@ private[kafka] class KafkaSystemConsumer(
   }
 
   val sink = new MessageSink {
-    var lastDroppedRefresh = 0L
+    var lastDroppedRefresh = clock()
 
     def refreshDropped() {
       if (topicPartitionsAndOffsets.size > 0 && clock() - lastDroppedRefresh > 
10000) {

Reply via email to