Repository: kafka Updated Branches: refs/heads/trunk dd6347a5d -> 021d8a8e9
http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 6434d23..6781dc9 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -44,7 +44,7 @@ import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANS import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.{ApiKeys, Errors} -import org.apache.kafka.common.record.{ControlRecordType, EndTransactionMarker, MemoryRecords, RecordBatch} +import org.apache.kafka.common.record.{ControlRecordType, EndTransactionMarker, MemoryRecords, RecordBatch, RecordsProcessingStats} import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse import org.apache.kafka.common.requests.DeleteAclsResponse.{AclDeletionResult, AclFilterResponse} import org.apache.kafka.common.requests.{Resource => RResource, ResourceType => RResourceType, _} @@ -424,7 +424,7 @@ class KafkaApis(val requestChannel: RequestChannel, s"from client id ${request.header.clientId} with ack=0\n" + s"Topic and partition to exceptions: $exceptionsSummary" ) - closeConnection(request) + closeConnection(request, new ProduceResponse(mergedResponseStatus.asJava).errorCounts) } else { sendNoOpResponseExemptThrottle(request) } @@ -444,6 +444,12 @@ class KafkaApis(val requestChannel: RequestChannel, produceResponseCallback) } + def processingStatsCallback(processingStats: Map[TopicPartition, RecordsProcessingStats]): Unit = { + processingStats.foreach { case (tp, info) => + updateRecordsProcessingStats(request, tp, info) + } + } + if (authorizedRequestInfo.isEmpty) sendResponseCallback(Map.empty) else { @@ -456,7 +462,8 @@ class KafkaApis(val requestChannel: RequestChannel, internalTopicsAllowed = internalTopicsAllowed, isFromClient = true, entriesPerPartition = authorizedRequestInfo, - responseCallback = sendResponseCallback) + responseCallback = sendResponseCallback, + processingStatsCallback = processingStatsCallback) // if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected; // hence we clear its data here in order to let GC reclaim its memory since it is already appended to log @@ -511,9 +518,11 @@ class KafkaApis(val requestChannel: RequestChannel, downConvertMagic.map { magic => trace(s"Down converting records from partition $tp to message format version $magic for fetch request from $clientId") - val converted = data.records.downConvert(magic, fetchRequest.fetchData.get(tp).fetchOffset) + val startNanos = time.nanoseconds + val converted = data.records.downConvert(magic, fetchRequest.fetchData.get(tp).fetchOffset, time) + updateRecordsProcessingStats(request, tp, converted.recordsProcessingStats) new FetchResponse.PartitionData(data.error, data.highWatermark, FetchResponse.INVALID_LAST_STABLE_OFFSET, - data.logStartOffset, data.abortedTransactions, converted) + data.logStartOffset, data.abortedTransactions, converted.records) } }.getOrElse(data) @@ -2002,6 +2011,25 @@ class KafkaApis(val requestChannel: RequestChannel, throw new ClusterAuthorizationException(s"Request $request is not authorized.") } + private def updateRecordsProcessingStats(request: RequestChannel.Request, tp: TopicPartition, + processingStats: RecordsProcessingStats): Unit = { + val conversionCount = processingStats.conversionCount + if (conversionCount > 0) { + request.header.apiKey match { + case ApiKeys.PRODUCE => + brokerTopicStats.topicStats(tp.topic).produceMessageConversionsRate.mark(conversionCount) + brokerTopicStats.allTopicsStats.produceMessageConversionsRate.mark(conversionCount) + case ApiKeys.FETCH => + brokerTopicStats.topicStats(tp.topic).fetchMessageConversionsRate.mark(conversionCount) + brokerTopicStats.allTopicsStats.fetchMessageConversionsRate.mark(conversionCount) + case _ => + throw new IllegalStateException("Message conversion info is recorded only for Produce/Fetch requests") + } + request.messageConversionsTimeNanos = processingStats.conversionTimeNanos + } + request.temporaryMemoryBytes = processingStats.temporaryMemoryBytes + } + private def handleError(request: RequestChannel.Request, e: Throwable) { val mayThrottle = e.isInstanceOf[ClusterAuthorizationException] || !request.header.apiKey.clusterAction error("Error when handling request %s".format(request.body[AbstractRequest]), e) @@ -2031,9 +2059,10 @@ class KafkaApis(val requestChannel: RequestChannel, } private def sendErrorOrCloseConnection(request: RequestChannel.Request, error: Throwable)(throttleMs: Int): Unit = { - val response = request.body[AbstractRequest].getErrorResponse(throttleMs, error) + val requestBody = request.body[AbstractRequest] + val response = requestBody.getErrorResponse(throttleMs, error) if (response == null) - closeConnection(request) + closeConnection(request, requestBody.errorCounts(error)) else sendResponse(request, Some(response)) } @@ -2043,13 +2072,17 @@ class KafkaApis(val requestChannel: RequestChannel, sendResponse(request, None) } - private def closeConnection(request: RequestChannel.Request): Unit = { + private def closeConnection(request: RequestChannel.Request, errorCounts: java.util.Map[Errors, Integer]): Unit = { // This case is used when the request handler has encountered an error, but the client // does not expect a response (e.g. when produce request has acks set to 0) + requestChannel.updateErrorMetrics(request.header.apiKey, errorCounts.asScala) requestChannel.sendResponse(new RequestChannel.Response(request, None, CloseConnectionAction, None)) } private def sendResponse(request: RequestChannel.Request, responseOpt: Option[AbstractResponse]): Unit = { + // Update error metrics for each error code in the response including Errors.NONE + responseOpt.foreach(response => requestChannel.updateErrorMetrics(request.header.apiKey, response.errorCounts.asScala)) + responseOpt match { case Some(response) => val responseSend = request.context.buildResponse(response) http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/server/KafkaHealthcheck.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala index b108bf6..43c81ab 100644 --- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala +++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala @@ -25,10 +25,13 @@ import kafka.api.ApiVersion import kafka.cluster.EndPoint import kafka.metrics.KafkaMetricsGroup import kafka.utils._ +import com.yammer.metrics.core.Gauge import org.I0Itec.zkclient.IZkStateListener import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.zookeeper.Watcher.Event.KeeperState +import scala.collection.mutable.Set + /** * This class registers the broker in zookeeper to allow * other brokers and consumers to detect failures. It uses an ephemeral znode with the path: @@ -71,6 +74,8 @@ class KafkaHealthcheck(brokerId: Int, interBrokerProtocolVersion) } + def shutdown(): Unit = sessionExpireListener.shutdown() + /** * When we get a SessionExpired event, it means that we have lost all ephemeral nodes and ZKClient has re-established * a connection for us. We need to re-register this broker in the broker registry. We rely on `handleStateChanged` @@ -78,6 +83,8 @@ class KafkaHealthcheck(brokerId: Int, */ class SessionExpireListener extends IZkStateListener with KafkaMetricsGroup { + private val metricNames = Set[String]() + private[server] val stateToMeterMap = { import KeeperState._ val stateToEventTypeMap = Map( @@ -89,10 +96,20 @@ class KafkaHealthcheck(brokerId: Int, Expired -> "Expires" ) stateToEventTypeMap.map { case (state, eventType) => - state -> newMeter(s"ZooKeeper${eventType}PerSec", eventType.toLowerCase(Locale.ROOT), TimeUnit.SECONDS) + val name = s"ZooKeeper${eventType}PerSec" + metricNames += name + state -> newMeter(name, eventType.toLowerCase(Locale.ROOT), TimeUnit.SECONDS) } } + private[server] val sessionStateGauge = + newGauge("SessionState", new Gauge[String] { + override def value: String = + Option(zkUtils.zkConnection.getZookeeperState.toString).getOrElse("DISCONNECTED") + }) + + metricNames += "SessionState" + @throws[Exception] override def handleStateChanged(state: KeeperState) { stateToMeterMap.get(state).foreach(_.mark()) @@ -110,6 +127,8 @@ class KafkaHealthcheck(brokerId: Int, fatal("Could not establish session with zookeeper", error) } + def shutdown(): Unit = metricNames.foreach(removeMetric(_)) + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/server/KafkaRequestHandler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala index f055762..a498781 100755 --- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala +++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala @@ -129,6 +129,8 @@ class BrokerTopicMetrics(name: Option[String]) extends KafkaMetricsGroup { val failedFetchRequestRate = newMeter(BrokerTopicStats.FailedFetchRequestsPerSec, "requests", TimeUnit.SECONDS, tags) val totalProduceRequestRate = newMeter(BrokerTopicStats.TotalProduceRequestsPerSec, "requests", TimeUnit.SECONDS, tags) val totalFetchRequestRate = newMeter(BrokerTopicStats.TotalFetchRequestsPerSec, "requests", TimeUnit.SECONDS, tags) + val fetchMessageConversionsRate = newMeter(BrokerTopicStats.FetchMessageConversionsPerSec, "requests", TimeUnit.SECONDS, tags) + val produceMessageConversionsRate = newMeter(BrokerTopicStats.ProduceMessageConversionsPerSec, "requests", TimeUnit.SECONDS, tags) def close() { removeMetric(BrokerTopicStats.MessagesInPerSec, tags) @@ -143,6 +145,8 @@ class BrokerTopicMetrics(name: Option[String]) extends KafkaMetricsGroup { removeMetric(BrokerTopicStats.FailedFetchRequestsPerSec, tags) removeMetric(BrokerTopicStats.TotalProduceRequestsPerSec, tags) removeMetric(BrokerTopicStats.TotalFetchRequestsPerSec, tags) + removeMetric(BrokerTopicStats.FetchMessageConversionsPerSec, tags) + removeMetric(BrokerTopicStats.ProduceMessageConversionsPerSec, tags) } } @@ -157,6 +161,8 @@ object BrokerTopicStats { val FailedFetchRequestsPerSec = "FailedFetchRequestsPerSec" val TotalProduceRequestsPerSec = "TotalProduceRequestsPerSec" val TotalFetchRequestsPerSec = "TotalFetchRequestsPerSec" + val FetchMessageConversionsPerSec = "FetchMessageConversionsPerSec" + val ProduceMessageConversionsPerSec = "ProduceMessageConversionsPerSec" private val valueFactory = (k: String) => new BrokerTopicMetrics(Some(k)) } http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/server/KafkaServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 88b1d23..f8af7a2 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -293,7 +293,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP shutdownLatch = new CountDownLatch(1) startupComplete.set(true) isStartingUp.set(false) - AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString) + AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString, metrics) info("started") } } @@ -333,19 +333,21 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP chrootOption.foreach { chroot => val zkConnForChrootCreation = config.zkConnect.substring(0, chrootIndex) - val zkClientForChrootCreation = ZkUtils(zkConnForChrootCreation, + val zkClientForChrootCreation = ZkUtils.withMetrics(zkConnForChrootCreation, sessionTimeout = config.zkSessionTimeoutMs, connectionTimeout = config.zkConnectionTimeoutMs, - secureAclsEnabled) + secureAclsEnabled, + time) zkClientForChrootCreation.makeSurePersistentPathExists(chroot) info(s"Created zookeeper path $chroot") zkClientForChrootCreation.close() } - val zkUtils = ZkUtils(config.zkConnect, + val zkUtils = ZkUtils.withMetrics(config.zkConnect, sessionTimeout = config.zkSessionTimeoutMs, connectionTimeout = config.zkConnectionTimeoutMs, - secureAclsEnabled) + secureAclsEnabled, + time) zkUtils.setupCommonPaths() zkUtils } @@ -512,6 +514,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP CoreUtils.swallow(controlledShutdown()) brokerState.newState(BrokerShuttingDown) + if (kafkaHealthcheck != null) + CoreUtils.swallow(kafkaHealthcheck.shutdown()) + if (socketServer != null) CoreUtils.swallow(socketServer.shutdown()) if (requestHandlerPool != null) @@ -549,7 +554,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP startupComplete.set(false) isShuttingDown.set(false) - CoreUtils.swallow(AppInfoParser.unregisterAppInfo(jmxPrefix, config.brokerId.toString)) + CoreUtils.swallow(AppInfoParser.unregisterAppInfo(jmxPrefix, config.brokerId.toString, metrics)) shutdownLatch.countDown() info("shut down completed") } http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/server/ReplicaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 9cc6317..3acb88b 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -434,7 +434,8 @@ class ReplicaManager(val config: KafkaConfig, isFromClient: Boolean, entriesPerPartition: Map[TopicPartition, MemoryRecords], responseCallback: Map[TopicPartition, PartitionResponse] => Unit, - delayedProduceLock: Option[Object] = None) { + delayedProduceLock: Option[Object] = None, + processingStatsCallback: Map[TopicPartition, RecordsProcessingStats] => Unit = _ => ()) { if (isValidRequiredAcks(requiredAcks)) { val sTime = time.milliseconds val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed, @@ -448,6 +449,8 @@ class ReplicaManager(val config: KafkaConfig, new PartitionResponse(result.error, result.info.firstOffset, result.info.logAppendTime, result.info.logStartOffset)) // response status } + processingStatsCallback(localProduceResults.mapValues(_.info.recordsProcessingStats)) + if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) { // create delayed produce operation val produceMetadata = ProduceMetadata(requiredAcks, produceStatus) http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/main/scala/kafka/utils/ZkUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 9582c50..755f500 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -17,7 +17,7 @@ package kafka.utils -import java.util.concurrent.CountDownLatch +import java.util.concurrent.{CountDownLatch, TimeUnit} import kafka.admin._ import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, LeaderAndIsr} @@ -25,12 +25,16 @@ import kafka.cluster._ import kafka.common.{KafkaException, NoEpochForPartitionException, TopicAndPartition} import kafka.consumer.{ConsumerThreadId, TopicCount} import kafka.controller.{KafkaController, LeaderIsrAndControllerEpoch, ReassignedPartitionsContext} +import kafka.metrics.KafkaMetricsGroup import kafka.server.ConfigType import kafka.utils.ZkUtils._ + +import com.yammer.metrics.core.MetricName import org.I0Itec.zkclient.exception.{ZkBadVersionException, ZkException, ZkMarshallingError, ZkNoNodeException, ZkNodeExistsException} import org.I0Itec.zkclient.serialize.ZkSerializer -import org.I0Itec.zkclient.{ZkClient, ZkConnection, IZkDataListener, IZkChildListener, IZkStateListener} +import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient, ZkConnection} import org.apache.kafka.common.config.ConfigException +import org.apache.kafka.common.utils.Time import org.apache.zookeeper.AsyncCallback.{DataCallback, StringCallback} import org.apache.zookeeper.KeeperException.Code import org.apache.zookeeper.data.{ACL, Stat} @@ -84,6 +88,12 @@ object ZkUtils { // sensitive information that should not be world readable to the Seq val SensitiveZkRootPaths = Seq(ConfigUsersPath) + def withMetrics(zkUrl: String, sessionTimeout: Int, connectionTimeout: Int, isZkSecurityEnabled: Boolean, + time: Time): ZkUtils = { + val (zkClient, zkConnection) = createZkClientAndConnection(zkUrl, sessionTimeout, connectionTimeout) + new ZkUtils(new ZooKeeperClientMetrics(zkClient, time), zkConnection, isZkSecurityEnabled) + } + def apply(zkUrl: String, sessionTimeout: Int, connectionTimeout: Int, isZkSecurityEnabled: Boolean): ZkUtils = { val (zkClient, zkConnection) = createZkClientAndConnection(zkUrl, sessionTimeout, connectionTimeout) new ZkUtils(zkClient, zkConnection, isZkSecurityEnabled) @@ -212,7 +222,38 @@ object ZkUtils { } -class ZkUtils(val zkClient: ZkClient, +class ZooKeeperClientWrapper(val zkClient: ZkClient) { + def apply[T](method: ZkClient => T): T = method(zkClient) + def close(): Unit = { + if(zkClient != null) + zkClient.close() + } +} + +class ZooKeeperClientMetrics(zkClient: ZkClient, val time: Time) + extends ZooKeeperClientWrapper(zkClient) with KafkaMetricsGroup { + val latencyMetric = newHistogram("ZooKeeperRequestLatencyMs") + + override protected def metricName(name: String, metricTags: scala.collection.Map[String, String]): MetricName = { + explicitMetricName("kafka.server", "ZooKeeperClientMetrics", name, metricTags) + } + + override def apply[T](method: ZkClient => T): T = { + val startNs = time.nanoseconds + val ret = + try method(zkClient) + finally latencyMetric.update(TimeUnit.NANOSECONDS.toMillis(time.nanoseconds - startNs)) + ret + } + + override def close(): Unit = { + if (latencyMetric != null) + removeMetric("ZooKeeperRequestLatencyMs") + super.close() + } +} + +class ZkUtils(zkClientWrap: ZooKeeperClientWrapper, val zkConnection: ZkConnection, val isSecure: Boolean) extends Logging { // These are persistent ZK paths that should exist on kafka broker startup. @@ -228,8 +269,12 @@ class ZkUtils(val zkClient: ZkClient, ProducerIdBlockPath, LogDirEventNotificationPath) + /** Present for compatibility */ + def this(zkClient: ZkClient, zkConnection: ZkConnection, isSecure: Boolean) = + this(new ZooKeeperClientWrapper(zkClient), zkConnection, isSecure) + // Visible for testing - val zkPath = new ZkPath(zkClient) + val zkPath = new ZkPath(zkClientWrap) import ZkUtils._ @@ -238,6 +283,8 @@ class ZkUtils(val zkClient: ZkClient, def defaultAcls(path: String): java.util.List[ACL] = ZkUtils.defaultAcls(isSecure, path) + def zkClient: ZkClient = zkClientWrap.zkClient + def getController(): Int = { readDataMaybeNull(ControllerPath)._1 match { case Some(controller) => KafkaController.parseControllerId(controller) @@ -388,7 +435,7 @@ class ZkUtils(val zkClient: ZkClient, brokerInfo, zkConnection.getZookeeper, isSecure) - zkCheckedEphemeral.create() + zkClientWrap(_ => zkCheckedEphemeral.create()) } catch { case _: ZkNodeExistsException => throw new RuntimeException("A broker is already registered on the path " + brokerIdPath @@ -429,7 +476,7 @@ class ZkUtils(val zkClient: ZkClient, acls } - if (!zkClient.exists(path)) + if (!zkClientWrap(zkClient => zkClient.exists(path))) zkPath.createPersistent(path, createParents = true, acl) //won't throw NoNodeException or NodeExistsException } @@ -512,7 +559,7 @@ class ZkUtils(val zkClient: ZkClient, def updatePersistentPath(path: String, data: String, acls: java.util.List[ACL] = UseDefaultAcls) = { val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls try { - zkClient.writeData(path, data) + zkClientWrap(_.writeData(path, data)) } catch { case _: ZkNoNodeException => createParentPath(path) @@ -520,7 +567,7 @@ class ZkUtils(val zkClient: ZkClient, zkPath.createPersistent(path, data, acl) } catch { case _: ZkNodeExistsException => - zkClient.writeData(path, data) + zkClientWrap(_.writeData(path, data)) } } } @@ -536,7 +583,7 @@ class ZkUtils(val zkClient: ZkClient, def conditionalUpdatePersistentPath(path: String, data: String, expectVersion: Int, optionalChecker:Option[(ZkUtils, String, String) => (Boolean,Int)] = None): (Boolean, Int) = { try { - val stat = zkClient.writeDataReturnStat(path, data, expectVersion) + val stat = zkClientWrap(_.writeDataReturnStat(path, data, expectVersion)) debug("Conditional update of path %s with value %s and expected version %d succeeded, returning the new version: %d" .format(path, data, expectVersion, stat.getVersion)) (true, stat.getVersion) @@ -563,7 +610,7 @@ class ZkUtils(val zkClient: ZkClient, */ def conditionalUpdatePersistentPathIfExists(path: String, data: String, expectVersion: Int): (Boolean, Int) = { try { - val stat = zkClient.writeDataReturnStat(path, data, expectVersion) + val stat = zkClientWrap(_.writeDataReturnStat(path, data, expectVersion)) debug("Conditional update of path %s with value %s and expected version %d succeeded, returning the new version: %d" .format(path, data, expectVersion, stat.getVersion)) (true, stat.getVersion) @@ -583,7 +630,7 @@ class ZkUtils(val zkClient: ZkClient, def updateEphemeralPath(path: String, data: String, acls: java.util.List[ACL] = UseDefaultAcls): Unit = { val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls try { - zkClient.writeData(path, data) + zkClientWrap(_.writeData(path, data)) } catch { case _: ZkNoNodeException => createParentPath(path) @@ -592,7 +639,7 @@ class ZkUtils(val zkClient: ZkClient, } def deletePath(path: String): Boolean = { - zkClient.delete(path) + zkClientWrap(_.delete(path)) } /** @@ -601,7 +648,7 @@ class ZkUtils(val zkClient: ZkClient, */ def conditionalDeletePath(path: String, expectedVersion: Int): Boolean = { try { - zkClient.delete(path, expectedVersion) + zkClientWrap(_.delete(path, expectedVersion)) true } catch { case _: ZkBadVersionException => false @@ -609,37 +656,38 @@ class ZkUtils(val zkClient: ZkClient, } def deletePathRecursive(path: String) { - zkClient.deleteRecursive(path) + zkClientWrap(_.deleteRecursive(path)) } def subscribeDataChanges(path: String, listener: IZkDataListener): Unit = - zkClient.subscribeDataChanges(path, listener) + zkClientWrap(_.subscribeDataChanges(path, listener)) def unsubscribeDataChanges(path: String, dataListener: IZkDataListener): Unit = - zkClient.unsubscribeDataChanges(path, dataListener) + zkClientWrap(_.unsubscribeDataChanges(path, dataListener)) def subscribeStateChanges(listener: IZkStateListener): Unit = - zkClient.subscribeStateChanges(listener) + zkClientWrap(_.subscribeStateChanges(listener)) def subscribeChildChanges(path: String, listener: IZkChildListener): Option[Seq[String]] = - Option(zkClient.subscribeChildChanges(path, listener)).map(_.asScala) + Option(zkClientWrap(_.subscribeChildChanges(path, listener))).map(_.asScala) def unsubscribeChildChanges(path: String, childListener: IZkChildListener): Unit = - zkClient.unsubscribeChildChanges(path, childListener) + zkClientWrap(_.unsubscribeChildChanges(path, childListener)) def unsubscribeAll(): Unit = - zkClient.unsubscribeAll() + zkClientWrap(_.unsubscribeAll()) def readData(path: String): (String, Stat) = { val stat: Stat = new Stat() - val dataStr: String = zkClient.readData(path, stat) + val dataStr: String = zkClientWrap(_.readData[String](path, stat)) (dataStr, stat) } def readDataMaybeNull(path: String): (Option[String], Stat) = { val stat = new Stat() val dataAndStat = try { - (Some(zkClient.readData(path, stat)), stat) + val dataStr = zkClientWrap(_.readData[String](path, stat)) + (Some(dataStr), stat) } catch { case _: ZkNoNodeException => (None, stat) @@ -650,18 +698,18 @@ class ZkUtils(val zkClient: ZkClient, def readDataAndVersionMaybeNull(path: String): (Option[String], Int) = { val stat = new Stat() try { - val data: String = zkClient.readData(path, stat) + val data = zkClientWrap(_.readData[String](path, stat)) (Option(data), stat.getVersion) } catch { case _: ZkNoNodeException => (None, stat.getVersion) } } - def getChildren(path: String): Seq[String] = zkClient.getChildren(path).asScala + def getChildren(path: String): Seq[String] = zkClientWrap(_.getChildren(path)).asScala def getChildrenParentMayNotExist(path: String): Seq[String] = { try { - zkClient.getChildren(path).asScala + zkClientWrap(_.getChildren(path)).asScala } catch { case _: ZkNoNodeException => Nil } @@ -671,7 +719,7 @@ class ZkUtils(val zkClient: ZkClient, * Check if the given path exists */ def pathExists(path: String): Boolean = { - zkClient.exists(path) + zkClientWrap(_.exists(path)) } def isTopicMarkedForDeletion(topic: String): Boolean = { @@ -789,9 +837,9 @@ class ZkUtils(val zkClient: ZkClient, def deletePartition(brokerId: Int, topic: String) { val brokerIdPath = BrokerIdsPath + "/" + brokerId - zkClient.delete(brokerIdPath) + zkClientWrap(_.delete(brokerIdPath)) val brokerPartTopicPath = ZkUtils.BrokerTopicsPath + "/" + topic + "/" + brokerId - zkClient.delete(brokerPartTopicPath) + zkClientWrap(_.delete(brokerPartTopicPath)) } @deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0") @@ -851,7 +899,7 @@ class ZkUtils(val zkClient: ZkClient, */ def getSequenceId(path: String, acls: java.util.List[ACL] = UseDefaultAcls): Int = { val acl = if (acls == UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls - def writeToZk: Int = zkClient.writeDataReturnStat(path, "", -1).getVersion + def writeToZk: Int = zkClientWrap(_.writeDataReturnStat(path, "", -1)).getVersion try { writeToZk } catch { @@ -915,9 +963,7 @@ class ZkUtils(val zkClient: ZkClient, } def close() { - if(zkClient != null) { - zkClient.close() - } + zkClientWrap.close() } } @@ -973,7 +1019,7 @@ class ZKConfig(props: VerifiableProperties) { val zkSyncTimeMs = props.getInt(ZkSyncTimeMsProp, 2000) } -class ZkPath(client: ZkClient) { +class ZkPath(clientWrap: ZooKeeperClientWrapper) { @volatile private var isNamespacePresent: Boolean = false @@ -981,7 +1027,7 @@ class ZkPath(client: ZkClient) { if (isNamespacePresent) return - if (!client.exists("/")) { + if (!clientWrap(_.exists("/"))) { throw new ConfigException("Zookeeper namespace does not exist") } isNamespacePresent = true @@ -993,22 +1039,22 @@ class ZkPath(client: ZkClient) { def createPersistent(path: String, data: Object, acls: java.util.List[ACL]) { checkNamespace() - client.createPersistent(path, data, acls) + clientWrap(_.createPersistent(path, data, acls)) } def createPersistent(path: String, createParents: Boolean, acls: java.util.List[ACL]) { checkNamespace() - client.createPersistent(path, createParents, acls) + clientWrap(_.createPersistent(path, createParents, acls)) } def createEphemeral(path: String, data: Object, acls: java.util.List[ACL]) { checkNamespace() - client.createEphemeral(path, data, acls) + clientWrap(_.createEphemeral(path, data, acls)) } def createPersistentSequential(path: String, data: Object, acls: java.util.List[ACL]): String = { checkNamespace() - client.createPersistentSequential(path, data, acls) + clientWrap(_.createPersistentSequential(path, data, acls)) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/test/scala/integration/kafka/api/MetricsTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/MetricsTest.scala b/core/src/test/scala/integration/kafka/api/MetricsTest.scala new file mode 100644 index 0000000..f71a36a --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/MetricsTest.scala @@ -0,0 +1,288 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package kafka.api + +import java.util.{Locale, Properties} + +import kafka.log.LogConfig +import kafka.network.RequestMetrics +import kafka.server.{KafkaConfig, KafkaServer} +import kafka.utils.{JaasTestUtils, TestUtils} + +import com.yammer.metrics.Metrics +import com.yammer.metrics.core.{Gauge, Histogram, Meter} +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} +import org.apache.kafka.common.{Metric, MetricName, TopicPartition} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.errors.InvalidTopicException +import org.apache.kafka.common.network.ListenerName +import org.apache.kafka.common.protocol.SecurityProtocol +import org.junit.{After, Before, Test} +import org.junit.Assert._ + +import scala.collection.JavaConverters._ + +class MetricsTest extends IntegrationTestHarness with SaslSetup { + + override val producerCount = 1 + override val consumerCount = 1 + override val serverCount = 1 + + override protected def listenerName = new ListenerName("CLIENT") + private val kafkaClientSaslMechanism = "PLAIN" + private val kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism) + private val kafkaServerJaasEntryName = + s"${listenerName.value.toLowerCase(Locale.ROOT)}.${JaasTestUtils.KafkaServerContextName}" + this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "false") + this.serverConfig.setProperty(KafkaConfig.AutoCreateTopicsEnableDoc, "false") + override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT + override protected val serverSaslProperties = + Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism)) + override protected val clientSaslProperties = + Some(kafkaClientSaslProperties(kafkaClientSaslMechanism)) + + @Before + override def setUp(): Unit = { + startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, kafkaServerJaasEntryName)) + super.setUp() + } + + @After + override def tearDown(): Unit = { + super.tearDown() + closeSasl() + } + + /** + * Verifies some of the metrics of producer, consumer as well as server. + */ + @Test + def testMetrics(): Unit = { + val topic = "topicWithOldMessageFormat" + val props = new Properties + props.setProperty(LogConfig.MessageFormatVersionProp, "0.9.0") + TestUtils.createTopic(this.zkUtils, topic, numPartitions = 1, replicationFactor = 1, this.servers, props) + val tp = new TopicPartition(topic, 0) + + // Clear static state + RequestMetrics.clearErrorMeters() + + // Produce and consume some records + val numRecords = 10 + val recordSize = 1000 + val producer = producers.head + sendRecords(producer, numRecords, recordSize, tp) + + val consumer = this.consumers.head + consumer.assign(List(tp).asJava) + consumer.seek(tp, 0) + TestUtils.consumeRecords(consumer, numRecords) + + verifyKafkaRateMetricsHaveCumulativeCount() + verifyClientVersionMetrics(consumer.metrics, "Consumer") + verifyClientVersionMetrics(this.producers.head.metrics, "Producer") + + val server = servers.head + verifyBrokerMessageConversionMetrics(server, recordSize) + verifyBrokerErrorMetrics(servers.head) + verifyBrokerZkMetrics(server, topic) + + generateAuthenticationFailure(tp) + verifyBrokerAuthenticationMetrics(server) + } + + private def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]], numRecords: Int, + recordSize: Int, tp: TopicPartition) = { + val bytes = new Array[Byte](recordSize) + (0 until numRecords).map { i => + producer.send(new ProducerRecord(tp.topic, tp.partition, i.toLong, s"key $i".getBytes, bytes)) + } + producer.flush() + } + + // Create a producer that fails authentication to verify authentication failure metrics + private def generateAuthenticationFailure(tp: TopicPartition): Unit = { + val producerProps = new Properties() + val saslProps = new Properties() + // Temporary limit to reduce blocking before KIP-152 client-side changes are merged + saslProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000") + saslProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000") + saslProps.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256") + // Use acks=0 to verify error metric when connection is closed without a response + saslProps.put(ProducerConfig.ACKS_CONFIG, "0") + val producer = TestUtils.createNewProducer(brokerList, securityProtocol = securityProtocol, + trustStoreFile = trustStoreFile, saslProperties = Some(saslProps), props = Some(producerProps)) + + try { + producer.send(new ProducerRecord(tp.topic, tp.partition, "key".getBytes, "value".getBytes)).get + } catch { + case _: Exception => // expected exception + } finally { + producer.close() + } + } + + private def verifyKafkaRateMetricsHaveCumulativeCount(): Unit = { + + def exists(name: String, rateMetricName: MetricName, allMetricNames: Set[MetricName]): Boolean = { + allMetricNames.contains(new MetricName(name, rateMetricName.group, "", rateMetricName.tags)) + } + + def verify(rateMetricName: MetricName, allMetricNames: Set[MetricName]): Unit = { + val name = rateMetricName.name + val totalExists = exists(name.replace("-rate", "-total"), rateMetricName, allMetricNames) + val totalTimeExists = exists(name.replace("-rate", "-time"), rateMetricName, allMetricNames) + assertTrue(s"No cumulative count/time metric for rate metric $rateMetricName", + totalExists || totalTimeExists) + } + + val consumer = this.consumers.head + val consumerMetricNames = consumer.metrics.keySet.asScala.toSet + consumerMetricNames.filter(_.name.endsWith("-rate")) + .foreach(verify(_, consumerMetricNames)) + + val producer = this.producers.head + val producerMetricNames = producer.metrics.keySet.asScala.toSet + val producerExclusions = Set("compression-rate") // compression-rate is an Average metric, not Rate + producerMetricNames.filter(_.name.endsWith("-rate")) + .filterNot(metricName => producerExclusions.contains(metricName.name)) + .foreach(verify(_, producerMetricNames)) + + // Check a couple of metrics of consumer and producer to ensure that values are set + verifyKafkaMetricRecorded("records-consumed-rate", consumer.metrics, "Consumer") + verifyKafkaMetricRecorded("records-consumed-total", consumer.metrics, "Consumer") + verifyKafkaMetricRecorded("record-send-rate", producer.metrics, "Producer") + verifyKafkaMetricRecorded("record-send-total", producer.metrics, "Producer") + } + + private def verifyClientVersionMetrics(metrics: java.util.Map[MetricName, _ <: Metric], entity: String): Unit = { + Seq("commit-id", "version").foreach { name => + verifyKafkaMetric(name, metrics, entity) { matchingMetrics => + assertEquals(1, matchingMetrics.size) + val metric = matchingMetrics.head + val value = metric.metricValue + assertNotNull(s"$entity metric not recorded $name", value) + assertNotNull(s"$entity metric $name should be a non-empty String", + value.isInstanceOf[String] && !value.asInstanceOf[String].isEmpty) + assertTrue("Client-id not specified", metric.metricName.tags.containsKey("client-id")) + } + } + } + + private def verifyBrokerAuthenticationMetrics(server: KafkaServer): Unit = { + val metrics = server.metrics.metrics + TestUtils.waitUntilTrue(() => + maxKafkaMetricValue("failed-authentication-total", metrics, "Broker", Some("socket-server-metrics")) > 0, + "failed-authentication-total not updated") + verifyKafkaMetricRecorded("successful-authentication-rate", metrics, "Broker", Some("socket-server-metrics")) + verifyKafkaMetricRecorded("successful-authentication-total", metrics, "Broker", Some("socket-server-metrics")) + verifyKafkaMetricRecorded("failed-authentication-rate", metrics, "Broker", Some("socket-server-metrics")) + verifyKafkaMetricRecorded("failed-authentication-total", metrics, "Broker", Some("socket-server-metrics")) + } + + private def verifyBrokerMessageConversionMetrics(server: KafkaServer, recordSize: Int): Unit = { + val requestMetricsPrefix = "kafka.network:type=RequestMetrics" + val requestBytes = verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=RequestBytes,request=Produce") + val tempBytes = verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=TemporaryMemoryBytes,request=Produce") + assertTrue(s"Unexpected temporary memory size requestBytes $requestBytes tempBytes $tempBytes", + tempBytes >= recordSize) + + verifyYammerMetricRecorded(s"kafka.server:type=BrokerTopicMetrics,name=ProduceMessageConversionsPerSec") + verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=MessageConversionsTimeMs,request=Produce", value => value > 0.0) + + verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=RequestBytes,request=Fetch") + // Temporary size for fetch should be zero after KAFKA-5968 is fixed + verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=TemporaryMemoryBytes,request=Fetch", value => value >= 0.0) + + // request size recorded for all request types, check one + verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=RequestBytes,request=Metadata") + } + + private def verifyBrokerZkMetrics(server: KafkaServer, topic: String): Unit = { + // Latency is rounded to milliseconds, so we may need to retry some operations to get latency > 0. + val (_, recorded) = TestUtils.computeUntilTrue({ + servers.head.zkUtils.getLeaderAndIsrForPartition(topic, 0) + yammerMetricValue("kafka.server:type=ZooKeeperClientMetrics,name=ZooKeeperRequestLatencyMs").asInstanceOf[Double] + })(latency => latency > 0.0) + assertTrue("ZooKeeper latency not recorded", recorded) + + assertEquals(s"Unexpected ZK state ${server.zkUtils.zkConnection.getZookeeperState}", + "CONNECTED", yammerMetricValue("SessionState")) + } + + private def verifyBrokerErrorMetrics(server: KafkaServer): Unit = { + + def errorMetricCount = Metrics.defaultRegistry.allMetrics.keySet.asScala.filter(_.getName == "ErrorsPerSec").size + + val startErrorMetricCount = errorMetricCount + val errorMetricPrefix = "kafka.network:type=RequestMetrics,name=ErrorsPerSec" + verifyYammerMetricRecorded(s"$errorMetricPrefix,request=Metadata,error=NONE") + + try { + consumers.head.partitionsFor("12{}!") + } catch { + case _: InvalidTopicException => // expected + } + verifyYammerMetricRecorded(s"$errorMetricPrefix,request=Metadata,error=INVALID_TOPIC_EXCEPTION") + + // Check that error metrics are registered dynamically + val currentErrorMetricCount = errorMetricCount + assertEquals(startErrorMetricCount + 1, currentErrorMetricCount) + assertTrue(s"Too many error metrics $currentErrorMetricCount" , currentErrorMetricCount < 10) + + // Verify that error metric is updated with producer acks=0 when no response is sent + sendRecords(producers.head, 1, 100, new TopicPartition("non-existent", 0)) + verifyYammerMetricRecorded(s"$errorMetricPrefix,request=Metadata,error=LEADER_NOT_AVAILABLE") + } + + private def verifyKafkaMetric[T](name: String, metrics: java.util.Map[MetricName, _ <: Metric], entity: String, + group: Option[String] = None)(verify: Iterable[Metric] => T) : T = { + val matchingMetrics = metrics.asScala.filter { + case (metricName, _) => metricName.name == name && group.forall(_ == metricName.group) + } + assertTrue(s"Metric not found $name", matchingMetrics.size > 0) + verify(matchingMetrics.values) + } + + private def maxKafkaMetricValue(name: String, metrics: java.util.Map[MetricName, _ <: Metric], entity: String, + group: Option[String]): Double = { + // Use max value of all matching metrics since Selector metrics are recorded for each Processor + verifyKafkaMetric(name, metrics, entity, group) { matchingMetrics => + matchingMetrics.foldLeft(0.0)((max, metric) => Math.max(max, metric.value)) + } + } + + private def verifyKafkaMetricRecorded(name: String, metrics: java.util.Map[MetricName, _ <: Metric], entity: String, + group: Option[String] = None): Unit = { + val value = maxKafkaMetricValue(name, metrics, entity, group) + assertTrue(s"$entity metric not recorded correctly for $name value $value", value > 0.0) + } + + private def yammerMetricValue(name: String): Any = { + val allMetrics = Metrics.defaultRegistry.allMetrics.asScala + val (_, metric) = allMetrics.find { case (n, _) => n.getMBeanName.endsWith(name) } + .getOrElse(fail(s"Unable to find broker metric $name: allMetrics: ${allMetrics.keySet.map(_.getMBeanName)}")) + metric match { + case m: Meter => m.count.toDouble + case m: Histogram => m.max + case m: Gauge[_] => m.value + case m => fail(s"Unexpected broker metric of class ${m.getClass}") + } + } + + private def verifyYammerMetricRecorded(name: String, verify: Double => Boolean = d => d > 0): Double = { + val metricValue = yammerMetricValue(name).asInstanceOf[Double] + assertTrue(s"Broker metric not recorded correctly for $name value $metricValue", verify(metricValue)) + metricValue + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala index 604bbf3..c1b26f1 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala @@ -1508,55 +1508,6 @@ class PlaintextConsumerTest extends BaseConsumerTest { servers.foreach(assertNoExemptRequestMetric(_)) } - // Rate metrics of both Producer and Consumer are verified by this test - @Test - def testRateMetricsHaveCumulativeCount() { - val numRecords = 100 - sendRecords(numRecords) - - val consumer = this.consumers.head - consumer.assign(List(tp).asJava) - consumer.seek(tp, 0) - consumeAndVerifyRecords(consumer, numRecords = numRecords, startingOffset = 0) - - def exists(name: String, rateMetricName: MetricName, allMetricNames: Set[MetricName]): Boolean = { - allMetricNames.contains(new MetricName(name, rateMetricName.group, "", rateMetricName.tags)) - } - - def verify(rateMetricName: MetricName, allMetricNames: Set[MetricName]): Unit = { - val name = rateMetricName.name - val totalExists = exists(name.replace("-rate", "-total"), rateMetricName, allMetricNames) - val totalTimeExists = exists(name.replace("-rate", "-time"), rateMetricName, allMetricNames) - assertTrue(s"No cumulative count/time metric for rate metric $rateMetricName", - totalExists || totalTimeExists) - } - - val consumerMetricNames = consumer.metrics.keySet.asScala.toSet - consumerMetricNames.filter(_.name.endsWith("-rate")) - .foreach(verify(_, consumerMetricNames)) - - val producer = this.producers.head - val producerMetricNames = producer.metrics.keySet.asScala.toSet - val producerExclusions = Set("compression-rate") // compression-rate is an Average metric, not Rate - producerMetricNames.filter(_.name.endsWith("-rate")) - .filterNot(metricName => producerExclusions.contains(metricName.name)) - .foreach(verify(_, producerMetricNames)) - - def verifyMetric(name: String, metrics: java.util.Map[MetricName, _ <: Metric], entity: String): Unit = { - val entry = metrics.asScala.find { case (metricName, _) => metricName.name == name } - assertTrue(s"$entity metric not defined $name", entry.nonEmpty) - entry.foreach { case (metricName, metric) => - assertTrue(s"$entity metric not recorded $metricName", metric.value > 0.0) - } - } - - // Check a couple of metrics of consumer and producer to ensure that values are set - verifyMetric("records-consumed-rate", consumer.metrics, "Consumer") - verifyMetric("records-consumed-total", consumer.metrics, "Consumer") - verifyMetric("record-send-rate", producer.metrics, "Producer") - verifyMetric("record-send-total", producer.metrics, "Producer") - } - def runMultiConsumerSessionTimeoutTest(closeConsumer: Boolean): Unit = { // use consumers defined in this class plus one additional consumer // Use topic defined in this class + one additional topic http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala index 2ffd828..95abb33 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -1351,7 +1351,8 @@ class GroupCoordinatorTest extends JUnitSuite { isFromClient = EasyMock.eq(false), EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]], EasyMock.capture(capturedArgument), - EasyMock.anyObject().asInstanceOf[Option[Object]])).andAnswer(new IAnswer[Unit] { + EasyMock.anyObject().asInstanceOf[Option[Object]], + EasyMock.anyObject())).andAnswer(new IAnswer[Unit] { override def answer = capturedArgument.getValue.apply( Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) -> new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L) @@ -1434,7 +1435,8 @@ class GroupCoordinatorTest extends JUnitSuite { isFromClient = EasyMock.eq(false), EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]], EasyMock.capture(capturedArgument), - EasyMock.anyObject().asInstanceOf[Option[Object]]) + EasyMock.anyObject().asInstanceOf[Option[Object]], + EasyMock.anyObject()) ).andAnswer(new IAnswer[Unit] { override def answer = capturedArgument.getValue.apply( Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) -> @@ -1463,7 +1465,8 @@ class GroupCoordinatorTest extends JUnitSuite { isFromClient = EasyMock.eq(false), EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]], EasyMock.capture(capturedArgument), - EasyMock.anyObject().asInstanceOf[Option[Object]]) + EasyMock.anyObject().asInstanceOf[Option[Object]], + EasyMock.anyObject()) ).andAnswer(new IAnswer[Unit] { override def answer = capturedArgument.getValue.apply( Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(groupId)) -> http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala index a2f5f92..0def9ce 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala @@ -1306,7 +1306,8 @@ class GroupMetadataManagerTest { isFromClient = EasyMock.eq(false), EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]], EasyMock.capture(capturedArgument), - EasyMock.anyObject().asInstanceOf[Option[Object]]) + EasyMock.anyObject().asInstanceOf[Option[Object]], + EasyMock.anyObject()) ) EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) capturedArgument @@ -1320,7 +1321,8 @@ class GroupMetadataManagerTest { isFromClient = EasyMock.eq(false), EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]], EasyMock.capture(capturedArgument), - EasyMock.anyObject().asInstanceOf[Option[Object]]) + EasyMock.anyObject().asInstanceOf[Option[Object]], + EasyMock.anyObject()) ).andAnswer(new IAnswer[Unit] { override def answer = capturedArgument.getValue.apply( Map(groupTopicPartition -> http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala index 49c8e6a..ed1636c 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala @@ -498,7 +498,8 @@ class TransactionStateManagerTest { EasyMock.eq(false), EasyMock.eq(recordsByPartition), EasyMock.capture(capturedArgument), - EasyMock.eq(None) + EasyMock.eq(None), + EasyMock.anyObject() )).andAnswer(new IAnswer[Unit] { override def answer(): Unit = { capturedArgument.getValue.apply( @@ -598,6 +599,7 @@ class TransactionStateManagerTest { isFromClient = EasyMock.eq(false), EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]], EasyMock.capture(capturedArgument), + EasyMock.anyObject(), EasyMock.anyObject()) ).andAnswer(new IAnswer[Unit] { override def answer(): Unit = capturedArgument.getValue.apply( http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index 96f7bfc..b64371e 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -22,6 +22,7 @@ import kafka.common.LongRef import kafka.message.{CompressionCodec, DefaultCompressionCodec, GZIPCompressionCodec, NoCompressionCodec, SnappyCompressionCodec} import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedForMessageFormatException} import org.apache.kafka.common.record._ +import org.apache.kafka.common.utils.Time import org.apache.kafka.test.TestUtils import org.junit.Assert._ import org.junit.Test @@ -30,6 +31,8 @@ import scala.collection.JavaConverters._ class LogValidatorTest { + val time = Time.SYSTEM + @Test def testLogAppendTimeNonCompressedV1() { checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V1) @@ -41,6 +44,7 @@ class LogValidatorTest { val records = createRecords(magicValue = magic, timestamp = 1234L, codec = CompressionType.NONE) val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records, offsetCounter = new LongRef(0), + time= time, now = now, sourceCodec = NoCompressionCodec, targetCodec = NoCompressionCodec, @@ -56,6 +60,8 @@ class LogValidatorTest { assertEquals(s"Max timestamp should be $now", now, validatedResults.maxTimestamp) assertEquals(s"The offset of max timestamp should be 0", 0, validatedResults.shallowOffsetOfMaxTimestamp) assertFalse("Message size should not have been changed", validatedResults.messageSizeMaybeChanged) + + verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, 0, records, compressed = false) } def testLogAppendTimeNonCompressedV2() { @@ -74,6 +80,7 @@ class LogValidatorTest { val validatedResults = LogValidator.validateMessagesAndAssignOffsets( records, offsetCounter = new LongRef(0), + time = time, now = now, sourceCodec = DefaultCompressionCodec, targetCodec = DefaultCompressionCodec, @@ -92,6 +99,9 @@ class LogValidatorTest { assertEquals(s"The offset of max timestamp should be ${records.records.asScala.size - 1}", records.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp) assertTrue("Message size may have been changed", validatedResults.messageSizeMaybeChanged) + + val stats = validatedResults.recordsProcessingStats + verifyRecordsProcessingStats(stats, 3, records, compressed = true) } @Test @@ -111,6 +121,7 @@ class LogValidatorTest { val validatedResults = LogValidator.validateMessagesAndAssignOffsets( records, offsetCounter = new LongRef(0), + time = time, now = now, sourceCodec = DefaultCompressionCodec, targetCodec = DefaultCompressionCodec, @@ -130,6 +141,8 @@ class LogValidatorTest { assertEquals(s"The offset of max timestamp should be ${records.records.asScala.size - 1}", records.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp) assertFalse("Message size should not have been changed", validatedResults.messageSizeMaybeChanged) + + verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, 0, records, compressed = true) } @Test @@ -161,6 +174,7 @@ class LogValidatorTest { val validatingResults = LogValidator.validateMessagesAndAssignOffsets(records, offsetCounter = new LongRef(0), + time = time, now = System.currentTimeMillis(), sourceCodec = NoCompressionCodec, targetCodec = NoCompressionCodec, @@ -192,6 +206,8 @@ class LogValidatorTest { assertEquals(s"Max timestamp should be ${now + 1}", now + 1, validatingResults.maxTimestamp) assertEquals(s"Offset of max timestamp should be 1", 1, validatingResults.shallowOffsetOfMaxTimestamp) assertFalse("Message size should not have been changed", validatingResults.messageSizeMaybeChanged) + + verifyRecordsProcessingStats(validatingResults.recordsProcessingStats, 0, records, compressed = false) } @Test @@ -223,6 +239,7 @@ class LogValidatorTest { val validatingResults = LogValidator.validateMessagesAndAssignOffsets(records, offsetCounter = new LongRef(0), + time = time, now = System.currentTimeMillis(), sourceCodec = NoCompressionCodec, targetCodec = GZIPCompressionCodec, @@ -253,6 +270,8 @@ class LogValidatorTest { assertEquals(s"Max timestamp should be ${now + 1}", now + 1, validatingResults.maxTimestamp) assertEquals("Offset of max timestamp should be 2", 2, validatingResults.shallowOffsetOfMaxTimestamp) assertTrue("Message size should have been changed", validatingResults.messageSizeMaybeChanged) + + verifyRecordsProcessingStats(validatingResults.recordsProcessingStats, 3, records, compressed = true) } @Test @@ -269,6 +288,7 @@ class LogValidatorTest { val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V0, codec = CompressionType.GZIP) val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records, offsetCounter = new LongRef(0), + time = time, now = System.currentTimeMillis(), sourceCodec = DefaultCompressionCodec, targetCodec = DefaultCompressionCodec, @@ -293,6 +313,8 @@ class LogValidatorTest { assertEquals(s"Offset of max timestamp should be ${validatedRecords.records.asScala.size - 1}", validatedRecords.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp) assertTrue("Message size should have been changed", validatedResults.messageSizeMaybeChanged) + + verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, 3, records, compressed = true) } @Test @@ -306,6 +328,7 @@ class LogValidatorTest { val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V1, codec = CompressionType.GZIP, timestamp = timestamp) val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records, offsetCounter = new LongRef(0), + time = time, now = timestamp, sourceCodec = DefaultCompressionCodec, targetCodec = DefaultCompressionCodec, @@ -330,6 +353,8 @@ class LogValidatorTest { assertEquals(s"Offset of max timestamp should be ${validatedRecords.records.asScala.size - 1}", validatedRecords.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp) assertTrue("Message size should have been changed", validatedResults.messageSizeMaybeChanged) + + verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, 3, records, compressed = true) } @Test @@ -356,6 +381,7 @@ class LogValidatorTest { val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records, offsetCounter = new LongRef(0), + time = time, now = System.currentTimeMillis(), sourceCodec = DefaultCompressionCodec, targetCodec = DefaultCompressionCodec, @@ -387,6 +413,8 @@ class LogValidatorTest { assertEquals(s"Offset of max timestamp should be ${validatedRecords.records.asScala.size - 1}", validatedRecords.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp) assertFalse("Message size should not have been changed", validatedResults.messageSizeMaybeChanged) + + verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, 0, records, compressed = true) } @Test @@ -402,6 +430,7 @@ class LogValidatorTest { LogValidator.validateMessagesAndAssignOffsets( records, offsetCounter = new LongRef(0), + time = time, now = System.currentTimeMillis(), sourceCodec = NoCompressionCodec, targetCodec = NoCompressionCodec, @@ -421,6 +450,7 @@ class LogValidatorTest { LogValidator.validateMessagesAndAssignOffsets( records, offsetCounter = new LongRef(0), + time = time, now = System.currentTimeMillis(), sourceCodec = NoCompressionCodec, targetCodec = NoCompressionCodec, @@ -440,6 +470,7 @@ class LogValidatorTest { LogValidator.validateMessagesAndAssignOffsets( records, offsetCounter = new LongRef(0), + time = time, now = System.currentTimeMillis(), sourceCodec = DefaultCompressionCodec, targetCodec = DefaultCompressionCodec, @@ -459,6 +490,7 @@ class LogValidatorTest { LogValidator.validateMessagesAndAssignOffsets( records, offsetCounter = new LongRef(0), + time = time, now = System.currentTimeMillis(), sourceCodec = DefaultCompressionCodec, targetCodec = DefaultCompressionCodec, @@ -477,6 +509,7 @@ class LogValidatorTest { checkOffsets(records, 0) checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records, offsetCounter = new LongRef(offset), + time = time, now = System.currentTimeMillis(), sourceCodec = NoCompressionCodec, targetCodec = NoCompressionCodec, @@ -495,6 +528,7 @@ class LogValidatorTest { checkOffsets(records, 0) checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records, offsetCounter = new LongRef(offset), + time = time, now = System.currentTimeMillis(), sourceCodec = DefaultCompressionCodec, targetCodec = DefaultCompressionCodec, @@ -514,6 +548,7 @@ class LogValidatorTest { checkOffsets(records, 0) val messageWithOffset = LogValidator.validateMessagesAndAssignOffsets(records, offsetCounter = new LongRef(offset), + time = time, now = System.currentTimeMillis(), sourceCodec = NoCompressionCodec, targetCodec = NoCompressionCodec, @@ -534,6 +569,7 @@ class LogValidatorTest { checkOffsets(records, 0) val messageWithOffset = LogValidator.validateMessagesAndAssignOffsets(records, offsetCounter = new LongRef(offset), + time = time, now = System.currentTimeMillis(), sourceCodec = NoCompressionCodec, targetCodec = NoCompressionCodec, @@ -555,6 +591,7 @@ class LogValidatorTest { val compressedMessagesWithOffset = LogValidator.validateMessagesAndAssignOffsets( records, offsetCounter = new LongRef(offset), + time = time, now = System.currentTimeMillis(), sourceCodec = DefaultCompressionCodec, targetCodec = DefaultCompressionCodec, @@ -576,6 +613,7 @@ class LogValidatorTest { val compressedMessagesWithOffset = LogValidator.validateMessagesAndAssignOffsets( records, offsetCounter = new LongRef(offset), + time = time, now = System.currentTimeMillis(), sourceCodec = DefaultCompressionCodec, targetCodec = DefaultCompressionCodec, @@ -595,6 +633,7 @@ class LogValidatorTest { val offset = 1234567 checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records, offsetCounter = new LongRef(offset), + time = time, now = System.currentTimeMillis(), sourceCodec = NoCompressionCodec, targetCodec = NoCompressionCodec, @@ -613,6 +652,7 @@ class LogValidatorTest { val offset = 1234567 checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records, offsetCounter = new LongRef(offset), + time = time, now = System.currentTimeMillis(), sourceCodec = NoCompressionCodec, targetCodec = NoCompressionCodec, @@ -631,6 +671,7 @@ class LogValidatorTest { checkOffsets(records, 0) checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records, offsetCounter = new LongRef(offset), + time = time, now = System.currentTimeMillis(), sourceCodec = DefaultCompressionCodec, targetCodec = DefaultCompressionCodec, @@ -649,6 +690,7 @@ class LogValidatorTest { checkOffsets(records, 0) checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records, offsetCounter = new LongRef(offset), + time = time, now = System.currentTimeMillis(), sourceCodec = DefaultCompressionCodec, targetCodec = DefaultCompressionCodec, @@ -667,6 +709,7 @@ class LogValidatorTest { val records = MemoryRecords.withEndTransactionMarker(23423L, 5, endTxnMarker) LogValidator.validateMessagesAndAssignOffsets(records, offsetCounter = new LongRef(offset), + time = time, now = System.currentTimeMillis(), sourceCodec = NoCompressionCodec, targetCodec = NoCompressionCodec, @@ -685,6 +728,7 @@ class LogValidatorTest { val records = MemoryRecords.withEndTransactionMarker(23423L, 5, endTxnMarker) val result = LogValidator.validateMessagesAndAssignOffsets(records, offsetCounter = new LongRef(offset), + time = time, now = System.currentTimeMillis(), sourceCodec = NoCompressionCodec, targetCodec = SnappyCompressionCodec, @@ -708,6 +752,7 @@ class LogValidatorTest { checkOffsets(records, 0) checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records, offsetCounter = new LongRef(offset), + time = time, now = System.currentTimeMillis(), sourceCodec = NoCompressionCodec, targetCodec = NoCompressionCodec, @@ -727,6 +772,7 @@ class LogValidatorTest { checkOffsets(records, 0) checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records, offsetCounter = new LongRef(offset), + time = time, now = System.currentTimeMillis(), sourceCodec = DefaultCompressionCodec, targetCodec = DefaultCompressionCodec, @@ -745,6 +791,7 @@ class LogValidatorTest { val offset = 1234567 checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records, offsetCounter = new LongRef(offset), + time = time, now = System.currentTimeMillis(), sourceCodec = NoCompressionCodec, targetCodec = NoCompressionCodec, @@ -763,6 +810,7 @@ class LogValidatorTest { checkOffsets(records, 0) checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records, offsetCounter = new LongRef(offset), + time = time, now = System.currentTimeMillis(), sourceCodec = DefaultCompressionCodec, targetCodec = DefaultCompressionCodec, @@ -782,6 +830,7 @@ class LogValidatorTest { checkOffsets(records, 0) checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records, offsetCounter = new LongRef(offset), + time = time, now = System.currentTimeMillis(), sourceCodec = NoCompressionCodec, targetCodec = NoCompressionCodec, @@ -801,6 +850,7 @@ class LogValidatorTest { checkOffsets(records, 0) checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records, offsetCounter = new LongRef(offset), + time = time, now = System.currentTimeMillis(), sourceCodec = DefaultCompressionCodec, targetCodec = DefaultCompressionCodec, @@ -822,6 +872,7 @@ class LogValidatorTest { new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes), new SimpleRecord("beautiful".getBytes)) checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records, offsetCounter = new LongRef(offset), + time = time, now = System.currentTimeMillis(), sourceCodec = DefaultCompressionCodec, targetCodec = DefaultCompressionCodec, @@ -843,6 +894,7 @@ class LogValidatorTest { new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes), new SimpleRecord("beautiful".getBytes)) checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records, offsetCounter = new LongRef(offset), + time = time, now = System.currentTimeMillis(), sourceCodec = DefaultCompressionCodec, targetCodec = DefaultCompressionCodec, @@ -862,6 +914,7 @@ class LogValidatorTest { checkOffsets(records, 0) checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records, offsetCounter = new LongRef(offset), + time = time, now = System.currentTimeMillis(), sourceCodec = NoCompressionCodec, targetCodec = NoCompressionCodec, @@ -881,6 +934,7 @@ class LogValidatorTest { checkOffsets(records, 0) checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records, offsetCounter = new LongRef(offset), + time = time, now = System.currentTimeMillis(), sourceCodec = DefaultCompressionCodec, targetCodec = DefaultCompressionCodec, @@ -898,6 +952,7 @@ class LogValidatorTest { val records = recordsWithInvalidInnerMagic(offset) LogValidator.validateMessagesAndAssignOffsets(records, offsetCounter = new LongRef(offset), + time = time, now = System.currentTimeMillis(), sourceCodec = SnappyCompressionCodec, targetCodec = SnappyCompressionCodec, @@ -936,6 +991,7 @@ class LogValidatorTest { val records = MemoryRecords.readableRecords(buffer) LogValidator.validateMessagesAndAssignOffsets(records, offsetCounter = new LongRef(offset), + time = time, now = System.currentTimeMillis(), sourceCodec = sourceCodec, targetCodec = targetCodec, @@ -1010,4 +1066,19 @@ class LogValidatorTest { } } + def verifyRecordsProcessingStats(stats: RecordsProcessingStats, convertedCount: Int, + records: MemoryRecords, compressed: Boolean): Unit = { + assertNotNull("Records processing info is null", stats) + assertEquals(convertedCount, stats.conversionCount) + if (stats.conversionCount > 0) + assertTrue(s"Conversion time not recorded $stats", stats.conversionTimeNanos > 0) + val originalSize = records.sizeInBytes + val tempBytes = stats.temporaryMemoryBytes + if (convertedCount > 0) + assertTrue(s"Temp bytes too small, orig=$originalSize actual=$tempBytes", tempBytes > originalSize) + else if (compressed) + assertTrue("Temp bytes not updated", tempBytes > 0) + else + assertEquals(0, tempBytes) + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/test/scala/unit/kafka/network/SocketServerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 8b611f2..077950d 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -519,7 +519,7 @@ class SocketServerTest extends JUnitSuite { val channel = overrideServer.requestChannel val request = receiveRequest(channel) - val requestMetrics = RequestMetrics.metricsMap(request.header.apiKey.name) + val requestMetrics = RequestMetrics(request.header.apiKey.name) def totalTimeHistCount(): Long = requestMetrics.totalTimeHist.count val expectedTotalTimeCount = totalTimeHistCount() + 1 @@ -561,7 +561,7 @@ class SocketServerTest extends JUnitSuite { TestUtils.waitUntilTrue(() => overrideServer.processor(request.processor).channel(request.context.connectionId).isEmpty, s"Idle connection `${request.context.connectionId}` was not closed by selector") - val requestMetrics = RequestMetrics.metricsMap(request.header.apiKey.name) + val requestMetrics = RequestMetrics(request.header.apiKey.name) def totalTimeHistCount(): Long = requestMetrics.totalTimeHist.count val expectedTotalTimeCount = totalTimeHistCount() + 1 http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index deea586..fde2ae1 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -179,6 +179,7 @@ class KafkaApisTest { EasyMock.eq(false), EasyMock.anyObject(), EasyMock.capture(responseCallback), + EasyMock.anyObject(), EasyMock.anyObject())).andAnswer(new IAnswer[Unit] { override def answer(): Unit = { responseCallback.getValue.apply(Map(tp2 -> new PartitionResponse(Errors.NONE))) @@ -217,6 +218,7 @@ class KafkaApisTest { EasyMock.eq(false), EasyMock.anyObject(), EasyMock.capture(responseCallback), + EasyMock.anyObject(), EasyMock.anyObject())).andAnswer(new IAnswer[Unit] { override def answer(): Unit = { responseCallback.getValue.apply(Map(tp2 -> new PartitionResponse(Errors.NONE))) @@ -247,6 +249,7 @@ class KafkaApisTest { EasyMock.eq(false), EasyMock.anyObject(), EasyMock.anyObject(), + EasyMock.anyObject(), EasyMock.anyObject())) EasyMock.replay(replicaManager) http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/docs/upgrade.html ---------------------------------------------------------------------- diff --git a/docs/upgrade.html b/docs/upgrade.html index 344ef2e..1806cb0 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -68,6 +68,12 @@ <li>The Java clients and tools now accept any string as a client-id.</li> <li>The deprecated tool <code>kafka-consumer-offset-checker.sh</code> has been removed. Use <code>kafka-consumer-groups.sh</code> to get consumer group details.</li> <li>SimpleAclAuthorizer now logs access denials to the authorizer log by default.</li> + <li>The <code>app-info</code> mbean registered with JMX to provide version and commit id will be deprecated and replaced with + metrics providing these attributes.</li> + <li>Kafka metrics may now contain non-numeric values. <code>org.apache.kafka.common.Metric#value()</code> has been deprecated and + may throw an <code>IllegalStateException</code> when iterating over metrics of <code>KafkaProducer/KafkaConsumer/KafkaAdminClient</code>. + <code>org.apache.kafka.common.Metric#metricValue()</code> can be used to safely iterate over any metric value.</code> + </ul> <h5><a id="upgrade_100_new_protocols" href="#upgrade_100_new_protocols">New Protocol Versions</a></h5> http://git-wip-us.apache.org/repos/asf/kafka/blob/021d8a8e/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java b/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java index e02bbb0..0e5d130 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java +++ b/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java @@ -32,7 +32,7 @@ public class ToolsUtils { public static void printMetrics(Map<MetricName, ? extends Metric> metrics) { if (metrics != null && !metrics.isEmpty()) { int maxLengthOfDisplayName = 0; - TreeMap<String, Double> sortedMetrics = new TreeMap<>(new Comparator<String>() { + TreeMap<String, Object> sortedMetrics = new TreeMap<>(new Comparator<String>() { @Override public int compare(String o1, String o2) { return o1.compareTo(o2); @@ -42,12 +42,18 @@ public class ToolsUtils { MetricName mName = metric.metricName(); String mergedName = mName.group() + ":" + mName.name() + ":" + mName.tags(); maxLengthOfDisplayName = maxLengthOfDisplayName < mergedName.length() ? mergedName.length() : maxLengthOfDisplayName; - sortedMetrics.put(mergedName, metric.value()); + sortedMetrics.put(mergedName, metric.metricValue()); } - String outputFormat = "%-" + maxLengthOfDisplayName + "s : %.3f"; + String doubleOutputFormat = "%-" + maxLengthOfDisplayName + "s : %.3f"; + String defaultOutputFormat = "%-" + maxLengthOfDisplayName + "s : %s"; System.out.println(String.format("\n%-" + maxLengthOfDisplayName + "s %s", "Metric Name", "Value")); - for (Map.Entry<String, Double> entry : sortedMetrics.entrySet()) { + for (Map.Entry<String, Object> entry : sortedMetrics.entrySet()) { + String outputFormat; + if (entry.getValue() instanceof Double) + outputFormat = doubleOutputFormat; + else + outputFormat = defaultOutputFormat; System.out.println(String.format(outputFormat, entry.getKey(), entry.getValue())); } }