kevin-wu24 commented on code in PR #20707:
URL: https://github.com/apache/kafka/pull/20707#discussion_r2557267208
##########
core/src/main/scala/kafka/tools/StorageTool.scala:
##########
@@ -120,9 +120,13 @@ object StorageTool extends Logging {
throw new TerseFailure("The kafka configuration file appears to be for "
+
"a legacy cluster. Formatting is only supported for clusters in KRaft
mode.")
}
+ val writeBootstrapSnapshot =
config.processRoles.contains(ProcessRole.ControllerRole) ||
+ !config.processRoles.contains(ProcessRole.BrokerRole)
Review Comment:
We can remove this since we only read it once. Also we can remove
`!config.processRoles.contains(ProcessRole.BrokerRole)` from the predicate.
##########
metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java:
##########
@@ -122,6 +124,17 @@ private BatchAndType
nextControlBatch(FileChannelRecordBatch input) {
messages.add(new ApiMessageAndVersion(message, (short)
0));
break;
}
+ case KRAFT_VERSION: {
+ KRaftVersionRecord message = new KRaftVersionRecord();
+ message.read(new ByteBufferAccessor(record.value()),
(short) 0);
+ messages.add(new ApiMessageAndVersion(message, (short)
0));
+ break;
+ }
+ case KRAFT_VOTERS:
+ VotersRecord message = new VotersRecord();
+ message.read(new ByteBufferAccessor(record.value()),
(short) 0);
+ messages.add(new ApiMessageAndVersion(message, (short)
0));
+ break;
Review Comment:
Hmmmm... this whole method is a bit weird. Control records do not matter
outside of the raft module, so it's weird we even provide this functionality to
read + serialize KRaft control records into memory within the metadata module.
I'm okay with this change for now, since it is to pass the `StorageTool`
tests I presume, but let me think of how we can clean this up.
##########
core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala:
##########
@@ -48,8 +48,15 @@ import scala.jdk.CollectionConverters._
class ConsumerGroupDescribeRequestTest(cluster: ClusterInstance) extends
GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(
+ metadataVersion = MetadataVersion.IBP_4_1_IV1,
features = Array(
new ClusterFeature(feature = Feature.GROUP_VERSION, version = 0)
+ ),
+ serverProperties = Array(
+ new ClusterConfigProperty(
+ key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG,
+ value = "classic"
+ )
Review Comment:
See my other comment regarding test code changes for unrelated tests.
##########
core/src/main/scala/kafka/tools/StorageTool.scala:
##########
@@ -120,9 +120,13 @@ object StorageTool extends Logging {
throw new TerseFailure("The kafka configuration file appears to be for "
+
"a legacy cluster. Formatting is only supported for clusters in KRaft
mode.")
}
+ val writeBootstrapSnapshot =
config.processRoles.contains(ProcessRole.ControllerRole) ||
+ !config.processRoles.contains(ProcessRole.BrokerRole)
+
val formatter = new Formatter().
setPrintStream(printStream).
setNodeId(config.nodeId).
+ setWriteBootstrapSnapshot(writeBootstrapSnapshot).
Review Comment:
```suggestion
setWriteBootstrapSnapshot(config.processRoles.contains(ProcessRole.ControllerRole)).
```
##########
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##########
@@ -1405,7 +1405,12 @@ class KRaftClusterTest {
build()
try {
cluster.format()
+ val controllerServer = cluster.controllers().values().iterator().next()
+ val dynamicConfigProps = new Properties()
+ dynamicConfigProps.setProperty(ServerConfigs.NUM_IO_THREADS_CONFIG, "9")
+ controllerServer.config.dynamicConfig.initialize(None)
cluster.startup()
+
controllerServer.config.dynamicConfig.updateDefaultConfig(dynamicConfigProps)
Review Comment:
Please remove these changes. There are several other unrelated tests where
you changed the test code to pass. After your changes to `KafkaClusterTestKit`
these files should not need changing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]