Updated Branches: refs/heads/trunk c9028ad8c -> 26a02c32d
KAFKA-1055 BrokerTopicStats should distinguish between messages received and messages actually appended (i.e., not dropped for various reasons). Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/26a02c32 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/26a02c32 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/26a02c32 Branch: refs/heads/trunk Commit: 26a02c32dda0222c6ab1ad897992558c1c1eab76 Parents: c9028ad Author: Joel Koshy <[email protected]> Authored: Fri Jan 24 18:00:01 2014 -0800 Committer: Joel Koshy <[email protected]> Committed: Fri Jan 24 18:00:01 2014 -0800 ---------------------------------------------------------------------- core/src/main/scala/kafka/server/KafkaApis.scala | 5 ++++- core/src/main/scala/kafka/server/KafkaRequestHandler.scala | 1 + 2 files changed, 5 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/26a02c32/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 29abc46..bd7940b 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -240,6 +240,7 @@ class KafkaApis(val requestChannel: RequestChannel, val partitionAndData: Map[TopicAndPartition, MessageSet] = producerRequest.data trace("Append [%s] to local log ".format(partitionAndData.toString)) partitionAndData.map {case (topicAndPartition, messages) => + // update stats for incoming bytes rate BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesInRate.mark(messages.sizeInBytes) BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(messages.sizeInBytes) @@ -254,7 +255,9 @@ class KafkaApis(val requestChannel: RequestChannel, } val numAppendedMessages = if (info.firstOffset == -1L || info.lastOffset == -1L) 0 else (info.lastOffset - info.firstOffset + 1) - // update stats + // update stats for successfully appended messages + BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).logBytesAppendRate.mark(messages.sizeInBytes) + BrokerTopicStats.getBrokerAllTopicsStats.logBytesAppendRate.mark(messages.sizeInBytes) BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).messagesInRate.mark(numAppendedMessages) BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numAppendedMessages) http://git-wip-us.apache.org/repos/asf/kafka/blob/26a02c32/core/src/main/scala/kafka/server/KafkaRequestHandler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala index d0f05cb..871212b 100644 --- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala +++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala @@ -76,6 +76,7 @@ class BrokerTopicMetrics(name: String) extends KafkaMetricsGroup { val messagesInRate = newMeter(name + "MessagesInPerSec", "messages", TimeUnit.SECONDS) val bytesInRate = newMeter(name + "BytesInPerSec", "bytes", TimeUnit.SECONDS) val bytesOutRate = newMeter(name + "BytesOutPerSec", "bytes", TimeUnit.SECONDS) + val logBytesAppendRate = newMeter(name + "LogBytesAppendedPerSec", "bytes", TimeUnit.SECONDS) val failedProduceRequestRate = newMeter(name + "FailedProduceRequestsPerSec", "requests", TimeUnit.SECONDS) val failedFetchRequestRate = newMeter(name + "FailedFetchRequestsPerSec", "requests", TimeUnit.SECONDS) }
