apoorvmittal10 commented on code in PR #18174:
URL: https://github.com/apache/kafka/pull/18174#discussion_r1887170307
##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java:
##########
@@ -317,6 +317,10 @@ private CompletableFuture<Void>
performRecordPruning(TopicPartition tp) {
fut.completeExceptionally(exp);
return;
}
+ shareCoordinatorMetrics.recordPrune(
+ off,
Review Comment:
replica manager's `deleteRecords` method respond with
`DeleteRecordsPartitionResult` which contains new `lowWaterMark`. Can there be
a case when `lowWaterMark` from replica manager is different than from offset
requested?
##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/metrics/ShareCoordinatorMetrics.java:
##########
@@ -153,4 +158,31 @@ public void record(String sensorName) {
globalSensors.get(sensorName).record();
}
}
+
+ public void recordPrune(double value, TopicPartition tp) {
+ pruneMetrics.computeIfAbsent(tp, k -> new ShareGroupPruneMetrics(tp))
+ .pruneSensor.record(value);
+ }
+
+ private class ShareGroupPruneMetrics {
+ private final Sensor pruneSensor;
+
+ ShareGroupPruneMetrics(TopicPartition tp) {
+ String sensorNameSuffix = tp.toString();
+ Map<String, String> tags = Map.of(
+ "topic", tp.topic(),
+ "partition", Integer.toString(tp.partition())
+ );
+
+ pruneSensor =
metrics.sensor(SHARE_COORDINATOR_STATE_TOPIC_PRUNE_SENSOR_NAME +
sensorNameSuffix);
+
+ pruneSensor.add(
+ metrics.metricName("last-pruned-offset",
Review Comment:
So essentially the metric is capturing the lowWaterMark for topic partition,
hence what's the relevance of calling it `pruned`? Also what monitoring
importance this metric captures?
##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/metrics/ShareCoordinatorMetrics.java:
##########
@@ -46,6 +48,8 @@ public class ShareCoordinatorMetrics extends
CoordinatorMetrics implements AutoC
public static final String SHARE_COORDINATOR_WRITE_SENSOR_NAME =
"ShareCoordinatorWrite";
public static final String SHARE_COORDINATOR_WRITE_LATENCY_SENSOR_NAME =
"ShareCoordinatorWriteLatency";
+ public static final String SHARE_COORDINATOR_STATE_TOPIC_PRUNE_SENSOR_NAME
= "ShareCoordinatorStateTopicPruneSensorName";
+ private Map<TopicPartition, ShareGroupPruneMetrics> pruneMetrics = new
HashMap<>();
Review Comment:
Should it be thread safe? Can recordPrune be called from multiple threads?
##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/metrics/ShareCoordinatorMetrics.java:
##########
@@ -153,4 +158,31 @@ public void record(String sensorName) {
globalSensors.get(sensorName).record();
}
}
+
+ public void recordPrune(double value, TopicPartition tp) {
+ pruneMetrics.computeIfAbsent(tp, k -> new ShareGroupPruneMetrics(tp))
+ .pruneSensor.record(value);
+ }
+
+ private class ShareGroupPruneMetrics {
+ private final Sensor pruneSensor;
+
+ ShareGroupPruneMetrics(TopicPartition tp) {
+ String sensorNameSuffix = tp.toString();
+ Map<String, String> tags = Map.of(
+ "topic", tp.topic(),
+ "partition", Integer.toString(tp.partition())
+ );
+
+ pruneSensor =
metrics.sensor(SHARE_COORDINATOR_STATE_TOPIC_PRUNE_SENSOR_NAME +
sensorNameSuffix);
+
+ pruneSensor.add(
+ metrics.metricName("last-pruned-offset",
+ METRICS_GROUP,
+ "The offset at which the share-group state topic was last
pruned.",
+ tags),
+ new Value()
+ );
+ }
+ }
Review Comment:
Do we really need this or can rely on `metrics.getSensor(name)` as
internally a map already exists there?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]