adixitconfluent commented on code in PR #20815:
URL: https://github.com/apache/kafka/pull/20815#discussion_r2486660841
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -2081,31 +2084,50 @@ private Optional<Throwable>
acknowledgePerOffsetBatchRecords(
new InvalidRecordStateException("Member is not the
owner of offset"));
}
- // Determine the record state for the offset. If the per
offset record state is not provided
- // by the client, then use the batch record state.
- RecordState recordState =
- recordStateMap.size() > 1 ?
recordStateMap.get(offsetState.getKey()) :
- recordStateDefault;
- InFlightState updateResult =
offsetState.getValue().startStateTransition(
- recordState,
- DeliveryCountOps.NO_OP,
- this.maxDeliveryCount,
- EMPTY_MEMBER_ID
- );
- if (updateResult == null) {
- log.debug("Unable to acknowledge records for the offset:
{} in batch: {}"
- + " for the share partition: {}-{}",
offsetState.getKey(),
- inFlightBatch, groupId, topicIdPartition);
- return Optional.of(new InvalidRecordStateException(
- "Unable to acknowledge records for the batch"));
- }
- // Successfully updated the state of the offset and created a
persister state batch for write to persister.
- persisterBatches.add(new PersisterBatch(updateResult, new
PersisterStateBatch(offsetState.getKey(),
- offsetState.getKey(), updateResult.state().id(), (short)
updateResult.deliveryCount())));
- if (isStateTerminal(updateResult.state())) {
- inFlightTerminalRecords.incrementAndGet();
+ // In case of 0 size ackTypeMap, we have already validated the
batch.acknowledgeTypes.
+ byte ackType = ackTypeMap.size() > 1 ?
ackTypeMap.get(offsetState.getKey()) : batch.acknowledgeTypes().get(0);
+
+ if (ackType == AcknowledgeType.RENEW.id) {
+ // If RENEW, renew the acquisition lock timer for this
offset and continue without changing state.
+ // We do not care about recordState map here.
+ // Only valid for ACQUIRED offsets; the check above
ensures this.
+ long key = offsetState.getKey();
+ InFlightState state = offsetState.getValue();
+ log.debug("Renewing acq lock for {}-{} with offsets {}-{}
for member {}.",
Review Comment:
nit: I think it is better if we log something like `log.debug("Renewing acq
lock for {}-{} with offsets {} for member {}.", groupId, topicIdPartition, key,
memberId)` and when we are logging it for batch, it should be `
log.debug("Renewing acq lock for {}-{} with batch {}-{} for member {}.",
groupId, topicIdPartition, inFlightBatch.firstOffset(),
inFlightBatch.lastOffset(), memberId);`
I think that is more explicit for debugging purpose.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]