AndrewJSchofield commented on code in PR #20124:
URL: https://github.com/apache/kafka/pull/20124#discussion_r2195493687


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -3103,25 +3112,37 @@ private int updatedDeliveryCount(DeliveryCountOps ops) {
             };
         }
 
-        private void archive(String newMemberId) {
+        // Visible for testing.
+        synchronized void archive(String newMemberId) {
+            if (rollbackState != null) {
+                isMarkedArchived = true;
+            }
             state = RecordState.ARCHIVED;
             memberId = newMemberId;
         }
 
-        private InFlightState startStateTransition(RecordState newState, 
DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) {
-            rollbackState = new InFlightState(state, deliveryCount, memberId, 
acquisitionLockTimeoutTask);
-            return tryUpdateState(newState, ops, maxDeliveryCount, 
newMemberId);
+        // Visible for testing
+        synchronized InFlightState startStateTransition(RecordState newState, 
DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) {
+            InFlightState currentState = new InFlightState(state, 
deliveryCount, memberId, acquisitionLockTimeoutTask);
+            InFlightState updatedState = tryUpdateState(newState, ops, 
maxDeliveryCount, newMemberId);
+            if (updatedState != null) {
+                rollbackState = currentState;
+            }
+            return updatedState;
         }
 
-        private void completeStateTransition(boolean commit) {
-            if (commit) {
+        // Visible for testing
+        synchronized void completeStateTransition(boolean commit) {
+            if (commit || isMarkedArchived) {

Review Comment:
   Would we want to run the acquisition lock timeout task in this situation too?



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -3103,25 +3112,37 @@ private int updatedDeliveryCount(DeliveryCountOps ops) {
             };
         }
 
-        private void archive(String newMemberId) {
+        // Visible for testing.
+        synchronized void archive(String newMemberId) {
+            if (rollbackState != null) {
+                isMarkedArchived = true;

Review Comment:
   This does seem odd to me. I think it's the check on the rollback state which 
is peculiar. I wonder whether `archive(String newMemberId, boolean 
isTerminalState)` would be better.



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -3016,6 +3023,9 @@ static final class InFlightState {
         private InFlightState rollbackState;
         // The timer task for the acquisition lock timeout.
         private AcquisitionLockTimerTask acquisitionLockTimeoutTask;
+        // The boolean determines if the record has achieved a final state of 
ARCHIVED from which it cannot transition
+        // to any other state. This could happen because of LSO movement etc.
+        private boolean isMarkedArchived = false;

Review Comment:
   If this is set to `true`, it means that the state of the record is ARCHIVED, 
and that there will be no further state transitions allowed? That seems like a 
terminal state to me, as opposed to "marked archived" which is a bit more 
nebulous in meaning.



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -3103,25 +3112,37 @@ private int updatedDeliveryCount(DeliveryCountOps ops) {
             };
         }
 
-        private void archive(String newMemberId) {
+        // Visible for testing.
+        synchronized void archive(String newMemberId) {
+            if (rollbackState != null) {
+                isMarkedArchived = true;
+            }
             state = RecordState.ARCHIVED;
             memberId = newMemberId;
         }
 
-        private InFlightState startStateTransition(RecordState newState, 
DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) {
-            rollbackState = new InFlightState(state, deliveryCount, memberId, 
acquisitionLockTimeoutTask);
-            return tryUpdateState(newState, ops, maxDeliveryCount, 
newMemberId);
+        // Visible for testing
+        synchronized InFlightState startStateTransition(RecordState newState, 
DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) {
+            InFlightState currentState = new InFlightState(state, 
deliveryCount, memberId, acquisitionLockTimeoutTask);
+            InFlightState updatedState = tryUpdateState(newState, ops, 
maxDeliveryCount, newMemberId);
+            if (updatedState != null) {
+                rollbackState = currentState;
+            }
+            return updatedState;
         }
 
-        private void completeStateTransition(boolean commit) {
-            if (commit) {
+        // Visible for testing
+        synchronized void completeStateTransition(boolean commit) {
+            if (commit || isMarkedArchived) {
                 rollbackState = null;
                 return;
             }
             state = rollbackState.state;
             deliveryCount = rollbackState.deliveryCount;
             memberId = rollbackState.memberId;
             rollbackState = null;
+            if (acquisitionLockTimeoutTask.hasExpired())
+                acquisitionLockTimeoutTask.run();

Review Comment:
   I observe that you are calling the acquisition lock timeout task within the 
synchronized block in the in-flight state. Are you sure that there is no 
opportunity for deadlock introduced by this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to