apoorvmittal10 commented on code in PR #20124:
URL: https://github.com/apache/kafka/pull/20124#discussion_r2198603871


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -2054,7 +2054,15 @@ void rollbackOrProcessStateUpdates(
                 // Log in DEBUG to avoid flooding of logs for a faulty client.
                 log.debug("Request failed for updating state, rollback any 
changed state"
                     + " for the share partition: {}-{}", groupId, 
topicIdPartition);
-                updatedStates.forEach(state -> 
state.completeStateTransition(false));
+                updatedStates.forEach(state -> {
+                    state.completeStateTransition(false);
+                    // If state transition fails in write state RPC, we will 
rollback to the original state if record
+                    // hasn't reached a terminal state. If acquisition lock 
has expired by that time, the record can
+                    // be stuck in ACQUIRED state unless we run the 
acquisition lock task again.
+                    if (!state.isTerminalState() && 
state.acquisitionLockTimeoutTask.hasExpired()) {
+                        state.acquisitionLockTimeoutTask.run();

Review Comment:
   This will record the metric of timeout again. The previous run of timeout 
task must have already issued a call to persister in background then is it not 
concerning us?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to