adixitconfluent commented on code in PR #20124:
URL: https://github.com/apache/kafka/pull/20124#discussion_r2195635537


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -3103,25 +3112,37 @@ private int updatedDeliveryCount(DeliveryCountOps ops) {
             };
         }
 
-        private void archive(String newMemberId) {
+        // Visible for testing.
+        synchronized void archive(String newMemberId) {
+            if (rollbackState != null) {
+                isMarkedArchived = true;
+            }
             state = RecordState.ARCHIVED;
             memberId = newMemberId;
         }
 
-        private InFlightState startStateTransition(RecordState newState, 
DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) {
-            rollbackState = new InFlightState(state, deliveryCount, memberId, 
acquisitionLockTimeoutTask);
-            return tryUpdateState(newState, ops, maxDeliveryCount, 
newMemberId);
+        // Visible for testing
+        synchronized InFlightState startStateTransition(RecordState newState, 
DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) {
+            InFlightState currentState = new InFlightState(state, 
deliveryCount, memberId, acquisitionLockTimeoutTask);
+            InFlightState updatedState = tryUpdateState(newState, ops, 
maxDeliveryCount, newMemberId);
+            if (updatedState != null) {
+                rollbackState = currentState;
+            }
+            return updatedState;
         }
 
-        private void completeStateTransition(boolean commit) {
-            if (commit) {
+        // Visible for testing
+        synchronized void completeStateTransition(boolean commit) {
+            if (commit || isMarkedArchived) {
                 rollbackState = null;
                 return;
             }
             state = rollbackState.state;
             deliveryCount = rollbackState.deliveryCount;
             memberId = rollbackState.memberId;
             rollbackState = null;
+            if (acquisitionLockTimeoutTask.hasExpired())
+                acquisitionLockTimeoutTask.run();

Review Comment:
   One solution I can think of is that instead of adding this code here , we 
should do it at the places where we call 
`inflightsState.completeStateTransition(false)`. That will be ideal, I think



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to