apoorvmittal10 commented on code in PR #18959:
URL: https://github.com/apache/kafka/pull/18959#discussion_r1964540273
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -79,16 +84,29 @@ public DelayedShareFetch(
ShareFetch shareFetch,
ReplicaManager replicaManager,
BiConsumer<SharePartitionKey, Throwable> exceptionHandler,
- LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions) {
- this(shareFetch, replicaManager, exceptionHandler, sharePartitions,
PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM));
+ LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions,
+ ShareGroupMetrics shareGroupMetrics,
Review Comment:
Done.
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -417,6 +452,19 @@ private void handleFetchException(
shareFetch.maybeCompleteWithException(topicIdPartitions, throwable);
}
+ /**
+ * The method updates the metric for the time taken to acquire the share
partition locks. Also,
+ * it resets the acquireStartTimeMs to the current time, so that the
metric records the time taken
+ * to acquire the locks for the re-try, if the partitions are re-acquired.
The partitions can be
+ * re-acquired if the fetch request is not completed because of the
minBytes or some other condition.
+ */
+ private void updateAcquireElapsedTimeMetric() {
+
shareGroupMetrics.recordTopicPartitionsAcquireTimeMs(shareFetch.groupId(),
time.hiResClockMs() - acquireStartTimeMs);
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]