JimmyWang6 commented on code in PR #20973:
URL: https://github.com/apache/kafka/pull/20973#discussion_r2565760210


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1827,13 +1827,13 @@ private List<AcquiredRecords> createBatches(
                         sharePartitionMetrics);
                     int delayMs = 
recordLockDurationMsOrDefault(groupConfigManager, groupId, 
defaultRecordLockDurationMs);
                     long lastOffset = acquiredRecords.firstOffset() + 
maxFetchRecords - 1;
-                    acquiredRecords.setLastOffset(lastOffset);
                     inFlightBatch.maybeInitializeOffsetStateUpdate(lastOffset, 
delayMs);
                     updateFindNextFetchOffset(true);
 
                     cachedState.put(acquiredRecords.firstOffset(), 
inFlightBatch);
                     sharePartitionMetrics.recordInFlightBatchMessageCount(
                         acquiredRecords.lastOffset() - 
acquiredRecords.firstOffset() + 1);
+                    acquiredRecords.setLastOffset(lastOffset);
                     return List.of(acquiredRecords);

Review Comment:
   As @chirag-wadhwa5 noted in comment 
[#20246](https://github.com/apache/kafka/pull/20246#discussion_r2556752233) 
,`InFlightMessageCount` currently reports only the total number of in-flight 
messages for the share partition and does not indicate how many records within 
that partition are in each state; for example, records in `ACKNOWLEDGED` or 
`ARCHIVED` should not be considered in-flight. Perhaps we could add a Gauge 
metric to track the number of records in each state.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to