kafka-989; Race condition shutting down high-level consumer results in spinning background thread; patched by Phil Hargett; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1d6ad3d7 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1d6ad3d7 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1d6ad3d7 Branch: refs/heads/trunk Commit: 1d6ad3d7d441bc7174a445a4e5e49319ee1003eb Parents: 7fbbb66 Author: Phil Hargett <[email protected]> Authored: Mon Aug 5 18:59:34 2013 -0700 Committer: Jun Rao <[email protected]> Committed: Mon Aug 5 18:59:34 2013 -0700 ---------------------------------------------------------------------- .../consumer/ZookeeperConsumerConnector.scala | 94 +++++++++++--------- 1 file changed, 50 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/1d6ad3d7/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index a2ea5a9..0ca2850 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -153,32 +153,34 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } def shutdown() { - val canShutdown = isShuttingDown.compareAndSet(false, true); - if (canShutdown) { - info("ZKConsumerConnector shutting down") + rebalanceLock synchronized { + val canShutdown = isShuttingDown.compareAndSet(false, true); + if (canShutdown) { + info("ZKConsumerConnector shutting down") - if (wildcardTopicWatcher != null) - wildcardTopicWatcher.shutdown() - try { - if (config.autoCommitEnable) - scheduler.shutdownNow() - fetcher match { - case Some(f) => f.stopConnections - case None => - } - sendShutdownToAllQueues() - if (config.autoCommitEnable) - commitOffsets() - if (zkClient != null) { - zkClient.close() - zkClient = null + if (wildcardTopicWatcher != null) + wildcardTopicWatcher.shutdown() + try { + if (config.autoCommitEnable) + scheduler.shutdownNow() + fetcher match { + case Some(f) => f.stopConnections + case None => + } + sendShutdownToAllQueues() + if (config.autoCommitEnable) + commitOffsets() + if (zkClient != null) { + zkClient.close() + zkClient = null + } + } catch { + case e => + fatal("error during consumer connector shutdown", e) } - } catch { - case e => - fatal("error during consumer connector shutdown", e) + info("ZKConsumerConnector shut down completed") } - info("ZKConsumerConnector shut down completed") - } + } } def consume[K, V](topicCountMap: scala.collection.Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V]) @@ -369,31 +371,35 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, def syncedRebalance() { rebalanceLock synchronized { - for (i <- 0 until config.rebalanceMaxRetries) { - info("begin rebalancing consumer " + consumerIdString + " try #" + i) - var done = false - val cluster = getCluster(zkClient) - try { - done = rebalance(cluster) - } catch { - case e => - /** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating. - * For example, a ZK node can disappear between the time we get all children and the time we try to get - * the value of a child. Just let this go since another rebalance will be triggered. - **/ - info("exception during rebalance ", e) - } - info("end rebalancing consumer " + consumerIdString + " try #" + i) - if (done) { - return - } else { + if(isShuttingDown.get()) { + return + } else { + for (i <- 0 until config.rebalanceMaxRetries) { + info("begin rebalancing consumer " + consumerIdString + " try #" + i) + var done = false + val cluster = getCluster(zkClient) + try { + done = rebalance(cluster) + } catch { + case e => + /** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating. + * For example, a ZK node can disappear between the time we get all children and the time we try to get + * the value of a child. Just let this go since another rebalance will be triggered. + **/ + info("exception during rebalance ", e) + } + info("end rebalancing consumer " + consumerIdString + " try #" + i) + if (done) { + return + } else { /* Here the cache is at a risk of being stale. To take future rebalancing decisions correctly, we should * clear the cache */ info("Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered") + } + // stop all fetchers and clear all the queues to avoid data duplication + closeFetchersForQueues(cluster, kafkaMessageAndMetadataStreams, topicThreadIdAndQueues.map(q => q._2)) + Thread.sleep(config.rebalanceBackoffMs) } - // stop all fetchers and clear all the queues to avoid data duplication - closeFetchersForQueues(cluster, kafkaMessageAndMetadataStreams, topicThreadIdAndQueues.map(q => q._2)) - Thread.sleep(config.rebalanceBackoffMs) } }
