dengziming commented on code in PR #12274: URL: https://github.com/apache/kafka/pull/12274#discussion_r943173917
########## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ########## @@ -1305,12 +1305,11 @@ private void handleFeatureControlChange() { } } - @SuppressWarnings("unchecked") - private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId) { + private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId, long offset) { Review Comment: Done ########## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ########## @@ -759,7 +759,7 @@ public void run() throws Exception { int i = 1; for (ApiMessageAndVersion message : result.records()) { try { - replay(message.message(), Optional.empty()); + replay(message.message(), Optional.empty(), writeOffset + i); Review Comment: Thank you for pointing out this. ########## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ########## @@ -217,6 +217,14 @@ boolean check() { */ private final TimelineHashMap<Integer, BrokerRegistration> brokerRegistrations; + /** + * Save the offset of each broker registration record, we will only unfence a + * broker when its high watermark has reached its broker registration record, + * this is not necessarily the exact offset of each broker registration record + * but should not be smaller than it. + */ + private final TimelineHashMap<Integer, Long> registerBrokerRecordOffsets; Review Comment: Currently, all fields in `BrokerRegistration` are hard states which means they will all be persisted in `RegisterBrokerRecord`, so I prefer to leave it in a separate field ########## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ########## @@ -1874,8 +1873,13 @@ public CompletableFuture<BrokerHeartbeatReply> processBrokerHeartbeat( @Override public ControllerResult<BrokerHeartbeatReply> generateRecordsAndResult() { + Long offsetForRegisterBrokerRecord = clusterControl.registerBrokerRecordOffset(brokerId); + if (offsetForRegisterBrokerRecord == null) { + throw new RuntimeException( + String.format("Receive a heartbeat from broker %d before registration", brokerId)); Review Comment: Good catch, I find we have a similar inspection at `ClusterControlManager.checkBrokerEpoch` where we return `StaleBrokerEpochException` if BrokerRegistration is null, I also return a `StaleBrokerEpochException` here to make it consistent. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org