apoorvmittal10 commented on code in PR #20815:
URL: https://github.com/apache/kafka/pull/20815#discussion_r2490988051


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -2081,31 +2084,50 @@ private Optional<Throwable> 
acknowledgePerOffsetBatchRecords(
                         new InvalidRecordStateException("Member is not the 
owner of offset"));
                 }
 
-                // Determine the record state for the offset. If the per 
offset record state is not provided
-                // by the client, then use the batch record state.
-                RecordState recordState =
-                    recordStateMap.size() > 1 ? 
recordStateMap.get(offsetState.getKey()) :
-                        recordStateDefault;
-                InFlightState updateResult = 
offsetState.getValue().startStateTransition(
-                    recordState,
-                    DeliveryCountOps.NO_OP,
-                    this.maxDeliveryCount,
-                    EMPTY_MEMBER_ID
-                );
-                if (updateResult == null) {
-                    log.debug("Unable to acknowledge records for the offset: 
{} in batch: {}"
-                            + " for the share partition: {}-{}", 
offsetState.getKey(),
-                        inFlightBatch, groupId, topicIdPartition);
-                    return Optional.of(new InvalidRecordStateException(
-                        "Unable to acknowledge records for the batch"));
-                }
-                // Successfully updated the state of the offset and created a 
persister state batch for write to persister.
-                persisterBatches.add(new PersisterBatch(updateResult, new 
PersisterStateBatch(offsetState.getKey(),
-                    offsetState.getKey(), updateResult.state().id(), (short) 
updateResult.deliveryCount())));
-                if (isStateTerminal(updateResult.state())) {
-                    deliveryCompleteCount.incrementAndGet();
+                // In case of 0 size ackTypeMap, we have already validated the 
batch.acknowledgeTypes.
+                byte ackType = ackTypeMap.size() > 1 ? 
ackTypeMap.get(offsetState.getKey()) : batch.acknowledgeTypes().get(0);
+
+                if (ackType == AcknowledgeType.RENEW.id) {
+                    // If RENEW, renew the acquisition lock timer for this 
offset and continue without changing state.
+                    // We do not care about recordState map here.
+                    // Only valid for ACQUIRED offsets; the check above 
ensures this.
+                    long key = offsetState.getKey();
+                    InFlightState state = offsetState.getValue();
+                    log.debug("Renewing acq lock for {}-{} with offset {} in 
batch {} for member {}.",
+                        groupId, topicIdPartition, key, inFlightBatch, 
memberId);
+                    state.cancelAndClearAcquisitionLockTimeoutTask();
+                    AcquisitionLockTimerTask renewalTask = 
scheduleAcquisitionLockTimeout(memberId, key, key);
+                    state.updateAcquisitionLockTimeoutTask(renewalTask);
+                } else {
+                    // Determine the record state for the offset. If the per 
offset record state is not provided
+                    // by the client, then use the batch record state. This 
will always be present as it is a static
+                    // mapping between bytes and record state type. All ack 
types have been added except for RENEW which
+                    // has been handled above.
+                    RecordState recordState = 
ACK_TYPE_TO_RECORD_STATE.get(ackType);
+                    Objects.requireNonNull(recordState);

Review Comment:
   Hmmm, though post validation of ackType we should not get null recordState 
though but as we have check for `Objects.requireNonNull` in code then it can 
throw null pointer exception, is it the correct way to handle the incorrect 
ackType?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to