apoorvmittal10 commented on code in PR #20815:
URL: https://github.com/apache/kafka/pull/20815#discussion_r2490988051
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -2081,31 +2084,50 @@ private Optional<Throwable>
acknowledgePerOffsetBatchRecords(
new InvalidRecordStateException("Member is not the
owner of offset"));
}
- // Determine the record state for the offset. If the per
offset record state is not provided
- // by the client, then use the batch record state.
- RecordState recordState =
- recordStateMap.size() > 1 ?
recordStateMap.get(offsetState.getKey()) :
- recordStateDefault;
- InFlightState updateResult =
offsetState.getValue().startStateTransition(
- recordState,
- DeliveryCountOps.NO_OP,
- this.maxDeliveryCount,
- EMPTY_MEMBER_ID
- );
- if (updateResult == null) {
- log.debug("Unable to acknowledge records for the offset:
{} in batch: {}"
- + " for the share partition: {}-{}",
offsetState.getKey(),
- inFlightBatch, groupId, topicIdPartition);
- return Optional.of(new InvalidRecordStateException(
- "Unable to acknowledge records for the batch"));
- }
- // Successfully updated the state of the offset and created a
persister state batch for write to persister.
- persisterBatches.add(new PersisterBatch(updateResult, new
PersisterStateBatch(offsetState.getKey(),
- offsetState.getKey(), updateResult.state().id(), (short)
updateResult.deliveryCount())));
- if (isStateTerminal(updateResult.state())) {
- deliveryCompleteCount.incrementAndGet();
+ // In case of 0 size ackTypeMap, we have already validated the
batch.acknowledgeTypes.
+ byte ackType = ackTypeMap.size() > 1 ?
ackTypeMap.get(offsetState.getKey()) : batch.acknowledgeTypes().get(0);
+
+ if (ackType == AcknowledgeType.RENEW.id) {
+ // If RENEW, renew the acquisition lock timer for this
offset and continue without changing state.
+ // We do not care about recordState map here.
+ // Only valid for ACQUIRED offsets; the check above
ensures this.
+ long key = offsetState.getKey();
+ InFlightState state = offsetState.getValue();
+ log.debug("Renewing acq lock for {}-{} with offset {} in
batch {} for member {}.",
+ groupId, topicIdPartition, key, inFlightBatch,
memberId);
+ state.cancelAndClearAcquisitionLockTimeoutTask();
+ AcquisitionLockTimerTask renewalTask =
scheduleAcquisitionLockTimeout(memberId, key, key);
+ state.updateAcquisitionLockTimeoutTask(renewalTask);
+ } else {
+ // Determine the record state for the offset. If the per
offset record state is not provided
+ // by the client, then use the batch record state. This
will always be present as it is a static
+ // mapping between bytes and record state type. All ack
types have been added except for RENEW which
+ // has been handled above.
+ RecordState recordState =
ACK_TYPE_TO_RECORD_STATE.get(ackType);
+ Objects.requireNonNull(recordState);
Review Comment:
Hmmm, though post validation of ackType we should not get null recordState
though but as we have check for `Objects.requireNonNull` in code then it can
throw null pointer exception, is it the correct way to handle the incorrect
ackType?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]