This is an automated email from the ASF dual-hosted git repository. gwenshap pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 0b1dc1c KAFKA-6263; Expose metrics for group and transaction metadata loading duration 0b1dc1c is described below commit 0b1dc1ca7be3566a4367d7d3aae2c691cf94f48e Author: anatasiavela <anastasiav...@berkeley.edu> AuthorDate: Sat Aug 3 21:00:46 2019 -0700 KAFKA-6263; Expose metrics for group and transaction metadata loading duration [JIRA](https://issues.apache.org/jira/browse/KAFKA-6263) - Add metrics to provide visibility for how long group metadata and transaction metadata take to load in order to understand some inactivity seen in the consumer groups - Tests include mocking load times by creating a delay after each are loaded and ensuring the measured JMX metric is as it should be Author: anatasiavela <anastasiav...@berkeley.edu> Reviewers: Gwen Shapira, Jason Gustafson Closes #7045 from anatasiavela/KAFKA-6263 --- .../kafka/coordinator/group/GroupCoordinator.scala | 16 +++++--- .../coordinator/group/GroupMetadataManager.scala | 20 ++++++++- .../transaction/TransactionCoordinator.scala | 2 +- .../transaction/TransactionStateManager.scala | 21 ++++++++-- core/src/main/scala/kafka/server/KafkaServer.scala | 2 +- .../group/GroupCoordinatorConcurrencyTest.scala | 3 +- .../coordinator/group/GroupCoordinatorTest.scala | 3 +- .../group/GroupMetadataManagerTest.scala | 47 +++++++++++++++++++++- .../TransactionCoordinatorConcurrencyTest.scala | 3 +- .../transaction/TransactionStateManagerTest.scala | 39 +++++++++++++++++- docs/ops.html | 20 +++++++++ 11 files changed, 157 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala index 6a57d59..7874946 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala @@ -28,6 +28,7 @@ import kafka.zk.KafkaZkClient import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember +import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.RecordBatch.{NO_PRODUCER_EPOCH, NO_PRODUCER_ID} @@ -55,7 +56,8 @@ class GroupCoordinator(val brokerId: Int, val groupManager: GroupMetadataManager, val heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat], val joinPurgatory: DelayedOperationPurgatory[DelayedJoin], - time: Time) extends Logging { + time: Time, + metrics: Metrics) extends Logging { import GroupCoordinator._ type JoinCallback = JoinGroupResult => Unit @@ -1084,10 +1086,11 @@ object GroupCoordinator { def apply(config: KafkaConfig, zkClient: KafkaZkClient, replicaManager: ReplicaManager, - time: Time): GroupCoordinator = { + time: Time, + metrics: Metrics): GroupCoordinator = { val heartbeatPurgatory = DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", config.brokerId) val joinPurgatory = DelayedOperationPurgatory[DelayedJoin]("Rebalance", config.brokerId) - apply(config, zkClient, replicaManager, heartbeatPurgatory, joinPurgatory, time) + apply(config, zkClient, replicaManager, heartbeatPurgatory, joinPurgatory, time, metrics) } private[group] def offsetConfig(config: KafkaConfig) = OffsetConfig( @@ -1108,7 +1111,8 @@ object GroupCoordinator { replicaManager: ReplicaManager, heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat], joinPurgatory: DelayedOperationPurgatory[DelayedJoin], - time: Time): GroupCoordinator = { + time: Time, + metrics: Metrics): GroupCoordinator = { val offsetConfig = this.offsetConfig(config) val groupConfig = GroupConfig(groupMinSessionTimeoutMs = config.groupMinSessionTimeoutMs, groupMaxSessionTimeoutMs = config.groupMaxSessionTimeoutMs, @@ -1116,8 +1120,8 @@ object GroupCoordinator { groupInitialRebalanceDelayMs = config.groupInitialRebalanceDelay) val groupMetadataManager = new GroupMetadataManager(config.brokerId, config.interBrokerProtocolVersion, - offsetConfig, replicaManager, zkClient, time) - new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, groupMetadataManager, heartbeatPurgatory, joinPurgatory, time) + offsetConfig, replicaManager, zkClient, time, metrics) + new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, groupMetadataManager, heartbeatPurgatory, joinPurgatory, time, metrics) } def joinError(memberId: String, error: Errors): JoinGroupResult = { diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index 03c3e37..7d8499f 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -36,6 +36,8 @@ import kafka.zk.KafkaZkClient import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.internals.Topic +import org.apache.kafka.common.metrics.stats.{Avg, Max} +import org.apache.kafka.common.metrics.{MetricConfig, Metrics} import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.types.Type._ import org.apache.kafka.common.protocol.types._ @@ -53,7 +55,8 @@ class GroupMetadataManager(brokerId: Int, config: OffsetConfig, replicaManager: ReplicaManager, zkClient: KafkaZkClient, - time: Time) extends Logging with KafkaMetricsGroup { + time: Time, + metrics: Metrics) extends Logging with KafkaMetricsGroup { private val compressionType: CompressionType = CompressionType.forId(config.offsetsTopicCompressionCodec.codec) @@ -82,6 +85,16 @@ class GroupMetadataManager(brokerId: Int, * We use this structure to quickly find the groups which need to be updated by the commit/abort marker. */ private val openGroupsForProducer = mutable.HashMap[Long, mutable.Set[String]]() + /* setup metrics*/ + val partitionLoadSensor = metrics.sensor("PartitionLoadTime") + + partitionLoadSensor.add(metrics.metricName("partition-load-time-max", + "group-coordinator-metrics", + "The max time it took to load the partitions in the last 30sec"), new Max()) + partitionLoadSensor.add(metrics.metricName("partition-load-time-avg", + "group-coordinator-metrics", + "The avg time it took to load the partitions in the last 30sec"), new Avg()) + this.logIdent = s"[GroupMetadataManager brokerId=$brokerId] " private def recreateGauge[T](name: String, gauge: Gauge[T]): Gauge[T] = { @@ -498,7 +511,10 @@ class GroupMetadataManager(brokerId: Int, try { val startMs = time.milliseconds() doLoadGroupsAndOffsets(topicPartition, onGroupLoaded) - info(s"Finished loading offsets and group metadata from $topicPartition in ${time.milliseconds() - startMs} milliseconds.") + val endMs = time.milliseconds() + val timeLapse = endMs - startMs + partitionLoadSensor.record(timeLapse, endMs, false) + info(s"Finished loading offsets and group metadata from $topicPartition in $timeLapse milliseconds.") } catch { case t: Throwable => error(s"Error loading offsets from $topicPartition", t) } finally { diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala index 9d4eed6..6d99889 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala @@ -55,7 +55,7 @@ object TransactionCoordinator { // we do not need to turn on reaper thread since no tasks will be expired and there are no completed tasks to be purged val txnMarkerPurgatory = DelayedOperationPurgatory[DelayedTxnMarker]("txn-marker-purgatory", config.brokerId, reaperEnabled = false, timerEnabled = false) - val txnStateManager = new TransactionStateManager(config.brokerId, zkClient, scheduler, replicaManager, txnConfig, time) + val txnStateManager = new TransactionStateManager(config.brokerId, zkClient, scheduler, replicaManager, txnConfig, time, metrics) val logContext = new LogContext(s"[TransactionCoordinator id=${config.brokerId}] ") val txnMarkerChannelManager = TransactionMarkerChannelManager(config, metrics, metadataCache, txnStateManager, diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala index 45ee4e9..38caed5 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala @@ -30,6 +30,8 @@ import kafka.utils.{Logging, Pool, Scheduler} import kafka.zk.KafkaZkClient import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.internals.Topic +import org.apache.kafka.common.metrics.stats.{Avg, Max} +import org.apache.kafka.common.metrics.{MetricConfig, Metrics} import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.{FileRecords, MemoryRecords, SimpleRecord} import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse @@ -70,7 +72,8 @@ class TransactionStateManager(brokerId: Int, scheduler: Scheduler, replicaManager: ReplicaManager, config: TransactionConfig, - time: Time) extends Logging { + time: Time, + metrics: Metrics) extends Logging { this.logIdent = "[Transaction State Manager " + brokerId + "]: " @@ -94,6 +97,16 @@ class TransactionStateManager(brokerId: Int, /** number of partitions for the transaction log topic */ private val transactionTopicPartitionCount = getTransactionTopicPartitionCount + /** setup metrics*/ + private val partitionLoadSensor = metrics.sensor("PartitionLoadTime") + + partitionLoadSensor.add(metrics.metricName("partition-load-time-max", + "transaction-coordinator-metrics", + "The max time it took to load the partitions in the last 30sec"), new Max()) + partitionLoadSensor.add(metrics.metricName("partition-load-time-avg", + "transaction-coordinator-metrics", + "The avg time it took to load the partitions in the last 30sec"), new Avg()) + // visible for testing only private[transaction] def addLoadingPartition(partitionId: Int, coordinatorEpoch: Int): Unit = { val partitionAndLeaderEpoch = TransactionPartitionAndLeaderEpoch(partitionId, coordinatorEpoch) @@ -339,8 +352,10 @@ class TransactionStateManager(brokerId: Int, currOffset = batch.nextOffset } } - - info(s"Finished loading ${loadedTransactions.size} transaction metadata from $topicPartition in ${time.milliseconds() - startMs} milliseconds") + val endMs = time.milliseconds() + val timeLapse = endMs - startMs + partitionLoadSensor.record(timeLapse, endMs, false) + info(s"Finished loading ${loadedTransactions.size} transaction metadata from $topicPartition in $timeLapse milliseconds") } } catch { case t: Throwable => error(s"Error loading transactions from transaction log $topicPartition", t) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 07ffe9d..6c433b7 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -276,7 +276,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP /* start group coordinator */ // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue - groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, Time.SYSTEM) + groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, Time.SYSTEM, metrics) groupCoordinator.startup() /* start transaction coordinator, with a separate background thread scheduler for transaction expiration and log loading */ diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala index b85035f..ec31fc9 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala @@ -26,6 +26,7 @@ import kafka.coordinator.group.GroupCoordinatorConcurrencyTest._ import kafka.server.{DelayedOperationPurgatory, KafkaConfig} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.internals.Topic +import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.JoinGroupRequest @@ -84,7 +85,7 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest val heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", timer, config.brokerId, reaperEnabled = false) val joinPurgatory = new DelayedOperationPurgatory[DelayedJoin]("Rebalance", timer, config.brokerId, reaperEnabled = false) - groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, heartbeatPurgatory, joinPurgatory, timer.time) + groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, heartbeatPurgatory, joinPurgatory, timer.time, new Metrics()) groupCoordinator.startup(false) } diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala index cdf1518..5a35e33 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -35,6 +35,7 @@ import java.util.concurrent.locks.ReentrantLock import kafka.cluster.Partition import kafka.zk.KafkaZkClient import org.apache.kafka.common.internals.Topic +import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity import org.junit.Assert._ import org.junit.{After, Assert, Before, Test} @@ -111,7 +112,7 @@ class GroupCoordinatorTest { val heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", timer, config.brokerId, reaperEnabled = false) val joinPurgatory = new DelayedOperationPurgatory[DelayedJoin]("Rebalance", timer, config.brokerId, reaperEnabled = false) - groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, heartbeatPurgatory, joinPurgatory, timer.time) + groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, heartbeatPurgatory, joinPurgatory, timer.time, new Metrics()) groupCoordinator.startup(enableMetadataExpiration = false) // add the partition into the owned partition list diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala index faca447..dbcf5ed 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala @@ -19,11 +19,12 @@ package kafka.coordinator.group import com.yammer.metrics.Metrics import com.yammer.metrics.core.Gauge +import java.lang.management.ManagementFactory import java.nio.ByteBuffer import java.util.Collections import java.util.Optional import java.util.concurrent.locks.ReentrantLock - +import javax.management.ObjectName import kafka.api._ import kafka.cluster.Partition import kafka.common.OffsetAndMetadata @@ -35,6 +36,7 @@ import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription import org.apache.kafka.clients.consumer.internals.ConsumerProtocol import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.internals.Topic +import org.apache.kafka.common.metrics.{JmxReporter, Metrics => kMetrics} import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.OffsetFetchResponse @@ -56,6 +58,7 @@ class GroupMetadataManagerTest { var zkClient: KafkaZkClient = null var partition: Partition = null var defaultOffsetRetentionMs = Long.MaxValue + var metrics: kMetrics = null val groupId = "foo" val groupInstanceId = Some("bar") @@ -87,9 +90,10 @@ class GroupMetadataManagerTest { EasyMock.expect(zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME)).andReturn(Some(2)) EasyMock.replay(zkClient) + metrics = new kMetrics() time = new MockTime replicaManager = EasyMock.createNiceMock(classOf[ReplicaManager]) - groupMetadataManager = new GroupMetadataManager(0, ApiVersion.latestVersion, offsetConfig, replicaManager, zkClient, time) + groupMetadataManager = new GroupMetadataManager(0, ApiVersion.latestVersion, offsetConfig, replicaManager, zkClient, time, metrics) partition = EasyMock.niceMock(classOf[Partition]) } @@ -2051,4 +2055,43 @@ class GroupMetadataManagerTest { group.transitionTo(CompletingRebalance) expectMetrics(groupMetadataManager, 1, 0, 1) } + + @Test + def testPartitionLoadMetric(): Unit = { + val server = ManagementFactory.getPlatformMBeanServer + val mBeanName = "kafka.server:type=group-coordinator-metrics" + val reporter = new JmxReporter("kafka.server") + metrics.addReporter(reporter) + + def partitionLoadTime(attribute: String): Double = { + server.getAttribute(new ObjectName(mBeanName), attribute).asInstanceOf[Double] + } + + assertTrue(server.isRegistered(new ObjectName(mBeanName))) + assertEquals(Double.NaN, partitionLoadTime( "partition-load-time-max"), 0) + assertEquals(Double.NaN, partitionLoadTime("partition-load-time-avg"), 0) + assertTrue(reporter.containsMbean(mBeanName)) + + val groupMetadataTopicPartition = groupTopicPartition + val startOffset = 15L + val memberId = "98098230493" + val committedOffsets = Map( + new TopicPartition("foo", 0) -> 23L, + new TopicPartition("foo", 1) -> 455L, + new TopicPartition("bar", 0) -> 8992L + ) + + val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets) + val groupMetadataRecord = buildStableGroupRecordWithMember(generation = 15, + protocolType = "consumer", protocol = "range", memberId) + val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, + (offsetCommitRecords ++ Seq(groupMetadataRecord)).toArray: _*) + + expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records) + EasyMock.replay(replicaManager) + groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ()) + + assertTrue(partitionLoadTime("partition-load-time-max") >= 0.0) + assertTrue(partitionLoadTime( "partition-load-time-avg") >= 0.0) + } } diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala index bc6ed93..238ef4b 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala @@ -28,6 +28,7 @@ import kafka.utils.{Pool, TestUtils} import org.apache.kafka.clients.{ClientResponse, NetworkClient} import org.apache.kafka.common.{Node, TopicPartition} import org.apache.kafka.common.internals.Topic.TRANSACTION_STATE_TOPIC_NAME +import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.{CompressionType, FileRecords, MemoryRecords, SimpleRecord} import org.apache.kafka.common.requests._ @@ -68,7 +69,7 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren .anyTimes() EasyMock.replay(zkClient) - txnStateManager = new TransactionStateManager(0, zkClient, scheduler, replicaManager, txnConfig, time) + txnStateManager = new TransactionStateManager(0, zkClient, scheduler, replicaManager, txnConfig, time, new Metrics()) for (i <- 0 until numPartitions) txnStateManager.addLoadedTransactionsToCache(i, coordinatorEpoch, new Pool[String, TransactionMetadata]()) diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala index 14745e7..4e778dd 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala @@ -16,8 +16,10 @@ */ package kafka.coordinator.transaction +import java.lang.management.ManagementFactory import java.nio.ByteBuffer import java.util.concurrent.locks.ReentrantLock +import javax.management.ObjectName import kafka.log.Log import kafka.server.{FetchDataInfo, FetchLogEnd, LogOffsetMetadata, ReplicaManager} @@ -26,6 +28,7 @@ import org.scalatest.Assertions.fail import kafka.zk.KafkaZkClient import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.internals.Topic.TRANSACTION_STATE_TOPIC_NAME +import org.apache.kafka.common.metrics.{JmxReporter, Metrics} import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse @@ -59,9 +62,10 @@ class TransactionStateManagerTest { .anyTimes() EasyMock.replay(zkClient) + val metrics = new Metrics() val txnConfig = TransactionConfig() - val transactionManager: TransactionStateManager = new TransactionStateManager(0, zkClient, scheduler, replicaManager, txnConfig, time) + val transactionManager: TransactionStateManager = new TransactionStateManager(0, zkClient, scheduler, replicaManager, txnConfig, time, metrics) val transactionalId1: String = "one" val transactionalId2: String = "two" @@ -627,4 +631,37 @@ class TransactionStateManagerTest { EasyMock.replay(replicaManager) } + + @Test + def testPartitionLoadMetric(): Unit = { + val server = ManagementFactory.getPlatformMBeanServer + val mBeanName = "kafka.server:type=transaction-coordinator-metrics" + val reporter = new JmxReporter("kafka.server") + metrics.addReporter(reporter) + + def partitionLoadTime(attribute: String): Double = { + server.getAttribute(new ObjectName(mBeanName), attribute).asInstanceOf[Double] + } + + assertTrue(server.isRegistered(new ObjectName(mBeanName))) + assertEquals(Double.NaN, partitionLoadTime( "partition-load-time-max"), 0) + assertEquals(Double.NaN, partitionLoadTime("partition-load-time-avg"), 0) + assertTrue(reporter.containsMbean(mBeanName)) + + txnMetadata1.state = Ongoing + txnMetadata1.addPartitions(Set[TopicPartition](new TopicPartition("topic1", 1), + new TopicPartition("topic1", 1))) + + txnRecords += new SimpleRecord(txnMessageKeyBytes1, TransactionLog.valueToBytes(txnMetadata1.prepareNoTransit())) + + val startOffset = 15L + val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, txnRecords.toArray: _*) + + prepareTxnLog(topicPartition, startOffset, records) + transactionManager.loadTransactionsForTxnTopicPartition(partitionId, 0, (_, _, _, _, _) => ()) + scheduler.tick() + + assertTrue(partitionLoadTime("partition-load-time-max") >= 0) + assertTrue(partitionLoadTime( "partition-load-time-avg") >= 0) + } } diff --git a/docs/ops.html b/docs/ops.html index f94afc3..984b80d 100644 --- a/docs/ops.html +++ b/docs/ops.html @@ -1029,6 +1029,26 @@ <td>Connection status of broker's ZooKeeper session which may be one of Disconnected|SyncConnected|AuthFailed|ConnectedReadOnly|SaslAuthenticated|Expired.</td> </tr> + <tr> + <td>Max time to load group metadata</td> + <td>kafka.server:type=group-coordinator-metrics,name=partition-load-time-max</td> + <td>maximum time, in milliseconds, it took to load offsets and group metadata from the consumer offset partitions loaded in the last 30 seconds</td> + </tr> + <tr> + <td>Avg time to load group metadata</td> + <td>kafka.server:type=group-coordinator-metrics,name=partition-load-time-avg</td> + <td>average time, in milliseconds, it took to load offsets and group metadata from the consumer offset partitions loaded in the last 30 seconds</td> + </tr> + <tr> + <td>Max time to load transaction metadata</td> + <td>kafka.server:type=transaction-coordinator-metrics,name=partition-load-time-max</td> + <td>maximum time, in milliseconds, it took to load transaction metadata from the consumer offset partitions loaded in the last 30 seconds</td> + </tr> + <tr> + <td>Avg time to load transaction metadata</td> + <td>kafka.server:type=transaction-coordinator-metrics,name=partition-load-time-avg</td> + <td>average time, in milliseconds, it took to load transaction metadata from the consumer offset partitions loaded in the last 30 seconds</td> + </tr> </tbody></table> <h4><a id="selector_monitoring" href="#selector_monitoring">Common monitoring metrics for producer/consumer/connect/streams</a></h4>