Updated Branches:
  refs/heads/trunk c9028ad8c -> 26a02c32d

KAFKA-1055 BrokerTopicStats should distinguish between messages received and 
messages actually appended (i.e., not dropped for various reasons).


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/26a02c32
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/26a02c32
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/26a02c32

Branch: refs/heads/trunk
Commit: 26a02c32dda0222c6ab1ad897992558c1c1eab76
Parents: c9028ad
Author: Joel Koshy <[email protected]>
Authored: Fri Jan 24 18:00:01 2014 -0800
Committer: Joel Koshy <[email protected]>
Committed: Fri Jan 24 18:00:01 2014 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/server/KafkaApis.scala           | 5 ++++-
 core/src/main/scala/kafka/server/KafkaRequestHandler.scala | 1 +
 2 files changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/26a02c32/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 29abc46..bd7940b 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -240,6 +240,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val partitionAndData: Map[TopicAndPartition, MessageSet] = 
producerRequest.data
     trace("Append [%s] to local log ".format(partitionAndData.toString))
     partitionAndData.map {case (topicAndPartition, messages) =>
+      // update stats for incoming bytes rate
       
BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesInRate.mark(messages.sizeInBytes)
       
BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(messages.sizeInBytes)
 
@@ -254,7 +255,9 @@ class KafkaApis(val requestChannel: RequestChannel,
           }
         val numAppendedMessages = if (info.firstOffset == -1L || 
info.lastOffset == -1L) 0 else (info.lastOffset - info.firstOffset + 1)
 
-        // update stats
+        // update stats for successfully appended messages
+        
BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).logBytesAppendRate.mark(messages.sizeInBytes)
+        
BrokerTopicStats.getBrokerAllTopicsStats.logBytesAppendRate.mark(messages.sizeInBytes)
         
BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).messagesInRate.mark(numAppendedMessages)
         
BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numAppendedMessages)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/26a02c32/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala 
b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index d0f05cb..871212b 100644
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -76,6 +76,7 @@ class BrokerTopicMetrics(name: String) extends 
KafkaMetricsGroup {
   val messagesInRate = newMeter(name + "MessagesInPerSec",  "messages", 
TimeUnit.SECONDS)
   val bytesInRate = newMeter(name + "BytesInPerSec",  "bytes", 
TimeUnit.SECONDS)
   val bytesOutRate = newMeter(name + "BytesOutPerSec",  "bytes", 
TimeUnit.SECONDS)
+  val logBytesAppendRate = newMeter(name + "LogBytesAppendedPerSec",  "bytes", 
TimeUnit.SECONDS)
   val failedProduceRequestRate = newMeter(name + 
"FailedProduceRequestsPerSec",  "requests", TimeUnit.SECONDS)
   val failedFetchRequestRate = newMeter(name + "FailedFetchRequestsPerSec",  
"requests", TimeUnit.SECONDS)
 }

Reply via email to