dengziming commented on code in PR #12195: URL: https://github.com/apache/kafka/pull/12195#discussion_r879293533
########## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ########## @@ -153,7 +153,10 @@ public enum MetadataVersion { IBP_3_2_IV0(4, "3.2", "IV0", false), // Support for metadata.version feature flag (KIP-778) - IBP_3_3_IV0(5, "3.3", "IV0", false); + IBP_3_3_IV0(5, "3.3", "IV0", false), + + // In KRaft mode, use BrokerRegistrationChangeRecord instead of UnfenceBrokerRecord and FenceBrokerRecord. + IBP_3_3_IV1(6, "3.3", "IV0", false); Review Comment: It's recommended to add corresponding `MetadataVersionTest` after adding new IBP ########## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ########## @@ -1087,9 +1111,15 @@ void handleBrokerFenced(int brokerId, List<ApiMessageAndVersion> records) { } generateLeaderAndIsrUpdates("handleBrokerFenced", brokerId, NO_LEADER, records, brokersToIsrs.partitionsWithBrokerInIsr(brokerId)); - records.add(new ApiMessageAndVersion(new FenceBrokerRecord(). - setId(brokerId).setEpoch(brokerRegistration.epoch()), - FENCE_BROKER_RECORD.highestSupportedVersion())); + if (featureControl.metadataVersion().brokerRegistrationChangeRecordSupported()) { + records.add(new ApiMessageAndVersion(new BrokerRegistrationChangeRecord(). + setBrokerId(brokerId).setBrokerEpoch(brokerRegistration.epoch()).setFenced((byte) 1), + (short) 0)); Review Comment: How about using `BrokerRegistrationChangeRecord.highestSupportedVersion()` instead of using fixed value here. ########## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ########## @@ -211,6 +218,15 @@ ReplicationControlManager build() { } if (logContext == null) logContext = new LogContext(); if (snapshotRegistry == null) snapshotRegistry = configurationControl.snapshotRegistry(); + if (featureControl == null) { + featureControl = new FeatureControlManager(logContext, + new QuorumFeatures(0, new ApiVersions(), + Collections.singletonMap(MetadataVersion.FEATURE_NAME, Review Comment: nit: use `QuorumFeatures.defaultFeatureMap()` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org