AndrewJSchofield commented on code in PR #20815:
URL: https://github.com/apache/kafka/pull/20815#discussion_r2486362579
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1856,26 +1867,21 @@ private boolean checkForStartOffsetWithinBatch(long
batchFirstOffset, long batch
return batchFirstOffset < localStartOffset && batchLastOffset >=
localStartOffset;
}
- private Map<Long, RecordState> fetchRecordStateMapForAcknowledgementBatch(
- ShareAcknowledgementBatch batch) {
+ // Visibility for test
+ static Map<Long, Byte> fetchAckTypeMapForBatch(ShareAcknowledgementBatch
batch) {
// Client can either send a single entry in acknowledgeTypes which
represents the state
// of the complete batch or can send individual offsets state.
Construct a map with record state
// for each offset in the batch, if single acknowledge type is sent,
the map will have only one entry.
- Map<Long, RecordState> recordStateMap = new HashMap<>();
+ Map<Long, Byte> ackTypeMap = new HashMap<>();
for (int index = 0; index < batch.acknowledgeTypes().size(); index++) {
- recordStateMap.put(batch.firstOffset() + index,
- fetchRecordState(batch.acknowledgeTypes().get(index)));
+ byte ackType = batch.acknowledgeTypes().get(index);
+ // Validate
Review Comment:
So this is going to throw an exception if the ackType is invalid, right?
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -138,6 +139,16 @@ enum SharePartitionState {
FENCED
}
+ /**
+ * To provide static mapping between acknowledgement type bytes to
RecordState.
+ */
+ private static final Map<Byte, RecordState> ACK_TYPE_TO_RECORD_STATE =
Map.of(
+ (byte) 0, RecordState.ARCHIVED, //
Represents gap
+ AcknowledgeType.ACCEPT.id, RecordState.ACKNOWLEDGED,
+ AcknowledgeType.RELEASE.id, RecordState.AVAILABLE,
+ AcknowledgeType.REJECT.id, RecordState.ARCHIVED
Review Comment:
I wonder whether logically a fifth entry of `AcknowledgeType.RENEW.id,
RecordState.ACQUIRED` makes sense. wdyt?
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -2131,10 +2154,35 @@ private Optional<Throwable> acknowledgeCompleteBatch(
"The batch cannot be acknowledged. The batch is not in the
acquired state."));
}
+ // If the request is a full-batch RENEW acknowledgement (ack type
4), then renew the
+ // acquisition lock without changing the state or persisting
anything.
+ // Before reaching this point, it should be verified that it is
full batch ack and
+ // not per offset ack as well as startOffset not moved.
+ if (ackType == AcknowledgeType.RENEW.id) {
+ if (inFlightBatch.batchState() != RecordState.ACQUIRED) {
Review Comment:
This condition is redundant because of the check on line 2150. We know the
the batch state is ACQUIRED if we have reached this point.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]