CalvinConfluent commented on code in PR #18277:
URL: https://github.com/apache/kafka/pull/18277#discussion_r1935979962


##########
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##########
@@ -422,7 +425,15 @@ public ControllerResult<BrokerRegistrationReply> 
registerBroker(
             record.setLogDirs(request.logDirs());
         }
 
-        if (!request.incarnationId().equals(prevIncarnationId)) {
+        boolean isUncleanShutdownRegistration;
+        if (uncleanShutdownDetectionEnabled) {
+            isUncleanShutdownRegistration = storedBrokerEpoch != 
request.previousBrokerEpoch();
+        } else {
+            isUncleanShutdownRegistration = 
!request.incarnationId().equals(prevIncarnationId);
+        }
+        boolean isBrokerStartUp = 
!request.incarnationId().equals(prevIncarnationId);
+
+        if (isUncleanShutdownRegistration) {

Review Comment:
   Good question. I think the same principle applies to remove the broker ID 
from ISR or ELR. The controller only removes the broker from ISR and ELR from 
the replication/data safety perspective. If the broker is able to prove its 
data safety(with the broker epoch), the controller should not remove it from 
ISR and ELR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to