adixitconfluent commented on code in PR #20124:
URL: https://github.com/apache/kafka/pull/20124#discussion_r2195598981


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -3103,25 +3112,37 @@ private int updatedDeliveryCount(DeliveryCountOps ops) {
             };
         }
 
-        private void archive(String newMemberId) {
+        // Visible for testing.
+        synchronized void archive(String newMemberId) {
+            if (rollbackState != null) {
+                isMarkedArchived = true;
+            }
             state = RecordState.ARCHIVED;
             memberId = newMemberId;
         }
 
-        private InFlightState startStateTransition(RecordState newState, 
DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) {
-            rollbackState = new InFlightState(state, deliveryCount, memberId, 
acquisitionLockTimeoutTask);
-            return tryUpdateState(newState, ops, maxDeliveryCount, 
newMemberId);
+        // Visible for testing
+        synchronized InFlightState startStateTransition(RecordState newState, 
DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) {
+            InFlightState currentState = new InFlightState(state, 
deliveryCount, memberId, acquisitionLockTimeoutTask);
+            InFlightState updatedState = tryUpdateState(newState, ops, 
maxDeliveryCount, newMemberId);
+            if (updatedState != null) {
+                rollbackState = currentState;
+            }
+            return updatedState;
         }
 
-        private void completeStateTransition(boolean commit) {
-            if (commit) {
+        // Visible for testing
+        synchronized void completeStateTransition(boolean commit) {
+            if (commit || isMarkedArchived) {
                 rollbackState = null;
                 return;
             }
             state = rollbackState.state;
             deliveryCount = rollbackState.deliveryCount;
             memberId = rollbackState.memberId;
             rollbackState = null;
+            if (acquisitionLockTimeoutTask.hasExpired())
+                acquisitionLockTimeoutTask.run();

Review Comment:
   hmm, I think you're right. Lets take a scenario where a thread is able to 
acquire `lock` when entering the function `acquire` and it is waiting for the 
`intrinsic lock` of an instance 'x' of `InflightState` in order to perform  a 
check for `hasOngoingStateTransition()`. Now, there is a possibility that there 
is another thread that hold the `intrinsic lock` of instance 'x' of 
`InflightState` when it entered `completeStateTransition` and now it wants to 
acquire `lock` in order to run `acquisitionLockTimeoutTask.run()`
   This fix needs more thought then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to