AndrewJSchofield commented on code in PR #20815:
URL: https://github.com/apache/kafka/pull/20815#discussion_r2486362579


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1856,26 +1867,21 @@ private boolean checkForStartOffsetWithinBatch(long 
batchFirstOffset, long batch
         return batchFirstOffset < localStartOffset && batchLastOffset >= 
localStartOffset;
     }
 
-    private Map<Long, RecordState> fetchRecordStateMapForAcknowledgementBatch(
-        ShareAcknowledgementBatch batch) {
+    // Visibility for test
+    static Map<Long, Byte> fetchAckTypeMapForBatch(ShareAcknowledgementBatch 
batch) {
         // Client can either send a single entry in acknowledgeTypes which 
represents the state
         // of the complete batch or can send individual offsets state. 
Construct a map with record state
         // for each offset in the batch, if single acknowledge type is sent, 
the map will have only one entry.
-        Map<Long, RecordState> recordStateMap = new HashMap<>();
+        Map<Long, Byte> ackTypeMap = new HashMap<>();
         for (int index = 0; index < batch.acknowledgeTypes().size(); index++) {
-            recordStateMap.put(batch.firstOffset() + index,
-                fetchRecordState(batch.acknowledgeTypes().get(index)));
+            byte ackType = batch.acknowledgeTypes().get(index);
+            // Validate

Review Comment:
   So this is going to throw an exception if the ackType is invalid, right?



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -138,6 +139,16 @@ enum SharePartitionState {
         FENCED
     }
 
+    /**
+     * To provide static mapping between acknowledgement type bytes to 
RecordState.
+     */
+    private static final Map<Byte, RecordState> ACK_TYPE_TO_RECORD_STATE = 
Map.of(
+        (byte) 0, RecordState.ARCHIVED,                             // 
Represents gap
+        AcknowledgeType.ACCEPT.id, RecordState.ACKNOWLEDGED,
+        AcknowledgeType.RELEASE.id, RecordState.AVAILABLE,
+        AcknowledgeType.REJECT.id, RecordState.ARCHIVED

Review Comment:
   I wonder whether logically a fifth entry of `AcknowledgeType.RENEW.id, 
RecordState.ACQUIRED` makes sense. wdyt?



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -2131,10 +2154,35 @@ private Optional<Throwable> acknowledgeCompleteBatch(
                     "The batch cannot be acknowledged. The batch is not in the 
acquired state."));
             }
 
+            // If the request is a full-batch RENEW acknowledgement (ack type 
4), then renew the
+            // acquisition lock without changing the state or persisting 
anything.
+            // Before reaching this point, it should be verified that it is 
full batch ack and
+            // not per offset ack as well as startOffset not moved.
+            if (ackType == AcknowledgeType.RENEW.id) {
+                if (inFlightBatch.batchState() != RecordState.ACQUIRED) {

Review Comment:
   This condition is redundant because of the check on line 2150. We know the 
the batch state is ACQUIRED if we have reached this point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to