apoorvmittal10 commented on code in PR #20124: URL: https://github.com/apache/kafka/pull/20124#discussion_r2198603871
########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -2054,7 +2054,15 @@ void rollbackOrProcessStateUpdates( // Log in DEBUG to avoid flooding of logs for a faulty client. log.debug("Request failed for updating state, rollback any changed state" + " for the share partition: {}-{}", groupId, topicIdPartition); - updatedStates.forEach(state -> state.completeStateTransition(false)); + updatedStates.forEach(state -> { + state.completeStateTransition(false); + // If state transition fails in write state RPC, we will rollback to the original state if record + // hasn't reached a terminal state. If acquisition lock has expired by that time, the record can + // be stuck in ACQUIRED state unless we run the acquisition lock task again. + if (!state.isTerminalState() && state.acquisitionLockTimeoutTask.hasExpired()) { + state.acquisitionLockTimeoutTask.run(); Review Comment: This will record the metric of timeout again. The previous run of timeout task must have already issued a call to persister in background then is it not concerning us? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org