SAMZA-584; fix race condition in kafka system consumer
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/fcb5cea3 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/fcb5cea3 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/fcb5cea3 Branch: refs/heads/samza-sql Commit: fcb5cea3fc4767ba6239a5cdf05a5a06a7f92e62 Parents: 7a21bef Author: Chris Riccomini <[email protected]> Authored: Tue Mar 3 10:20:15 2015 -0800 Committer: Chris Riccomini <[email protected]> Committed: Tue Mar 3 10:20:15 2015 -0800 ---------------------------------------------------------------------- .../system/kafka/KafkaSystemConsumer.scala | 27 ++++++++++---------- 1 file changed, 13 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/fcb5cea3/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala index 38117e2..de00320 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala @@ -142,7 +142,6 @@ private[kafka] class KafkaSystemConsumer( // This avoids trying to re-add the same topic partition repeatedly def refresh(tp: List[TopicAndPartition]) = { val head :: rest = tpToRefresh - val nextOffset = topicPartitionsAndOffsets.get(head).get // refreshBrokers can be called from abdicate and refreshDropped, // both of which are triggered from BrokerProxy threads. To prevent // accidentally creating multiple objects for the same broker, or @@ -151,18 +150,18 @@ private[kafka] class KafkaSystemConsumer( this.synchronized { // Check if we still need this TopicAndPartition inside the // critical section. If we don't, then skip it. - if (topicPartitionsAndOffsets.contains(head)) { - getHostPort(topicMetadata(head.topic), head.partition) match { - case Some((host, port)) => - val brokerProxy = brokerProxies.getOrElseUpdate((host, port), createBrokerProxy(host, port)) - brokerProxy.addTopicPartition(head, Option(nextOffset)) - brokerProxy.start - debug("Claimed topic-partition (%s) for (%s)".format(head, brokerProxy)) - topicPartitionsAndOffsets -= head - case None => info("No metadata available for: %s. Will try to refresh and add to a consumer thread later." format head) - } - } else { - debug("Ignoring refresh for %s because we already added it from another thread." format head) + topicPartitionsAndOffsets.get(head) match { + case Some(nextOffset) => + getHostPort(topicMetadata(head.topic), head.partition) match { + case Some((host, port)) => + val brokerProxy = brokerProxies.getOrElseUpdate((host, port), createBrokerProxy(host, port)) + brokerProxy.addTopicPartition(head, Option(nextOffset)) + brokerProxy.start + debug("Claimed topic-partition (%s) for (%s)".format(head, brokerProxy)) + topicPartitionsAndOffsets -= head + case None => info("No metadata available for: %s. Will try to refresh and add to a consumer thread later." format head) + } + case _ => debug("Ignoring refresh for %s because we already added it from another thread." format head) } } rest @@ -182,7 +181,7 @@ private[kafka] class KafkaSystemConsumer( } val sink = new MessageSink { - var lastDroppedRefresh = 0L + var lastDroppedRefresh = clock() def refreshDropped() { if (topicPartitionsAndOffsets.size > 0 && clock() - lastDroppedRefresh > 10000) {
