AndrewJSchofield commented on code in PR #20124: URL: https://github.com/apache/kafka/pull/20124#discussion_r2195493687
########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -3103,25 +3112,37 @@ private int updatedDeliveryCount(DeliveryCountOps ops) { }; } - private void archive(String newMemberId) { + // Visible for testing. + synchronized void archive(String newMemberId) { + if (rollbackState != null) { + isMarkedArchived = true; + } state = RecordState.ARCHIVED; memberId = newMemberId; } - private InFlightState startStateTransition(RecordState newState, DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) { - rollbackState = new InFlightState(state, deliveryCount, memberId, acquisitionLockTimeoutTask); - return tryUpdateState(newState, ops, maxDeliveryCount, newMemberId); + // Visible for testing + synchronized InFlightState startStateTransition(RecordState newState, DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) { + InFlightState currentState = new InFlightState(state, deliveryCount, memberId, acquisitionLockTimeoutTask); + InFlightState updatedState = tryUpdateState(newState, ops, maxDeliveryCount, newMemberId); + if (updatedState != null) { + rollbackState = currentState; + } + return updatedState; } - private void completeStateTransition(boolean commit) { - if (commit) { + // Visible for testing + synchronized void completeStateTransition(boolean commit) { + if (commit || isMarkedArchived) { Review Comment: Would we want to run the acquisition lock timeout task in this situation too? ########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -3103,25 +3112,37 @@ private int updatedDeliveryCount(DeliveryCountOps ops) { }; } - private void archive(String newMemberId) { + // Visible for testing. + synchronized void archive(String newMemberId) { + if (rollbackState != null) { + isMarkedArchived = true; Review Comment: This does seem odd to me. I think it's the check on the rollback state which is peculiar. I wonder whether `archive(String newMemberId, boolean isTerminalState)` would be better. ########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -3016,6 +3023,9 @@ static final class InFlightState { private InFlightState rollbackState; // The timer task for the acquisition lock timeout. private AcquisitionLockTimerTask acquisitionLockTimeoutTask; + // The boolean determines if the record has achieved a final state of ARCHIVED from which it cannot transition + // to any other state. This could happen because of LSO movement etc. + private boolean isMarkedArchived = false; Review Comment: If this is set to `true`, it means that the state of the record is ARCHIVED, and that there will be no further state transitions allowed? That seems like a terminal state to me, as opposed to "marked archived" which is a bit more nebulous in meaning. ########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -3103,25 +3112,37 @@ private int updatedDeliveryCount(DeliveryCountOps ops) { }; } - private void archive(String newMemberId) { + // Visible for testing. + synchronized void archive(String newMemberId) { + if (rollbackState != null) { + isMarkedArchived = true; + } state = RecordState.ARCHIVED; memberId = newMemberId; } - private InFlightState startStateTransition(RecordState newState, DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) { - rollbackState = new InFlightState(state, deliveryCount, memberId, acquisitionLockTimeoutTask); - return tryUpdateState(newState, ops, maxDeliveryCount, newMemberId); + // Visible for testing + synchronized InFlightState startStateTransition(RecordState newState, DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) { + InFlightState currentState = new InFlightState(state, deliveryCount, memberId, acquisitionLockTimeoutTask); + InFlightState updatedState = tryUpdateState(newState, ops, maxDeliveryCount, newMemberId); + if (updatedState != null) { + rollbackState = currentState; + } + return updatedState; } - private void completeStateTransition(boolean commit) { - if (commit) { + // Visible for testing + synchronized void completeStateTransition(boolean commit) { + if (commit || isMarkedArchived) { rollbackState = null; return; } state = rollbackState.state; deliveryCount = rollbackState.deliveryCount; memberId = rollbackState.memberId; rollbackState = null; + if (acquisitionLockTimeoutTask.hasExpired()) + acquisitionLockTimeoutTask.run(); Review Comment: I observe that you are calling the acquisition lock timeout task within the synchronized block in the in-flight state. Are you sure that there is no opportunity for deadlock introduced by this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org