Repository: kafka
Updated Branches:
  refs/heads/trunk dd6347a5d -> 021d8a8e9


http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 6434d23..6781dc9 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -44,7 +44,7 @@ import 
org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANS
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.record.{ControlRecordType, 
EndTransactionMarker, MemoryRecords, RecordBatch}
+import org.apache.kafka.common.record.{ControlRecordType, 
EndTransactionMarker, MemoryRecords, RecordBatch, RecordsProcessingStats}
 import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse
 import org.apache.kafka.common.requests.DeleteAclsResponse.{AclDeletionResult, 
AclFilterResponse}
 import org.apache.kafka.common.requests.{Resource => RResource, ResourceType 
=> RResourceType, _}
@@ -424,7 +424,7 @@ class KafkaApis(val requestChannel: RequestChannel,
                 s"from client id ${request.header.clientId} with ack=0\n" +
                 s"Topic and partition to exceptions: $exceptionsSummary"
             )
-            closeConnection(request)
+            closeConnection(request, new 
ProduceResponse(mergedResponseStatus.asJava).errorCounts)
           } else {
             sendNoOpResponseExemptThrottle(request)
           }
@@ -444,6 +444,12 @@ class KafkaApis(val requestChannel: RequestChannel,
         produceResponseCallback)
     }
 
+    def processingStatsCallback(processingStats: Map[TopicPartition, 
RecordsProcessingStats]): Unit = {
+      processingStats.foreach { case (tp, info) =>
+        updateRecordsProcessingStats(request, tp, info)
+      }
+    }
+
     if (authorizedRequestInfo.isEmpty)
       sendResponseCallback(Map.empty)
     else {
@@ -456,7 +462,8 @@ class KafkaApis(val requestChannel: RequestChannel,
         internalTopicsAllowed = internalTopicsAllowed,
         isFromClient = true,
         entriesPerPartition = authorizedRequestInfo,
-        responseCallback = sendResponseCallback)
+        responseCallback = sendResponseCallback,
+        processingStatsCallback = processingStatsCallback)
 
       // if the request is put into the purgatory, it will have a held 
reference and hence cannot be garbage collected;
       // hence we clear its data here in order to let GC reclaim its memory 
since it is already appended to log
@@ -511,9 +518,11 @@ class KafkaApis(val requestChannel: RequestChannel,
 
         downConvertMagic.map { magic =>
           trace(s"Down converting records from partition $tp to message format 
version $magic for fetch request from $clientId")
-          val converted = data.records.downConvert(magic, 
fetchRequest.fetchData.get(tp).fetchOffset)
+          val startNanos = time.nanoseconds
+          val converted = data.records.downConvert(magic, 
fetchRequest.fetchData.get(tp).fetchOffset, time)
+          updateRecordsProcessingStats(request, tp, 
converted.recordsProcessingStats)
           new FetchResponse.PartitionData(data.error, data.highWatermark, 
FetchResponse.INVALID_LAST_STABLE_OFFSET,
-            data.logStartOffset, data.abortedTransactions, converted)
+            data.logStartOffset, data.abortedTransactions, converted.records)
         }
 
       }.getOrElse(data)
@@ -2002,6 +2011,25 @@ class KafkaApis(val requestChannel: RequestChannel,
       throw new ClusterAuthorizationException(s"Request $request is not 
authorized.")
   }
 
+  private def updateRecordsProcessingStats(request: RequestChannel.Request, 
tp: TopicPartition,
+                                           processingStats: 
RecordsProcessingStats): Unit = {
+    val conversionCount = processingStats.conversionCount
+    if (conversionCount > 0) {
+      request.header.apiKey match {
+        case ApiKeys.PRODUCE =>
+          
brokerTopicStats.topicStats(tp.topic).produceMessageConversionsRate.mark(conversionCount)
+          
brokerTopicStats.allTopicsStats.produceMessageConversionsRate.mark(conversionCount)
+        case ApiKeys.FETCH =>
+          
brokerTopicStats.topicStats(tp.topic).fetchMessageConversionsRate.mark(conversionCount)
+          
brokerTopicStats.allTopicsStats.fetchMessageConversionsRate.mark(conversionCount)
+        case _ =>
+          throw new IllegalStateException("Message conversion info is recorded 
only for Produce/Fetch requests")
+      }
+      request.messageConversionsTimeNanos = processingStats.conversionTimeNanos
+    }
+    request.temporaryMemoryBytes = processingStats.temporaryMemoryBytes
+  }
+
   private def handleError(request: RequestChannel.Request, e: Throwable) {
     val mayThrottle = e.isInstanceOf[ClusterAuthorizationException] || 
!request.header.apiKey.clusterAction
     error("Error when handling request 
%s".format(request.body[AbstractRequest]), e)
@@ -2031,9 +2059,10 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   private def sendErrorOrCloseConnection(request: RequestChannel.Request, 
error: Throwable)(throttleMs: Int): Unit = {
-    val response = request.body[AbstractRequest].getErrorResponse(throttleMs, 
error)
+    val requestBody = request.body[AbstractRequest]
+    val response = requestBody.getErrorResponse(throttleMs, error)
     if (response == null)
-      closeConnection(request)
+      closeConnection(request, requestBody.errorCounts(error))
     else
       sendResponse(request, Some(response))
   }
@@ -2043,13 +2072,17 @@ class KafkaApis(val requestChannel: RequestChannel,
     sendResponse(request, None)
   }
 
-  private def closeConnection(request: RequestChannel.Request): Unit = {
+  private def closeConnection(request: RequestChannel.Request, errorCounts: 
java.util.Map[Errors, Integer]): Unit = {
     // This case is used when the request handler has encountered an error, 
but the client
     // does not expect a response (e.g. when produce request has acks set to 0)
+    requestChannel.updateErrorMetrics(request.header.apiKey, 
errorCounts.asScala)
     requestChannel.sendResponse(new RequestChannel.Response(request, None, 
CloseConnectionAction, None))
   }
 
   private def sendResponse(request: RequestChannel.Request, responseOpt: 
Option[AbstractResponse]): Unit = {
+    // Update error metrics for each error code in the response including 
Errors.NONE
+    responseOpt.foreach(response => 
requestChannel.updateErrorMetrics(request.header.apiKey, 
response.errorCounts.asScala))
+
     responseOpt match {
       case Some(response) =>
         val responseSend = request.context.buildResponse(response)

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala 
b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
index b108bf6..43c81ab 100644
--- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
+++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
@@ -25,10 +25,13 @@ import kafka.api.ApiVersion
 import kafka.cluster.EndPoint
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils._
+import com.yammer.metrics.core.Gauge
 import org.I0Itec.zkclient.IZkStateListener
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.zookeeper.Watcher.Event.KeeperState
 
+import scala.collection.mutable.Set
+
 /**
  * This class registers the broker in zookeeper to allow 
  * other brokers and consumers to detect failures. It uses an ephemeral znode 
with the path:
@@ -71,6 +74,8 @@ class KafkaHealthcheck(brokerId: Int,
       interBrokerProtocolVersion)
   }
 
+  def shutdown(): Unit = sessionExpireListener.shutdown()
+
   /**
    *  When we get a SessionExpired event, it means that we have lost all 
ephemeral nodes and ZKClient has re-established
    *  a connection for us. We need to re-register this broker in the broker 
registry. We rely on `handleStateChanged`
@@ -78,6 +83,8 @@ class KafkaHealthcheck(brokerId: Int,
    */
   class SessionExpireListener extends IZkStateListener with KafkaMetricsGroup {
 
+    private val metricNames = Set[String]()
+
     private[server] val stateToMeterMap = {
       import KeeperState._
       val stateToEventTypeMap = Map(
@@ -89,10 +96,20 @@ class KafkaHealthcheck(brokerId: Int,
         Expired -> "Expires"
       )
       stateToEventTypeMap.map { case (state, eventType) =>
-        state -> newMeter(s"ZooKeeper${eventType}PerSec", 
eventType.toLowerCase(Locale.ROOT), TimeUnit.SECONDS)
+        val name = s"ZooKeeper${eventType}PerSec"
+        metricNames += name
+        state -> newMeter(name, eventType.toLowerCase(Locale.ROOT), 
TimeUnit.SECONDS)
       }
     }
 
+    private[server] val sessionStateGauge =
+      newGauge("SessionState", new Gauge[String] {
+        override def value: String =
+          
Option(zkUtils.zkConnection.getZookeeperState.toString).getOrElse("DISCONNECTED")
+      })
+
+    metricNames += "SessionState"
+
     @throws[Exception]
     override def handleStateChanged(state: KeeperState) {
       stateToMeterMap.get(state).foreach(_.mark())
@@ -110,6 +127,8 @@ class KafkaHealthcheck(brokerId: Int,
       fatal("Could not establish session with zookeeper", error)
     }
 
+    def shutdown(): Unit = metricNames.foreach(removeMetric(_))
+
   }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala 
b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index f055762..a498781 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -129,6 +129,8 @@ class BrokerTopicMetrics(name: Option[String]) extends 
KafkaMetricsGroup {
   val failedFetchRequestRate = 
newMeter(BrokerTopicStats.FailedFetchRequestsPerSec, "requests", 
TimeUnit.SECONDS, tags)
   val totalProduceRequestRate = 
newMeter(BrokerTopicStats.TotalProduceRequestsPerSec, "requests", 
TimeUnit.SECONDS, tags)
   val totalFetchRequestRate = 
newMeter(BrokerTopicStats.TotalFetchRequestsPerSec, "requests", 
TimeUnit.SECONDS, tags)
+  val fetchMessageConversionsRate = 
newMeter(BrokerTopicStats.FetchMessageConversionsPerSec, "requests", 
TimeUnit.SECONDS, tags)
+  val produceMessageConversionsRate = 
newMeter(BrokerTopicStats.ProduceMessageConversionsPerSec, "requests", 
TimeUnit.SECONDS, tags)
 
   def close() {
     removeMetric(BrokerTopicStats.MessagesInPerSec, tags)
@@ -143,6 +145,8 @@ class BrokerTopicMetrics(name: Option[String]) extends 
KafkaMetricsGroup {
     removeMetric(BrokerTopicStats.FailedFetchRequestsPerSec, tags)
     removeMetric(BrokerTopicStats.TotalProduceRequestsPerSec, tags)
     removeMetric(BrokerTopicStats.TotalFetchRequestsPerSec, tags)
+    removeMetric(BrokerTopicStats.FetchMessageConversionsPerSec, tags)
+    removeMetric(BrokerTopicStats.ProduceMessageConversionsPerSec, tags)
   }
 }
 
@@ -157,6 +161,8 @@ object BrokerTopicStats {
   val FailedFetchRequestsPerSec = "FailedFetchRequestsPerSec"
   val TotalProduceRequestsPerSec = "TotalProduceRequestsPerSec"
   val TotalFetchRequestsPerSec = "TotalFetchRequestsPerSec"
+  val FetchMessageConversionsPerSec = "FetchMessageConversionsPerSec"
+  val ProduceMessageConversionsPerSec = "ProduceMessageConversionsPerSec"
   private val valueFactory = (k: String) => new BrokerTopicMetrics(Some(k))
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 88b1d23..f8af7a2 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -293,7 +293,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
         shutdownLatch = new CountDownLatch(1)
         startupComplete.set(true)
         isStartingUp.set(false)
-        AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString)
+        AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString, 
metrics)
         info("started")
       }
     }
@@ -333,19 +333,21 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
 
     chrootOption.foreach { chroot =>
       val zkConnForChrootCreation = config.zkConnect.substring(0, chrootIndex)
-      val zkClientForChrootCreation = ZkUtils(zkConnForChrootCreation,
+      val zkClientForChrootCreation = 
ZkUtils.withMetrics(zkConnForChrootCreation,
                                               sessionTimeout = 
config.zkSessionTimeoutMs,
                                               connectionTimeout = 
config.zkConnectionTimeoutMs,
-                                              secureAclsEnabled)
+                                              secureAclsEnabled,
+                                              time)
       zkClientForChrootCreation.makeSurePersistentPathExists(chroot)
       info(s"Created zookeeper path $chroot")
       zkClientForChrootCreation.close()
     }
 
-    val zkUtils = ZkUtils(config.zkConnect,
+    val zkUtils = ZkUtils.withMetrics(config.zkConnect,
                           sessionTimeout = config.zkSessionTimeoutMs,
                           connectionTimeout = config.zkConnectionTimeoutMs,
-                          secureAclsEnabled)
+                          secureAclsEnabled,
+                          time)
     zkUtils.setupCommonPaths()
     zkUtils
   }
@@ -512,6 +514,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
         CoreUtils.swallow(controlledShutdown())
         brokerState.newState(BrokerShuttingDown)
 
+        if (kafkaHealthcheck != null)
+          CoreUtils.swallow(kafkaHealthcheck.shutdown())
+
         if (socketServer != null)
           CoreUtils.swallow(socketServer.shutdown())
         if (requestHandlerPool != null)
@@ -549,7 +554,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
 
         startupComplete.set(false)
         isShuttingDown.set(false)
-        CoreUtils.swallow(AppInfoParser.unregisterAppInfo(jmxPrefix, 
config.brokerId.toString))
+        CoreUtils.swallow(AppInfoParser.unregisterAppInfo(jmxPrefix, 
config.brokerId.toString, metrics))
         shutdownLatch.countDown()
         info("shut down completed")
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 9cc6317..3acb88b 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -434,7 +434,8 @@ class ReplicaManager(val config: KafkaConfig,
                     isFromClient: Boolean,
                     entriesPerPartition: Map[TopicPartition, MemoryRecords],
                     responseCallback: Map[TopicPartition, PartitionResponse] 
=> Unit,
-                    delayedProduceLock: Option[Object] = None) {
+                    delayedProduceLock: Option[Object] = None,
+                    processingStatsCallback: Map[TopicPartition, 
RecordsProcessingStats] => Unit = _ => ()) {
     if (isValidRequiredAcks(requiredAcks)) {
       val sTime = time.milliseconds
       val localProduceResults = appendToLocalLog(internalTopicsAllowed = 
internalTopicsAllowed,
@@ -448,6 +449,8 @@ class ReplicaManager(val config: KafkaConfig,
                   new PartitionResponse(result.error, result.info.firstOffset, 
result.info.logAppendTime, result.info.logStartOffset)) // response status
       }
 
+      
processingStatsCallback(localProduceResults.mapValues(_.info.recordsProcessingStats))
+
       if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, 
localProduceResults)) {
         // create delayed produce operation
         val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala 
b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 9582c50..755f500 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -17,7 +17,7 @@
 
 package kafka.utils
 
-import java.util.concurrent.CountDownLatch
+import java.util.concurrent.{CountDownLatch, TimeUnit}
 
 import kafka.admin._
 import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, LeaderAndIsr}
@@ -25,12 +25,16 @@ import kafka.cluster._
 import kafka.common.{KafkaException, NoEpochForPartitionException, 
TopicAndPartition}
 import kafka.consumer.{ConsumerThreadId, TopicCount}
 import kafka.controller.{KafkaController, LeaderIsrAndControllerEpoch, 
ReassignedPartitionsContext}
+import kafka.metrics.KafkaMetricsGroup
 import kafka.server.ConfigType
 import kafka.utils.ZkUtils._
+
+import com.yammer.metrics.core.MetricName
 import org.I0Itec.zkclient.exception.{ZkBadVersionException, ZkException, 
ZkMarshallingError, ZkNoNodeException, ZkNodeExistsException}
 import org.I0Itec.zkclient.serialize.ZkSerializer
-import org.I0Itec.zkclient.{ZkClient, ZkConnection, IZkDataListener, 
IZkChildListener, IZkStateListener}
+import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, 
IZkStateListener, ZkClient, ZkConnection}
 import org.apache.kafka.common.config.ConfigException
+import org.apache.kafka.common.utils.Time
 import org.apache.zookeeper.AsyncCallback.{DataCallback, StringCallback}
 import org.apache.zookeeper.KeeperException.Code
 import org.apache.zookeeper.data.{ACL, Stat}
@@ -84,6 +88,12 @@ object ZkUtils {
   //            sensitive information that should not be world readable to the 
Seq
   val SensitiveZkRootPaths = Seq(ConfigUsersPath)
 
+  def withMetrics(zkUrl: String, sessionTimeout: Int, connectionTimeout: Int, 
isZkSecurityEnabled: Boolean,
+                  time: Time): ZkUtils = {
+    val (zkClient, zkConnection) = createZkClientAndConnection(zkUrl, 
sessionTimeout, connectionTimeout)
+    new ZkUtils(new ZooKeeperClientMetrics(zkClient, time), zkConnection, 
isZkSecurityEnabled)
+  }
+
   def apply(zkUrl: String, sessionTimeout: Int, connectionTimeout: Int, 
isZkSecurityEnabled: Boolean): ZkUtils = {
     val (zkClient, zkConnection) = createZkClientAndConnection(zkUrl, 
sessionTimeout, connectionTimeout)
     new ZkUtils(zkClient, zkConnection, isZkSecurityEnabled)
@@ -212,7 +222,38 @@ object ZkUtils {
 
 }
 
-class ZkUtils(val zkClient: ZkClient,
+class ZooKeeperClientWrapper(val zkClient: ZkClient) {
+  def apply[T](method: ZkClient => T): T = method(zkClient)
+  def close(): Unit = {
+    if(zkClient != null)
+      zkClient.close()
+  }
+}
+
+class ZooKeeperClientMetrics(zkClient: ZkClient, val time: Time)
+    extends ZooKeeperClientWrapper(zkClient) with KafkaMetricsGroup {
+  val latencyMetric = newHistogram("ZooKeeperRequestLatencyMs")
+
+  override protected def metricName(name: String, metricTags: 
scala.collection.Map[String, String]): MetricName = {
+    explicitMetricName("kafka.server", "ZooKeeperClientMetrics", name, 
metricTags)
+  }
+
+  override def apply[T](method: ZkClient => T): T = {
+    val startNs = time.nanoseconds
+    val ret =
+      try method(zkClient)
+      finally 
latencyMetric.update(TimeUnit.NANOSECONDS.toMillis(time.nanoseconds - startNs))
+    ret
+  }
+
+  override def close(): Unit = {
+    if (latencyMetric != null)
+      removeMetric("ZooKeeperRequestLatencyMs")
+    super.close()
+  }
+}
+
+class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
               val zkConnection: ZkConnection,
               val isSecure: Boolean) extends Logging {
   // These are persistent ZK paths that should exist on kafka broker startup.
@@ -228,8 +269,12 @@ class ZkUtils(val zkClient: ZkClient,
                               ProducerIdBlockPath,
                               LogDirEventNotificationPath)
 
+  /** Present for compatibility */
+  def this(zkClient: ZkClient, zkConnection: ZkConnection, isSecure: Boolean) =
+    this(new ZooKeeperClientWrapper(zkClient), zkConnection, isSecure)
+
   // Visible for testing
-  val zkPath = new ZkPath(zkClient)
+  val zkPath = new ZkPath(zkClientWrap)
 
   import ZkUtils._
 
@@ -238,6 +283,8 @@ class ZkUtils(val zkClient: ZkClient,
 
   def defaultAcls(path: String): java.util.List[ACL] = 
ZkUtils.defaultAcls(isSecure, path)
 
+  def zkClient: ZkClient = zkClientWrap.zkClient
+
   def getController(): Int = {
     readDataMaybeNull(ControllerPath)._1 match {
       case Some(controller) => KafkaController.parseControllerId(controller)
@@ -388,7 +435,7 @@ class ZkUtils(val zkClient: ZkClient,
                                                       brokerInfo,
                                                       
zkConnection.getZookeeper,
                                                       isSecure)
-      zkCheckedEphemeral.create()
+      zkClientWrap(_ => zkCheckedEphemeral.create())
     } catch {
       case _: ZkNodeExistsException =>
         throw new RuntimeException("A broker is already registered on the path 
" + brokerIdPath
@@ -429,7 +476,7 @@ class ZkUtils(val zkClient: ZkClient,
       acls
     }
 
-    if (!zkClient.exists(path))
+    if (!zkClientWrap(zkClient => zkClient.exists(path)))
       zkPath.createPersistent(path, createParents = true, acl) //won't throw 
NoNodeException or NodeExistsException
   }
 
@@ -512,7 +559,7 @@ class ZkUtils(val zkClient: ZkClient,
   def updatePersistentPath(path: String, data: String, acls: 
java.util.List[ACL] = UseDefaultAcls) = {
     val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) 
else acls
     try {
-      zkClient.writeData(path, data)
+      zkClientWrap(_.writeData(path, data))
     } catch {
       case _: ZkNoNodeException =>
         createParentPath(path)
@@ -520,7 +567,7 @@ class ZkUtils(val zkClient: ZkClient,
           zkPath.createPersistent(path, data, acl)
         } catch {
           case _: ZkNodeExistsException =>
-            zkClient.writeData(path, data)
+            zkClientWrap(_.writeData(path, data))
         }
     }
   }
@@ -536,7 +583,7 @@ class ZkUtils(val zkClient: ZkClient,
   def conditionalUpdatePersistentPath(path: String, data: String, 
expectVersion: Int,
     optionalChecker:Option[(ZkUtils, String, String) => (Boolean,Int)] = 
None): (Boolean, Int) = {
     try {
-      val stat = zkClient.writeDataReturnStat(path, data, expectVersion)
+      val stat = zkClientWrap(_.writeDataReturnStat(path, data, expectVersion))
       debug("Conditional update of path %s with value %s and expected version 
%d succeeded, returning the new version: %d"
         .format(path, data, expectVersion, stat.getVersion))
       (true, stat.getVersion)
@@ -563,7 +610,7 @@ class ZkUtils(val zkClient: ZkClient,
    */
   def conditionalUpdatePersistentPathIfExists(path: String, data: String, 
expectVersion: Int): (Boolean, Int) = {
     try {
-      val stat = zkClient.writeDataReturnStat(path, data, expectVersion)
+      val stat = zkClientWrap(_.writeDataReturnStat(path, data, expectVersion))
       debug("Conditional update of path %s with value %s and expected version 
%d succeeded, returning the new version: %d"
         .format(path, data, expectVersion, stat.getVersion))
       (true, stat.getVersion)
@@ -583,7 +630,7 @@ class ZkUtils(val zkClient: ZkClient,
   def updateEphemeralPath(path: String, data: String, acls: 
java.util.List[ACL] = UseDefaultAcls): Unit = {
     val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) 
else acls
     try {
-      zkClient.writeData(path, data)
+      zkClientWrap(_.writeData(path, data))
     } catch {
       case _: ZkNoNodeException =>
         createParentPath(path)
@@ -592,7 +639,7 @@ class ZkUtils(val zkClient: ZkClient,
   }
 
   def deletePath(path: String): Boolean = {
-    zkClient.delete(path)
+    zkClientWrap(_.delete(path))
   }
 
   /**
@@ -601,7 +648,7 @@ class ZkUtils(val zkClient: ZkClient,
     */
    def conditionalDeletePath(path: String, expectedVersion: Int): Boolean = {
     try {
-      zkClient.delete(path, expectedVersion)
+      zkClientWrap(_.delete(path, expectedVersion))
       true
     } catch {
       case _: ZkBadVersionException => false
@@ -609,37 +656,38 @@ class ZkUtils(val zkClient: ZkClient,
   }
 
   def deletePathRecursive(path: String) {
-    zkClient.deleteRecursive(path)
+    zkClientWrap(_.deleteRecursive(path))
   }
 
   def subscribeDataChanges(path: String, listener: IZkDataListener): Unit =
-    zkClient.subscribeDataChanges(path, listener)
+    zkClientWrap(_.subscribeDataChanges(path, listener))
 
   def unsubscribeDataChanges(path: String, dataListener: IZkDataListener): 
Unit =
-    zkClient.unsubscribeDataChanges(path, dataListener)
+    zkClientWrap(_.unsubscribeDataChanges(path, dataListener))
 
   def subscribeStateChanges(listener: IZkStateListener): Unit =
-    zkClient.subscribeStateChanges(listener)
+    zkClientWrap(_.subscribeStateChanges(listener))
 
   def subscribeChildChanges(path: String, listener: IZkChildListener): 
Option[Seq[String]] =
-    Option(zkClient.subscribeChildChanges(path, listener)).map(_.asScala)
+    Option(zkClientWrap(_.subscribeChildChanges(path, 
listener))).map(_.asScala)
 
   def unsubscribeChildChanges(path: String, childListener: IZkChildListener): 
Unit =
-    zkClient.unsubscribeChildChanges(path, childListener)
+    zkClientWrap(_.unsubscribeChildChanges(path, childListener))
 
   def unsubscribeAll(): Unit =
-    zkClient.unsubscribeAll()
+    zkClientWrap(_.unsubscribeAll())
 
   def readData(path: String): (String, Stat) = {
     val stat: Stat = new Stat()
-    val dataStr: String = zkClient.readData(path, stat)
+    val dataStr: String = zkClientWrap(_.readData[String](path, stat))
     (dataStr, stat)
   }
 
   def readDataMaybeNull(path: String): (Option[String], Stat) = {
     val stat = new Stat()
     val dataAndStat = try {
-                        (Some(zkClient.readData(path, stat)), stat)
+                        val dataStr = zkClientWrap(_.readData[String](path, 
stat))
+                        (Some(dataStr), stat)
                       } catch {
                         case _: ZkNoNodeException =>
                           (None, stat)
@@ -650,18 +698,18 @@ class ZkUtils(val zkClient: ZkClient,
   def readDataAndVersionMaybeNull(path: String): (Option[String], Int) = {
     val stat = new Stat()
     try {
-      val data: String = zkClient.readData(path, stat)
+      val data = zkClientWrap(_.readData[String](path, stat))
       (Option(data), stat.getVersion)
     } catch {
       case _: ZkNoNodeException => (None, stat.getVersion)
     }
   }
 
-  def getChildren(path: String): Seq[String] = 
zkClient.getChildren(path).asScala
+  def getChildren(path: String): Seq[String] = 
zkClientWrap(_.getChildren(path)).asScala
 
   def getChildrenParentMayNotExist(path: String): Seq[String] = {
     try {
-      zkClient.getChildren(path).asScala
+      zkClientWrap(_.getChildren(path)).asScala
     } catch {
       case _: ZkNoNodeException => Nil
     }
@@ -671,7 +719,7 @@ class ZkUtils(val zkClient: ZkClient,
    * Check if the given path exists
    */
   def pathExists(path: String): Boolean = {
-    zkClient.exists(path)
+    zkClientWrap(_.exists(path))
   }
 
   def isTopicMarkedForDeletion(topic: String): Boolean = {
@@ -789,9 +837,9 @@ class ZkUtils(val zkClient: ZkClient,
 
   def deletePartition(brokerId: Int, topic: String) {
     val brokerIdPath = BrokerIdsPath + "/" + brokerId
-    zkClient.delete(brokerIdPath)
+    zkClientWrap(_.delete(brokerIdPath))
     val brokerPartTopicPath = ZkUtils.BrokerTopicsPath + "/" + topic + "/" + 
brokerId
-    zkClient.delete(brokerPartTopicPath)
+    zkClientWrap(_.delete(brokerPartTopicPath))
   }
 
   @deprecated("This method has been deprecated and will be removed in a future 
release.", "0.11.0.0")
@@ -851,7 +899,7 @@ class ZkUtils(val zkClient: ZkClient,
     */
   def getSequenceId(path: String, acls: java.util.List[ACL] = UseDefaultAcls): 
Int = {
     val acl = if (acls == UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) 
else acls
-    def writeToZk: Int = zkClient.writeDataReturnStat(path, "", -1).getVersion
+    def writeToZk: Int = zkClientWrap(_.writeDataReturnStat(path, "", 
-1)).getVersion
     try {
       writeToZk
     } catch {
@@ -915,9 +963,7 @@ class ZkUtils(val zkClient: ZkClient,
   }
 
   def close() {
-    if(zkClient != null) {
-      zkClient.close()
-    }
+    zkClientWrap.close()
   }
 }
 
@@ -973,7 +1019,7 @@ class ZKConfig(props: VerifiableProperties) {
   val zkSyncTimeMs = props.getInt(ZkSyncTimeMsProp, 2000)
 }
 
-class ZkPath(client: ZkClient) {
+class ZkPath(clientWrap: ZooKeeperClientWrapper) {
 
   @volatile private var isNamespacePresent: Boolean = false
 
@@ -981,7 +1027,7 @@ class ZkPath(client: ZkClient) {
     if (isNamespacePresent)
       return
 
-    if (!client.exists("/")) {
+    if (!clientWrap(_.exists("/"))) {
       throw new ConfigException("Zookeeper namespace does not exist")
     }
     isNamespacePresent = true
@@ -993,22 +1039,22 @@ class ZkPath(client: ZkClient) {
 
   def createPersistent(path: String, data: Object, acls: java.util.List[ACL]) {
     checkNamespace()
-    client.createPersistent(path, data, acls)
+    clientWrap(_.createPersistent(path, data, acls))
   }
 
   def createPersistent(path: String, createParents: Boolean, acls: 
java.util.List[ACL]) {
     checkNamespace()
-    client.createPersistent(path, createParents, acls)
+    clientWrap(_.createPersistent(path, createParents, acls))
   }
 
   def createEphemeral(path: String, data: Object, acls: java.util.List[ACL]) {
     checkNamespace()
-    client.createEphemeral(path, data, acls)
+    clientWrap(_.createEphemeral(path, data, acls))
   }
 
   def createPersistentSequential(path: String, data: Object, acls: 
java.util.List[ACL]): String = {
     checkNamespace()
-    client.createPersistentSequential(path, data, acls)
+    clientWrap(_.createPersistentSequential(path, data, acls))
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/test/scala/integration/kafka/api/MetricsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/MetricsTest.scala 
b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
new file mode 100644
index 0000000..f71a36a
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
@@ -0,0 +1,288 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+  * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+  * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+  * License. You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+  * specific language governing permissions and limitations under the License.
+  */
+package kafka.api
+
+import java.util.{Locale, Properties}
+
+import kafka.log.LogConfig
+import kafka.network.RequestMetrics
+import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.utils.{JaasTestUtils, TestUtils}
+
+import com.yammer.metrics.Metrics
+import com.yammer.metrics.core.{Gauge, Histogram, Meter}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
+import org.apache.kafka.common.{Metric, MetricName, TopicPartition}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.errors.InvalidTopicException
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.protocol.SecurityProtocol
+import org.junit.{After, Before, Test}
+import org.junit.Assert._
+
+import scala.collection.JavaConverters._
+
+class MetricsTest extends IntegrationTestHarness with SaslSetup {
+
+  override val producerCount = 1
+  override val consumerCount = 1
+  override val serverCount = 1
+
+  override protected def listenerName = new ListenerName("CLIENT")
+  private val kafkaClientSaslMechanism = "PLAIN"
+  private val kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism)
+  private val kafkaServerJaasEntryName =
+    
s"${listenerName.value.toLowerCase(Locale.ROOT)}.${JaasTestUtils.KafkaServerContextName}"
+  this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "false")
+  this.serverConfig.setProperty(KafkaConfig.AutoCreateTopicsEnableDoc, "false")
+  override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
+  override protected val serverSaslProperties =
+    Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, 
kafkaClientSaslMechanism))
+  override protected val clientSaslProperties =
+    Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
+
+  @Before
+  override def setUp(): Unit = {
+    startSasl(jaasSections(kafkaServerSaslMechanisms, 
Some(kafkaClientSaslMechanism), KafkaSasl, kafkaServerJaasEntryName))
+    super.setUp()
+  }
+
+  @After
+  override def tearDown(): Unit = {
+    super.tearDown()
+    closeSasl()
+  }
+
+  /**
+   * Verifies some of the metrics of producer, consumer as well as server.
+   */
+  @Test
+  def testMetrics(): Unit = {
+    val topic = "topicWithOldMessageFormat"
+    val props = new Properties
+    props.setProperty(LogConfig.MessageFormatVersionProp, "0.9.0")
+    TestUtils.createTopic(this.zkUtils, topic, numPartitions = 1, 
replicationFactor = 1, this.servers, props)
+    val tp = new TopicPartition(topic, 0)
+
+    // Clear static state
+    RequestMetrics.clearErrorMeters()
+
+    // Produce and consume some records
+    val numRecords = 10
+    val recordSize = 1000
+    val producer = producers.head
+    sendRecords(producer, numRecords, recordSize, tp)
+
+    val consumer = this.consumers.head
+    consumer.assign(List(tp).asJava)
+    consumer.seek(tp, 0)
+    TestUtils.consumeRecords(consumer, numRecords)
+
+    verifyKafkaRateMetricsHaveCumulativeCount()
+    verifyClientVersionMetrics(consumer.metrics, "Consumer")
+    verifyClientVersionMetrics(this.producers.head.metrics, "Producer")
+
+    val server = servers.head
+    verifyBrokerMessageConversionMetrics(server, recordSize)
+    verifyBrokerErrorMetrics(servers.head)
+    verifyBrokerZkMetrics(server, topic)
+
+    generateAuthenticationFailure(tp)
+    verifyBrokerAuthenticationMetrics(server)
+  }
+
+  private def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]], 
numRecords: Int,
+      recordSize: Int, tp: TopicPartition) = {
+    val bytes = new Array[Byte](recordSize)
+    (0 until numRecords).map { i =>
+      producer.send(new ProducerRecord(tp.topic, tp.partition, i.toLong, s"key 
$i".getBytes, bytes))
+    }
+    producer.flush()
+  }
+
+  // Create a producer that fails authentication to verify authentication 
failure metrics
+  private def generateAuthenticationFailure(tp: TopicPartition): Unit = {
+    val producerProps = new Properties()
+    val saslProps = new Properties()
+     // Temporary limit to reduce blocking before KIP-152 client-side changes 
are merged
+    saslProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000")
+    saslProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000")
+    saslProps.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256")
+    // Use acks=0 to verify error metric when connection is closed without a 
response
+    saslProps.put(ProducerConfig.ACKS_CONFIG, "0")
+    val producer = TestUtils.createNewProducer(brokerList, securityProtocol = 
securityProtocol,
+        trustStoreFile = trustStoreFile, saslProperties = Some(saslProps), 
props = Some(producerProps))
+
+    try {
+      producer.send(new ProducerRecord(tp.topic, tp.partition, "key".getBytes, 
"value".getBytes)).get
+    } catch {
+      case _: Exception => // expected exception
+    } finally {
+      producer.close()
+    }
+  }
+
+  private def verifyKafkaRateMetricsHaveCumulativeCount(): Unit =  {
+
+    def exists(name: String, rateMetricName: MetricName, allMetricNames: 
Set[MetricName]): Boolean = {
+      allMetricNames.contains(new MetricName(name, rateMetricName.group, "", 
rateMetricName.tags))
+    }
+
+    def verify(rateMetricName: MetricName, allMetricNames: Set[MetricName]): 
Unit = {
+      val name = rateMetricName.name
+      val totalExists = exists(name.replace("-rate", "-total"), 
rateMetricName, allMetricNames)
+      val totalTimeExists = exists(name.replace("-rate", "-time"), 
rateMetricName, allMetricNames)
+      assertTrue(s"No cumulative count/time metric for rate metric 
$rateMetricName",
+          totalExists || totalTimeExists)
+    }
+
+    val consumer = this.consumers.head
+    val consumerMetricNames = consumer.metrics.keySet.asScala.toSet
+    consumerMetricNames.filter(_.name.endsWith("-rate"))
+        .foreach(verify(_, consumerMetricNames))
+
+    val producer = this.producers.head
+    val producerMetricNames = producer.metrics.keySet.asScala.toSet
+    val producerExclusions = Set("compression-rate") // compression-rate is an 
Average metric, not Rate
+    producerMetricNames.filter(_.name.endsWith("-rate"))
+        .filterNot(metricName => producerExclusions.contains(metricName.name))
+        .foreach(verify(_, producerMetricNames))
+
+    // Check a couple of metrics of consumer and producer to ensure that 
values are set
+    verifyKafkaMetricRecorded("records-consumed-rate", consumer.metrics, 
"Consumer")
+    verifyKafkaMetricRecorded("records-consumed-total", consumer.metrics, 
"Consumer")
+    verifyKafkaMetricRecorded("record-send-rate", producer.metrics, "Producer")
+    verifyKafkaMetricRecorded("record-send-total", producer.metrics, 
"Producer")
+  }
+
+  private def verifyClientVersionMetrics(metrics: java.util.Map[MetricName, _ 
<: Metric], entity: String): Unit = {
+    Seq("commit-id", "version").foreach { name =>
+      verifyKafkaMetric(name, metrics, entity) { matchingMetrics =>
+        assertEquals(1, matchingMetrics.size)
+        val metric = matchingMetrics.head
+        val value = metric.metricValue
+        assertNotNull(s"$entity metric not recorded $name", value)
+        assertNotNull(s"$entity metric $name should be a non-empty String",
+            value.isInstanceOf[String] && !value.asInstanceOf[String].isEmpty)
+        assertTrue("Client-id not specified", 
metric.metricName.tags.containsKey("client-id"))
+      }
+    }
+  }
+
+  private def verifyBrokerAuthenticationMetrics(server: KafkaServer): Unit = {
+    val metrics = server.metrics.metrics
+    TestUtils.waitUntilTrue(() =>
+      maxKafkaMetricValue("failed-authentication-total", metrics, "Broker", 
Some("socket-server-metrics")) > 0,
+      "failed-authentication-total not updated")
+    verifyKafkaMetricRecorded("successful-authentication-rate", metrics, 
"Broker", Some("socket-server-metrics"))
+    verifyKafkaMetricRecorded("successful-authentication-total", metrics, 
"Broker", Some("socket-server-metrics"))
+    verifyKafkaMetricRecorded("failed-authentication-rate", metrics, "Broker", 
Some("socket-server-metrics"))
+    verifyKafkaMetricRecorded("failed-authentication-total", metrics, 
"Broker", Some("socket-server-metrics"))
+  }
+
+  private def verifyBrokerMessageConversionMetrics(server: KafkaServer, 
recordSize: Int): Unit = {
+    val requestMetricsPrefix = "kafka.network:type=RequestMetrics"
+    val requestBytes = 
verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=RequestBytes,request=Produce")
+    val tempBytes = 
verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=TemporaryMemoryBytes,request=Produce")
+    assertTrue(s"Unexpected temporary memory size requestBytes $requestBytes 
tempBytes $tempBytes",
+        tempBytes >= recordSize)
+
+    
verifyYammerMetricRecorded(s"kafka.server:type=BrokerTopicMetrics,name=ProduceMessageConversionsPerSec")
+    
verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=MessageConversionsTimeMs,request=Produce",
 value => value > 0.0)
+
+    
verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=RequestBytes,request=Fetch")
+    // Temporary size for fetch should be zero after KAFKA-5968 is fixed
+    
verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=TemporaryMemoryBytes,request=Fetch",
 value => value >= 0.0)
+
+     // request size recorded for all request types, check one
+    
verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=RequestBytes,request=Metadata")
+  }
+
+  private def verifyBrokerZkMetrics(server: KafkaServer, topic: String): Unit 
= {
+    // Latency is rounded to milliseconds, so we may need to retry some 
operations to get latency > 0.
+    val (_, recorded) = TestUtils.computeUntilTrue({
+      servers.head.zkUtils.getLeaderAndIsrForPartition(topic, 0)
+      
yammerMetricValue("kafka.server:type=ZooKeeperClientMetrics,name=ZooKeeperRequestLatencyMs").asInstanceOf[Double]
+    })(latency => latency > 0.0)
+    assertTrue("ZooKeeper latency not recorded", recorded)
+
+    assertEquals(s"Unexpected ZK state 
${server.zkUtils.zkConnection.getZookeeperState}",
+        "CONNECTED", yammerMetricValue("SessionState"))
+  }
+
+  private def verifyBrokerErrorMetrics(server: KafkaServer): Unit = {
+
+    def errorMetricCount = 
Metrics.defaultRegistry.allMetrics.keySet.asScala.filter(_.getName == 
"ErrorsPerSec").size
+
+    val startErrorMetricCount = errorMetricCount
+    val errorMetricPrefix = 
"kafka.network:type=RequestMetrics,name=ErrorsPerSec"
+    
verifyYammerMetricRecorded(s"$errorMetricPrefix,request=Metadata,error=NONE")
+
+    try {
+      consumers.head.partitionsFor("12{}!")
+    } catch {
+      case _: InvalidTopicException => // expected
+    }
+    
verifyYammerMetricRecorded(s"$errorMetricPrefix,request=Metadata,error=INVALID_TOPIC_EXCEPTION")
+
+    // Check that error metrics are registered dynamically
+    val currentErrorMetricCount = errorMetricCount
+    assertEquals(startErrorMetricCount + 1, currentErrorMetricCount)
+    assertTrue(s"Too many error metrics $currentErrorMetricCount" , 
currentErrorMetricCount < 10)
+
+    // Verify that error metric is updated with producer acks=0 when no 
response is sent
+    sendRecords(producers.head, 1, 100, new TopicPartition("non-existent", 0))
+    
verifyYammerMetricRecorded(s"$errorMetricPrefix,request=Metadata,error=LEADER_NOT_AVAILABLE")
+  }
+
+  private def verifyKafkaMetric[T](name: String, metrics: 
java.util.Map[MetricName, _ <: Metric], entity: String,
+      group: Option[String] = None)(verify: Iterable[Metric] => T) : T = {
+    val matchingMetrics = metrics.asScala.filter {
+      case (metricName, _) => metricName.name == name && group.forall(_ == 
metricName.group)
+    }
+    assertTrue(s"Metric not found $name", matchingMetrics.size > 0)
+    verify(matchingMetrics.values)
+  }
+
+  private def maxKafkaMetricValue(name: String, metrics: 
java.util.Map[MetricName, _ <: Metric], entity: String,
+      group: Option[String]): Double = {
+    // Use max value of all matching metrics since Selector metrics are 
recorded for each Processor
+    verifyKafkaMetric(name, metrics, entity, group) { matchingMetrics =>
+      matchingMetrics.foldLeft(0.0)((max, metric) => Math.max(max, 
metric.value))
+    }
+  }
+
+  private def verifyKafkaMetricRecorded(name: String, metrics: 
java.util.Map[MetricName, _ <: Metric], entity: String,
+      group: Option[String] = None): Unit = {
+    val value = maxKafkaMetricValue(name, metrics, entity, group)
+    assertTrue(s"$entity metric not recorded correctly for $name value 
$value", value > 0.0)
+  }
+
+  private def yammerMetricValue(name: String): Any = {
+    val allMetrics = Metrics.defaultRegistry.allMetrics.asScala
+    val (_, metric) = allMetrics.find { case (n, _) => 
n.getMBeanName.endsWith(name) }
+      .getOrElse(fail(s"Unable to find broker metric $name: allMetrics: 
${allMetrics.keySet.map(_.getMBeanName)}"))
+    metric match {
+      case m: Meter => m.count.toDouble
+      case m: Histogram => m.max
+      case m: Gauge[_] => m.value
+      case m => fail(s"Unexpected broker metric of class ${m.getClass}")
+    }
+  }
+
+  private def verifyYammerMetricRecorded(name: String, verify: Double => 
Boolean = d => d > 0): Double = {
+    val metricValue = yammerMetricValue(name).asInstanceOf[Double]
+    assertTrue(s"Broker metric not recorded correctly for $name value 
$metricValue", verify(metricValue))
+    metricValue
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 604bbf3..c1b26f1 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -1508,55 +1508,6 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     servers.foreach(assertNoExemptRequestMetric(_))
   }
 
-  // Rate metrics of both Producer and Consumer are verified by this test
-  @Test
-  def testRateMetricsHaveCumulativeCount() {
-    val numRecords = 100
-    sendRecords(numRecords)
-
-    val consumer = this.consumers.head
-    consumer.assign(List(tp).asJava)
-    consumer.seek(tp, 0)
-    consumeAndVerifyRecords(consumer, numRecords = numRecords, startingOffset 
= 0)
-
-    def exists(name: String, rateMetricName: MetricName, allMetricNames: 
Set[MetricName]): Boolean = {
-      allMetricNames.contains(new MetricName(name, rateMetricName.group, "", 
rateMetricName.tags))
-    }
-
-    def verify(rateMetricName: MetricName, allMetricNames: Set[MetricName]): 
Unit = {
-      val name = rateMetricName.name
-      val totalExists = exists(name.replace("-rate", "-total"), 
rateMetricName, allMetricNames)
-      val totalTimeExists = exists(name.replace("-rate", "-time"), 
rateMetricName, allMetricNames)
-      assertTrue(s"No cumulative count/time metric for rate metric 
$rateMetricName",
-          totalExists || totalTimeExists)
-    }
-
-    val consumerMetricNames = consumer.metrics.keySet.asScala.toSet
-    consumerMetricNames.filter(_.name.endsWith("-rate"))
-        .foreach(verify(_, consumerMetricNames))
-
-    val producer = this.producers.head
-    val producerMetricNames = producer.metrics.keySet.asScala.toSet
-    val producerExclusions = Set("compression-rate") // compression-rate is an 
Average metric, not Rate
-    producerMetricNames.filter(_.name.endsWith("-rate"))
-        .filterNot(metricName => producerExclusions.contains(metricName.name))
-        .foreach(verify(_, producerMetricNames))
-
-    def verifyMetric(name: String, metrics: java.util.Map[MetricName, _ <: 
Metric], entity: String): Unit = {
-      val entry = metrics.asScala.find { case (metricName, _) => 
metricName.name == name }
-      assertTrue(s"$entity metric not defined $name", entry.nonEmpty)
-      entry.foreach { case (metricName, metric) =>
-        assertTrue(s"$entity metric not recorded $metricName", metric.value > 
0.0)
-      }
-    }
-
-    // Check a couple of metrics of consumer and producer to ensure that 
values are set
-    verifyMetric("records-consumed-rate", consumer.metrics, "Consumer")
-    verifyMetric("records-consumed-total", consumer.metrics, "Consumer")
-    verifyMetric("record-send-rate", producer.metrics, "Producer")
-    verifyMetric("record-send-total", producer.metrics, "Producer")
-  }
-
   def runMultiConsumerSessionTimeoutTest(closeConsumer: Boolean): Unit = {
     // use consumers defined in this class plus one additional consumer
     // Use topic defined in this class + one additional topic

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 2ffd828..95abb33 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -1351,7 +1351,8 @@ class GroupCoordinatorTest extends JUnitSuite {
       isFromClient = EasyMock.eq(false),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument),
-      EasyMock.anyObject().asInstanceOf[Option[Object]])).andAnswer(new 
IAnswer[Unit] {
+      EasyMock.anyObject().asInstanceOf[Option[Object]],
+      EasyMock.anyObject())).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
         Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupPartitionId) ->
           new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L)
@@ -1434,7 +1435,8 @@ class GroupCoordinatorTest extends JUnitSuite {
       isFromClient = EasyMock.eq(false),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument),
-      EasyMock.anyObject().asInstanceOf[Option[Object]])
+      EasyMock.anyObject().asInstanceOf[Option[Object]],
+      EasyMock.anyObject())
     ).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
           Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupPartitionId) ->
@@ -1463,7 +1465,8 @@ class GroupCoordinatorTest extends JUnitSuite {
       isFromClient = EasyMock.eq(false),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument),
-      EasyMock.anyObject().asInstanceOf[Option[Object]])
+      EasyMock.anyObject().asInstanceOf[Option[Object]],
+      EasyMock.anyObject())
     ).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
         Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupCoordinator.partitionFor(groupId)) ->

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index a2f5f92..0def9ce 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -1306,7 +1306,8 @@ class GroupMetadataManagerTest {
       isFromClient = EasyMock.eq(false),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument),
-      EasyMock.anyObject().asInstanceOf[Option[Object]])
+      EasyMock.anyObject().asInstanceOf[Option[Object]],
+      EasyMock.anyObject())
     )
     
EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
     capturedArgument
@@ -1320,7 +1321,8 @@ class GroupMetadataManagerTest {
       isFromClient = EasyMock.eq(false),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument),
-      EasyMock.anyObject().asInstanceOf[Option[Object]])
+      EasyMock.anyObject().asInstanceOf[Option[Object]],
+      EasyMock.anyObject())
     ).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
         Map(groupTopicPartition ->

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 49c8e6a..ed1636c 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -498,7 +498,8 @@ class TransactionStateManagerTest {
           EasyMock.eq(false),
           EasyMock.eq(recordsByPartition),
           EasyMock.capture(capturedArgument),
-          EasyMock.eq(None)
+          EasyMock.eq(None),
+          EasyMock.anyObject()
         )).andAnswer(new IAnswer[Unit] {
           override def answer(): Unit = {
             capturedArgument.getValue.apply(
@@ -598,6 +599,7 @@ class TransactionStateManagerTest {
       isFromClient = EasyMock.eq(false),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument),
+      EasyMock.anyObject(),
       EasyMock.anyObject())
     ).andAnswer(new IAnswer[Unit] {
         override def answer(): Unit = capturedArgument.getValue.apply(

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala 
b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index 96f7bfc..b64371e 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -22,6 +22,7 @@ import kafka.common.LongRef
 import kafka.message.{CompressionCodec, DefaultCompressionCodec, 
GZIPCompressionCodec, NoCompressionCodec, SnappyCompressionCodec}
 import org.apache.kafka.common.errors.{InvalidTimestampException, 
UnsupportedForMessageFormatException}
 import org.apache.kafka.common.record._
+import org.apache.kafka.common.utils.Time
 import org.apache.kafka.test.TestUtils
 import org.junit.Assert._
 import org.junit.Test
@@ -30,6 +31,8 @@ import scala.collection.JavaConverters._
 
 class LogValidatorTest {
 
+  val time = Time.SYSTEM
+
   @Test
   def testLogAppendTimeNonCompressedV1() {
     checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V1)
@@ -41,6 +44,7 @@ class LogValidatorTest {
     val records = createRecords(magicValue = magic, timestamp = 1234L, codec = 
CompressionType.NONE)
     val validatedResults = 
LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(0),
+      time= time,
       now = now,
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
@@ -56,6 +60,8 @@ class LogValidatorTest {
     assertEquals(s"Max timestamp should be $now", now, 
validatedResults.maxTimestamp)
     assertEquals(s"The offset of max timestamp should be 0", 0, 
validatedResults.shallowOffsetOfMaxTimestamp)
     assertFalse("Message size should not have been changed", 
validatedResults.messageSizeMaybeChanged)
+
+    verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, 0, 
records, compressed = false)
   }
 
   def testLogAppendTimeNonCompressedV2() {
@@ -74,6 +80,7 @@ class LogValidatorTest {
     val validatedResults = LogValidator.validateMessagesAndAssignOffsets(
       records,
       offsetCounter = new LongRef(0),
+      time = time,
       now = now,
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
@@ -92,6 +99,9 @@ class LogValidatorTest {
     assertEquals(s"The offset of max timestamp should be 
${records.records.asScala.size - 1}",
       records.records.asScala.size - 1, 
validatedResults.shallowOffsetOfMaxTimestamp)
     assertTrue("Message size may have been changed", 
validatedResults.messageSizeMaybeChanged)
+
+    val stats = validatedResults.recordsProcessingStats
+    verifyRecordsProcessingStats(stats, 3, records, compressed = true)
   }
 
   @Test
@@ -111,6 +121,7 @@ class LogValidatorTest {
     val validatedResults = LogValidator.validateMessagesAndAssignOffsets(
       records,
       offsetCounter = new LongRef(0),
+      time = time,
       now = now,
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
@@ -130,6 +141,8 @@ class LogValidatorTest {
     assertEquals(s"The offset of max timestamp should be 
${records.records.asScala.size - 1}",
       records.records.asScala.size - 1, 
validatedResults.shallowOffsetOfMaxTimestamp)
     assertFalse("Message size should not have been changed", 
validatedResults.messageSizeMaybeChanged)
+
+    verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, 0, 
records, compressed = true)
   }
 
   @Test
@@ -161,6 +174,7 @@ class LogValidatorTest {
 
     val validatingResults = 
LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(0),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
@@ -192,6 +206,8 @@ class LogValidatorTest {
     assertEquals(s"Max timestamp should be ${now + 1}", now + 1, 
validatingResults.maxTimestamp)
     assertEquals(s"Offset of max timestamp should be 1", 1, 
validatingResults.shallowOffsetOfMaxTimestamp)
     assertFalse("Message size should not have been changed", 
validatingResults.messageSizeMaybeChanged)
+
+    verifyRecordsProcessingStats(validatingResults.recordsProcessingStats, 0, 
records, compressed = false)
   }
 
   @Test
@@ -223,6 +239,7 @@ class LogValidatorTest {
 
     val validatingResults = 
LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(0),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = NoCompressionCodec,
       targetCodec = GZIPCompressionCodec,
@@ -253,6 +270,8 @@ class LogValidatorTest {
     assertEquals(s"Max timestamp should be ${now + 1}", now + 1, 
validatingResults.maxTimestamp)
     assertEquals("Offset of max timestamp should be 2", 2, 
validatingResults.shallowOffsetOfMaxTimestamp)
     assertTrue("Message size should have been changed", 
validatingResults.messageSizeMaybeChanged)
+
+    verifyRecordsProcessingStats(validatingResults.recordsProcessingStats, 3, 
records, compressed = true)
   }
 
   @Test
@@ -269,6 +288,7 @@ class LogValidatorTest {
     val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V0, codec 
= CompressionType.GZIP)
     val validatedResults = 
LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(0),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
@@ -293,6 +313,8 @@ class LogValidatorTest {
     assertEquals(s"Offset of max timestamp should be 
${validatedRecords.records.asScala.size - 1}",
       validatedRecords.records.asScala.size - 1, 
validatedResults.shallowOffsetOfMaxTimestamp)
     assertTrue("Message size should have been changed", 
validatedResults.messageSizeMaybeChanged)
+
+    verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, 3, 
records, compressed = true)
   }
 
   @Test
@@ -306,6 +328,7 @@ class LogValidatorTest {
     val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V1, codec 
= CompressionType.GZIP, timestamp = timestamp)
     val validatedResults = 
LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(0),
+      time = time,
       now = timestamp,
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
@@ -330,6 +353,8 @@ class LogValidatorTest {
     assertEquals(s"Offset of max timestamp should be 
${validatedRecords.records.asScala.size - 1}",
       validatedRecords.records.asScala.size - 1, 
validatedResults.shallowOffsetOfMaxTimestamp)
     assertTrue("Message size should have been changed", 
validatedResults.messageSizeMaybeChanged)
+
+    verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, 3, 
records, compressed = true)
   }
 
   @Test
@@ -356,6 +381,7 @@ class LogValidatorTest {
 
     val validatedResults = 
LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(0),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
@@ -387,6 +413,8 @@ class LogValidatorTest {
     assertEquals(s"Offset of max timestamp should be 
${validatedRecords.records.asScala.size - 1}",
       validatedRecords.records.asScala.size - 1, 
validatedResults.shallowOffsetOfMaxTimestamp)
     assertFalse("Message size should not have been changed", 
validatedResults.messageSizeMaybeChanged)
+
+    verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, 0, 
records, compressed = true)
   }
 
   @Test
@@ -402,6 +430,7 @@ class LogValidatorTest {
     LogValidator.validateMessagesAndAssignOffsets(
       records,
       offsetCounter = new LongRef(0),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
@@ -421,6 +450,7 @@ class LogValidatorTest {
     LogValidator.validateMessagesAndAssignOffsets(
       records,
       offsetCounter = new LongRef(0),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
@@ -440,6 +470,7 @@ class LogValidatorTest {
     LogValidator.validateMessagesAndAssignOffsets(
       records,
       offsetCounter = new LongRef(0),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
@@ -459,6 +490,7 @@ class LogValidatorTest {
     LogValidator.validateMessagesAndAssignOffsets(
       records,
       offsetCounter = new LongRef(0),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
@@ -477,6 +509,7 @@ class LogValidatorTest {
     checkOffsets(records, 0)
     checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
@@ -495,6 +528,7 @@ class LogValidatorTest {
     checkOffsets(records, 0)
     checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
@@ -514,6 +548,7 @@ class LogValidatorTest {
     checkOffsets(records, 0)
     val messageWithOffset = 
LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
@@ -534,6 +569,7 @@ class LogValidatorTest {
     checkOffsets(records, 0)
     val messageWithOffset = 
LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
@@ -555,6 +591,7 @@ class LogValidatorTest {
     val compressedMessagesWithOffset = 
LogValidator.validateMessagesAndAssignOffsets(
       records,
       offsetCounter = new LongRef(offset),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
@@ -576,6 +613,7 @@ class LogValidatorTest {
     val compressedMessagesWithOffset = 
LogValidator.validateMessagesAndAssignOffsets(
       records,
       offsetCounter = new LongRef(offset),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
@@ -595,6 +633,7 @@ class LogValidatorTest {
     val offset = 1234567
     checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
@@ -613,6 +652,7 @@ class LogValidatorTest {
     val offset = 1234567
     checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
@@ -631,6 +671,7 @@ class LogValidatorTest {
     checkOffsets(records, 0)
     checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
@@ -649,6 +690,7 @@ class LogValidatorTest {
     checkOffsets(records, 0)
     checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
@@ -667,6 +709,7 @@ class LogValidatorTest {
     val records = MemoryRecords.withEndTransactionMarker(23423L, 5, 
endTxnMarker)
     LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
@@ -685,6 +728,7 @@ class LogValidatorTest {
     val records = MemoryRecords.withEndTransactionMarker(23423L, 5, 
endTxnMarker)
     val result = LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = NoCompressionCodec,
       targetCodec = SnappyCompressionCodec,
@@ -708,6 +752,7 @@ class LogValidatorTest {
     checkOffsets(records, 0)
     checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
@@ -727,6 +772,7 @@ class LogValidatorTest {
     checkOffsets(records, 0)
     checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
@@ -745,6 +791,7 @@ class LogValidatorTest {
     val offset = 1234567
     checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
@@ -763,6 +810,7 @@ class LogValidatorTest {
     checkOffsets(records, 0)
     checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
@@ -782,6 +830,7 @@ class LogValidatorTest {
     checkOffsets(records, 0)
     checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
@@ -801,6 +850,7 @@ class LogValidatorTest {
     checkOffsets(records, 0)
     checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
@@ -822,6 +872,7 @@ class LogValidatorTest {
       new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes), 
new SimpleRecord("beautiful".getBytes))
     checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
@@ -843,6 +894,7 @@ class LogValidatorTest {
       new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes), 
new SimpleRecord("beautiful".getBytes))
     checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
@@ -862,6 +914,7 @@ class LogValidatorTest {
     checkOffsets(records, 0)
     checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
@@ -881,6 +934,7 @@ class LogValidatorTest {
     checkOffsets(records, 0)
     checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
@@ -898,6 +952,7 @@ class LogValidatorTest {
     val records = recordsWithInvalidInnerMagic(offset)
     LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = SnappyCompressionCodec,
       targetCodec = SnappyCompressionCodec,
@@ -936,6 +991,7 @@ class LogValidatorTest {
     val records = MemoryRecords.readableRecords(buffer)
     LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
+      time = time,
       now = System.currentTimeMillis(),
       sourceCodec = sourceCodec,
       targetCodec = targetCodec,
@@ -1010,4 +1066,19 @@ class LogValidatorTest {
     }
   }
 
+  def verifyRecordsProcessingStats(stats: RecordsProcessingStats, 
convertedCount: Int,
+            records: MemoryRecords, compressed: Boolean): Unit = {
+    assertNotNull("Records processing info is null", stats)
+    assertEquals(convertedCount, stats.conversionCount)
+    if (stats.conversionCount > 0)
+      assertTrue(s"Conversion time not recorded $stats", 
stats.conversionTimeNanos > 0)
+    val originalSize = records.sizeInBytes
+    val tempBytes = stats.temporaryMemoryBytes
+    if (convertedCount > 0)
+      assertTrue(s"Temp bytes too small, orig=$originalSize 
actual=$tempBytes", tempBytes > originalSize)
+    else if (compressed)
+      assertTrue("Temp bytes not updated", tempBytes > 0)
+    else
+      assertEquals(0, tempBytes)
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala 
b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 8b611f2..077950d 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -519,7 +519,7 @@ class SocketServerTest extends JUnitSuite {
       val channel = overrideServer.requestChannel
       val request = receiveRequest(channel)
 
-      val requestMetrics = 
RequestMetrics.metricsMap(request.header.apiKey.name)
+      val requestMetrics = RequestMetrics(request.header.apiKey.name)
       def totalTimeHistCount(): Long = requestMetrics.totalTimeHist.count
       val expectedTotalTimeCount = totalTimeHistCount() + 1
 
@@ -561,7 +561,7 @@ class SocketServerTest extends JUnitSuite {
       TestUtils.waitUntilTrue(() => 
overrideServer.processor(request.processor).channel(request.context.connectionId).isEmpty,
         s"Idle connection `${request.context.connectionId}` was not closed by 
selector")
 
-      val requestMetrics = 
RequestMetrics.metricsMap(request.header.apiKey.name)
+      val requestMetrics = RequestMetrics(request.header.apiKey.name)
       def totalTimeHistCount(): Long = requestMetrics.totalTimeHist.count
       val expectedTotalTimeCount = totalTimeHistCount() + 1
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index deea586..fde2ae1 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -179,6 +179,7 @@ class KafkaApisTest {
       EasyMock.eq(false),
       EasyMock.anyObject(),
       EasyMock.capture(responseCallback),
+      EasyMock.anyObject(),
       EasyMock.anyObject())).andAnswer(new IAnswer[Unit] {
       override def answer(): Unit = {
         responseCallback.getValue.apply(Map(tp2 -> new 
PartitionResponse(Errors.NONE)))
@@ -217,6 +218,7 @@ class KafkaApisTest {
       EasyMock.eq(false),
       EasyMock.anyObject(),
       EasyMock.capture(responseCallback),
+      EasyMock.anyObject(),
       EasyMock.anyObject())).andAnswer(new IAnswer[Unit] {
       override def answer(): Unit = {
         responseCallback.getValue.apply(Map(tp2 -> new 
PartitionResponse(Errors.NONE)))
@@ -247,6 +249,7 @@ class KafkaApisTest {
       EasyMock.eq(false),
       EasyMock.anyObject(),
       EasyMock.anyObject(),
+      EasyMock.anyObject(),
       EasyMock.anyObject()))
 
     EasyMock.replay(replicaManager)

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 344ef2e..1806cb0 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -68,6 +68,12 @@
     <li>The Java clients and tools now accept any string as a client-id.</li>
     <li>The deprecated tool <code>kafka-consumer-offset-checker.sh</code> has 
been removed. Use <code>kafka-consumer-groups.sh</code> to get consumer group 
details.</li>
     <li>SimpleAclAuthorizer now logs access denials to the authorizer log by 
default.</li>
+    <li>The <code>app-info</code> mbean registered with JMX to provide version 
and commit id will be deprecated and replaced with
+        metrics providing these attributes.</li>
+    <li>Kafka metrics may now contain non-numeric values. 
<code>org.apache.kafka.common.Metric#value()</code> has been deprecated and
+        may throw an <code>IllegalStateException</code> when iterating over 
metrics of <code>KafkaProducer/KafkaConsumer/KafkaAdminClient</code>.
+        <code>org.apache.kafka.common.Metric#metricValue()</code> can be used 
to safely iterate over any metric value.</code>
+
 </ul>
 
 <h5><a id="upgrade_100_new_protocols" href="#upgrade_100_new_protocols">New 
Protocol Versions</a></h5>

http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java 
b/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java
index e02bbb0..0e5d130 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java
@@ -32,7 +32,7 @@ public class ToolsUtils {
     public static void printMetrics(Map<MetricName, ? extends Metric> metrics) 
{
         if (metrics != null && !metrics.isEmpty()) {
             int maxLengthOfDisplayName = 0;
-            TreeMap<String, Double> sortedMetrics = new TreeMap<>(new 
Comparator<String>() {
+            TreeMap<String, Object> sortedMetrics = new TreeMap<>(new 
Comparator<String>() {
                 @Override
                 public int compare(String o1, String o2) {
                     return o1.compareTo(o2);
@@ -42,12 +42,18 @@ public class ToolsUtils {
                 MetricName mName = metric.metricName();
                 String mergedName = mName.group() + ":" + mName.name() + ":" + 
mName.tags();
                 maxLengthOfDisplayName = maxLengthOfDisplayName < 
mergedName.length() ? mergedName.length() : maxLengthOfDisplayName;
-                sortedMetrics.put(mergedName, metric.value());
+                sortedMetrics.put(mergedName, metric.metricValue());
             }
-            String outputFormat = "%-" + maxLengthOfDisplayName + "s : %.3f";
+            String doubleOutputFormat = "%-" + maxLengthOfDisplayName + "s : 
%.3f";
+            String defaultOutputFormat = "%-" + maxLengthOfDisplayName + "s : 
%s";
             System.out.println(String.format("\n%-" + maxLengthOfDisplayName + 
"s   %s", "Metric Name", "Value"));
 
-            for (Map.Entry<String, Double> entry : sortedMetrics.entrySet()) {
+            for (Map.Entry<String, Object> entry : sortedMetrics.entrySet()) {
+                String outputFormat;
+                if (entry.getValue() instanceof Double)
+                    outputFormat = doubleOutputFormat;
+                else
+                    outputFormat = defaultOutputFormat;
                 System.out.println(String.format(outputFormat, entry.getKey(), 
entry.getValue()));
             }
         }

Reply via email to