apoorvmittal10 commented on code in PR #18696:
URL: https://github.com/apache/kafka/pull/18696#discussion_r1936961623
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1791,10 +1875,24 @@ be removed once all the messages (0-99) are
acknowledged (ACCEPT or REJECT).
long firstKeyToRemove = cachedState.firstKey();
long lastKeyToRemove;
NavigableMap.Entry<Long, InFlightBatch> entry =
cachedState.floorEntry(lastOffsetAcknowledged);
+ // If the lastOffsetAcknowledged is equal to the last offset of
entry, then the entire batch can potentially be removed.
if (lastOffsetAcknowledged == entry.getValue().lastOffset()) {
startOffset = cachedState.higherKey(lastOffsetAcknowledged);
+ if (isInitialReadGapOffsetWindowActive()) {
+ // This case will arise if we have a situation where there
is an acquirable gap after the lastOffsetAcknowledged.
+ // Ex, the cachedState has following state batches -> {(0,
10), (11, 20), (31,40)} and all these batches are acked.
+ // In this case, lastOffsetAcknowledged will be 20, but we
cannot simply move the start offset to the first offset
+ // of next cachedState batch. The startOffset should be at
21, because we have an acquirable gap there.
+ startOffset =
Math.min(initialReadGapOffset.gapStartOffset(), startOffset);
+ } else {
+ // If initialReadGapOffset is null, that means the
cachedState does not have any acquirable gaps.
+ // We can simply move the start offset to the first offset
of the next cachedState batch.
+ startOffset =
cachedState.higherKey(lastOffsetAcknowledged);
+ }
lastKeyToRemove = entry.getKey();
} else {
+ // The code will reach this point only if
lastOffsetAcknowledged is in the middle of a stateBatch. In this case
+ // we can simply move the startOffset to the next offset of
lastOffsetAcknowledged.
Review Comment:
```suggestion
// we can simply move the startOffset to the next offset of
lastOffsetAcknowledged and should consider any read gap offsets.
```
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1791,10 +1875,24 @@ be removed once all the messages (0-99) are
acknowledged (ACCEPT or REJECT).
long firstKeyToRemove = cachedState.firstKey();
long lastKeyToRemove;
NavigableMap.Entry<Long, InFlightBatch> entry =
cachedState.floorEntry(lastOffsetAcknowledged);
+ // If the lastOffsetAcknowledged is equal to the last offset of
entry, then the entire batch can potentially be removed.
if (lastOffsetAcknowledged == entry.getValue().lastOffset()) {
startOffset = cachedState.higherKey(lastOffsetAcknowledged);
+ if (isInitialReadGapOffsetWindowActive()) {
+ // This case will arise if we have a situation where there
is an acquirable gap after the lastOffsetAcknowledged.
+ // Ex, the cachedState has following state batches -> {(0,
10), (11, 20), (31,40)} and all these batches are acked.
+ // In this case, lastOffsetAcknowledged will be 20, but we
cannot simply move the start offset to the first offset
+ // of next cachedState batch. The startOffset should be at
21, because we have an acquirable gap there.
+ startOffset =
Math.min(initialReadGapOffset.gapStartOffset(), startOffset);
+ } else {
+ // If initialReadGapOffset is null, that means the
cachedState does not have any acquirable gaps.
+ // We can simply move the start offset to the first offset
of the next cachedState batch.
+ startOffset =
cachedState.higherKey(lastOffsetAcknowledged);
+ }
lastKeyToRemove = entry.getKey();
} else {
+ // The code will reach this point only if
lastOffsetAcknowledged is in the middle of a stateBatch. In this case
Review Comment:
```suggestion
// The code will reach this point only if
lastOffsetAcknowledged is in the middle of some stateBatch. In this case
```
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1791,10 +1875,24 @@ be removed once all the messages (0-99) are
acknowledged (ACCEPT or REJECT).
long firstKeyToRemove = cachedState.firstKey();
long lastKeyToRemove;
NavigableMap.Entry<Long, InFlightBatch> entry =
cachedState.floorEntry(lastOffsetAcknowledged);
+ // If the lastOffsetAcknowledged is equal to the last offset of
entry, then the entire batch can potentially be removed.
if (lastOffsetAcknowledged == entry.getValue().lastOffset()) {
startOffset = cachedState.higherKey(lastOffsetAcknowledged);
+ if (isInitialReadGapOffsetWindowActive()) {
+ // This case will arise if we have a situation where there
is an acquirable gap after the lastOffsetAcknowledged.
+ // Ex, the cachedState has following state batches -> {(0,
10), (11, 20), (31,40)} and all these batches are acked.
+ // In this case, lastOffsetAcknowledged will be 20, but we
cannot simply move the start offset to the first offset
+ // of next cachedState batch. The startOffset should be at
21, because we have an acquirable gap there.
+ startOffset =
Math.min(initialReadGapOffset.gapStartOffset(), startOffset);
Review Comment:
Please write the example more clearly with gapStartOffset.
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1791,10 +1875,24 @@ be removed once all the messages (0-99) are
acknowledged (ACCEPT or REJECT).
long firstKeyToRemove = cachedState.firstKey();
long lastKeyToRemove;
NavigableMap.Entry<Long, InFlightBatch> entry =
cachedState.floorEntry(lastOffsetAcknowledged);
+ // If the lastOffsetAcknowledged is equal to the last offset of
entry, then the entire batch can potentially be removed.
if (lastOffsetAcknowledged == entry.getValue().lastOffset()) {
startOffset = cachedState.higherKey(lastOffsetAcknowledged);
+ if (isInitialReadGapOffsetWindowActive()) {
+ // This case will arise if we have a situation where there
is an acquirable gap after the lastOffsetAcknowledged.
+ // Ex, the cachedState has following state batches -> {(0,
10), (11, 20), (31,40)} and all these batches are acked.
+ // In this case, lastOffsetAcknowledged will be 20, but we
cannot simply move the start offset to the first offset
+ // of next cachedState batch. The startOffset should be at
21, because we have an acquirable gap there.
+ startOffset =
Math.min(initialReadGapOffset.gapStartOffset(), startOffset);
+ } else {
+ // If initialReadGapOffset is null, that means the
cachedState does not have any acquirable gaps.
+ // We can simply move the start offset to the first offset
of the next cachedState batch.
+ startOffset =
cachedState.higherKey(lastOffsetAcknowledged);
+ }
Review Comment:
Why do you need else block?
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1847,19 +1948,30 @@ private boolean isRecordStateAcknowledged(RecordState
recordState) {
return recordState == RecordState.ACKNOWLEDGED || recordState ==
RecordState.ARCHIVED;
}
- private long findLastOffsetAcknowledged() {
- lock.readLock().lock();
+ // Visible for testing
+ long findLastOffsetAcknowledged() {
long lastOffsetAcknowledged = -1;
+ lock.readLock().lock();
try {
for (NavigableMap.Entry<Long, InFlightBatch> entry :
cachedState.entrySet()) {
InFlightBatch inFlightBatch = entry.getValue();
if (inFlightBatch.offsetState() == null) {
if
(!isRecordStateAcknowledged(inFlightBatch.batchState())) {
return lastOffsetAcknowledged;
}
+ // If initialReadGapOffset.gapStartOffset is less than or
equal to the last offset of the batch
+ // then we cannot identify the current inFlightBatch as
acknowledged. All the offsets between
+ // initialReadGapOffset.gapStartOffset and
initialReadGapOffset.endOffset should always be present
+ // in the cachedState
+ if (isInitialReadGapOffsetWindowActive() &&
inFlightBatch.lastOffset() >= initialReadGapOffset.gapStartOffset()) {
+ return lastOffsetAcknowledged;
+ }
lastOffsetAcknowledged = inFlightBatch.lastOffset();
} else {
for (Map.Entry<Long, InFlightState> offsetState :
inFlightBatch.offsetState.entrySet()) {
+ if (isInitialReadGapOffsetWindowActive() &&
offsetState.getKey() >= initialReadGapOffset.gapStartOffset()) {
+ return lastOffsetAcknowledged;
+ }
Review Comment:
As explained please correct the checks.
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1825,8 +1921,8 @@ private boolean canMoveStartOffset() {
NavigableMap.Entry<Long, InFlightBatch> entry =
cachedState.floorEntry(startOffset);
if (entry == null) {
- log.error("The start offset: {} is not found in the cached state
for share partition: {}-{}."
- + " Cannot move the start offset.", startOffset, groupId,
topicIdPartition);
+ log.info("The start offset: {} is not found in the cached state
for share partition: {}-{} " +
+ "as there is an acquirable gap at the beginning. Cannot move
the start offset.", startOffset, groupId, topicIdPartition);
return false;
Review Comment:
Seems missed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]