kamalcph commented on code in PR #14576: URL: https://github.com/apache/kafka/pull/14576#discussion_r1364317572
########## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java: ########## @@ -302,22 +307,26 @@ public void addCopyInProgressSegment(RemoteLogSegmentMetadata remoteLogSegmentMe RemoteLogSegmentId remoteLogSegmentId = remoteLogSegmentMetadata.remoteLogSegmentId(); RemoteLogSegmentMetadata existingMetadata = idToSegmentMetadata.get(remoteLogSegmentId); - checkStateTransition(existingMetadata != null ? existingMetadata.state() : null, - remoteLogSegmentMetadata.state()); - + boolean isValid = checkStateTransition(existingMetadata != null ? existingMetadata.state() : null, + remoteLogSegmentMetadata.state(), remoteLogSegmentMetadata.remoteLogSegmentId()); + if (!isValid) { + return; + } for (Integer epoch : remoteLogSegmentMetadata.segmentLeaderEpochs().keySet()) { leaderEpochEntries.computeIfAbsent(epoch, leaderEpoch -> new RemoteLogLeaderEpochState()) .handleSegmentWithCopySegmentStartedState(remoteLogSegmentId); } - idToSegmentMetadata.put(remoteLogSegmentId, remoteLogSegmentMetadata); } - private void checkStateTransition(RemoteLogSegmentState existingState, RemoteLogSegmentState targetState) { - if (!RemoteLogSegmentState.isValidTransition(existingState, targetState)) { - throw new IllegalStateException( - "Current state: " + existingState + " can not be transitioned to target state: " + targetState); + private boolean checkStateTransition(RemoteLogSegmentState existingState, + RemoteLogSegmentState targetState, + RemoteLogSegmentId segmentId) { + boolean isValid = RemoteLogSegmentState.isValidTransition(existingState, targetState); + if (!isValid) { + log.error("Current state: {} can not be transitioned to target state: {}, segmentId: {}. Dropping the event", Review Comment: Logging the error instead of throwing the exception as it will stop the internal consumer which consumes from the remote log metadata topic. To clarify, producer `enable.idempotence` is set to true by default from v3.2. In our internal cluster, producer idempotence was not enabled and we have seen the out-of-order messages in the internal topic. Once this issue happens, the internal consumer stops processing the message, then fails to upload the pending segments to remote storage. This issue is not recoverable even after broker restarts. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org