Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-03 Thread via GitHub


dajac commented on code in PR #14417:
URL: https://github.com/apache/kafka/pull/14417#discussion_r1341657675


##
core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala:
##
@@ -283,7 +292,63 @@ class CoordinatorLoaderImplTest {
 minOneMessage = true
   )).thenReturn(readResult)
 
-  assertNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
+  assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
+}
+  }
+
+  @Test
+  def testLoadSummary(): Unit = {
+val tp = new TopicPartition("foo", 0)
+val replicaManager = mock(classOf[ReplicaManager])
+val serde = new StringKeyValueDeserializer
+val log = mock(classOf[UnifiedLog])
+val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
+val time = new MockTime()
+
+TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+  time,
+  replicaManager = replicaManager,
+  deserializer = serde,
+  loadBufferSize = 1000
+)) { loader =>
+  val startTimeMs = time.milliseconds()
+  when(replicaManager.getLog(tp)).thenReturn(Some(log))
+  when(log.logStartOffset).thenReturn(0L)
+  when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(5L))
+
+  val readResult1 = logReadResult(startOffset = 0, records = Seq(
+new SimpleRecord("k1".getBytes, "v1".getBytes),
+new SimpleRecord("k2".getBytes, "v2".getBytes)
+  ))
+
+  when(log.read(
+startOffset = 0L,
+maxLength = 1000,
+isolation = FetchIsolation.LOG_END,
+minOneMessage = true
+  )).thenAnswer((_: InvocationOnMock) => {
+time.sleep(1000)

Review Comment:
   Why do we need to sleep here?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorRuntimeMetrics.java:
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.metrics;
+
+import 
org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState;
+
+import java.util.function.Supplier;
+
+/**
+ * Used by the group and transaction coordinator runtimes, the metrics suite 
holds partition state gauges and sensors.
+ */
+public interface CoordinatorRuntimeMetrics extends AutoCloseable {
+/**
+ * Returns the metrics group.
+ */
+String metricsGroup();

Review Comment:
   We only use this on from the tests. Should we remove it from the interface?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorRuntimeMetrics.java:
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.metrics;
+
+import 
org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState;
+
+import java.util.function.Supplier;
+
+/**
+ * Used by the group and transaction coordinator runtimes, the metrics suite 
holds partition state gauges and sensors.
+ */
+public interface CoordinatorRuntimeMetrics extends AutoCloseable {
+/**
+ * Returns the metrics group.
+ */
+String metricsGroup();
+
+/**
+ * Called when the partition state changes.
+ * @param oldState The old state.
+ * @param newState The new state to transition to.
+ */
+void onPartitionStateChange(CoordinatorState oldState, CoordinatorState 
newState);
+
+/**
+ * Record the partition load metric.

Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-03 Thread via GitHub


jolshan commented on code in PR #14417:
URL: https://github.com/apache/kafka/pull/14417#discussion_r1344714254


##
core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala:
##
@@ -283,7 +292,63 @@ class CoordinatorLoaderImplTest {
 minOneMessage = true
   )).thenReturn(readResult)
 
-  assertNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
+  assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
+}
+  }
+
+  @Test
+  def testLoadSummary(): Unit = {
+val tp = new TopicPartition("foo", 0)
+val replicaManager = mock(classOf[ReplicaManager])
+val serde = new StringKeyValueDeserializer
+val log = mock(classOf[UnifiedLog])
+val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
+val time = new MockTime()
+
+TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+  time,
+  replicaManager = replicaManager,
+  deserializer = serde,
+  loadBufferSize = 1000
+)) { loader =>
+  val startTimeMs = time.milliseconds()
+  when(replicaManager.getLog(tp)).thenReturn(Some(log))
+  when(log.logStartOffset).thenReturn(0L)
+  when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(5L))
+
+  val readResult1 = logReadResult(startOffset = 0, records = Seq(
+new SimpleRecord("k1".getBytes, "v1".getBytes),
+new SimpleRecord("k2".getBytes, "v2".getBytes)
+  ))
+
+  when(log.read(
+startOffset = 0L,
+maxLength = 1000,
+isolation = FetchIsolation.LOG_END,
+minOneMessage = true
+  )).thenAnswer((_: InvocationOnMock) => {
+time.sleep(1000)

Review Comment:
   I think the idea was that we would have a different value for the end time 
for the check on line 349. I think this is ok.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-03 Thread via GitHub


jolshan commented on code in PR #14417:
URL: https://github.com/apache/kafka/pull/14417#discussion_r1344716332


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -148,7 +157,9 @@ public GroupCoordinatorService build() {
 CoordinatorEventProcessor processor = new 
MultiThreadedEventProcessor(
 logContext,
 "group-coordinator-event-processor-",
-config.numThreads
+config.numThreads,
+coordinatorRuntimeMetrics,

Review Comment:
   nit: there are some places in this file where we specify the time before the 
metrics. Should we try to keep ordering consistent?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-03 Thread via GitHub


jolshan commented on code in PR #14417:
URL: https://github.com/apache/kafka/pull/14417#discussion_r1344717277


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorRuntimeMetrics.java:
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.metrics;
+
+import 
org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState;
+
+import java.util.function.Supplier;
+
+/**
+ * Used by the group and transaction coordinator runtimes, the metrics suite 
holds partition state gauges and sensors.
+ */
+public interface CoordinatorRuntimeMetrics extends AutoCloseable {
+/**
+ * Returns the metrics group.
+ */
+String metricsGroup();
+
+/**
+ * Called when the partition state changes.
+ * @param oldState The old state.
+ * @param newState The new state to transition to.
+ */
+void onPartitionStateChange(CoordinatorState oldState, CoordinatorState 
newState);
+
+/**
+ * Record the partition load metric.
+ * @param startTimeMs The partition load start time.
+ * @param endTimeMs   The partition load end time.
+ */
+void recordPartitionLoadSensor(long startTimeMs, long endTimeMs);
+
+/**
+ * Get the counter for partitions in Loading state.
+ * Only used for testing.
+ */
+long numPartitionsLoading();
+
+/**
+ * Get the counter for partitions in Active state.
+ * Only used for testing.
+ */
+long numPartitionsActive();
+
+/**
+ * Get the counter for partitions in Failed state.
+ * Only used for testing.
+ */
+long numPartitionsFailed();

Review Comment:
   +1 can we use an implementation with these methods for testing?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-03 Thread via GitHub


jolshan commented on code in PR #14417:
URL: https://github.com/apache/kafka/pull/14417#discussion_r1344730913


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -156,15 +165,16 @@ public CoordinatorRuntime build() {
 loader,
 coordinatorShardBuilderSupplier,
 time,
-timer
+timer,
+runtimeMetrics
 );
 }
 }
 
 /**
  * The various state that a coordinator for a partition can be in.
  */
-enum CoordinatorState {
+public enum CoordinatorState {

Review Comment:
   I don't immediately see an issue with this, but was there a reason this was 
not public before?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-03 Thread via GitHub


jolshan commented on code in PR #14417:
URL: https://github.com/apache/kafka/pull/14417#discussion_r1344731581


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -1242,7 +1307,8 @@ public void scheduleLoadOperation(
 case FAILED:
 case INITIAL:
 context.transitionTo(CoordinatorState.LOADING);
-loader.load(tp, 
context.coordinator).whenComplete((state, exception) -> {
+loader.load(tp, 
context.coordinator).whenComplete((summary, exception) -> {
+

Review Comment:
   nit: new line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-04 Thread via GitHub


jeffkbkim commented on code in PR #14417:
URL: https://github.com/apache/kafka/pull/14417#discussion_r1346596121


##
core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala:
##
@@ -283,7 +292,63 @@ class CoordinatorLoaderImplTest {
 minOneMessage = true
   )).thenReturn(readResult)
 
-  assertNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
+  assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
+}
+  }
+
+  @Test
+  def testLoadSummary(): Unit = {
+val tp = new TopicPartition("foo", 0)
+val replicaManager = mock(classOf[ReplicaManager])
+val serde = new StringKeyValueDeserializer
+val log = mock(classOf[UnifiedLog])
+val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
+val time = new MockTime()
+
+TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+  time,
+  replicaManager = replicaManager,
+  deserializer = serde,
+  loadBufferSize = 1000
+)) { loader =>
+  val startTimeMs = time.milliseconds()
+  when(replicaManager.getLog(tp)).thenReturn(Some(log))
+  when(log.logStartOffset).thenReturn(0L)
+  when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(5L))
+
+  val readResult1 = logReadResult(startOffset = 0, records = Seq(
+new SimpleRecord("k1".getBytes, "v1".getBytes),
+new SimpleRecord("k2".getBytes, "v2".getBytes)
+  ))
+
+  when(log.read(
+startOffset = 0L,
+maxLength = 1000,
+isolation = FetchIsolation.LOG_END,
+minOneMessage = true
+  )).thenAnswer((_: InvocationOnMock) => {
+time.sleep(1000)

Review Comment:
   yes, that was my intention



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-04 Thread via GitHub


jeffkbkim commented on code in PR #14417:
URL: https://github.com/apache/kafka/pull/14417#discussion_r1346600021


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorRuntimeMetrics.java:
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.metrics;
+
+import 
org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState;
+
+import java.util.function.Supplier;
+
+/**
+ * Used by the group and transaction coordinator runtimes, the metrics suite 
holds partition state gauges and sensors.
+ */
+public interface CoordinatorRuntimeMetrics extends AutoCloseable {
+/**
+ * Returns the metrics group.
+ */
+String metricsGroup();
+
+/**
+ * Called when the partition state changes.
+ * @param oldState The old state.
+ * @param newState The new state to transition to.
+ */
+void onPartitionStateChange(CoordinatorState oldState, CoordinatorState 
newState);
+
+/**
+ * Record the partition load metric.
+ * @param startTimeMs The partition load start time.
+ * @param endTimeMs   The partition load end time.
+ */
+void recordPartitionLoadSensor(long startTimeMs, long endTimeMs);
+
+/**
+ * Get the counter for partitions in Loading state.
+ * Only used for testing.
+ */
+long numPartitionsLoading();
+
+/**
+ * Get the counter for partitions in Active state.
+ * Only used for testing.
+ */
+long numPartitionsActive();
+
+/**
+ * Get the counter for partitions in Failed state.
+ * Only used for testing.
+ */
+long numPartitionsFailed();
+
+/**
+ * Update the event queue time.
+ *
+ * @param durationMs The queue time.
+ */
+void updateEventQueueTime(long durationMs);
+
+/**
+ * Update the event queue processing time.
+ *
+ * @param durationMs The event processing time.
+ */
+void updateEventQueueProcessingTime(long durationMs);
+
+/**
+ * Record the failed event.
+ */
+void recordFailedEvent();
+
+/**
+ * Record the successful event.
+ */
+void recordSuccessfulEvent();

Review Comment:
   i'll remove them



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-04 Thread via GitHub


jeffkbkim commented on code in PR #14417:
URL: https://github.com/apache/kafka/pull/14417#discussion_r1346607275


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorRuntimeMetrics.java:
##
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.metrics;
+
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.MetricsRegistry;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import 
org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState;
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
+
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+public class GroupCoordinatorRuntimeMetrics implements 
CoordinatorRuntimeMetrics {
+/**
+ * The metrics group.
+ */
+public static final String METRICS_GROUP = "group-coordinator-metrics";
+
+/**
+ * Metric to count the number of partitions in Loading state.
+ */
+private static final MetricName NUM_PARTITIONS_LOADING = getMetricName(
+"CoordinatorRuntime", "NumPartitionsLoading");

Review Comment:
   i will use the existing kafka metrics metric group, 
"group-coordinator-metrics".
   
   The helper method appends "kafka.coordinator.group" which would have 
differentiated from "kafka.coordinator.transaction"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-04 Thread via GitHub


jeffkbkim commented on code in PR #14417:
URL: https://github.com/apache/kafka/pull/14417#discussion_r1346608656


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -156,15 +165,16 @@ public CoordinatorRuntime build() {
 loader,
 coordinatorShardBuilderSupplier,
 time,
-timer
+timer,
+runtimeMetrics
 );
 }
 }
 
 /**
  * The various state that a coordinator for a partition can be in.
  */
-enum CoordinatorState {
+public enum CoordinatorState {

Review Comment:
   it was only used within the coordinator runtime. now that we have some logic 
in the metrics package to handle state changes, we made it public.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-04 Thread via GitHub


jeffkbkim commented on code in PR #14417:
URL: https://github.com/apache/kafka/pull/14417#discussion_r1346610016


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorRuntimeMetrics.java:
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.metrics;
+
+import 
org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState;
+
+import java.util.function.Supplier;
+
+/**
+ * Used by the group and transaction coordinator runtimes, the metrics suite 
holds partition state gauges and sensors.
+ */
+public interface CoordinatorRuntimeMetrics extends AutoCloseable {
+/**
+ * Returns the metrics group.
+ */
+String metricsGroup();
+
+/**
+ * Called when the partition state changes.
+ * @param oldState The old state.
+ * @param newState The new state to transition to.
+ */
+void onPartitionStateChange(CoordinatorState oldState, CoordinatorState 
newState);
+
+/**
+ * Record the partition load metric.
+ * @param startTimeMs The partition load start time.
+ * @param endTimeMs   The partition load end time.
+ */
+void recordPartitionLoadSensor(long startTimeMs, long endTimeMs);
+
+/**
+ * Get the counter for partitions in Loading state.
+ * Only used for testing.
+ */
+long numPartitionsLoading();
+
+/**
+ * Get the counter for partitions in Active state.
+ * Only used for testing.
+ */
+long numPartitionsActive();
+
+/**
+ * Get the counter for partitions in Failed state.
+ * Only used for testing.
+ */
+long numPartitionsFailed();
+
+/**
+ * Update the event queue time.
+ *
+ * @param durationMs The queue time.
+ */
+void updateEventQueueTime(long durationMs);
+
+/**
+ * Update the event queue processing time.
+ *
+ * @param durationMs The event processing time.
+ */
+void updateEventQueueProcessingTime(long durationMs);
+
+/**
+ * Record the failed event.
+ */
+void recordFailedEvent();
+
+/**
+ * Record the successful event.
+ */
+void recordSuccessfulEvent();
+
+/**
+ * Record the thread idle ratio.
+ * @param ratio The idle ratio.
+ */
+void recordThreadIdleRatio(double ratio);

Review Comment:
   Each EventProcessorThread records this ratio in `handleEvents()`. This will 
take into account time spent on context switching which is what we want, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-05 Thread via GitHub


jolshan commented on code in PR #14417:
URL: https://github.com/apache/kafka/pull/14417#discussion_r1347989806


##
core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala:
##
@@ -143,10 +150,12 @@ class CoordinatorLoaderImpl[T](
 
   currentOffset = batch.nextOffset
 }
+numBytes = numBytes + memoryRecords.sizeInBytes()
   }
+  val endTimeMs = time.milliseconds()
 
   if (isRunning.get) {
-future.complete(null)
+future.complete(new LoadSummary(startTimeMs, endTimeMs, 
numRecords, numBytes))

Review Comment:
   should we include information about the LoadSummary in the PR description as 
well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-05 Thread via GitHub


jolshan commented on code in PR #14417:
URL: https://github.com/apache/kafka/pull/14417#discussion_r1347998253


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorRuntimeMetrics.java:
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.metrics;
+
+import 
org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState;
+
+import java.util.function.Supplier;
+
+/**
+ * Used by the group and transaction coordinator runtimes, the metrics suite 
holds partition state gauges and sensors.
+ */
+public interface CoordinatorRuntimeMetrics extends AutoCloseable {
+/**
+ * Returns the metrics group.
+ */
+String metricsGroup();
+
+/**
+ * Called when the partition state changes.
+ * @param oldState The old state.
+ * @param newState The new state to transition to.
+ */
+void onPartitionStateChange(CoordinatorState oldState, CoordinatorState 
newState);
+
+/**
+ * Record the partition load metric.
+ * @param startTimeMs The partition load start time.
+ * @param endTimeMs   The partition load end time.
+ */
+void recordPartitionLoadSensor(long startTimeMs, long endTimeMs);
+
+/**
+ * Get the counter for partitions in Loading state.
+ * Only used for testing.
+ */
+long numPartitionsLoading();
+
+/**
+ * Get the counter for partitions in Active state.
+ * Only used for testing.
+ */
+long numPartitionsActive();
+
+/**
+ * Get the counter for partitions in Failed state.
+ * Only used for testing.
+ */
+long numPartitionsFailed();
+
+/**
+ * Update the event queue time.
+ *
+ * @param durationMs The queue time.
+ */
+void updateEventQueueTime(long durationMs);
+
+/**
+ * Update the event queue processing time.
+ *
+ * @param durationMs The event processing time.
+ */
+void updateEventQueueProcessingTime(long durationMs);
+
+/**
+ * Record the failed event.
+ */
+void recordFailedEvent();
+
+/**
+ * Record the successful event.
+ */
+void recordSuccessfulEvent();
+
+/**
+ * Record the thread idle ratio.
+ * @param ratio The idle ratio.
+ */
+void recordThreadIdleRatio(double ratio);

Review Comment:
   How is this reported then? Is there a metric per thread?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-05 Thread via GitHub


jolshan commented on code in PR #14417:
URL: https://github.com/apache/kafka/pull/14417#discussion_r1348000674


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorRuntimeMetrics.java:
##
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.metrics;
+
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.MetricsRegistry;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Min;
+import 
org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState;
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
+
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+public class GroupCoordinatorRuntimeMetrics implements 
CoordinatorRuntimeMetrics {
+/**
+ * The metrics group.
+ */
+public static final String METRICS_GROUP = "group-coordinator-metrics";
+
+/**
+ * Metric to count the number of partitions in Loading state.
+ */
+private final MetricName numPartitionsLoading;
+
+/**
+ * The Loading state partition counter.
+ */
+private final AtomicLong numPartitionsLoadingCounter = new AtomicLong(0);
+
+/**
+ * Metric to count the number of partitions in Active state.
+ */
+private final MetricName numPartitionsActive;
+
+/**
+ * The Active state partition counter.
+ */
+private final AtomicLong numPartitionsActiveCounter = new AtomicLong(0);
+
+/**
+ * Metric to count the number of partitions in Failed state.
+ */
+private final MetricName numPartitionsFailed;
+
+/**
+ * The Failed state partition counter.
+ */
+private final AtomicLong numPartitionsFailedCounter = new AtomicLong(0);
+
+/**
+ * Metric to count the size of the processor queue.
+ */
+private final MetricName eventQueueSize;
+
+/**
+ * Metric to measure the event queue time.
+ */
+private final com.yammer.metrics.core.MetricName eventQueueTimeMs =

Review Comment:
   Does kafka have a preference for yammer vs non-yammer metrics? I know both 
exist, but I wasn't sure if there was any trend or initiative to only use one 
type going forward.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-05 Thread via GitHub


jolshan commented on code in PR #14417:
URL: https://github.com/apache/kafka/pull/14417#discussion_r1347999373


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorRuntimeMetrics.java:
##
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.metrics;
+
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.MetricsRegistry;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Min;
+import 
org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState;
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
+
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+public class GroupCoordinatorRuntimeMetrics implements 
CoordinatorRuntimeMetrics {
+/**
+ * The metrics group.
+ */
+public static final String METRICS_GROUP = "group-coordinator-metrics";
+
+/**
+ * Metric to count the number of partitions in Loading state.
+ */
+private final MetricName numPartitionsLoading;
+
+/**
+ * The Loading state partition counter.

Review Comment:
   Do we think the comment on the name and the counter is helpful? Or could we 
remove one of them?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-05 Thread via GitHub


jolshan commented on code in PR #14417:
URL: https://github.com/apache/kafka/pull/14417#discussion_r1348003447


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorRuntimeMetrics.java:
##
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.metrics;
+
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.MetricsRegistry;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Min;
+import 
org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState;
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
+
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+public class GroupCoordinatorRuntimeMetrics implements 
CoordinatorRuntimeMetrics {
+/**
+ * The metrics group.
+ */
+public static final String METRICS_GROUP = "group-coordinator-metrics";
+
+/**
+ * Metric to count the number of partitions in Loading state.
+ */
+private final MetricName numPartitionsLoading;
+
+/**
+ * The Loading state partition counter.
+ */
+private final AtomicLong numPartitionsLoadingCounter = new AtomicLong(0);
+
+/**
+ * Metric to count the number of partitions in Active state.
+ */
+private final MetricName numPartitionsActive;
+
+/**
+ * The Active state partition counter.
+ */
+private final AtomicLong numPartitionsActiveCounter = new AtomicLong(0);
+
+/**
+ * Metric to count the number of partitions in Failed state.
+ */
+private final MetricName numPartitionsFailed;
+
+/**
+ * The Failed state partition counter.
+ */
+private final AtomicLong numPartitionsFailedCounter = new AtomicLong(0);
+
+/**
+ * Metric to count the size of the processor queue.
+ */
+private final MetricName eventQueueSize;
+
+/**
+ * Metric to measure the event queue time.
+ */
+private final com.yammer.metrics.core.MetricName eventQueueTimeMs =
+KafkaYammerMetrics.getMetricName("kafka.coordinator.group", 
METRICS_GROUP, "EventQueueTimeMs");
+
+/**
+ * Metric to measure the event processing time.
+ */
+private final com.yammer.metrics.core.MetricName 
eventQueueProcessingTimeMs =
+KafkaYammerMetrics.getMetricName("kafka.coordinator.group", 
METRICS_GROUP, "EventQueueProcessingTimeMs");
+
+/**
+ * The yammer metrics registry.
+ */
+private final MetricsRegistry registry;
+
+/**
+ * The Kafka metrics registry.
+ */
+private final Metrics metrics;
+
+/**
+ * The partition load sensor.
+ */
+private Sensor partitionLoadSensor;
+
+/**
+ * The thread idle sensor.
+ */
+private Sensor threadIdleRatioSensor;
+
+/**
+ * The event queue time updater.
+ */
+private final Consumer eventQueueTimeUpdater;
+
+/**
+ * The event queue processing time updater.
+ */
+private final Consumer eventQueueProcessingTimeUpdater;
+
+public GroupCoordinatorRuntimeMetrics(MetricsRegistry registry, Metrics 
metrics) {
+this.registry = Objects.requireNonNull(registry);
+this.metrics = Objects.requireNonNull(metrics);
+
+this.numPartitionsLoading = getMetricName("num-partitions-loading");
+this.numPartitionsActive = getMetricName("num-partitions-active");
+this.numPartitionsFailed = getMetricName("num-partitions-failed");
+this.eventQueueSize = getMetricName("event-queue-size");
+
+initializeMetrics();
+
+eventQueueTimeUpdater = newHistogram(eventQueueTimeMs);
+eventQueueProcessingTimeUpdater = 
newHistogram(eventQueueProcessingTimeMs);
+}
+
+/**
+ * Register the metrics.
+ */
+public void initializeMetrics() {

Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-05 Thread via GitHub


jolshan commented on code in PR #14417:
URL: https://github.com/apache/kafka/pull/14417#discussion_r1348006202


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorRuntimeMetrics.java:
##
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.metrics;
+
+import 
org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState;
+
+import java.util.function.Supplier;
+
+/**
+ * Used by the group and transaction coordinator runtimes, the metrics suite 
holds partition state gauges and sensors.
+ */
+public interface CoordinatorRuntimeMetrics extends AutoCloseable {
+
+/**
+ * Called when the partition state changes.
+ * @param oldState The old state.
+ * @param newState The new state to transition to.
+ */
+void recordPartitionStateChange(CoordinatorState oldState, 
CoordinatorState newState);
+
+/**
+ * Record the partition load metric.
+ * @param startTimeMs The partition load start time.
+ * @param endTimeMs   The partition load end time.
+ */
+void recordPartitionLoadSensor(long startTimeMs, long endTimeMs);
+
+/**
+ * Update the event queue time.
+ *
+ * @param durationMs The queue time.
+ */
+void recordEventQueueTime(long durationMs);
+
+/**
+ * Update the event queue processing time.
+ *
+ * @param durationMs The event processing time.
+ */
+void recordEventQueueProcessingTime(long durationMs);
+
+/**
+ * Record the thread idle ratio.
+ * @param ratio The idle ratio.
+ */
+void recordThreadIdleRatio(double ratio);
+
+/**
+ * Register the event queue size gauge.
+ *
+ * @param sizeSupplier The size supplier.
+ */
+void registerEventQueueSizeGauge(Supplier sizeSupplier);

Review Comment:
   What is the event queue size gauge? And do we expect to see this called 
often or just once? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-05 Thread via GitHub


jolshan commented on code in PR #14417:
URL: https://github.com/apache/kafka/pull/14417#discussion_r1348084380


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorRuntimeMetricsTest.java:
##
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.metrics;
+
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.MetricsRegistry;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import 
org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState;
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.IntStream;
+
+import static 
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorRuntimeMetrics.METRICS_GROUP;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class GroupCoordinatorRuntimeMetricsTest {
+
+@Test
+public void testMetricNames() {
+MetricsRegistry registry = new MetricsRegistry();
+Metrics metrics = new Metrics();
+
+HashSet expectedMetrics = new 
HashSet<>(Arrays.asList(
+metrics.metricName("num-partitions-loading", METRICS_GROUP),
+metrics.metricName("num-partitions-active", METRICS_GROUP),
+metrics.metricName("num-partitions-failed", METRICS_GROUP),
+metrics.metricName("event-queue-size", METRICS_GROUP),
+metrics.metricName("partition-load-time-max", METRICS_GROUP),
+metrics.metricName("partition-load-time-avg", METRICS_GROUP),
+metrics.metricName("thread-idle-ratio-min", METRICS_GROUP),
+metrics.metricName("thread-idle-ratio-avg", METRICS_GROUP)
+));
+
+try {
+try (GroupCoordinatorRuntimeMetrics runtimeMetrics = new 
GroupCoordinatorRuntimeMetrics(
+registry,
+metrics
+)) {
+HashSet expectedRegistry = new HashSet<>(Arrays.asList(
+
"kafka.coordinator.group:type=group-coordinator-metrics,name=EventQueueTimeMs",
+
"kafka.coordinator.group:type=group-coordinator-metrics,name=EventQueueProcessingTimeMs"
+));
+runtimeMetrics.registerEventQueueSizeGauge(() -> 0);
+
+assertMetricsForTypeEqual(registry, "kafka.coordinator.group", 
expectedRegistry);
+expectedMetrics.forEach(metricName -> 
assertTrue(metrics.metrics().containsKey(metricName)));
+}
+assertMetricsForTypeEqual(registry, "kafka.coordinator.group", 
Collections.emptySet());
+expectedMetrics.forEach(metricName -> 
assertFalse(metrics.metrics().containsKey(metricName)));
+} finally {
+registry.shutdown();
+}
+}
+
+@Test
+public void testUpdateNumPartitionsMetrics() {
+Metrics metrics = new Metrics();
+
+try (GroupCoordinatorRuntimeMetrics runtimeMetrics = new 
GroupCoordinatorRuntimeMetrics(
+KafkaYammerMetrics.defaultRegistry(),
+metrics
+)) {
+IntStream.range(0, 10)
+.forEach(__ -> 
runtimeMetrics.recordPartitionStateChange(CoordinatorState.INITIAL, 
CoordinatorState.LOADING));
+IntStream.range(0, 8)
+.forEach(__ -> 
runtimeMetrics.recordPartitionStateChange(CoordinatorState.LOADING, 
CoordinatorState.ACTIVE));
+IntStream.range(0, 8)
+.forEach(__ -> 
runtimeMetrics.recordPartitionStateChange(CoordinatorState.ACTIVE, 
CoordinatorState.FAILED));
+IntStream.range(0, 2)
+.forEach(__ -> 
runtimeMetrics.recordPartitionStateChange(CoordinatorState.FAILED, 
Coordin

Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-05 Thread via GitHub


jolshan commented on code in PR #14417:
URL: https://github.com/apache/kafka/pull/14417#discussion_r1348107552


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -1340,4 +1377,119 @@ public void testNonRetryableTimer() throws 
InterruptedException {
 assertEquals(1, cnt.get());
 assertEquals(0, ctx.timer.size());
 }
+
+@Test
+public void testStateChanges() throws Exception {
+MockTimer timer = new MockTimer();
+MockPartitionWriter writer = mock(MockPartitionWriter.class);
+MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class);
+MockCoordinatorShardBuilderSupplier supplier = 
mock(MockCoordinatorShardBuilderSupplier.class);
+MockCoordinatorShardBuilder builder = 
mock(MockCoordinatorShardBuilder.class);
+MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
+GroupCoordinatorRuntimeMetrics runtimeMetrics = 
mock(GroupCoordinatorRuntimeMetrics.class);
+
+CoordinatorRuntime runtime =
+new CoordinatorRuntime.Builder()
+.withTime(timer.time())
+.withTimer(timer)
+.withLoader(loader)
+.withEventProcessor(new DirectEventProcessor())
+.withPartitionWriter(writer)
+.withCoordinatorShardBuilderSupplier(supplier)
+.withCoordinatorRuntimeMetrics(runtimeMetrics)
+.build();
+
+when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+when(builder.withLogContext(any())).thenReturn(builder);
+when(builder.withTime(any())).thenReturn(builder);
+when(builder.withTimer(any())).thenReturn(builder);
+when(builder.withTopicPartition(any())).thenReturn(builder);
+when(builder.build()).thenReturn(coordinator);
+when(supplier.get()).thenReturn(builder);
+CompletableFuture future = new 
CompletableFuture<>();
+when(loader.load(TP, coordinator)).thenReturn(future);
+
+// Schedule the loading.
+runtime.scheduleLoadOperation(TP, 0);
+
+// Getting the context succeeds and the coordinator should be in 
loading.
+CoordinatorRuntime.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+assertEquals(LOADING, ctx.state);
+verify(runtimeMetrics, times(1)).recordPartitionStateChange(INITIAL, 
LOADING);
+
+// When the loading fails, the coordinator transitions to failed.
+future.completeExceptionally(new Exception("failure"));
+assertEquals(FAILED, ctx.state);
+verify(runtimeMetrics, times(1)).recordPartitionStateChange(LOADING, 
FAILED);
+
+// Start loading a new topic partition.
+TopicPartition tp = new TopicPartition("__consumer_offsets", 1);
+future = new CompletableFuture<>();
+when(loader.load(tp, coordinator)).thenReturn(future);
+// Schedule the loading.
+runtime.scheduleLoadOperation(tp, 0);
+// Getting the context succeeds and the coordinator should be in 
loading.
+ctx = runtime.contextOrThrow(tp);
+assertEquals(LOADING, ctx.state);
+verify(runtimeMetrics, times(2)).recordPartitionStateChange(INITIAL, 
LOADING);
+
+// When the loading completes, the coordinator transitions to active.
+future.complete(null);
+assertEquals(ACTIVE, ctx.state);
+verify(runtimeMetrics, times(1)).recordPartitionStateChange(LOADING, 
ACTIVE);
+
+runtime.close();
+verify(runtimeMetrics, times(1)).recordPartitionStateChange(FAILED, 
CLOSED);

Review Comment:
   maybe a bit unrelated to this test, but do we just keep all partitions we 
try to load forever (and then transition to close when we close the runtime)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-06 Thread via GitHub


jeffkbkim commented on code in PR #14417:
URL: https://github.com/apache/kafka/pull/14417#discussion_r1348371483


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorRuntimeMetrics.java:
##
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.metrics;
+
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.MetricsRegistry;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Min;
+import 
org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState;
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
+
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+public class GroupCoordinatorRuntimeMetrics implements 
CoordinatorRuntimeMetrics {
+/**
+ * The metrics group.
+ */
+public static final String METRICS_GROUP = "group-coordinator-metrics";
+
+/**
+ * Metric to count the number of partitions in Loading state.
+ */
+private final MetricName numPartitionsLoading;
+
+/**
+ * The Loading state partition counter.
+ */
+private final AtomicLong numPartitionsLoadingCounter = new AtomicLong(0);
+
+/**
+ * Metric to count the number of partitions in Active state.
+ */
+private final MetricName numPartitionsActive;
+
+/**
+ * The Active state partition counter.
+ */
+private final AtomicLong numPartitionsActiveCounter = new AtomicLong(0);
+
+/**
+ * Metric to count the number of partitions in Failed state.
+ */
+private final MetricName numPartitionsFailed;
+
+/**
+ * The Failed state partition counter.
+ */
+private final AtomicLong numPartitionsFailedCounter = new AtomicLong(0);
+
+/**
+ * Metric to count the size of the processor queue.
+ */
+private final MetricName eventQueueSize;
+
+/**
+ * Metric to measure the event queue time.
+ */
+private final com.yammer.metrics.core.MetricName eventQueueTimeMs =

Review Comment:
   I don't think it's heavily enforced but jason mentioned that we should try 
to use kafka metrics if possible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-06 Thread via GitHub


jeffkbkim commented on code in PR #14417:
URL: https://github.com/apache/kafka/pull/14417#discussion_r1348377598


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorRuntimeMetrics.java:
##
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.metrics;
+
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.MetricsRegistry;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Min;
+import 
org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState;
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
+
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+public class GroupCoordinatorRuntimeMetrics implements 
CoordinatorRuntimeMetrics {
+/**
+ * The metrics group.
+ */
+public static final String METRICS_GROUP = "group-coordinator-metrics";
+
+/**
+ * Metric to count the number of partitions in Loading state.
+ */
+private final MetricName numPartitionsLoading;
+
+/**
+ * The Loading state partition counter.
+ */
+private final AtomicLong numPartitionsLoadingCounter = new AtomicLong(0);
+
+/**
+ * Metric to count the number of partitions in Active state.
+ */
+private final MetricName numPartitionsActive;
+
+/**
+ * The Active state partition counter.
+ */
+private final AtomicLong numPartitionsActiveCounter = new AtomicLong(0);
+
+/**
+ * Metric to count the number of partitions in Failed state.
+ */
+private final MetricName numPartitionsFailed;
+
+/**
+ * The Failed state partition counter.
+ */
+private final AtomicLong numPartitionsFailedCounter = new AtomicLong(0);
+
+/**
+ * Metric to count the size of the processor queue.
+ */
+private final MetricName eventQueueSize;
+
+/**
+ * Metric to measure the event queue time.
+ */
+private final com.yammer.metrics.core.MetricName eventQueueTimeMs =
+KafkaYammerMetrics.getMetricName("kafka.coordinator.group", 
METRICS_GROUP, "EventQueueTimeMs");
+
+/**
+ * Metric to measure the event processing time.
+ */
+private final com.yammer.metrics.core.MetricName 
eventQueueProcessingTimeMs =
+KafkaYammerMetrics.getMetricName("kafka.coordinator.group", 
METRICS_GROUP, "EventQueueProcessingTimeMs");
+
+/**
+ * The yammer metrics registry.
+ */
+private final MetricsRegistry registry;
+
+/**
+ * The Kafka metrics registry.
+ */
+private final Metrics metrics;
+
+/**
+ * The partition load sensor.
+ */
+private Sensor partitionLoadSensor;
+
+/**
+ * The thread idle sensor.
+ */
+private Sensor threadIdleRatioSensor;
+
+/**
+ * The event queue time updater.
+ */
+private final Consumer eventQueueTimeUpdater;
+
+/**
+ * The event queue processing time updater.
+ */
+private final Consumer eventQueueProcessingTimeUpdater;
+
+public GroupCoordinatorRuntimeMetrics(MetricsRegistry registry, Metrics 
metrics) {
+this.registry = Objects.requireNonNull(registry);
+this.metrics = Objects.requireNonNull(metrics);
+
+this.numPartitionsLoading = getMetricName("num-partitions-loading");
+this.numPartitionsActive = getMetricName("num-partitions-active");
+this.numPartitionsFailed = getMetricName("num-partitions-failed");
+this.eventQueueSize = getMetricName("event-queue-size");
+
+initializeMetrics();
+
+eventQueueTimeUpdater = newHistogram(eventQueueTimeMs);
+eventQueueProcessingTimeUpdater = 
newHistogram(eventQueueProcessingTimeMs);
+}
+
+/**
+ * Register the metrics.
+ */
+public void initializeMetrics() 

Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-06 Thread via GitHub


jeffkbkim commented on code in PR #14417:
URL: https://github.com/apache/kafka/pull/14417#discussion_r1348380466


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorRuntimeMetrics.java:
##
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.metrics;
+
+import 
org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState;
+
+import java.util.function.Supplier;
+
+/**
+ * Used by the group and transaction coordinator runtimes, the metrics suite 
holds partition state gauges and sensors.
+ */
+public interface CoordinatorRuntimeMetrics extends AutoCloseable {
+
+/**
+ * Called when the partition state changes.
+ * @param oldState The old state.
+ * @param newState The new state to transition to.
+ */
+void recordPartitionStateChange(CoordinatorState oldState, 
CoordinatorState newState);
+
+/**
+ * Record the partition load metric.
+ * @param startTimeMs The partition load start time.
+ * @param endTimeMs   The partition load end time.
+ */
+void recordPartitionLoadSensor(long startTimeMs, long endTimeMs);
+
+/**
+ * Update the event queue time.
+ *
+ * @param durationMs The queue time.
+ */
+void recordEventQueueTime(long durationMs);
+
+/**
+ * Update the event queue processing time.
+ *
+ * @param durationMs The event processing time.
+ */
+void recordEventQueueProcessingTime(long durationMs);
+
+/**
+ * Record the thread idle ratio.
+ * @param ratio The idle ratio.
+ */
+void recordThreadIdleRatio(double ratio);
+
+/**
+ * Register the event queue size gauge.
+ *
+ * @param sizeSupplier The size supplier.
+ */
+void registerEventQueueSizeGauge(Supplier sizeSupplier);

Review Comment:
   MultiThreadedEventProcessor includes a EventAccumulator that enqueues all 
jobs. This metric reports the size of the accumulator which is synchronized.
   
   This will be only called when we report the metric value so it is at every 
interval that kafka reports metrics.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-06 Thread via GitHub


jeffkbkim commented on code in PR #14417:
URL: https://github.com/apache/kafka/pull/14417#discussion_r1348389082


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -1340,4 +1377,119 @@ public void testNonRetryableTimer() throws 
InterruptedException {
 assertEquals(1, cnt.get());
 assertEquals(0, ctx.timer.size());
 }
+
+@Test
+public void testStateChanges() throws Exception {
+MockTimer timer = new MockTimer();
+MockPartitionWriter writer = mock(MockPartitionWriter.class);
+MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class);
+MockCoordinatorShardBuilderSupplier supplier = 
mock(MockCoordinatorShardBuilderSupplier.class);
+MockCoordinatorShardBuilder builder = 
mock(MockCoordinatorShardBuilder.class);
+MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
+GroupCoordinatorRuntimeMetrics runtimeMetrics = 
mock(GroupCoordinatorRuntimeMetrics.class);
+
+CoordinatorRuntime runtime =
+new CoordinatorRuntime.Builder()
+.withTime(timer.time())
+.withTimer(timer)
+.withLoader(loader)
+.withEventProcessor(new DirectEventProcessor())
+.withPartitionWriter(writer)
+.withCoordinatorShardBuilderSupplier(supplier)
+.withCoordinatorRuntimeMetrics(runtimeMetrics)
+.build();
+
+when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+when(builder.withLogContext(any())).thenReturn(builder);
+when(builder.withTime(any())).thenReturn(builder);
+when(builder.withTimer(any())).thenReturn(builder);
+when(builder.withTopicPartition(any())).thenReturn(builder);
+when(builder.build()).thenReturn(coordinator);
+when(supplier.get()).thenReturn(builder);
+CompletableFuture future = new 
CompletableFuture<>();
+when(loader.load(TP, coordinator)).thenReturn(future);
+
+// Schedule the loading.
+runtime.scheduleLoadOperation(TP, 0);
+
+// Getting the context succeeds and the coordinator should be in 
loading.
+CoordinatorRuntime.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+assertEquals(LOADING, ctx.state);
+verify(runtimeMetrics, times(1)).recordPartitionStateChange(INITIAL, 
LOADING);
+
+// When the loading fails, the coordinator transitions to failed.
+future.completeExceptionally(new Exception("failure"));
+assertEquals(FAILED, ctx.state);
+verify(runtimeMetrics, times(1)).recordPartitionStateChange(LOADING, 
FAILED);
+
+// Start loading a new topic partition.
+TopicPartition tp = new TopicPartition("__consumer_offsets", 1);
+future = new CompletableFuture<>();
+when(loader.load(tp, coordinator)).thenReturn(future);
+// Schedule the loading.
+runtime.scheduleLoadOperation(tp, 0);
+// Getting the context succeeds and the coordinator should be in 
loading.
+ctx = runtime.contextOrThrow(tp);
+assertEquals(LOADING, ctx.state);
+verify(runtimeMetrics, times(2)).recordPartitionStateChange(INITIAL, 
LOADING);
+
+// When the loading completes, the coordinator transitions to active.
+future.complete(null);
+assertEquals(ACTIVE, ctx.state);
+verify(runtimeMetrics, times(1)).recordPartitionStateChange(LOADING, 
ACTIVE);
+
+runtime.close();
+verify(runtimeMetrics, times(1)).recordPartitionStateChange(FAILED, 
CLOSED);

Review Comment:
   No, the partitions (aka coordinator context) is cleared when the broker 
resigns from leadership.
   
   When a partition migrates to a different broker, we schedule an unload 
operation (`GroupCoordinatorServer#onResignation`) and remove the coordinator 
context from memory after deregistering the HWM listener and clearing its state 
(`CoordinatorRuntime.CoordinatorContext#unload`).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-06 Thread via GitHub


jeffkbkim commented on code in PR #14417:
URL: https://github.com/apache/kafka/pull/14417#discussion_r1348397835


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorRuntimeMetrics.java:
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.metrics;
+
+import 
org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState;
+
+import java.util.function.Supplier;
+
+/**
+ * Used by the group and transaction coordinator runtimes, the metrics suite 
holds partition state gauges and sensors.
+ */
+public interface CoordinatorRuntimeMetrics extends AutoCloseable {
+/**
+ * Returns the metrics group.
+ */
+String metricsGroup();
+
+/**
+ * Called when the partition state changes.
+ * @param oldState The old state.
+ * @param newState The new state to transition to.
+ */
+void onPartitionStateChange(CoordinatorState oldState, CoordinatorState 
newState);
+
+/**
+ * Record the partition load metric.
+ * @param startTimeMs The partition load start time.
+ * @param endTimeMs   The partition load end time.
+ */
+void recordPartitionLoadSensor(long startTimeMs, long endTimeMs);
+
+/**
+ * Get the counter for partitions in Loading state.
+ * Only used for testing.
+ */
+long numPartitionsLoading();
+
+/**
+ * Get the counter for partitions in Active state.
+ * Only used for testing.
+ */
+long numPartitionsActive();
+
+/**
+ * Get the counter for partitions in Failed state.
+ * Only used for testing.
+ */
+long numPartitionsFailed();
+
+/**
+ * Update the event queue time.
+ *
+ * @param durationMs The queue time.
+ */
+void updateEventQueueTime(long durationMs);
+
+/**
+ * Update the event queue processing time.
+ *
+ * @param durationMs The event processing time.
+ */
+void updateEventQueueProcessingTime(long durationMs);
+
+/**
+ * Record the failed event.
+ */
+void recordFailedEvent();
+
+/**
+ * Record the successful event.
+ */
+void recordSuccessfulEvent();
+
+/**
+ * Record the thread idle ratio.
+ * @param ratio The idle ratio.
+ */
+void recordThreadIdleRatio(double ratio);

Review Comment:
   We use a global sensor (`threadIdleRatioSensor`) that is shared across all 
threads. Each thread will record its own view of the idle ratio. So we will 
emit a single meter for all threads



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-06 Thread via GitHub


jeffkbkim commented on code in PR #14417:
URL: https://github.com/apache/kafka/pull/14417#discussion_r1348458471


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorRuntimeMetrics.java:
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.metrics;
+
+import 
org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState;
+
+import java.util.function.Supplier;
+
+/**
+ * Used by the group and transaction coordinator runtimes, the metrics suite 
holds partition state gauges and sensors.
+ */
+public interface CoordinatorRuntimeMetrics extends AutoCloseable {
+/**
+ * Returns the metrics group.
+ */
+String metricsGroup();
+
+/**
+ * Called when the partition state changes.
+ * @param oldState The old state.
+ * @param newState The new state to transition to.
+ */
+void onPartitionStateChange(CoordinatorState oldState, CoordinatorState 
newState);
+
+/**
+ * Record the partition load metric.
+ * @param startTimeMs The partition load start time.
+ * @param endTimeMs   The partition load end time.
+ */
+void recordPartitionLoadSensor(long startTimeMs, long endTimeMs);
+
+/**
+ * Get the counter for partitions in Loading state.
+ * Only used for testing.
+ */
+long numPartitionsLoading();
+
+/**
+ * Get the counter for partitions in Active state.
+ * Only used for testing.
+ */
+long numPartitionsActive();
+
+/**
+ * Get the counter for partitions in Failed state.
+ * Only used for testing.
+ */
+long numPartitionsFailed();
+
+/**
+ * Update the event queue time.
+ *
+ * @param durationMs The queue time.
+ */
+void updateEventQueueTime(long durationMs);
+
+/**
+ * Update the event queue processing time.
+ *
+ * @param durationMs The event processing time.
+ */
+void updateEventQueueProcessingTime(long durationMs);
+
+/**
+ * Record the failed event.
+ */
+void recordFailedEvent();
+
+/**
+ * Record the successful event.
+ */
+void recordSuccessfulEvent();
+
+/**
+ * Record the thread idle ratio.
+ * @param ratio The idle ratio.
+ */
+void recordThreadIdleRatio(double ratio);

Review Comment:
   Created 
`MultiThreadedEventProcessorTest#testRecordThreadIdleRatioTwoThreads`. This 
doesn't test the exact idle ratio but confirms we record values with multiple 
threads.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-09 Thread via GitHub


jolshan commented on code in PR #14417:
URL: https://github.com/apache/kafka/pull/14417#discussion_r1350606817


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorRuntimeMetrics.java:
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.metrics;
+
+import 
org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState;
+
+import java.util.function.Supplier;
+
+/**
+ * Used by the group and transaction coordinator runtimes, the metrics suite 
holds partition state gauges and sensors.
+ */
+public interface CoordinatorRuntimeMetrics extends AutoCloseable {
+/**
+ * Returns the metrics group.
+ */
+String metricsGroup();
+
+/**
+ * Called when the partition state changes.
+ * @param oldState The old state.
+ * @param newState The new state to transition to.
+ */
+void onPartitionStateChange(CoordinatorState oldState, CoordinatorState 
newState);
+
+/**
+ * Record the partition load metric.
+ * @param startTimeMs The partition load start time.
+ * @param endTimeMs   The partition load end time.
+ */
+void recordPartitionLoadSensor(long startTimeMs, long endTimeMs);
+
+/**
+ * Get the counter for partitions in Loading state.
+ * Only used for testing.
+ */
+long numPartitionsLoading();
+
+/**
+ * Get the counter for partitions in Active state.
+ * Only used for testing.
+ */
+long numPartitionsActive();
+
+/**
+ * Get the counter for partitions in Failed state.
+ * Only used for testing.
+ */
+long numPartitionsFailed();
+
+/**
+ * Update the event queue time.
+ *
+ * @param durationMs The queue time.
+ */
+void updateEventQueueTime(long durationMs);
+
+/**
+ * Update the event queue processing time.
+ *
+ * @param durationMs The event processing time.
+ */
+void updateEventQueueProcessingTime(long durationMs);
+
+/**
+ * Record the failed event.
+ */
+void recordFailedEvent();
+
+/**
+ * Record the successful event.
+ */
+void recordSuccessfulEvent();
+
+/**
+ * Record the thread idle ratio.
+ * @param ratio The idle ratio.
+ */
+void recordThreadIdleRatio(double ratio);

Review Comment:
   So we have an average an min for idle ratio. Is the minimum value just the 
lowest recorded at a given time frame. (And we don't have a way to identify 
which thread is the one with the lowest value)



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorRuntimeMetrics.java:
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.metrics;
+
+import 
org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState;
+
+import java.util.function.Supplier;
+
+/**
+ * Used by the group and transaction coordinator runtimes, the metrics suite 
holds partition state gauges and sensors.
+ */
+public interface CoordinatorRuntimeMetrics extends AutoCloseable {
+/**
+ * Returns the metrics group.
+ */
+String metricsGroup();
+
+/**
+ * Called when the partition state changes.
+ * @param oldState The old state.
+ * @param newState The new state to transition to.
+ */
+void onPartitionStateChange(CoordinatorState oldState,

Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-09 Thread via GitHub


jolshan commented on code in PR #14417:
URL: https://github.com/apache/kafka/pull/14417#discussion_r1350608637


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorRuntimeMetrics.java:
##
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.metrics;
+
+import 
org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState;
+
+import java.util.function.Supplier;
+
+/**
+ * Used by the group and transaction coordinator runtimes, the metrics suite 
holds partition state gauges and sensors.
+ */
+public interface CoordinatorRuntimeMetrics extends AutoCloseable {
+
+/**
+ * Called when the partition state changes.
+ * @param oldState The old state.
+ * @param newState The new state to transition to.
+ */
+void recordPartitionStateChange(CoordinatorState oldState, 
CoordinatorState newState);
+
+/**
+ * Record the partition load metric.
+ * @param startTimeMs The partition load start time.
+ * @param endTimeMs   The partition load end time.
+ */
+void recordPartitionLoadSensor(long startTimeMs, long endTimeMs);
+
+/**
+ * Update the event queue time.
+ *
+ * @param durationMs The queue time.
+ */
+void recordEventQueueTime(long durationMs);
+
+/**
+ * Update the event queue processing time.
+ *
+ * @param durationMs The event processing time.
+ */
+void recordEventQueueProcessingTime(long durationMs);
+
+/**
+ * Record the thread idle ratio.
+ * @param ratio The idle ratio.
+ */
+void recordThreadIdleRatio(double ratio);
+
+/**
+ * Register the event queue size gauge.
+ *
+ * @param sizeSupplier The size supplier.
+ */
+void registerEventQueueSizeGauge(Supplier sizeSupplier);

Review Comment:
   Hmm -- it should just be when we construct the metrics object right? 
(referring to the register method)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-09 Thread via GitHub


jolshan commented on code in PR #14417:
URL: https://github.com/apache/kafka/pull/14417#discussion_r1350610255


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -1340,4 +1377,119 @@ public void testNonRetryableTimer() throws 
InterruptedException {
 assertEquals(1, cnt.get());
 assertEquals(0, ctx.timer.size());
 }
+
+@Test
+public void testStateChanges() throws Exception {
+MockTimer timer = new MockTimer();
+MockPartitionWriter writer = mock(MockPartitionWriter.class);
+MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class);
+MockCoordinatorShardBuilderSupplier supplier = 
mock(MockCoordinatorShardBuilderSupplier.class);
+MockCoordinatorShardBuilder builder = 
mock(MockCoordinatorShardBuilder.class);
+MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
+GroupCoordinatorRuntimeMetrics runtimeMetrics = 
mock(GroupCoordinatorRuntimeMetrics.class);
+
+CoordinatorRuntime runtime =
+new CoordinatorRuntime.Builder()
+.withTime(timer.time())
+.withTimer(timer)
+.withLoader(loader)
+.withEventProcessor(new DirectEventProcessor())
+.withPartitionWriter(writer)
+.withCoordinatorShardBuilderSupplier(supplier)
+.withCoordinatorRuntimeMetrics(runtimeMetrics)
+.build();
+
+when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+when(builder.withLogContext(any())).thenReturn(builder);
+when(builder.withTime(any())).thenReturn(builder);
+when(builder.withTimer(any())).thenReturn(builder);
+when(builder.withTopicPartition(any())).thenReturn(builder);
+when(builder.build()).thenReturn(coordinator);
+when(supplier.get()).thenReturn(builder);
+CompletableFuture future = new 
CompletableFuture<>();
+when(loader.load(TP, coordinator)).thenReturn(future);
+
+// Schedule the loading.
+runtime.scheduleLoadOperation(TP, 0);
+
+// Getting the context succeeds and the coordinator should be in 
loading.
+CoordinatorRuntime.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+assertEquals(LOADING, ctx.state);
+verify(runtimeMetrics, times(1)).recordPartitionStateChange(INITIAL, 
LOADING);
+
+// When the loading fails, the coordinator transitions to failed.
+future.completeExceptionally(new Exception("failure"));
+assertEquals(FAILED, ctx.state);
+verify(runtimeMetrics, times(1)).recordPartitionStateChange(LOADING, 
FAILED);
+
+// Start loading a new topic partition.
+TopicPartition tp = new TopicPartition("__consumer_offsets", 1);
+future = new CompletableFuture<>();
+when(loader.load(tp, coordinator)).thenReturn(future);
+// Schedule the loading.
+runtime.scheduleLoadOperation(tp, 0);
+// Getting the context succeeds and the coordinator should be in 
loading.
+ctx = runtime.contextOrThrow(tp);
+assertEquals(LOADING, ctx.state);
+verify(runtimeMetrics, times(2)).recordPartitionStateChange(INITIAL, 
LOADING);
+
+// When the loading completes, the coordinator transitions to active.
+future.complete(null);
+assertEquals(ACTIVE, ctx.state);
+verify(runtimeMetrics, times(1)).recordPartitionStateChange(LOADING, 
ACTIVE);
+
+runtime.close();
+verify(runtimeMetrics, times(1)).recordPartitionStateChange(FAILED, 
CLOSED);

Review Comment:
   So we should see this closed metric increment when we resign leadership as 
well? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-09 Thread via GitHub


jolshan commented on PR #14417:
URL: https://github.com/apache/kafka/pull/14417#issuecomment-1753860433

   Let's also checkstyle in the build. Something seems to be off in the group 
coordinator module.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-09 Thread via GitHub


jeffkbkim commented on code in PR #14417:
URL: https://github.com/apache/kafka/pull/14417#discussion_r1350888321


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorRuntimeMetrics.java:
##
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.metrics;
+
+import 
org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState;
+
+import java.util.function.Supplier;
+
+/**
+ * Used by the group and transaction coordinator runtimes, the metrics suite 
holds partition state gauges and sensors.
+ */
+public interface CoordinatorRuntimeMetrics extends AutoCloseable {
+
+/**
+ * Called when the partition state changes.
+ * @param oldState The old state.
+ * @param newState The new state to transition to.
+ */
+void recordPartitionStateChange(CoordinatorState oldState, 
CoordinatorState newState);
+
+/**
+ * Record the partition load metric.
+ * @param startTimeMs The partition load start time.
+ * @param endTimeMs   The partition load end time.
+ */
+void recordPartitionLoadSensor(long startTimeMs, long endTimeMs);
+
+/**
+ * Update the event queue time.
+ *
+ * @param durationMs The queue time.
+ */
+void recordEventQueueTime(long durationMs);
+
+/**
+ * Update the event queue processing time.
+ *
+ * @param durationMs The event processing time.
+ */
+void recordEventQueueProcessingTime(long durationMs);
+
+/**
+ * Record the thread idle ratio.
+ * @param ratio The idle ratio.
+ */
+void recordThreadIdleRatio(double ratio);
+
+/**
+ * Register the event queue size gauge.
+ *
+ * @param sizeSupplier The size supplier.
+ */
+void registerEventQueueSizeGauge(Supplier sizeSupplier);

Review Comment:
   Ah yes. Registering should only happen once



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-09 Thread via GitHub


jeffkbkim commented on code in PR #14417:
URL: https://github.com/apache/kafka/pull/14417#discussion_r1350893216


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorRuntimeMetrics.java:
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.metrics;
+
+import 
org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState;
+
+import java.util.function.Supplier;
+
+/**
+ * Used by the group and transaction coordinator runtimes, the metrics suite 
holds partition state gauges and sensors.
+ */
+public interface CoordinatorRuntimeMetrics extends AutoCloseable {
+/**
+ * Returns the metrics group.
+ */
+String metricsGroup();
+
+/**
+ * Called when the partition state changes.
+ * @param oldState The old state.
+ * @param newState The new state to transition to.
+ */
+void onPartitionStateChange(CoordinatorState oldState, CoordinatorState 
newState);
+
+/**
+ * Record the partition load metric.
+ * @param startTimeMs The partition load start time.
+ * @param endTimeMs   The partition load end time.
+ */
+void recordPartitionLoadSensor(long startTimeMs, long endTimeMs);
+
+/**
+ * Get the counter for partitions in Loading state.
+ * Only used for testing.
+ */
+long numPartitionsLoading();
+
+/**
+ * Get the counter for partitions in Active state.
+ * Only used for testing.
+ */
+long numPartitionsActive();
+
+/**
+ * Get the counter for partitions in Failed state.
+ * Only used for testing.
+ */
+long numPartitionsFailed();
+
+/**
+ * Update the event queue time.
+ *
+ * @param durationMs The queue time.
+ */
+void updateEventQueueTime(long durationMs);
+
+/**
+ * Update the event queue processing time.
+ *
+ * @param durationMs The event processing time.
+ */
+void updateEventQueueProcessingTime(long durationMs);
+
+/**
+ * Record the failed event.
+ */
+void recordFailedEvent();
+
+/**
+ * Record the successful event.
+ */
+void recordSuccessfulEvent();
+
+/**
+ * Record the thread idle ratio.
+ * @param ratio The idle ratio.
+ */
+void recordThreadIdleRatio(double ratio);

Review Comment:
   Right, we won't be able to differentiate the idle ratio for each individual 
thread. 
   
   > So we have an average an min for idle ratio. Is the minimum value just the 
lowest recorded at a given time frame?
   
   Yes, that's right



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-09 Thread via GitHub


jeffkbkim commented on code in PR #14417:
URL: https://github.com/apache/kafka/pull/14417#discussion_r1350893216


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorRuntimeMetrics.java:
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.metrics;
+
+import 
org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState;
+
+import java.util.function.Supplier;
+
+/**
+ * Used by the group and transaction coordinator runtimes, the metrics suite 
holds partition state gauges and sensors.
+ */
+public interface CoordinatorRuntimeMetrics extends AutoCloseable {
+/**
+ * Returns the metrics group.
+ */
+String metricsGroup();
+
+/**
+ * Called when the partition state changes.
+ * @param oldState The old state.
+ * @param newState The new state to transition to.
+ */
+void onPartitionStateChange(CoordinatorState oldState, CoordinatorState 
newState);
+
+/**
+ * Record the partition load metric.
+ * @param startTimeMs The partition load start time.
+ * @param endTimeMs   The partition load end time.
+ */
+void recordPartitionLoadSensor(long startTimeMs, long endTimeMs);
+
+/**
+ * Get the counter for partitions in Loading state.
+ * Only used for testing.
+ */
+long numPartitionsLoading();
+
+/**
+ * Get the counter for partitions in Active state.
+ * Only used for testing.
+ */
+long numPartitionsActive();
+
+/**
+ * Get the counter for partitions in Failed state.
+ * Only used for testing.
+ */
+long numPartitionsFailed();
+
+/**
+ * Update the event queue time.
+ *
+ * @param durationMs The queue time.
+ */
+void updateEventQueueTime(long durationMs);
+
+/**
+ * Update the event queue processing time.
+ *
+ * @param durationMs The event processing time.
+ */
+void updateEventQueueProcessingTime(long durationMs);
+
+/**
+ * Record the failed event.
+ */
+void recordFailedEvent();
+
+/**
+ * Record the successful event.
+ */
+void recordSuccessfulEvent();
+
+/**
+ * Record the thread idle ratio.
+ * @param ratio The idle ratio.
+ */
+void recordThreadIdleRatio(double ratio);

Review Comment:
   Right, we won't be able to differentiate the idle ratio for each individual 
thread. Is this something we should have?
   
   > So we have an average an min for idle ratio. Is the minimum value just the 
lowest recorded at a given time frame?
   
   Yes, that's right



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-09 Thread via GitHub


jeffkbkim commented on code in PR #14417:
URL: https://github.com/apache/kafka/pull/14417#discussion_r1350909602


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -1340,4 +1377,119 @@ public void testNonRetryableTimer() throws 
InterruptedException {
 assertEquals(1, cnt.get());
 assertEquals(0, ctx.timer.size());
 }
+
+@Test
+public void testStateChanges() throws Exception {
+MockTimer timer = new MockTimer();
+MockPartitionWriter writer = mock(MockPartitionWriter.class);
+MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class);
+MockCoordinatorShardBuilderSupplier supplier = 
mock(MockCoordinatorShardBuilderSupplier.class);
+MockCoordinatorShardBuilder builder = 
mock(MockCoordinatorShardBuilder.class);
+MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
+GroupCoordinatorRuntimeMetrics runtimeMetrics = 
mock(GroupCoordinatorRuntimeMetrics.class);
+
+CoordinatorRuntime runtime =
+new CoordinatorRuntime.Builder()
+.withTime(timer.time())
+.withTimer(timer)
+.withLoader(loader)
+.withEventProcessor(new DirectEventProcessor())
+.withPartitionWriter(writer)
+.withCoordinatorShardBuilderSupplier(supplier)
+.withCoordinatorRuntimeMetrics(runtimeMetrics)
+.build();
+
+when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+when(builder.withLogContext(any())).thenReturn(builder);
+when(builder.withTime(any())).thenReturn(builder);
+when(builder.withTimer(any())).thenReturn(builder);
+when(builder.withTopicPartition(any())).thenReturn(builder);
+when(builder.build()).thenReturn(coordinator);
+when(supplier.get()).thenReturn(builder);
+CompletableFuture future = new 
CompletableFuture<>();
+when(loader.load(TP, coordinator)).thenReturn(future);
+
+// Schedule the loading.
+runtime.scheduleLoadOperation(TP, 0);
+
+// Getting the context succeeds and the coordinator should be in 
loading.
+CoordinatorRuntime.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+assertEquals(LOADING, ctx.state);
+verify(runtimeMetrics, times(1)).recordPartitionStateChange(INITIAL, 
LOADING);
+
+// When the loading fails, the coordinator transitions to failed.
+future.completeExceptionally(new Exception("failure"));
+assertEquals(FAILED, ctx.state);
+verify(runtimeMetrics, times(1)).recordPartitionStateChange(LOADING, 
FAILED);
+
+// Start loading a new topic partition.
+TopicPartition tp = new TopicPartition("__consumer_offsets", 1);
+future = new CompletableFuture<>();
+when(loader.load(tp, coordinator)).thenReturn(future);
+// Schedule the loading.
+runtime.scheduleLoadOperation(tp, 0);
+// Getting the context succeeds and the coordinator should be in 
loading.
+ctx = runtime.contextOrThrow(tp);
+assertEquals(LOADING, ctx.state);
+verify(runtimeMetrics, times(2)).recordPartitionStateChange(INITIAL, 
LOADING);
+
+// When the loading completes, the coordinator transitions to active.
+future.complete(null);
+assertEquals(ACTIVE, ctx.state);
+verify(runtimeMetrics, times(1)).recordPartitionStateChange(LOADING, 
ACTIVE);
+
+runtime.close();
+verify(runtimeMetrics, times(1)).recordPartitionStateChange(FAILED, 
CLOSED);

Review Comment:
   We actually don't have the closed metric (and initial metric) anymore. This 
was removed in https://github.com/apache/kafka/pull/14417#discussion_r1334548382



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-09 Thread via GitHub


jolshan commented on code in PR #14417:
URL: https://github.com/apache/kafka/pull/14417#discussion_r1350953160


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -1340,4 +1377,119 @@ public void testNonRetryableTimer() throws 
InterruptedException {
 assertEquals(1, cnt.get());
 assertEquals(0, ctx.timer.size());
 }
+
+@Test
+public void testStateChanges() throws Exception {
+MockTimer timer = new MockTimer();
+MockPartitionWriter writer = mock(MockPartitionWriter.class);
+MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class);
+MockCoordinatorShardBuilderSupplier supplier = 
mock(MockCoordinatorShardBuilderSupplier.class);
+MockCoordinatorShardBuilder builder = 
mock(MockCoordinatorShardBuilder.class);
+MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
+GroupCoordinatorRuntimeMetrics runtimeMetrics = 
mock(GroupCoordinatorRuntimeMetrics.class);
+
+CoordinatorRuntime runtime =
+new CoordinatorRuntime.Builder()
+.withTime(timer.time())
+.withTimer(timer)
+.withLoader(loader)
+.withEventProcessor(new DirectEventProcessor())
+.withPartitionWriter(writer)
+.withCoordinatorShardBuilderSupplier(supplier)
+.withCoordinatorRuntimeMetrics(runtimeMetrics)
+.build();
+
+when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+when(builder.withLogContext(any())).thenReturn(builder);
+when(builder.withTime(any())).thenReturn(builder);
+when(builder.withTimer(any())).thenReturn(builder);
+when(builder.withTopicPartition(any())).thenReturn(builder);
+when(builder.build()).thenReturn(coordinator);
+when(supplier.get()).thenReturn(builder);
+CompletableFuture future = new 
CompletableFuture<>();
+when(loader.load(TP, coordinator)).thenReturn(future);
+
+// Schedule the loading.
+runtime.scheduleLoadOperation(TP, 0);
+
+// Getting the context succeeds and the coordinator should be in 
loading.
+CoordinatorRuntime.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+assertEquals(LOADING, ctx.state);
+verify(runtimeMetrics, times(1)).recordPartitionStateChange(INITIAL, 
LOADING);
+
+// When the loading fails, the coordinator transitions to failed.
+future.completeExceptionally(new Exception("failure"));
+assertEquals(FAILED, ctx.state);
+verify(runtimeMetrics, times(1)).recordPartitionStateChange(LOADING, 
FAILED);
+
+// Start loading a new topic partition.
+TopicPartition tp = new TopicPartition("__consumer_offsets", 1);
+future = new CompletableFuture<>();
+when(loader.load(tp, coordinator)).thenReturn(future);
+// Schedule the loading.
+runtime.scheduleLoadOperation(tp, 0);
+// Getting the context succeeds and the coordinator should be in 
loading.
+ctx = runtime.contextOrThrow(tp);
+assertEquals(LOADING, ctx.state);
+verify(runtimeMetrics, times(2)).recordPartitionStateChange(INITIAL, 
LOADING);
+
+// When the loading completes, the coordinator transitions to active.
+future.complete(null);
+assertEquals(ACTIVE, ctx.state);
+verify(runtimeMetrics, times(1)).recordPartitionStateChange(LOADING, 
ACTIVE);
+
+runtime.close();
+verify(runtimeMetrics, times(1)).recordPartitionStateChange(FAILED, 
CLOSED);

Review Comment:
   Oops. I see. Thanks for clarifying. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-10 Thread via GitHub


jeffkbkim commented on PR #14417:
URL: https://github.com/apache/kafka/pull/14417#issuecomment-1755898146

   not sure what this error is from
   ```
   
   > Task :streams:compileJava
   
   
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-14417/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java:246:
 error: incompatible types: inference variable V#1 has incompatible equality 
constraints KeyValueStore,VAgg
   
   return aggregate(initializer, adder, subtractor, 
Materialized.with(keySerde, null));
   
   ^
   
 where V#1,K#1,S,VAgg,K#2,V#2 are type-variables:
   
   V#1 extends Object declared in method 
with(Serde,Serde)
   
   K#1 extends Object declared in method 
with(Serde,Serde)
   
   S extends StateStore declared in method 
with(Serde,Serde)
   
   VAgg extends Object declared in method 
aggregate(Initializer,Aggregator,Aggregator)
   
   K#2 extends Object declared in class KGroupedTableImpl
   
   V#2 extends Object declared in class KGroupedTableImpl
   
   1 error
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-10 Thread via GitHub


jolshan commented on PR #14417:
URL: https://github.com/apache/kafka/pull/14417#issuecomment-1756470166

   I restarted the build. But can we take a look at 
   
`[kafka.server.FetchRequestTest.testLastFetchedEpochValidationV12(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14417/14/testReport/junit/kafka.server/FetchRequestTest/Build___JDK_11_and_Scala_2_13___testLastFetchedEpochValidationV12_String__quorum_kraft/)`
   
   It seems to be failing on all the versions that ran for build 14.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-10 Thread via GitHub


jeffkbkim commented on PR #14417:
URL: https://github.com/apache/kafka/pull/14417#issuecomment-1756480353

   @jolshan this is flaky in trunk as well


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-11 Thread via GitHub


dajac commented on code in PR #14417:
URL: https://github.com/apache/kafka/pull/14417#discussion_r1355162764


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -709,6 +727,16 @@ public void complete(Throwable exception) {
 }
 }
 
+@Override
+public long enqueueTimeMs() {
+return this.enqueueTimeMs;
+}
+
+@Override
+public void setEnqueueTimeMs(long enqueueTimeMs) {
+this.enqueueTimeMs = enqueueTimeMs;
+}

Review Comment:
   Instead of setting it like this, would it be possible to just let every 
event have a created time ms? The event is usually created just before being 
enqueued so it would be equivalent in the end.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-11 Thread via GitHub


dajac commented on code in PR #14417:
URL: https://github.com/apache/kafka/pull/14417#discussion_r1355171994


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -709,6 +727,16 @@ public void complete(Throwable exception) {
 }
 }
 
+@Override
+public long enqueueTimeMs() {
+return this.enqueueTimeMs;
+}
+
+@Override
+public void setEnqueueTimeMs(long enqueueTimeMs) {
+this.enqueueTimeMs = enqueueTimeMs;
+}

Review Comment:
   We can refactor this in follow-up if needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-11 Thread via GitHub


jeffkbkim commented on code in PR #14417:
URL: https://github.com/apache/kafka/pull/14417#discussion_r1355561775


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -709,6 +727,16 @@ public void complete(Throwable exception) {
 }
 }
 
+@Override
+public long enqueueTimeMs() {
+return this.enqueueTimeMs;
+}
+
+@Override
+public void setEnqueueTimeMs(long enqueueTimeMs) {
+this.enqueueTimeMs = enqueueTimeMs;
+}

Review Comment:
   addressed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-11 Thread via GitHub


jolshan commented on PR #14417:
URL: https://github.com/apache/kafka/pull/14417#issuecomment-1758466564

   AK builds have not been happy lately :( 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-11 Thread via GitHub


jolshan commented on PR #14417:
URL: https://github.com/apache/kafka/pull/14417#issuecomment-1758474606

   I also filed: https://issues.apache.org/jira/browse/KAFKA-15589


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-11 Thread via GitHub


jolshan commented on PR #14417:
URL: https://github.com/apache/kafka/pull/14417#issuecomment-1758628495

   Disabled the failing tests as this is a known issue: 
https://github.com/apache/kafka/pull/14525
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-12 Thread via GitHub


dajac commented on code in PR #14417:
URL: https://github.com/apache/kafka/pull/14417#discussion_r1356448238


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java:
##
@@ -46,6 +46,50 @@ public short unknownType() {
 }
 }
 
+/**
+ * Object that is returned as part of the future from load(). Holds the 
partition load time and the
+ * end time.
+ */
+class LoadSummary {
+private final long startTimeMs;
+private final long endTimeMs;
+private final long numRecords;
+private final long numBytes;
+
+public LoadSummary(long startTimeMs, long endTimeMs, long numRecords, 
long numBytes) {
+this.startTimeMs = startTimeMs;
+this.endTimeMs = endTimeMs;
+this.numRecords = numRecords;
+this.numBytes = numBytes;
+}
+
+public long startTimeMs() {
+return startTimeMs;
+}
+
+public long endTimeMs() {
+return endTimeMs;
+}
+
+public long numRecords() {
+return numRecords;
+}
+
+public long numBytes() {
+return numBytes;
+}
+

Review Comment:
   nit: We could remove an empty line here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-12 Thread via GitHub


jolshan commented on PR #14417:
URL: https://github.com/apache/kafka/pull/14417#issuecomment-1760414807

   Talked to folks offline -- seems we may want to avoid Yammer metrics, so we 
will hold off on the histograms for now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-13 Thread via GitHub


jeffkbkim commented on PR #14417:
URL: https://github.com/apache/kafka/pull/14417#issuecomment-1762463861

   @jolshan should we still keep them as sensors or do we want to remove them?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-17 Thread via GitHub


dajac commented on PR #14417:
URL: https://github.com/apache/kafka/pull/14417#issuecomment-1765923649

   I just updated to remove the Yammer metrics and filed 
https://issues.apache.org/jira/browse/KAFKA-15621 to bring equivalent back when 
we have support for them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-17 Thread via GitHub


jeffkbkim commented on PR #14417:
URL: https://github.com/apache/kafka/pull/14417#issuecomment-1765985366

   thanks @dajac 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-17 Thread via GitHub


jolshan merged PR #14417:
URL: https://github.com/apache/kafka/pull/14417


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org