dengziming commented on code in PR #12274:
URL: https://github.com/apache/kafka/pull/12274#discussion_r943173917


##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1305,12 +1305,11 @@ private void handleFeatureControlChange() {
         }
     }
 
-    @SuppressWarnings("unchecked")
-    private void replay(ApiMessage message, Optional<OffsetAndEpoch> 
snapshotId) {
+    private void replay(ApiMessage message, Optional<OffsetAndEpoch> 
snapshotId, long offset) {

Review Comment:
   Done



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -759,7 +759,7 @@ public void run() throws Exception {
                 int i = 1;
                 for (ApiMessageAndVersion message : result.records()) {
                     try {
-                        replay(message.message(), Optional.empty());
+                        replay(message.message(), Optional.empty(), 
writeOffset + i);

Review Comment:
   Thank you for pointing out this.



##########
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##########
@@ -217,6 +217,14 @@ boolean check() {
      */
     private final TimelineHashMap<Integer, BrokerRegistration> 
brokerRegistrations;
 
+    /**
+     * Save the offset of each broker registration record, we will only 
unfence a
+     * broker when its high watermark has reached its broker registration 
record,
+     * this is not necessarily the exact offset of each broker registration 
record
+     * but should not be smaller than it.
+     */
+    private final TimelineHashMap<Integer, Long> registerBrokerRecordOffsets;

Review Comment:
   Currently, all fields in `BrokerRegistration` are hard states which means 
they will all be persisted in `RegisterBrokerRecord`, so I prefer to leave it 
in a separate field



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1874,8 +1873,13 @@ public CompletableFuture<BrokerHeartbeatReply> 
processBrokerHeartbeat(
 
                 @Override
                 public ControllerResult<BrokerHeartbeatReply> 
generateRecordsAndResult() {
+                    Long offsetForRegisterBrokerRecord = 
clusterControl.registerBrokerRecordOffset(brokerId);
+                    if (offsetForRegisterBrokerRecord == null) {
+                        throw new RuntimeException(
+                            String.format("Receive a heartbeat from broker %d 
before registration", brokerId));

Review Comment:
   Good catch, I find we have a similar inspection at 
`ClusterControlManager.checkBrokerEpoch` where we return 
`StaleBrokerEpochException` if BrokerRegistration is null, I also return a 
`StaleBrokerEpochException` here to make it consistent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to