Updated Branches: refs/heads/0.8 ceb55cad5 -> d217f4cc2
kafka-1017; High number of open file handles in 0.8 producer; patched by Swapnil Ghike; reviewed by Neha Narkhede and Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d217f4cc Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d217f4cc Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d217f4cc Branch: refs/heads/0.8 Commit: d217f4cc276eee021a0d756d3390b633e43f7115 Parents: ceb55ca Author: Swapnil Ghike <[email protected]> Authored: Thu Aug 22 09:52:48 2013 -0700 Committer: Jun Rao <[email protected]> Committed: Thu Aug 22 09:52:48 2013 -0700 ---------------------------------------------------------------------- .../main/scala/kafka/producer/Producer.scala | 33 +++++---- .../producer/async/DefaultEventHandler.scala | 77 ++++++++++---------- 2 files changed, 56 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/d217f4cc/core/src/main/scala/kafka/producer/Producer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala index bb16a29..f582919 100644 --- a/core/src/main/scala/kafka/producer/Producer.scala +++ b/core/src/main/scala/kafka/producer/Producer.scala @@ -33,16 +33,17 @@ class Producer[K,V](val config: ProducerConfig, private val hasShutdown = new AtomicBoolean(false) private val queue = new LinkedBlockingQueue[KeyedMessage[K,V]](config.queueBufferingMaxMessages) - private val random = new Random private var sync: Boolean = true private var producerSendThread: ProducerSendThread[K,V] = null + private val lock = new Object() + config.producerType match { case "sync" => case "async" => sync = false producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + config.clientId, queue, - eventHandler, + eventHandler, config.queueBufferingMaxMs, config.batchNumMessages, config.clientId) @@ -67,12 +68,14 @@ class Producer[K,V](val config: ProducerConfig, * @param messages the producer data object that encapsulates the topic, key and message data */ def send(messages: KeyedMessage[K,V]*) { - if (hasShutdown.get) - throw new ProducerClosedException - recordStats(messages) - sync match { - case true => eventHandler.handle(messages) - case false => asyncSend(messages) + lock synchronized { + if (hasShutdown.get) + throw new ProducerClosedException + recordStats(messages) + sync match { + case true => eventHandler.handle(messages) + case false => asyncSend(messages) + } } } @@ -119,12 +122,14 @@ class Producer[K,V](val config: ProducerConfig, * the zookeeper client connection if one exists */ def close() = { - val canShutdown = hasShutdown.compareAndSet(false, true) - if(canShutdown) { - info("Shutting down producer") - if (producerSendThread != null) - producerSendThread.shutdown - eventHandler.close + lock synchronized { + val canShutdown = hasShutdown.compareAndSet(false, true) + if(canShutdown) { + info("Shutting down producer") + if (producerSendThread != null) + producerSendThread.shutdown + eventHandler.close + } } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/d217f4cc/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index f71a242..2e36d3b 100644 --- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -40,8 +40,6 @@ class DefaultEventHandler[K,V](config: ProducerConfig, val correlationId = new AtomicInteger(0) val brokerPartitionInfo = new BrokerPartitionInfo(config, producerPool, topicPartitionInfos) - private val lock = new Object() - private val topicMetadataRefreshInterval = config.topicMetadataRefreshIntervalMs private var lastTopicMetadataRefreshTime = 0L private val topicMetadataToRefresh = Set.empty[String] @@ -51,47 +49,46 @@ class DefaultEventHandler[K,V](config: ProducerConfig, private val producerTopicStats = ProducerTopicStatsRegistry.getProducerTopicStats(config.clientId) def handle(events: Seq[KeyedMessage[K,V]]) { - lock synchronized { - sendPartitionPerTopicCache.clear() - val serializedData = serialize(events) - serializedData.foreach { - keyed => - val dataSize = keyed.message.payloadSize - producerTopicStats.getProducerTopicStats(keyed.topic).byteRate.mark(dataSize) - producerTopicStats.getProducerAllTopicsStats.byteRate.mark(dataSize) - } - var outstandingProduceRequests = serializedData - var remainingRetries = config.messageSendMaxRetries + 1 - val correlationIdStart = correlationId.get() - debug("Handling %d events".format(events.size)) - while (remainingRetries > 0 && outstandingProduceRequests.size > 0) { - topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic) - if (topicMetadataRefreshInterval >= 0 && - SystemTime.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) { - Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement)) - topicMetadataToRefresh.clear - lastTopicMetadataRefreshTime = SystemTime.milliseconds - } - outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests) - if (outstandingProduceRequests.size > 0) { - info("Back off for %d ms before retrying send. Remaining retries = %d".format(config.retryBackoffMs, remainingRetries-1)) - // back off and update the topic metadata cache before attempting another send operation - Thread.sleep(config.retryBackoffMs) - // get topics of the outstanding produce requests and refresh metadata for those - Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement)) - remainingRetries -= 1 - producerStats.resendRate.mark() - } + val serializedData = serialize(events) + serializedData.foreach { + keyed => + val dataSize = keyed.message.payloadSize + producerTopicStats.getProducerTopicStats(keyed.topic).byteRate.mark(dataSize) + producerTopicStats.getProducerAllTopicsStats.byteRate.mark(dataSize) + } + var outstandingProduceRequests = serializedData + var remainingRetries = config.messageSendMaxRetries + 1 + val correlationIdStart = correlationId.get() + debug("Handling %d events".format(events.size)) + while (remainingRetries > 0 && outstandingProduceRequests.size > 0) { + topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic) + if (topicMetadataRefreshInterval >= 0 && + SystemTime.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) { + Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement)) + sendPartitionPerTopicCache.clear() + topicMetadataToRefresh.clear + lastTopicMetadataRefreshTime = SystemTime.milliseconds } - if(outstandingProduceRequests.size > 0) { - producerStats.failedSendRate.mark() - val correlationIdEnd = correlationId.get() - error("Failed to send requests for topics %s with correlation ids in [%d,%d]" - .format(outstandingProduceRequests.map(_.topic).toSet.mkString(","), - correlationIdStart, correlationIdEnd-1)) - throw new FailedToSendMessageException("Failed to send messages after " + config.messageSendMaxRetries + " tries.", null) + outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests) + if (outstandingProduceRequests.size > 0) { + info("Back off for %d ms before retrying send. Remaining retries = %d".format(config.retryBackoffMs, remainingRetries-1)) + // back off and update the topic metadata cache before attempting another send operation + Thread.sleep(config.retryBackoffMs) + // get topics of the outstanding produce requests and refresh metadata for those + Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement)) + sendPartitionPerTopicCache.clear() + remainingRetries -= 1 + producerStats.resendRate.mark() } } + if(outstandingProduceRequests.size > 0) { + producerStats.failedSendRate.mark() + val correlationIdEnd = correlationId.get() + error("Failed to send requests for topics %s with correlation ids in [%d,%d]" + .format(outstandingProduceRequests.map(_.topic).toSet.mkString(","), + correlationIdStart, correlationIdEnd-1)) + throw new FailedToSendMessageException("Failed to send messages after " + config.messageSendMaxRetries + " tries.", null) + } } private def dispatchSerializedData(messages: Seq[KeyedMessage[K,Message]]): Seq[KeyedMessage[K, Message]] = {
