kamalcph commented on code in PR #14576:
URL: https://github.com/apache/kafka/pull/14576#discussion_r1364317572


##########
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java:
##########
@@ -302,22 +307,26 @@ public void 
addCopyInProgressSegment(RemoteLogSegmentMetadata remoteLogSegmentMe
 
         RemoteLogSegmentId remoteLogSegmentId = 
remoteLogSegmentMetadata.remoteLogSegmentId();
         RemoteLogSegmentMetadata existingMetadata = 
idToSegmentMetadata.get(remoteLogSegmentId);
-        checkStateTransition(existingMetadata != null ? 
existingMetadata.state() : null,
-                remoteLogSegmentMetadata.state());
-
+        boolean isValid = checkStateTransition(existingMetadata != null ? 
existingMetadata.state() : null,
+                remoteLogSegmentMetadata.state(), 
remoteLogSegmentMetadata.remoteLogSegmentId());
+        if (!isValid) {
+            return;
+        }
         for (Integer epoch : 
remoteLogSegmentMetadata.segmentLeaderEpochs().keySet()) {
             leaderEpochEntries.computeIfAbsent(epoch, leaderEpoch -> new 
RemoteLogLeaderEpochState())
                     
.handleSegmentWithCopySegmentStartedState(remoteLogSegmentId);
         }
-
         idToSegmentMetadata.put(remoteLogSegmentId, remoteLogSegmentMetadata);
     }
 
-    private void checkStateTransition(RemoteLogSegmentState existingState, 
RemoteLogSegmentState targetState) {
-        if (!RemoteLogSegmentState.isValidTransition(existingState, 
targetState)) {
-            throw new IllegalStateException(
-                    "Current state: " + existingState + " can not be 
transitioned to target state: " + targetState);
+    private boolean checkStateTransition(RemoteLogSegmentState existingState,
+                                         RemoteLogSegmentState targetState,
+                                         RemoteLogSegmentId segmentId) {
+        boolean isValid = 
RemoteLogSegmentState.isValidTransition(existingState, targetState);
+        if (!isValid) {
+            log.error("Current state: {} can not be transitioned to target 
state: {}, segmentId: {}. Dropping the event",

Review Comment:
   Logging the error instead of throwing the exception as it will stop the 
internal consumer which consumes from the remote log metadata topic.
   
   To clarify, producer `enable.idempotence` is set to true by default from 
v3.2. In our internal cluster, producer idempotence was not enabled and we have 
seen the out-of-order messages in the internal topic. Once this issue happens, 
the internal consumer stops processing the message, then fails to upload the 
pending segments to remote storage. This issue is not recoverable even after 
broker restarts.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to