adixitconfluent commented on code in PR #20124:
URL: https://github.com/apache/kafka/pull/20124#discussion_r2199764435


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -2054,7 +2054,15 @@ void rollbackOrProcessStateUpdates(
                 // Log in DEBUG to avoid flooding of logs for a faulty client.
                 log.debug("Request failed for updating state, rollback any 
changed state"
                     + " for the share partition: {}-{}", groupId, 
topicIdPartition);
-                updatedStates.forEach(state -> 
state.completeStateTransition(false));
+                updatedStates.forEach(state -> {
+                    state.completeStateTransition(false);
+                    // If state transition fails in write state RPC, we will 
rollback to the original state if record
+                    // hasn't reached a terminal state. If acquisition lock 
has expired by that time, the record can
+                    // be stuck in ACQUIRED state unless we run the 
acquisition lock task again.
+                    if (!state.isTerminalState() && 
state.acquisitionLockTimeoutTask.hasExpired()) {
+                        state.acquisitionLockTimeoutTask.run();

Review Comment:
   there is a call to persister only if `stateBatches` is non-empty. Since the 
state of the record in this situation cannot be `ACQUIRED` (it will be in 
`AVAILABLE/ACKNOWLEDGED/ARCHIVED` state because of an ongoing transition) 
during the first acquisition lock timeout, state batches cannot have an entry 
for this record state. Thus, there won't be any persister calls for this record.
   
   Regarding "This will record the metric of timeout again.", I have a added a 
code change as described below to not record the metric of timeout twice.
   ```
   if (!hasExpired) {
                   
sharePartitionMetrics.recordAcquisitionLockTimeoutPerSec(lastOffset - 
firstOffset + 1);
   }  
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to