dengziming commented on code in PR #12195:
URL: https://github.com/apache/kafka/pull/12195#discussion_r879293533


##########
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##########
@@ -153,7 +153,10 @@ public enum MetadataVersion {
     IBP_3_2_IV0(4, "3.2", "IV0", false),
 
     // Support for metadata.version feature flag (KIP-778)
-    IBP_3_3_IV0(5, "3.3", "IV0", false);
+    IBP_3_3_IV0(5, "3.3", "IV0", false),
+
+    // In KRaft mode, use BrokerRegistrationChangeRecord instead of 
UnfenceBrokerRecord and FenceBrokerRecord.
+    IBP_3_3_IV1(6, "3.3", "IV0", false);

Review Comment:
   It's recommended to add corresponding `MetadataVersionTest` after adding new 
IBP



##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -1087,9 +1111,15 @@ void handleBrokerFenced(int brokerId, 
List<ApiMessageAndVersion> records) {
         }
         generateLeaderAndIsrUpdates("handleBrokerFenced", brokerId, NO_LEADER, 
records,
             brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
-        records.add(new ApiMessageAndVersion(new FenceBrokerRecord().
-            setId(brokerId).setEpoch(brokerRegistration.epoch()),
-            FENCE_BROKER_RECORD.highestSupportedVersion()));
+        if 
(featureControl.metadataVersion().brokerRegistrationChangeRecordSupported()) {
+            records.add(new ApiMessageAndVersion(new 
BrokerRegistrationChangeRecord().
+                    
setBrokerId(brokerId).setBrokerEpoch(brokerRegistration.epoch()).setFenced((byte)
 1),
+                    (short) 0));

Review Comment:
   How about using `BrokerRegistrationChangeRecord.highestSupportedVersion()`  
instead of using fixed value here.



##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -211,6 +218,15 @@ ReplicationControlManager build() {
             }
             if (logContext == null) logContext = new LogContext();
             if (snapshotRegistry == null) snapshotRegistry = 
configurationControl.snapshotRegistry();
+            if (featureControl == null) {
+                featureControl = new FeatureControlManager(logContext,
+                new QuorumFeatures(0, new ApiVersions(),
+                    Collections.singletonMap(MetadataVersion.FEATURE_NAME,

Review Comment:
   nit: use `QuorumFeatures.defaultFeatureMap()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to