adixitconfluent commented on code in PR #20815:
URL: https://github.com/apache/kafka/pull/20815#discussion_r2486660841


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -2081,31 +2084,50 @@ private Optional<Throwable> 
acknowledgePerOffsetBatchRecords(
                         new InvalidRecordStateException("Member is not the 
owner of offset"));
                 }
 
-                // Determine the record state for the offset. If the per 
offset record state is not provided
-                // by the client, then use the batch record state.
-                RecordState recordState =
-                    recordStateMap.size() > 1 ? 
recordStateMap.get(offsetState.getKey()) :
-                        recordStateDefault;
-                InFlightState updateResult = 
offsetState.getValue().startStateTransition(
-                    recordState,
-                    DeliveryCountOps.NO_OP,
-                    this.maxDeliveryCount,
-                    EMPTY_MEMBER_ID
-                );
-                if (updateResult == null) {
-                    log.debug("Unable to acknowledge records for the offset: 
{} in batch: {}"
-                            + " for the share partition: {}-{}", 
offsetState.getKey(),
-                        inFlightBatch, groupId, topicIdPartition);
-                    return Optional.of(new InvalidRecordStateException(
-                        "Unable to acknowledge records for the batch"));
-                }
-                // Successfully updated the state of the offset and created a 
persister state batch for write to persister.
-                persisterBatches.add(new PersisterBatch(updateResult, new 
PersisterStateBatch(offsetState.getKey(),
-                    offsetState.getKey(), updateResult.state().id(), (short) 
updateResult.deliveryCount())));
-                if (isStateTerminal(updateResult.state())) {
-                    inFlightTerminalRecords.incrementAndGet();
+                // In case of 0 size ackTypeMap, we have already validated the 
batch.acknowledgeTypes.
+                byte ackType = ackTypeMap.size() > 1 ? 
ackTypeMap.get(offsetState.getKey()) : batch.acknowledgeTypes().get(0);
+
+                if (ackType == AcknowledgeType.RENEW.id) {
+                    // If RENEW, renew the acquisition lock timer for this 
offset and continue without changing state.
+                    // We do not care about recordState map here.
+                    // Only valid for ACQUIRED offsets; the check above 
ensures this.
+                    long key = offsetState.getKey();
+                    InFlightState state = offsetState.getValue();
+                    log.debug("Renewing acq lock for {}-{} with offsets {}-{} 
for member {}.",

Review Comment:
   nit: I think it is better if we log something like `log.debug("Renewing acq 
lock for {}-{} with offset {} for member {}.", groupId, topicIdPartition, key, 
memberId)` and when we are logging it for batch, it should be `
   log.debug("Renewing acq lock for {}-{} with batch {}-{} for member {}.",
                       groupId, topicIdPartition, inFlightBatch.firstOffset(), 
inFlightBatch.lastOffset(), memberId);`
                       
   I think that is more explicit for debugging purpose.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to