adixitconfluent commented on code in PR #17539:
URL: https://github.com/apache/kafka/pull/17539#discussion_r1818159160
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1525,6 +1537,24 @@ private Optional<Throwable> acknowledgeCompleteBatch(
return Optional.empty();
}
+ protected void updateLatestFetchOffsetMetadata(LogOffsetMetadata
fetchOffsetMetadata) {
+ lock.writeLock().lock();
+ try {
+ latestFetchOffsetMetadata = fetchOffsetMetadata;
Review Comment:
Hi @junrao , agreed with the simpler approach. I have made the following
changes in my latest commit-
> In the current approach, (1) if any call moves endOffset, we reset the
latestFetchOffsetMetadata to Optional.empty()
I reset the `latestFetchOffsetMetadata` to Optional.empty() if -
1. `acquire()` results in non-empty acquired records in `ShareFetchUtils`.
2. acquisition lock timeout is called.
3. release acquired records on session close is called.
I haven't made the change in `acknowledge()` method of `SharePartition`,
since in the common case all the `ACQUIRED` records will moved to
`ACKNOWLEDGED` state and endOffset doesn't change then.
> In tryComplete, if latestFetchOffsetMetadata is empty, we call readFromLog
and update latestFetchOffsetMetadata
This functionality has been added in previous commits.
Please review my PR whenever you can. Thanks!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]