jsancio commented on code in PR #12274:
URL: https://github.com/apache/kafka/pull/12274#discussion_r942583448


##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -759,7 +759,7 @@ public void run() throws Exception {
                 int i = 1;
                 for (ApiMessageAndVersion message : result.records()) {
                     try {
-                        replay(message.message(), Optional.empty());
+                        replay(message.message(), Optional.empty(), 
writeOffset + i);

Review Comment:
   Let's write a comment explaining why the controller needs to do this. I 
wonder if this should be `writeOffset + result.records().size()` to make it 
consistent with the code in `handleCommit`. As you mentioned earlier, this is 
okay as long as the offset passed is equal or larger than the actual offset.
   
   What do you think?



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -938,7 +938,7 @@ public void 
handleSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
                         int i = 1;
                         for (ApiMessageAndVersion message : messages) {
                             try {
-                                replay(message.message(), 
Optional.of(reader.snapshotId()));
+                                replay(message.message(), 
Optional.of(reader.snapshotId()), offset);

Review Comment:
   Hmm. In the snapshot the `batch.lastOffset()` is unrelated to the offset 
from the log. This is the reason we added `reader.lastContainedLogOffset()`.



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1305,12 +1305,11 @@ private void handleFeatureControlChange() {
         }
     }
 
-    @SuppressWarnings("unchecked")
-    private void replay(ApiMessage message, Optional<OffsetAndEpoch> 
snapshotId) {
+    private void replay(ApiMessage message, Optional<OffsetAndEpoch> 
snapshotId, long offset) {

Review Comment:
   Let's document this `offset` argument since it now important and it is used 
when replaying registration broker records.



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1874,8 +1873,13 @@ public CompletableFuture<BrokerHeartbeatReply> 
processBrokerHeartbeat(
 
                 @Override
                 public ControllerResult<BrokerHeartbeatReply> 
generateRecordsAndResult() {
+                    Long offsetForRegisterBrokerRecord = 
clusterControl.registerBrokerRecordOffset(brokerId);
+                    if (offsetForRegisterBrokerRecord == null) {
+                        throw new RuntimeException(
+                            String.format("Receive a heartbeat from broker %d 
before registration", brokerId));

Review Comment:
   I would argue that this means that the client/broker sent a heartbeat 
request without sending a register broker request first. Unfortunately, KIP-631 
only documents `NOT_CONTROLLER` as a possible error code. Looking at the broker 
code, the broker retries all errors. I say we return an `INVALID_REQUEST` 
instead of `UNKNOWN_SERVER_ERROR`.
   
   Also, by throwing a `RuntimeException`, this will force the active 
controller to `resign`. See `QuorumController#handleEventException`. We don't 
want to do this in this case. For example, this code can instead throw a 
`InvalidRequestException` and `ControlerApis` will convert that to an `Errors`.



##########
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##########
@@ -217,6 +217,14 @@ boolean check() {
      */
     private final TimelineHashMap<Integer, BrokerRegistration> 
brokerRegistrations;
 
+    /**
+     * Save the offset of each broker registration record, we will only 
unfence a
+     * broker when its high watermark has reached its broker registration 
record,
+     * this is not necessarily the exact offset of each broker registration 
record
+     * but should not be smaller than it.
+     */
+    private final TimelineHashMap<Integer, Long> registerBrokerRecordOffsets;

Review Comment:
   Okay. Did you consider adding this offset to `BrokerRegistration`. 
`ClusterControlManager` already has a mapping of broker id to 
`BrokerRegistration` that gets snapshotted in-memory.



##########
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##########
@@ -366,7 +375,12 @@ public ControllerResult<BrokerRegistrationReply> 
registerBroker(
         return ControllerResult.atomicOf(records, new 
BrokerRegistrationReply(brokerEpoch));
     }
 
-    public void replay(RegisterBrokerRecord record) {
+    public Long registerBrokerRecordOffset(int brokerId) {
+        return registerBrokerRecordOffsets.get(brokerId);
+    }

Review Comment:
   Let's return an `OptionalLong` instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to