adixitconfluent commented on code in PR #20124: URL: https://github.com/apache/kafka/pull/20124#discussion_r2199764435
########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -2054,7 +2054,15 @@ void rollbackOrProcessStateUpdates( // Log in DEBUG to avoid flooding of logs for a faulty client. log.debug("Request failed for updating state, rollback any changed state" + " for the share partition: {}-{}", groupId, topicIdPartition); - updatedStates.forEach(state -> state.completeStateTransition(false)); + updatedStates.forEach(state -> { + state.completeStateTransition(false); + // If state transition fails in write state RPC, we will rollback to the original state if record + // hasn't reached a terminal state. If acquisition lock has expired by that time, the record can + // be stuck in ACQUIRED state unless we run the acquisition lock task again. + if (!state.isTerminalState() && state.acquisitionLockTimeoutTask.hasExpired()) { + state.acquisitionLockTimeoutTask.run(); Review Comment: there is a call to persister only if `stateBatches` is non-empty. Since the state of the record in this situation cannot be `ACQUIRED` (it will be in `AVAILABLE/ACKNOWLEDGED/ARCHIVED` state because of an ongoing transition) during the first acquisition lock timeout, state batches cannot have an entry for this record state. Thus, there won't be any persister calls for this record. Regarding "This will record the metric of timeout again.", I have a added a code change as described below to not record the metric of timeout twice. ``` if (!hasExpired) { sharePartitionMetrics.recordAcquisitionLockTimeoutPerSec(lastOffset - firstOffset + 1); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org