This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 051fb66fe6f KAFKA-19938: Fix combined node log dir format in
KafkaClusterTestKit (#21017)
051fb66fe6f is described below
commit 051fb66fe6f3990293c51e996179511b4b152502
Author: Mickael Maison <[email protected]>
AuthorDate: Wed Dec 3 17:30:13 2025 +0100
KAFKA-19938: Fix combined node log dir format in KafkaClusterTestKit
(#21017)
The previous logic did not format all log directories on combined nodes.
I was only formatting the metadata log directory which is the expected
result, all log directories should be formatted.
I stumble on this issue while implementing
[KIP-1066](https://cwiki.apache.org/confluence/x/Lg_TEg)
Reviewers: PoAn Yang <[email protected]>
---
.../kafka/common/test/KafkaClusterTestKit.java | 25 +++-------
.../org/apache/kafka/common/test/TestKitNodes.java | 38 +++++++++++-----
.../kafka/common/test/KafkaClusterTestKitTest.java | 53 ++++++++++++++++------
3 files changed, 73 insertions(+), 43 deletions(-)
diff --git
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
index a8440ff32fd..c958f144f62 100644
---
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
+++
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
@@ -75,7 +75,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
import static
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG;
import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIRS_CONFIG;
@@ -441,12 +440,13 @@ public class KafkaClusterTestKit implements AutoCloseable
{
List<Future<?>> futures = new ArrayList<>();
try {
for (ControllerServer controller : controllers.values()) {
- futures.add(executorService.submit(() ->
formatNode(controller.sharedServer().metaPropsEnsemble(), true)));
+ futures.add(executorService.submit(() ->
formatNode(controller.sharedServer().metaPropsEnsemble())));
}
for (Entry<Integer, BrokerServer> entry : brokers.entrySet()) {
BrokerServer broker = entry.getValue();
- futures.add(executorService.submit(() ->
formatNode(broker.sharedServer().metaPropsEnsemble(),
-
!nodes.isCombined(nodes().brokerNodes().get(entry.getKey()).id()))));
+ if
(!nodes.isCombined(nodes().brokerNodes().get(entry.getKey()).id())) {
+ futures.add(executorService.submit(() ->
formatNode(broker.sharedServer().metaPropsEnsemble())));
+ }
}
for (Future<?> future: futures) {
future.get();
@@ -460,21 +460,14 @@ public class KafkaClusterTestKit implements AutoCloseable
{
}
private void formatNode(
- MetaPropertiesEnsemble ensemble,
- boolean writeMetadataDirectory
+ MetaPropertiesEnsemble ensemble
) {
try {
final var nodeId = ensemble.nodeId().getAsInt();
Formatter formatter = new Formatter();
formatter.setNodeId(nodeId);
formatter.setClusterId(ensemble.clusterId().get());
- if (writeMetadataDirectory) {
- formatter.setDirectories(ensemble.logDirProps().keySet());
- } else {
-
formatter.setDirectories(ensemble.logDirProps().keySet().stream().
- filter(d -> !ensemble.metadataLogDir().get().equals(d)).
- collect(Collectors.toSet()));
- }
+ formatter.setDirectories(ensemble.logDirProps().keySet());
if (formatter.directories().isEmpty()) {
return;
}
@@ -482,11 +475,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
formatter.setUnstableFeatureVersionsEnabled(true);
formatter.setIgnoreFormatted(false);
formatter.setControllerListenerName(controllerListenerName);
- if (writeMetadataDirectory) {
-
formatter.setMetadataLogDirectory(ensemble.metadataLogDir().get());
- } else {
- formatter.setMetadataLogDirectory(Optional.empty());
- }
+ formatter.setMetadataLogDirectory(ensemble.metadataLogDir().get());
StringBuilder dynamicVotersBuilder = new StringBuilder();
String prefix = "";
if (standalone) {
diff --git
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
index 3622430f487..30c597915c1 100644
---
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
+++
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
@@ -196,7 +196,8 @@ public class TestKitNodes {
baseDirectory.toFile().getAbsolutePath(),
clusterId,
brokerNodeIds.contains(id),
- perServerProperties.getOrDefault(id, Map.of())
+ perServerProperties.getOrDefault(id, Map.of()),
+ numDisksPerBroker
);
controllerNodes.put(id, controllerNode);
}
@@ -346,21 +347,36 @@ public class TestKitNodes {
String baseDirectory,
String clusterId,
boolean combined,
- Map<String, String>
propertyOverrides) {
+ Map<String, String>
propertyOverrides,
+ int numDisksPerController) {
+ List<String> logDataDirectories = combined
+ ? IntStream
+ .range(0, numDisksPerController)
+ .mapToObj(i -> String.format("combined_%d_%d", id, i))
+ .map(logDir -> {
+ if (Paths.get(logDir).isAbsolute()) {
+ return logDir;
+ }
+ return new File(baseDirectory, logDir).getAbsolutePath();
+ })
+ .toList()
+ : List.of(new File(baseDirectory, String.format("controller_%d",
id)).getAbsolutePath());
String metadataDirectory = new File(baseDirectory,
combined ? String.format("combined_%d_0", id) :
String.format("controller_%d", id)).getAbsolutePath();
MetaPropertiesEnsemble.Copier copier = new
MetaPropertiesEnsemble.Copier(MetaPropertiesEnsemble.EMPTY);
copier.setMetaLogDir(Optional.of(metadataDirectory));
- copier.setLogDirProps(
- metadataDirectory,
- new MetaProperties.Builder()
- .setVersion(MetaPropertiesVersion.V1)
- .setClusterId(clusterId)
- .setNodeId(id)
- .setDirectoryId(copier.generateValidDirectoryId())
- .build()
- );
+ for (String logDir : logDataDirectories) {
+ copier.setLogDirProps(
+ logDir,
+ new MetaProperties.Builder()
+ .setVersion(MetaPropertiesVersion.V1)
+ .setClusterId(clusterId)
+ .setNodeId(id)
+ .setDirectoryId(copier.generateValidDirectoryId())
+ .build()
+ );
+ }
return new TestKitNode() {
private final MetaPropertiesEnsemble ensemble = copier.copy();
diff --git
a/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/KafkaClusterTestKitTest.java
b/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/KafkaClusterTestKitTest.java
index fa287aaaf4a..3272ab3fa78 100644
---
a/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/KafkaClusterTestKitTest.java
+++
b/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/KafkaClusterTestKitTest.java
@@ -17,16 +17,22 @@
package org.apache.kafka.common.test;
+import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
+
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
+import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -87,37 +93,56 @@ public class KafkaClusterTestKitTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testCreateClusterAndCloseWithMultipleLogDirs(boolean combined)
throws Exception {
+ @CsvSource({
+ "true,1,1,2", /* 1 combined node */
+ "true,5,7,2", /* 5 combined nodes + 2 controllers */
+ "true,7,5,2", /* 7 combined nodes */
+ "false,1,1,2", /* 1 broker + 1 controller */
+ "false,5,7,2", /* 5 brokers + 7 controllers */
+ "false,7,5,2", /* 7 brokers + 5 controllers */
+ })
+ public void testCreateClusterFormatAndCloseWithMultipleLogDirs(boolean
combined, int numBrokers, int numControllers, int numDisks) throws Exception {
try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
- setNumBrokerNodes(5).
- setNumDisksPerBroker(2).
+ setNumBrokerNodes(numBrokers).
+ setNumDisksPerBroker(numDisks).
setCombined(combined).
- setNumControllerNodes(3).build()).build()) {
+ setNumControllerNodes(numControllers).build()).build()) {
TestKitNodes nodes = cluster.nodes();
- assertEquals(5, nodes.brokerNodes().size());
- assertEquals(3, nodes.controllerNodes().size());
+ assertEquals(numBrokers, nodes.brokerNodes().size());
+ assertEquals(numControllers, nodes.controllerNodes().size());
+ Set<String> logDirs = new HashSet<>();
nodes.brokerNodes().forEach((brokerId, node) -> {
- assertEquals(2, node.logDataDirectories().size());
- Set<String> expected = Set.of(String.format("broker_%d_data0",
brokerId), String.format("broker_%d_data1", brokerId));
- if (nodes.isCombined(node.id())) {
- expected = Set.of(String.format("combined_%d_0",
brokerId), String.format("combined_%d_1", brokerId));
- }
+ assertEquals(numDisks, node.logDataDirectories().size());
+ Set<String> expectedDisks = IntStream.range(0, numDisks)
+ .mapToObj(i -> {
+ if (nodes.isCombined(node.id())) {
+ return String.format("combined_%d_%d",
brokerId, i);
+ } else {
+ return String.format("broker_%d_data%d",
brokerId, i);
+ }
+ }).collect(Collectors.toSet());
assertEquals(
- expected,
+ expectedDisks,
node.logDataDirectories().stream()
.map(p -> Paths.get(p).getFileName().toString())
.collect(Collectors.toSet())
);
+ logDirs.addAll(node.logDataDirectories());
});
nodes.controllerNodes().forEach((controllerId, node) -> {
- String expected = combined ? String.format("combined_%d_0",
controllerId) : String.format("controller_%d", controllerId);
+ String expected = nodes.isCombined(node.id()) ?
String.format("combined_%d_0", controllerId) : String.format("controller_%d",
controllerId);
assertEquals(expected,
Paths.get(node.metadataDirectory()).getFileName().toString());
+ logDirs.addAll(node.logDataDirectories());
});
+
+ cluster.format();
+ logDirs.forEach(logDir ->
+ assertTrue(Files.exists(Paths.get(logDir,
MetaPropertiesEnsemble.META_PROPERTIES_NAME)))
+ );
}
}