dajac commented on code in PR #14417:
URL: https://github.com/apache/kafka/pull/14417#discussion_r1334540685


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java:
##########
@@ -46,6 +46,35 @@ public short unknownType() {
         }
     }
 
+    /**
+     * Object that is returned as part of the future from load(). Holds the 
partition load time and the
+     * end time.
+     */
+    class LoadSummary {
+        private final long loadTimeMs;

Review Comment:
   How about storing startTime and endTime? Then we could compute the delta 
when requested? Could you also add the number of records and bytes read?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -1242,7 +1304,11 @@ public void scheduleLoadOperation(
                         case FAILED:
                         case INITIAL:
                             context.transitionTo(CoordinatorState.LOADING);
-                            loader.load(tp, 
context.coordinator).whenComplete((state, exception) -> {
+                            loader.load(tp, 
context.coordinator).whenComplete((summary, exception) -> {
+                                if (summary != null) {
+                                    log.debug("Finished loading tp={}, 
LoadSummary={}", tp, summary);
+                                    
runtimeMetrics.partitionLoadSensor.record(summary.loadTimeMs(), 
summary.endTimeMs(), false);

Review Comment:
   * We should do this in the `scheduleInternalOperation` below. Could we also 
log the time and the other information in the final log message? That would be 
super useful.
   * Accessing `partitionLoadSensor` is not safe in the current implementation. 
I think that we should rather define an method in the interface to hide it.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorRuntimeMetrics.java:
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.metrics;
+
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.MetricsRegistry;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
+
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Used by the group and transaction coordinator runtimes, the metrics suite 
holds partition state gauges and sensors.
+ */
+public abstract class CoordinatorRuntimeMetrics implements AutoCloseable {

Review Comment:
   I think that we should define an interface. We could have an abstract class 
which implements the interface when we re-implement the transaction coordinator.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorRuntimeMetrics.java:
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.metrics;
+
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.MetricsRegistry;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
+
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Used by the group and transaction coordinator runtimes, the metrics suite 
holds partition state gauges and sensors.
+ */
+public abstract class CoordinatorRuntimeMetrics implements AutoCloseable {
+    /**
+     * Metric to count the number of partitions in Initial state.
+     */
+    protected final MetricName numPartitionsInitial = 
getMetricName("NumPartitionsInitial");
+    /**
+     * Visible for testing.
+     */
+    public static final AtomicLong NUM_PARTITIONS_INITIAL = new AtomicLong(0);
+
+    /**
+     * Metric to count the number of partitions in Loading state.
+     */
+    protected final MetricName numPartitionsLoading = 
getMetricName("NumPartitionsLoading");
+    /**
+     * Visible for testing.
+     */
+    public static final AtomicLong NUM_PARTITIONS_LOADING = new AtomicLong(0);
+
+    /**
+     * Metric to count the number of partitions in Active state.
+     */
+    protected final MetricName numPartitionsActive = 
getMetricName("NumPartitionsActive");
+    /**
+     * Visible for testing.
+     */
+    public static final AtomicLong NUM_PARTITIONS_ACTIVE = new AtomicLong(0);
+
+    /**
+     * Metric to count the number of partitions in Closed state.
+     */
+    protected final MetricName numPartitionsClosed = 
getMetricName("NumPartitionsClosed");
+    /**
+     * Visible for testing.
+     */
+    public static final AtomicLong NUM_PARTITIONS_CLOSED = new AtomicLong(0);
+
+    /**
+     * Metric to count the number of partitions in Failed state.
+     */
+    protected final MetricName numPartitionsFailed = 
getMetricName("NumPartitionsFailed");
+    /**
+     * Visible for testing.
+     */
+    public static final AtomicLong NUM_PARTITIONS_FAILED = new AtomicLong(0);
+
+    protected MetricsRegistry registry;
+    protected Metrics metrics;
+    public Sensor partitionLoadSensor;

Review Comment:
   The handling of this metric is weird... It is defined here and created in 
partitionLoadSensor but still accessed in this class as well. How would someone 
which extends this class know that it has to create this metrics?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -1242,7 +1304,11 @@ public void scheduleLoadOperation(
                         case FAILED:
                         case INITIAL:
                             context.transitionTo(CoordinatorState.LOADING);
-                            loader.load(tp, 
context.coordinator).whenComplete((state, exception) -> {
+                            loader.load(tp, 
context.coordinator).whenComplete((summary, exception) -> {
+                                if (summary != null) {
+                                    log.debug("Finished loading tp={}, 
LoadSummary={}", tp, summary);

Review Comment:
   Yeah, this is not the right place to do this. It should rather be done in 
the event as you said.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorRuntimeMetrics.java:
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.metrics;
+
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.MetricsRegistry;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
+
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Used by the group and transaction coordinator runtimes, the metrics suite 
holds partition state gauges and sensors.
+ */
+public abstract class CoordinatorRuntimeMetrics implements AutoCloseable {
+    /**
+     * Metric to count the number of partitions in Initial state.
+     */
+    protected final MetricName numPartitionsInitial = 
getMetricName("NumPartitionsInitial");
+    /**
+     * Visible for testing.
+     */
+    public static final AtomicLong NUM_PARTITIONS_INITIAL = new AtomicLong(0);
+
+    /**
+     * Metric to count the number of partitions in Loading state.
+     */
+    protected final MetricName numPartitionsLoading = 
getMetricName("NumPartitionsLoading");
+    /**
+     * Visible for testing.
+     */
+    public static final AtomicLong NUM_PARTITIONS_LOADING = new AtomicLong(0);
+
+    /**
+     * Metric to count the number of partitions in Active state.
+     */
+    protected final MetricName numPartitionsActive = 
getMetricName("NumPartitionsActive");
+    /**
+     * Visible for testing.
+     */
+    public static final AtomicLong NUM_PARTITIONS_ACTIVE = new AtomicLong(0);
+
+    /**
+     * Metric to count the number of partitions in Closed state.
+     */
+    protected final MetricName numPartitionsClosed = 
getMetricName("NumPartitionsClosed");
+    /**
+     * Visible for testing.
+     */
+    public static final AtomicLong NUM_PARTITIONS_CLOSED = new AtomicLong(0);
+
+    /**
+     * Metric to count the number of partitions in Failed state.
+     */
+    protected final MetricName numPartitionsFailed = 
getMetricName("NumPartitionsFailed");
+    /**
+     * Visible for testing.
+     */
+    public static final AtomicLong NUM_PARTITIONS_FAILED = new AtomicLong(0);
+
+    protected MetricsRegistry registry;
+    protected Metrics metrics;
+    public Sensor partitionLoadSensor;
+
+    /**
+     * Add metrics to the registries.
+     */
+    protected void initializeMetrics() {
+        registry.newGauge(numPartitionsInitial, new Gauge<Long>() {
+            @Override
+            public Long value() {
+                return NUM_PARTITIONS_INITIAL.get();
+            }
+        });
+
+        registry.newGauge(numPartitionsLoading, new Gauge<Long>() {
+            @Override
+            public Long value() {
+                return NUM_PARTITIONS_LOADING.get();
+            }
+        });
+
+        registry.newGauge(numPartitionsActive, new Gauge<Long>() {
+            @Override
+            public Long value() {
+                return NUM_PARTITIONS_ACTIVE.get();
+            }
+        });
+
+        registry.newGauge(numPartitionsClosed, new Gauge<Long>() {
+            @Override
+            public Long value() {
+                return NUM_PARTITIONS_CLOSED.get();
+            }
+        });
+
+        registry.newGauge(numPartitionsFailed, new Gauge<Long>() {
+            @Override
+            public Long value() {
+                return NUM_PARTITIONS_FAILED.get();
+            }
+        });
+
+        this.partitionLoadSensor.add(
+            metrics.metricName(
+                "partition-load-time-max",
+                metricsGroup(),
+                "The max time it took to load the partitions in the last 30sec"
+            ), new Max());
+        this.partitionLoadSensor.add(
+            metrics.metricName(
+                "partition-load-time-avg",
+                metricsGroup(),
+                "The average time it took to load the partitions in the last 
30sec"
+            ), new Avg());
+    }
+
+    /**
+     * Retrieve the yammer metric name.
+     * @param name The name of the metric.
+     *
+     * @return The metric name.
+     */
+    protected abstract MetricName getMetricName(String name);
+
+    /**
+     * Retrieve the yammer metric name.
+     * @param group The metric group.
+     * @param type The metric type.
+     * @param name The name of the metric.
+     *
+     * @return The metric name.
+     */
+    protected static MetricName getMetricName(String group, String type, 
String name) {
+        return KafkaYammerMetrics.getMetricName(group, type, name);
+    }
+
+    /**
+     * @return The Kafka metrics group name.
+     */
+    protected abstract String metricsGroup();
+
+    @Override
+    public void close() {
+        Arrays.asList(
+            numPartitionsInitial,
+            numPartitionsLoading,
+            numPartitionsActive,
+            numPartitionsClosed,
+            numPartitionsFailed
+        ).forEach(registry::removeMetric);
+
+        metrics.removeSensor(partitionLoadSensor.name());
+    }
+
+    /**
+     * Increment the Initial partitions counter.
+     */
+    public void incrementNumPartitionsInitial() {

Review Comment:
   Did you consider having something like `partitionStateChanged(old, new)`? 
The name is not so good but you get the idea.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorRuntimeMetrics.java:
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.metrics;
+
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.MetricsRegistry;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
+
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Used by the group and transaction coordinator runtimes, the metrics suite 
holds partition state gauges and sensors.
+ */
+public abstract class CoordinatorRuntimeMetrics implements AutoCloseable {
+    /**
+     * Metric to count the number of partitions in Initial state.
+     */
+    protected final MetricName numPartitionsInitial = 
getMetricName("NumPartitionsInitial");
+    /**
+     * Visible for testing.
+     */
+    public static final AtomicLong NUM_PARTITIONS_INITIAL = new AtomicLong(0);
+
+    /**
+     * Metric to count the number of partitions in Loading state.
+     */
+    protected final MetricName numPartitionsLoading = 
getMetricName("NumPartitionsLoading");
+    /**
+     * Visible for testing.
+     */
+    public static final AtomicLong NUM_PARTITIONS_LOADING = new AtomicLong(0);
+
+    /**
+     * Metric to count the number of partitions in Active state.
+     */
+    protected final MetricName numPartitionsActive = 
getMetricName("NumPartitionsActive");
+    /**
+     * Visible for testing.
+     */
+    public static final AtomicLong NUM_PARTITIONS_ACTIVE = new AtomicLong(0);
+
+    /**
+     * Metric to count the number of partitions in Closed state.
+     */
+    protected final MetricName numPartitionsClosed = 
getMetricName("NumPartitionsClosed");

Review Comment:
   I wonder if there is any value is exposing Closed and Initial states. The 
partition never remains in Closed state. We transition to it, unload and remove 
the shard context. Or we should perhaps say Unloading in the metric... What do 
you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to