adixitconfluent commented on code in PR #20124: URL: https://github.com/apache/kafka/pull/20124#discussion_r2195598981
########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -3103,25 +3112,37 @@ private int updatedDeliveryCount(DeliveryCountOps ops) { }; } - private void archive(String newMemberId) { + // Visible for testing. + synchronized void archive(String newMemberId) { + if (rollbackState != null) { + isMarkedArchived = true; + } state = RecordState.ARCHIVED; memberId = newMemberId; } - private InFlightState startStateTransition(RecordState newState, DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) { - rollbackState = new InFlightState(state, deliveryCount, memberId, acquisitionLockTimeoutTask); - return tryUpdateState(newState, ops, maxDeliveryCount, newMemberId); + // Visible for testing + synchronized InFlightState startStateTransition(RecordState newState, DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) { + InFlightState currentState = new InFlightState(state, deliveryCount, memberId, acquisitionLockTimeoutTask); + InFlightState updatedState = tryUpdateState(newState, ops, maxDeliveryCount, newMemberId); + if (updatedState != null) { + rollbackState = currentState; + } + return updatedState; } - private void completeStateTransition(boolean commit) { - if (commit) { + // Visible for testing + synchronized void completeStateTransition(boolean commit) { + if (commit || isMarkedArchived) { rollbackState = null; return; } state = rollbackState.state; deliveryCount = rollbackState.deliveryCount; memberId = rollbackState.memberId; rollbackState = null; + if (acquisitionLockTimeoutTask.hasExpired()) + acquisitionLockTimeoutTask.run(); Review Comment: hmm, I think you're right. Lets take a scenario where a thread is able to acquire `lock` when entering the function `acquire` and it is waiting for the `intrinsic lock` of an instance 'x' of `InflightState` in order to perform a check for `hasOngoingStateTransition()`. Now, there is a possibility that there is another thread that hold the `intrinsic lock` of instance 'x' of `InflightState` when it entered `completeStateTransition` and now it wants to acquire `lock` in order to run `acquisitionLockTimeoutTask.run()` This fix needs more thought then. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org