jsancio commented on code in PR #12274: URL: https://github.com/apache/kafka/pull/12274#discussion_r942583448
########## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ########## @@ -759,7 +759,7 @@ public void run() throws Exception { int i = 1; for (ApiMessageAndVersion message : result.records()) { try { - replay(message.message(), Optional.empty()); + replay(message.message(), Optional.empty(), writeOffset + i); Review Comment: Let's write a comment explaining why the controller needs to do this. I wonder if this should be `writeOffset + result.records().size()` to make it consistent with the code in `handleCommit`. As you mentioned earlier, this is okay as long as the offset passed is equal or larger than the actual offset. What do you think? ########## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ########## @@ -938,7 +938,7 @@ public void handleSnapshot(SnapshotReader<ApiMessageAndVersion> reader) { int i = 1; for (ApiMessageAndVersion message : messages) { try { - replay(message.message(), Optional.of(reader.snapshotId())); + replay(message.message(), Optional.of(reader.snapshotId()), offset); Review Comment: Hmm. In the snapshot the `batch.lastOffset()` is unrelated to the offset from the log. This is the reason we added `reader.lastContainedLogOffset()`. ########## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ########## @@ -1305,12 +1305,11 @@ private void handleFeatureControlChange() { } } - @SuppressWarnings("unchecked") - private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId) { + private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId, long offset) { Review Comment: Let's document this `offset` argument since it now important and it is used when replaying registration broker records. ########## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ########## @@ -1874,8 +1873,13 @@ public CompletableFuture<BrokerHeartbeatReply> processBrokerHeartbeat( @Override public ControllerResult<BrokerHeartbeatReply> generateRecordsAndResult() { + Long offsetForRegisterBrokerRecord = clusterControl.registerBrokerRecordOffset(brokerId); + if (offsetForRegisterBrokerRecord == null) { + throw new RuntimeException( + String.format("Receive a heartbeat from broker %d before registration", brokerId)); Review Comment: I would argue that this means that the client/broker sent a heartbeat request without sending a register broker request first. Unfortunately, KIP-631 only documents `NOT_CONTROLLER` as a possible error code. Looking at the broker code, the broker retries all errors. I say we return an `INVALID_REQUEST` instead of `UNKNOWN_SERVER_ERROR`. Also, by throwing a `RuntimeException`, this will force the active controller to `resign`. See `QuorumController#handleEventException`. We don't want to do this in this case. For example, this code can instead throw a `InvalidRequestException` and `ControlerApis` will convert that to an `Errors`. ########## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ########## @@ -217,6 +217,14 @@ boolean check() { */ private final TimelineHashMap<Integer, BrokerRegistration> brokerRegistrations; + /** + * Save the offset of each broker registration record, we will only unfence a + * broker when its high watermark has reached its broker registration record, + * this is not necessarily the exact offset of each broker registration record + * but should not be smaller than it. + */ + private final TimelineHashMap<Integer, Long> registerBrokerRecordOffsets; Review Comment: Okay. Did you consider adding this offset to `BrokerRegistration`. `ClusterControlManager` already has a mapping of broker id to `BrokerRegistration` that gets snapshotted in-memory. ########## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ########## @@ -366,7 +375,12 @@ public ControllerResult<BrokerRegistrationReply> registerBroker( return ControllerResult.atomicOf(records, new BrokerRegistrationReply(brokerEpoch)); } - public void replay(RegisterBrokerRecord record) { + public Long registerBrokerRecordOffset(int brokerId) { + return registerBrokerRecordOffsets.get(brokerId); + } Review Comment: Let's return an `OptionalLong` instead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org