kamalcph commented on code in PR #21158:
URL: https://github.com/apache/kafka/pull/21158#discussion_r2625557728


##########
storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java:
##########
@@ -1120,6 +1111,16 @@ void resetLagStats() {
             brokerTopicStats.recordRemoteCopyLagSegments(topic, partition, 0);
         }
 
+        private void recordLagStats(UnifiedLog log) {
+            try {
+                long bytesLag = log.onlyLocalLogSegmentsSize() - 
log.activeSegment().size();
+                long segmentsLag = log.onlyLocalLogSegmentsCount() - 1;
+                recordLagStats(bytesLag, segmentsLag);
+            } catch (Exception e) {

Review Comment:
   This change can emit negative values when there are no candidate segments to 
upload and the `highestOffsetInRemoteStorage` is higher than the base-offset of 
the active segment. This can happen after leader change:
   
   We can update this to ensure that non-negative values are not emitted: 
   ```
   long bytesLag = Math.max(0, log.onlyLocalLogSegmentsSize() - 
log.activeSegment().size());
   long segmentsLag = Math.max(0, log.onlyLocalLogSegmentsCount() - 1);
   ```
   
   This issue wasn't there previously as the metric gets reported only when 
there candidate segments are available. Could you cover this PR with unit 
tests? 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to