chia7712 commented on code in PR #18845:
URL: https://github.com/apache/kafka/pull/18845#discussion_r1949874774
##########
metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java:
##########
@@ -328,15 +314,10 @@ private ApiError updateMetadataVersion(
try {
newVersion = MetadataVersion.fromFeatureLevel(newVersionLevel);
} catch (IllegalArgumentException e) {
- return invalidMetadataVersion(newVersionLevel, "Unknown
metadata.version.");
+ return invalidMetadataVersion(newVersionLevel, "Valid versions are
from "
Review Comment:
Should we enhance the error message for `fromFeatureLevel` as well?
##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1652,9 +1652,7 @@ private void registerWriteNoOpRecord(long
maxIdleIntervalNs) {
periodicControl.registerTask(new PeriodicTask("writeNoOpRecord",
() -> {
ArrayList<ApiMessageAndVersion> records = new ArrayList<>(1);
- if (featureControl.metadataVersion().isNoOpRecordSupported()) {
- records.add(new ApiMessageAndVersion(new NoOpRecord(),
(short) 0));
- }
+ records.add(new ApiMessageAndVersion(new NoOpRecord(), (short)
0));
Review Comment:
Maybe we can streamline the code?
```
() -> ControllerResult.of(List.of(new ApiMessageAndVersion(new NoOpRecord(),
(short) 0)), false),
```
##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -1424,15 +1406,10 @@ void handleBrokerUnregistered(int brokerId, long
brokerEpoch,
* @param records The record list to append to.
*/
void handleBrokerUnfenced(int brokerId, long brokerEpoch,
List<ApiMessageAndVersion> records) {
- if
(featureControl.metadataVersion().isBrokerRegistrationChangeRecordSupported()) {
- records.add(new ApiMessageAndVersion(new
BrokerRegistrationChangeRecord().
- setBrokerId(brokerId).setBrokerEpoch(brokerEpoch).
- setFenced(BrokerRegistrationFencingChange.UNFENCE.value()),
- (short) 0));
- } else {
- records.add(new ApiMessageAndVersion(new
UnfenceBrokerRecord().setId(brokerId).
- setEpoch(brokerEpoch), (short) 0));
- }
+ records.add(new ApiMessageAndVersion(new
BrokerRegistrationChangeRecord().
Review Comment:
ditto
##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -1379,16 +1367,10 @@ void handleBrokerFenced(int brokerId,
List<ApiMessageAndVersion> records) {
}
generateLeaderAndIsrUpdates("handleBrokerFenced", brokerId, NO_LEADER,
NO_LEADER, records,
brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
- if
(featureControl.metadataVersion().isBrokerRegistrationChangeRecordSupported()) {
- records.add(new ApiMessageAndVersion(new
BrokerRegistrationChangeRecord().
-
setBrokerId(brokerId).setBrokerEpoch(brokerRegistration.epoch()).
- setFenced(BrokerRegistrationFencingChange.FENCE.value()),
- (short) 0));
- } else {
- records.add(new ApiMessageAndVersion(new FenceBrokerRecord().
- setId(brokerId).setEpoch(brokerRegistration.epoch()),
- (short) 0));
- }
+ records.add(new ApiMessageAndVersion(new
BrokerRegistrationChangeRecord().
Review Comment:
Could you please update comment of `handleBrokerFenced` according to this
change?
##########
metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java:
##########
@@ -126,7 +125,7 @@ public void testFeatureLevelForFeature() {
static final List<ApiMessageAndVersion> RECORDS_WITH_OLD_METADATA_VERSION
= Collections.singletonList(
new ApiMessageAndVersion(new FeatureLevelRecord().
setName(FEATURE_NAME).
- setFeatureLevel(IBP_3_0_IV1.featureLevel()), (short) 0));
+ setFeatureLevel((short) 1), (short) 0));
Review Comment:
This feature version is not supported any more, so we should either remove
or modify the test.
##########
tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java:
##########
@@ -128,18 +126,6 @@ public void testReassignment() throws Exception {
executeAndVerifyReassignment();
}
- @ClusterTests({
- @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, metadataVersion =
IBP_3_3_IV0)
- })
- public void testReassignmentWithAlterPartitionDisabled() throws Exception {
- // Test reassignment when the IBP is on an older version which does
not use
- // the `AlterPartition` API. In this case, the controller will
register individual
- // watches for each reassigning partition so that the reassignment can
be
- // completed as soon as the ISR is expanded.
Review Comment:
It seems to me the behavior is related to zk only, and in kraft mode
`AlterPartition` should be supported totally. Hence, it is fine to remove the
test.
BTW, the test case for kraft is unnecessary - I overlook the scenario in
merging #15675
##########
metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectory.java:
##########
@@ -86,12 +84,7 @@ BootstrapMetadata readFromConfiguration() {
return
BootstrapMetadata.fromVersion(MetadataVersion.latestProduction(), "the default
bootstrap");
}
MetadataVersion version = MetadataVersion.fromVersionString(ibp.get());
- if (version.isLessThan(MINIMUM_BOOTSTRAP_VERSION)) {
Review Comment:
the `ibp` is always empty after the config is removed by #18566
I had opened https://issues.apache.org/jira/browse/KAFKA-18740 to cleanup
it. Maybe this PR can include the cleanup as well?
##########
metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java:
##########
@@ -293,11 +293,7 @@ public ApiMessageAndVersion toRecord(ImageWriterOptions
options) {
setFenced(fenced);
if (inControlledShutdown) {
- if
(options.metadataVersion().isInControlledShutdownStateSupported()) {
- registrationRecord.setInControlledShutdown(true);
- } else {
- options.handleLoss("the inControlledShutdown state of one or
more brokers");
- }
+ registrationRecord.setInControlledShutdown(true);
Review Comment:
Maybe we can set the flag in line#294.
```
RegisterBrokerRecord registrationRecord = new RegisterBrokerRecord().
setBrokerId(id).
setRack(rack.orElse(null)).
setBrokerEpoch(epoch).
setIncarnationId(incarnationId).
setFenced(fenced).
setInControlledShutdown(inControlledShutdown);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]