apoorvmittal10 commented on code in PR #18696:
URL: https://github.com/apache/kafka/pull/18696#discussion_r1930741313
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -629,15 +657,42 @@ public ShareAcquiredRecords acquire(
if (subMap.isEmpty()) {
log.trace("No cached data exists for the share partition for
requested fetch batch: {}-{}",
groupId, topicIdPartition);
- return acquireNewBatchRecords(memberId,
fetchPartitionData.records.batches(),
+ ShareAcquiredRecords acquiredRecords =
acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(),
firstBatch.baseOffset(), lastBatch.lastOffset(),
batchSize, maxFetchRecords);
+ // Since new records have been acquired, the window tracking
the gap in cachedState might need to be reduced
+ maybeUpdateReadGapFetchOffset(lastBatch.lastOffset() + 1);
Review Comment:
This is incorrect. As the changes I made, you can see
[here](https://github.com/apache/kafka/compare/trunk...apoorvmittal10:kafka:KAFKA-18494),
that change goes inside the `acquireNewBatchRecords` because if
`maxFetchRecords` is configured then not neccessarily all fetch records will be
acquired.
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -2205,6 +2310,40 @@ Timer timer() {
return timer;
}
+ // Visible for testing
+ Optional<InitialReadGapOffset> initialReadGapOffset() {
+ return Optional.ofNullable(initialReadGapOffset);
+ }
Review Comment:
I am not getting point of wrapping it in Optional for tests? What benefit do
you get?
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -444,6 +450,8 @@ public CompletableFuture<Void> maybeInitialize() {
stateEpoch = partitionData.stateEpoch();
List<PersisterStateBatch> stateBatches =
partitionData.stateBatches();
+ boolean isGapPresentInStateBatches = false;
+ long previousOffset = startOffset;
Review Comment:
I think the variable name `previousOffset` is confusing here. Probably you
can name as `previousBatchLastOffset` and track that. Change if condition to
`stateBatch.firstOffset() > previousBatchLastOffset + 1`. wdyt?
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -629,15 +657,42 @@ public ShareAcquiredRecords acquire(
if (subMap.isEmpty()) {
log.trace("No cached data exists for the share partition for
requested fetch batch: {}-{}",
groupId, topicIdPartition);
- return acquireNewBatchRecords(memberId,
fetchPartitionData.records.batches(),
+ ShareAcquiredRecords acquiredRecords =
acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(),
firstBatch.baseOffset(), lastBatch.lastOffset(),
batchSize, maxFetchRecords);
+ // Since new records have been acquired, the window tracking
the gap in cachedState might need to be reduced
+ maybeUpdateReadGapFetchOffset(lastBatch.lastOffset() + 1);
Review Comment:
Also you should write a test case for this as well.
cc: @adixitconfluent
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]