chia7712 commented on code in PR #18845:
URL: https://github.com/apache/kafka/pull/18845#discussion_r1960653141
##########
metadata/src/test/java/org/apache/kafka/image/writer/ImageWriterOptionsTest.java:
##########
@@ -48,40 +48,33 @@
public class ImageWriterOptionsTest {
@Test
public void testDefaultLossHandler() {
- ImageWriterOptions options = new ImageWriterOptions.Builder().build();
+ ImageWriterOptions options = new
ImageWriterOptions.Builder(MetadataVersion.latestProduction()).build();
assertEquals("stuff", assertThrows(UnwritableMetadataException.class,
() -> options.handleLoss("stuff")).loss());
}
@Test
public void testSetMetadataVersion() {
Review Comment:
If we rewrite `ImageWriterOptions` by record class, this
`testSetMetadataVersion` is unnecessary as all are pure setters.
##########
metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java:
##########
@@ -718,30 +710,14 @@ public void
testRegistrationWithUnsupportedMetadataVersion() {
setFeatures(new
BrokerRegistrationRequestData.FeatureCollection(
Collections.singleton(new
BrokerRegistrationRequestData.Feature().
setName(MetadataVersion.FEATURE_NAME).
-
setMinSupportedVersion(MetadataVersion.IBP_3_3_IV3.featureLevel()).
-
setMaxSupportedVersion(MetadataVersion.IBP_3_3_IV3.featureLevel())).iterator())).
+
setMinSupportedVersion(MetadataVersion.IBP_3_4_IV0.featureLevel()).
+
setMaxSupportedVersion(MetadataVersion.IBP_3_4_IV0.featureLevel())).iterator())).
setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")),
123L,
featureControl.finalizedFeatures(Long.MAX_VALUE),
false)).getMessage());
}
- @Test
- public void testRegisterControlWithOlderMetadataVersion() {
- FeatureControlManager featureControl = new
FeatureControlManager.Builder().
- setMetadataVersion(MetadataVersion.IBP_3_3_IV0).
Review Comment:
This test appears necessary as it verifies MV which is older than
`IBP_3_7_IV0`. Perhaps we could simply change `IBP_3_3_IV0` to `IBP_3_3_IV3`
instead.
##########
metadata/src/main/java/org/apache/kafka/image/writer/ImageWriterOptions.java:
##########
@@ -29,30 +29,22 @@
public final class ImageWriterOptions {
public static class Builder {
private MetadataVersion metadataVersion;
- private MetadataVersion requestedMetadataVersion;
- private boolean isEligibleLeaderReplicasEnabled = false;
private Consumer<UnwritableMetadataException> lossHandler = e -> {
throw e;
};
+ private boolean isEligibleLeaderReplicasEnabled = false;
- public Builder() {
- this.metadataVersion = MetadataVersion.latestProduction();
+ public Builder(MetadataVersion metadataVersion) {
+ this.metadataVersion = metadataVersion;
}
public Builder(MetadataImage image) {
- this.metadataVersion = image.features().metadataVersion();
+ this.metadataVersion = image.features().metadataVersionOrThrow();
this.isEligibleLeaderReplicasEnabled =
image.features().isElrEnabled();
}
public Builder setMetadataVersion(MetadataVersion metadataVersion) {
Review Comment:
After removing those if-else, it seems we don't need the builder anymore.
Maybe we can rewrite `ImageWriterOptions` by record class
##########
metadata/src/test/java/org/apache/kafka/image/MetadataVersionChangeTest.java:
##########
@@ -30,31 +30,31 @@
@Timeout(value = 40)
public class MetadataVersionChangeTest {
- private static final MetadataVersionChange CHANGE_3_0_IV1_TO_3_3_IV0 =
- new MetadataVersionChange(IBP_3_0_IV1, IBP_3_3_IV0);
+ private static final MetadataVersionChange CHANGE_MINUMUM_TO_LATEST =
Review Comment:
typo: `MINUMUM` -> `MINIMUM`
##########
core/src/main/scala/kafka/server/AlterPartitionManager.scala:
##########
@@ -92,8 +87,7 @@ object AlterPartitionManager {
scheduler = scheduler,
time = time,
brokerId = config.brokerId,
- brokerEpochSupplier = brokerEpochSupplier,
- metadataVersionSupplier = () => metadataCache.metadataVersion()
Review Comment:
we can remove `metadataCache` from the `apply` method
##########
core/src/main/scala/kafka/server/AlterPartitionManager.scala:
##########
@@ -114,7 +107,7 @@ class DefaultAlterPartitionManager(
// and re-created, we cannot have two entries in this Map especially if we
cannot
// use an AlterPartition request version which supports topic ids in the end
because
// the two updates with the same name would be merged together.
- private[server] val unsentIsrUpdates: util.Map[TopicPartition,
AlterPartitionItem] = new ConcurrentHashMap[TopicPartition,
AlterPartitionItem]()
+ private[server] val unsentIsrUpdates = new
ConcurrentHashMap[TopicIdPartition, AlterPartitionItem]()
Review Comment:
Could you please update the docs according to the change?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]