This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2fe447a8a39 MINOR: Cleanups in Test Common Module (#19775)
2fe447a8a39 is described below
commit 2fe447a8a3923c0ed1b59c62529e22e011aacbd3
Author: Sanskar Jhajharia <[email protected]>
AuthorDate: Mon May 26 00:37:30 2025 +0530
MINOR: Cleanups in Test Common Module (#19775)
Now that Kafka Brokers support Java 17, this PR makes some changes in
test-common module. The changes mostly include:
- Collections.emptyList(), Collections.singletonList() and
Arrays.asList() are replaced with List.of()
- Collections.emptyMap() and Collections.singletonMap() are replaced
with Map.of()
- Collections.singleton() is replaced with Set.of()
Reviewers: Ken Huang <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../kafka/common/test/api/ClusterConfig.java | 8 +--
.../kafka/common/test/api/ClusterTemplate.java | 2 +-
.../org/apache/kafka/common/test/api/README.md | 4 +-
.../kafka/common/test/api/ClusterConfigTest.java | 11 ++--
.../apache/kafka/common/test/ClusterInstance.java | 3 +-
.../kafka/common/test/KafkaClusterTestKit.java | 7 ++-
.../apache/kafka/common/test/MockController.java | 3 +-
.../common/test/PreboundSocketFactoryManager.java | 3 +-
.../org/apache/kafka/common/test/TestKitNodes.java | 6 +--
.../org/apache/kafka/common/test/TestUtils.java | 4 +-
.../common/test/junit/ClusterTestExtensions.java | 5 +-
.../test/junit/RaftClusterInvocationContext.java | 5 +-
.../kafka/common/test/KafkaClusterTestKitTest.java | 60 +++++++++++-----------
.../apache/kafka/common/test/TestKitNodeTest.java | 18 +++----
.../test/junit/ClusterTestExtensionsTest.java | 36 ++++++-------
.../test/junit/ClusterTestExtensionsUnitTest.java | 27 +++++-----
.../common/test/junit/CatalogTestFilterTest.java | 3 +-
17 files changed, 94 insertions(+), 111 deletions(-)
diff --git
a/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterConfig.java
b/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterConfig.java
index b9090b9ce40..f26c99a0872 100644
---
a/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterConfig.java
+++
b/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterConfig.java
@@ -207,10 +207,10 @@ public class ClusterConfig {
private ListenerName controllerListenerName;
private File trustStoreFile;
private MetadataVersion metadataVersion;
- private Map<String, String> serverProperties = Collections.emptyMap();
- private Map<Integer, Map<String, String>> perServerProperties =
Collections.emptyMap();
- private List<String> tags = Collections.emptyList();
- private Map<Feature, Short> features = Collections.emptyMap();
+ private Map<String, String> serverProperties = Map.of();
+ private Map<Integer, Map<String, String>> perServerProperties =
Map.of();
+ private List<String> tags = List.of();
+ private Map<Feature, Short> features = Map.of();
private Builder() {}
diff --git
a/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterTemplate.java
b/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterTemplate.java
index 94b71c97d4a..f0f203a432c 100644
---
a/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterTemplate.java
+++
b/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterTemplate.java
@@ -44,7 +44,7 @@ import static java.lang.annotation.RetentionPolicy.RUNTIME;
* Usage looks something like this:
* <pre>{@code
* private static List<ClusterConfig> generator() {
- * return
Collections.singletonList(ClusterConfig.defaultBuilder().build());
+ * return List.of(ClusterConfig.defaultBuilder().build());
* }
*
* @ClusterTemplate("generator")
diff --git
a/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/README.md
b/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/README.md
index 3c4efda30cc..e69a3811616 100644
---
a/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/README.md
+++
b/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/README.md
@@ -79,7 +79,7 @@ references a static method on the test class. This method is
used to produce any
number of test configurations using a fluent builder style API.
```java
-import java.util.Arrays;
+import java.util.List;
@ClusterTemplate("generateConfigs")
void testSomething() { ... }
@@ -99,7 +99,7 @@ static List<ClusterConfig> generateConfigs() {
.name("Generated Test 3")
.serverProperties(props3)
.build();
- return Arrays.asList(config1, config2, config3);
+ return List.of(config1, config2, config3);
}
```
diff --git
a/test-common/test-common-internal-api/src/test/java/org/apache/kafka/common/test/api/ClusterConfigTest.java
b/test-common/test-common-internal-api/src/test/java/org/apache/kafka/common/test/api/ClusterConfigTest.java
index 4e49ef63e40..ce5ac8b5158 100644
---
a/test-common/test-common-internal-api/src/test/java/org/apache/kafka/common/test/api/ClusterConfigTest.java
+++
b/test-common/test-common-internal-api/src/test/java/org/apache/kafka/common/test/api/ClusterConfigTest.java
@@ -29,7 +29,6 @@ import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.file.Files;
import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -55,20 +54,20 @@ public class ClusterConfigTest {
trustStoreFile.deleteOnExit();
ClusterConfig clusterConfig = ClusterConfig.builder()
- .setTypes(Collections.singleton(Type.KRAFT))
+ .setTypes(Set.of(Type.KRAFT))
.setBrokers(3)
.setControllers(2)
.setDisksPerBroker(1)
.setAutoStart(true)
- .setTags(Arrays.asList("name", "Generated Test"))
+ .setTags(List.of("name", "Generated Test"))
.setBrokerSecurityProtocol(SecurityProtocol.PLAINTEXT)
.setBrokerListenerName(ListenerName.normalised("EXTERNAL"))
.setControllerSecurityProtocol(SecurityProtocol.SASL_PLAINTEXT)
.setControllerListenerName(ListenerName.normalised("CONTROLLER"))
.setTrustStoreFile(trustStoreFile)
.setMetadataVersion(MetadataVersion.MINIMUM_VERSION)
- .setServerProperties(Collections.singletonMap("broker",
"broker_value"))
- .setPerServerProperties(Collections.singletonMap(0,
Collections.singletonMap("broker_0", "broker_0_value")))
+ .setServerProperties(Map.of("broker", "broker_value"))
+ .setPerServerProperties(Map.of(0, Map.of("broker_0",
"broker_0_value")))
.build();
Map<String, Object> clusterConfigFields = fields(clusterConfig);
@@ -105,7 +104,7 @@ public class ClusterConfigTest {
@Test
public void testDisplayTags() {
- List<String> tags = Arrays.asList("tag 1", "tag 2", "tag 3");
+ List<String> tags = List.of("tag 1", "tag 2", "tag 3");
ClusterConfig clusterConfig =
ClusterConfig.defaultBuilder().setTags(tags).build();
Set<String> expectedDisplayTags = clusterConfig.displayTags();
diff --git
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
index e6597e69cfd..0b6f6b23b97 100644
---
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
+++
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
@@ -57,7 +57,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -242,7 +241,7 @@ public interface ClusterInstance {
if (brokers().values().stream().allMatch(b ->
b.dataPlaneRequestProcessor().isConsumerGroupProtocolEnabled())) {
return Set.of(CLASSIC, CONSUMER);
} else {
- return Collections.singleton(CLASSIC);
+ return Set.of(CLASSIC);
}
}
diff --git
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
index 21ab67f9bab..3ff43a3a2ce 100644
---
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
+++
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
@@ -60,7 +60,6 @@ import java.nio.file.Paths;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -251,7 +250,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
socketFactoryManager.getOrCreatePortForListener(node.id(),
brokerListenerName);
}
for (TestKitNode node : nodes.controllerNodes().values()) {
- setupNodeDirectories(baseDirectory,
node.metadataDirectory(), Collections.emptyList());
+ setupNodeDirectories(baseDirectory,
node.metadataDirectory(), List.of());
KafkaConfig config = createNodeConfig(node);
SharedServer sharedServer = new SharedServer(
config,
@@ -259,7 +258,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
Time.SYSTEM,
new Metrics(),
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig().voters())),
- Collections.emptyList(),
+ List.of(),
faultHandlerFactory,
socketFactoryManager.getOrCreateSocketFactory(node.id())
);
@@ -287,7 +286,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
Time.SYSTEM,
new Metrics(),
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig().voters())),
- Collections.emptyList(),
+ List.of(),
faultHandlerFactory,
socketFactoryManager.getOrCreateSocketFactory(node.id())
);
diff --git
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/MockController.java
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/MockController.java
index 2ccb4c87e26..371228ca8c9 100644
---
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/MockController.java
+++
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/MockController.java
@@ -70,7 +70,6 @@ import org.apache.kafka.server.authorizer.AclDeleteResult;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -388,7 +387,7 @@ public class MockController implements Controller {
configs.computeIfAbsent(resource, __ -> new
HashMap<>()).put(key, value);
break;
case DELETE:
- configs.getOrDefault(resource,
Collections.emptyMap()).remove(key);
+ configs.getOrDefault(resource, Map.of()).remove(key);
break;
default:
break;
diff --git
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/PreboundSocketFactoryManager.java
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/PreboundSocketFactoryManager.java
index fa082001d64..4beadd89083 100644
---
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/PreboundSocketFactoryManager.java
+++
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/PreboundSocketFactoryManager.java
@@ -23,7 +23,6 @@ import org.apache.kafka.server.ServerSocketFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -174,7 +173,7 @@ public class PreboundSocketFactoryManager implements
AutoCloseable {
// SocketServer.)
for (Entry<Integer, Map<String, ServerSocketChannel>> socketsEntry :
sockets.entrySet()) {
Set<String> usedListeners = usedSockets.getOrDefault(
- socketsEntry.getKey(), Collections.emptySet());
+ socketsEntry.getKey(), Set.of());
for (Entry<String, ServerSocketChannel> entry :
socketsEntry.getValue().entrySet()) {
if (!usedListeners.contains(entry.getKey())) {
Utils.closeQuietly(entry.getValue(),
"serverSocketChannel");
diff --git
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
index b8c68ac7fa4..a9667dbd631 100644
---
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
+++
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
@@ -52,7 +52,7 @@ public class TestKitNodes {
private int numControllerNodes;
private int numBrokerNodes;
private int numDisksPerBroker = 1;
- private Map<Integer, Map<String, String>> perServerProperties =
Collections.emptyMap();
+ private Map<Integer, Map<String, String>> perServerProperties =
Map.of();
private BootstrapMetadata bootstrapMetadata;
public Builder() {
@@ -201,7 +201,7 @@ public class TestKitNodes {
baseDirectory.toFile().getAbsolutePath(),
clusterId,
brokerNodeIds.contains(id),
- perServerProperties.getOrDefault(id,
Collections.emptyMap())
+ perServerProperties.getOrDefault(id, Map.of())
);
controllerNodes.put(id, controllerNode);
}
@@ -213,7 +213,7 @@ public class TestKitNodes {
baseDirectory.toFile().getAbsolutePath(),
clusterId,
controllerNodeIds.contains(id),
- perServerProperties.getOrDefault(id,
Collections.emptyMap()),
+ perServerProperties.getOrDefault(id, Map.of()),
numDisksPerBroker
);
brokerNodes.put(id, brokerNode);
diff --git
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java
index e2361749318..158a93b8a77 100644
---
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java
+++
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java
@@ -32,7 +32,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
-import java.util.Collections;
+import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.function.BiFunction;
@@ -180,7 +180,7 @@ public class TestUtils {
}
private static Integer getLeaderFromAdmin(Admin admin, String topic, int
partition) throws Exception {
- TopicDescription topicDescription =
admin.describeTopics(Collections.singletonList(topic)).allTopicNames().get().get(topic);
+ TopicDescription topicDescription =
admin.describeTopics(List.of(topic)).allTopicNames().get().get(topic);
return topicDescription.partitions().stream()
.filter(partitionInfo -> partitionInfo.partition() == partition)
.findFirst()
diff --git
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/ClusterTestExtensions.java
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/ClusterTestExtensions.java
index 5b115793337..de81a6b82bd 100644
---
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/ClusterTestExtensions.java
+++
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/ClusterTestExtensions.java
@@ -42,7 +42,6 @@ import org.junit.platform.commons.util.ReflectionUtils;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -268,7 +267,7 @@ public class ClusterTestExtensions implements
TestTemplateInvocationContextProvi
.collect(Collectors.toMap(ClusterFeature::feature,
ClusterFeature::version));
ClusterConfig config = ClusterConfig.builder()
- .setTypes(new HashSet<>(Arrays.asList(types)))
+ .setTypes(Set.of(types))
.setBrokers(clusterTest.brokers() == 0 ? defaults.brokers() :
clusterTest.brokers())
.setControllers(clusterTest.controllers() == 0 ?
defaults.controllers() : clusterTest.controllers())
.setDisksPerBroker(clusterTest.disksPerBroker() == 0 ?
defaults.disksPerBroker() : clusterTest.disksPerBroker())
@@ -280,7 +279,7 @@ public class ClusterTestExtensions implements
TestTemplateInvocationContextProvi
.setServerProperties(serverProperties)
.setPerServerProperties(perServerProperties)
.setMetadataVersion(clusterTest.metadataVersion())
- .setTags(Arrays.asList(clusterTest.tags()))
+ .setTags(List.of(clusterTest.tags()))
.setFeatures(features)
.build();
diff --git
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/RaftClusterInvocationContext.java
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/RaftClusterInvocationContext.java
index 0b291159858..70ee6d2d624 100644
---
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/RaftClusterInvocationContext.java
+++
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/RaftClusterInvocationContext.java
@@ -42,7 +42,6 @@ import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.Extension;
import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -55,8 +54,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-
-
/**
* Wraps a {@link KafkaClusterTestKit} inside lifecycle methods for a test
invocation. Each instance of this
* class is provided with a configuration for the cluster.
@@ -88,7 +85,7 @@ public class RaftClusterInvocationContext implements
TestTemplateInvocationConte
@Override
public List<Extension> getAdditionalExtensions() {
RaftClusterInstance clusterInstance = new
RaftClusterInstance(clusterConfig, isCombined);
- return Arrays.asList(
+ return List.of(
(BeforeEachCallback) context -> {
clusterInstance.format();
if (clusterConfig.isAutoStart()) {
diff --git
a/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/KafkaClusterTestKitTest.java
b/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/KafkaClusterTestKitTest.java
index 683de7e930f..fa287aaaf4a 100644
---
a/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/KafkaClusterTestKitTest.java
+++
b/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/KafkaClusterTestKitTest.java
@@ -23,10 +23,7 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.nio.file.Path;
import java.nio.file.Paths;
-import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@@ -42,11 +39,11 @@ public class KafkaClusterTestKitTest {
@ValueSource(ints = {0, -1})
public void testCreateClusterWithBadNumDisksThrows(int disks) {
IllegalArgumentException e =
assertThrowsExactly(IllegalArgumentException.class, () -> new
KafkaClusterTestKit.Builder(
- new TestKitNodes.Builder()
- .setNumBrokerNodes(1)
- .setNumDisksPerBroker(disks)
- .setNumControllerNodes(1)
- .build())
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(1)
+ .setNumDisksPerBroker(disks)
+ .setNumControllerNodes(1)
+ .build())
);
assertEquals("Invalid value for numDisksPerBroker", e.getMessage());
}
@@ -76,15 +73,15 @@ public class KafkaClusterTestKitTest {
@Test
public void testCreateClusterWithBadPerServerProperties() {
Map<Integer, Map<String, String>> perServerProperties = new
HashMap<>();
- perServerProperties.put(100, Collections.singletonMap("foo", "foo1"));
- perServerProperties.put(200, Collections.singletonMap("bar", "bar1"));
+ perServerProperties.put(100, Map.of("foo", "foo1"));
+ perServerProperties.put(200, Map.of("bar", "bar1"));
IllegalArgumentException e =
assertThrowsExactly(IllegalArgumentException.class, () -> new
KafkaClusterTestKit.Builder(
- new TestKitNodes.Builder()
- .setNumBrokerNodes(1)
- .setNumControllerNodes(1)
- .setPerServerProperties(perServerProperties)
- .build())
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(1)
+ .setNumControllerNodes(1)
+ .setPerServerProperties(perServerProperties)
+ .build())
);
assertEquals("Unknown server id 100, 200 in perServerProperties, the
existent server ids are 0, 3000", e.getMessage());
}
@@ -93,11 +90,11 @@ public class KafkaClusterTestKitTest {
@ValueSource(booleans = {true, false})
public void testCreateClusterAndCloseWithMultipleLogDirs(boolean combined)
throws Exception {
try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
- new TestKitNodes.Builder().
- setNumBrokerNodes(5).
- setNumDisksPerBroker(2).
- setCombined(combined).
- setNumControllerNodes(3).build()).build()) {
+ new TestKitNodes.Builder().
+ setNumBrokerNodes(5).
+ setNumDisksPerBroker(2).
+ setCombined(combined).
+ setNumControllerNodes(3).build()).build()) {
TestKitNodes nodes = cluster.nodes();
assertEquals(5, nodes.brokerNodes().size());
@@ -105,15 +102,15 @@ public class KafkaClusterTestKitTest {
nodes.brokerNodes().forEach((brokerId, node) -> {
assertEquals(2, node.logDataDirectories().size());
- Set<String> expected = new
HashSet<>(Arrays.asList(String.format("broker_%d_data0", brokerId),
String.format("broker_%d_data1", brokerId)));
+ Set<String> expected = Set.of(String.format("broker_%d_data0",
brokerId), String.format("broker_%d_data1", brokerId));
if (nodes.isCombined(node.id())) {
- expected = new
HashSet<>(Arrays.asList(String.format("combined_%d_0", brokerId),
String.format("combined_%d_1", brokerId)));
+ expected = Set.of(String.format("combined_%d_0",
brokerId), String.format("combined_%d_1", brokerId));
}
assertEquals(
- expected,
- node.logDataDirectories().stream()
- .map(p ->
Paths.get(p).getFileName().toString())
- .collect(Collectors.toSet())
+ expected,
+ node.logDataDirectories().stream()
+ .map(p -> Paths.get(p).getFileName().toString())
+ .collect(Collectors.toSet())
);
});
@@ -128,11 +125,11 @@ public class KafkaClusterTestKitTest {
public void testCreateClusterWithSpecificBaseDir() throws Exception {
Path baseDirectory = TestUtils.tempDirectory().toPath();
try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
- new TestKitNodes.Builder().
- setBaseDirectory(baseDirectory).
- setNumBrokerNodes(1).
- setCombined(true).
- setNumControllerNodes(1).build()).build()) {
+ new TestKitNodes.Builder().
+ setBaseDirectory(baseDirectory).
+ setNumBrokerNodes(1).
+ setCombined(true).
+ setNumControllerNodes(1).build()).build()) {
assertEquals(cluster.nodes().baseDirectory(),
baseDirectory.toFile().getAbsolutePath());
cluster.nodes().controllerNodes().values().forEach(controller ->
assertTrue(Paths.get(controller.metadataDirectory()).startsWith(baseDirectory)));
@@ -140,6 +137,7 @@ public class KafkaClusterTestKitTest {
assertTrue(Paths.get(broker.metadataDirectory()).startsWith(baseDirectory)));
}
}
+
@Test
public void testExposedFaultHandlers() {
TestKitNodes nodes = new TestKitNodes.Builder()
diff --git
a/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/TestKitNodeTest.java
b/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/TestKitNodeTest.java
index b0bb8afa22c..7b1caf3383a 100644
---
a/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/TestKitNodeTest.java
+++
b/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/TestKitNodeTest.java
@@ -34,8 +34,8 @@ public class TestKitNodeTest {
public void testSecurityProtocol(SecurityProtocol securityProtocol) {
if (securityProtocol != SecurityProtocol.PLAINTEXT && securityProtocol
!= SecurityProtocol.SASL_PLAINTEXT) {
assertEquals("Currently only support PLAINTEXT / SASL_PLAINTEXT
security protocol",
- assertThrows(IllegalArgumentException.class,
- () -> new
TestKitNodes.Builder().setBrokerSecurityProtocol(securityProtocol).build()).getMessage());
+ assertThrows(IllegalArgumentException.class,
+ () -> new
TestKitNodes.Builder().setBrokerSecurityProtocol(securityProtocol).build()).getMessage());
assertEquals("Currently only support PLAINTEXT / SASL_PLAINTEXT
security protocol",
assertThrows(IllegalArgumentException.class,
() -> new
TestKitNodes.Builder().setControllerSecurityProtocol(securityProtocol).build()).getMessage());
@@ -47,13 +47,13 @@ public class TestKitNodeTest {
ListenerName brokerListenerName = ListenerName.normalised("FOOBAR");
ListenerName controllerListenerName =
ListenerName.normalised("BAZQUX");
TestKitNodes testKitNodes = new TestKitNodes.Builder()
- .setNumBrokerNodes(1)
- .setNumControllerNodes(1)
- .setBrokerListenerName(brokerListenerName)
- .setBrokerSecurityProtocol(SecurityProtocol.PLAINTEXT)
- .setControllerListenerName(controllerListenerName)
- .setControllerSecurityProtocol(SecurityProtocol.PLAINTEXT)
- .build();
+ .setNumBrokerNodes(1)
+ .setNumControllerNodes(1)
+ .setBrokerListenerName(brokerListenerName)
+ .setBrokerSecurityProtocol(SecurityProtocol.PLAINTEXT)
+ .setControllerListenerName(controllerListenerName)
+ .setControllerSecurityProtocol(SecurityProtocol.PLAINTEXT)
+ .build();
assertEquals(brokerListenerName, testKitNodes.brokerListenerName());
assertEquals(controllerListenerName,
testKitNodes.controllerListenerName());
}
diff --git
a/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsTest.java
b/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsTest.java
index 4ad3ff0dfc1..728eee67639 100644
---
a/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsTest.java
+++
b/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsTest.java
@@ -62,8 +62,6 @@ import org.junit.jupiter.api.Assertions;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -73,8 +71,6 @@ import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
-import static java.util.Collections.singleton;
-import static java.util.Collections.singletonList;
import static
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
import static
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC;
@@ -105,10 +101,10 @@ public class ClusterTestExtensionsTest {
static List<ClusterConfig> generate1() {
Map<String, String> serverProperties = new HashMap<>();
serverProperties.put("foo", "bar");
- return singletonList(ClusterConfig.defaultBuilder()
- .setTypes(singleton(Type.KRAFT))
+ return List.of(ClusterConfig.defaultBuilder()
+ .setTypes(Set.of(Type.KRAFT))
.setServerProperties(serverProperties)
- .setTags(singletonList("Generated Test"))
+ .setTags(List.of("Generated Test"))
.build());
}
@@ -126,7 +122,7 @@ public class ClusterTestExtensionsTest {
assertEquals(Type.KRAFT, clusterInstance.type(),
"generate1 provided a KRAFT cluster, so we should see that here");
assertEquals("bar",
clusterInstance.config().serverProperties().get("foo"));
- assertEquals(singletonList("Generated Test"),
clusterInstance.config().tags());
+ assertEquals(List.of("Generated Test"),
clusterInstance.config().tags());
}
// Multiple @ClusterTest can be used with @ClusterTests
@@ -157,22 +153,22 @@ public class ClusterTestExtensionsTest {
assertEquals("baz",
clusterInstance.config().serverProperties().get("foo"));
assertEquals("eggs",
clusterInstance.config().serverProperties().get("spam"));
assertEquals("overwrite.value",
clusterInstance.config().serverProperties().get("default.key"));
- assertEquals(Arrays.asList("default.display.key1",
"default.display.key2"), clusterInstance.config().tags());
+ assertEquals(List.of("default.display.key1", "default.display.key2"),
clusterInstance.config().tags());
// assert broker server 0 contains property queued.max.requests 200
from ClusterTest which overrides
// the value 100 in server property in ClusterTestDefaults
try (Admin admin = clusterInstance.admin()) {
ConfigResource configResource = new
ConfigResource(ConfigResource.Type.BROKER, "0");
- Map<ConfigResource, Config> configs =
admin.describeConfigs(singletonList(configResource)).all().get();
+ Map<ConfigResource, Config> configs =
admin.describeConfigs(List.of(configResource)).all().get();
assertEquals(1, configs.size());
assertEquals("200",
configs.get(configResource).get("queued.max.requests").value());
}
// In KRaft cluster non-combined mode, assert the controller server
3000 contains the property queued.max.requests 300
if (clusterInstance.type() == Type.KRAFT) {
- try (Admin admin = Admin.create(Collections.singletonMap(
+ try (Admin admin = Admin.create(Map.of(
AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG,
clusterInstance.bootstrapControllers()))) {
ConfigResource configResource = new
ConfigResource(ConfigResource.Type.BROKER, "3000");
- Map<ConfigResource, Config> configs =
admin.describeConfigs(singletonList(configResource)).all().get();
+ Map<ConfigResource, Config> configs =
admin.describeConfigs(List.of(configResource)).all().get();
assertEquals(1, configs.size());
assertEquals("300",
configs.get(configResource).get("queued.max.requests").value());
}
@@ -217,7 +213,7 @@ public class ClusterTestExtensionsTest {
})
})
public void testNotSupportedNewGroupProtocols(ClusterInstance
clusterInstance) {
- assertEquals(singleton(CLASSIC),
clusterInstance.supportedGroupProtocols());
+ assertEquals(Set.of(CLASSIC),
clusterInstance.supportedGroupProtocols());
}
@@ -231,7 +227,7 @@ public class ClusterTestExtensionsTest {
try (Admin admin = clusterInstance.admin()) {
Assertions.assertTrue(admin.listTopics().listings().get().stream().anyMatch(s
-> s.name().equals(topicName)));
- List<TopicPartitionInfo> partitions =
admin.describeTopics(singleton(topicName)).allTopicNames().get()
+ List<TopicPartitionInfo> partitions =
admin.describeTopics(Set.of(topicName)).allTopicNames().get()
.get(topicName).partitions();
assertEquals(numPartition, partitions.size());
Assertions.assertTrue(partitions.stream().allMatch(partition ->
partition.replicas().size() == numReplicas));
@@ -275,9 +271,9 @@ public class ClusterTestExtensionsTest {
public void testVerifyTopicDeletion(ClusterInstance clusterInstance)
throws Exception {
try (Admin admin = clusterInstance.admin()) {
String testTopic = "testTopic";
- admin.createTopics(singletonList(new NewTopic(testTopic, 1,
(short) 1)));
+ admin.createTopics(List.of(new NewTopic(testTopic, 1, (short) 1)));
clusterInstance.waitForTopic(testTopic, 1);
- admin.deleteTopics(singletonList(testTopic));
+ admin.deleteTopics(List.of(testTopic));
clusterInstance.waitTopicDeletion(testTopic);
Assertions.assertTrue(admin.listTopics().listings().get().stream().noneMatch(
topic -> topic.name().equals(testTopic)
@@ -299,12 +295,12 @@ public class ClusterTestExtensionsTest {
KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName(),
VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName()))
) {
- adminClient.createTopics(singleton(new NewTopic(topic, 1, (short)
1)));
+ adminClient.createTopics(Set.of(new NewTopic(topic, 1, (short)
1)));
assertNotNull(producer);
assertNotNull(consumer);
producer.send(new ProducerRecord<>(topic, key, value));
producer.flush();
- consumer.subscribe(singletonList(topic));
+ consumer.subscribe(List.of(topic));
List<ConsumerRecord<String, String>> records = new ArrayList<>();
TestUtils.waitForCondition(() -> {
consumer.poll(Duration.ofMillis(100)).forEach(records::add);
@@ -327,12 +323,12 @@ public class ClusterTestExtensionsTest {
Producer<byte[], byte[]> producer = cluster.producer();
Consumer<byte[], byte[]> consumer = cluster.consumer()
) {
- adminClient.createTopics(singleton(new NewTopic(topic, 1, (short)
1)));
+ adminClient.createTopics(Set.of(new NewTopic(topic, 1, (short)
1)));
assertNotNull(producer);
assertNotNull(consumer);
producer.send(new ProducerRecord<>(topic, key, value));
producer.flush();
- consumer.subscribe(singletonList(topic));
+ consumer.subscribe(List.of(topic));
List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
TestUtils.waitForCondition(() -> {
consumer.poll(Duration.ofMillis(100)).forEach(records::add);
diff --git
a/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsUnitTest.java
b/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsUnitTest.java
index 819006612c4..29374dfab78 100644
---
a/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsUnitTest.java
+++
b/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsUnitTest.java
@@ -25,7 +25,6 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtensionContext;
import java.lang.reflect.Method;
-import java.util.Collections;
import java.util.List;
import static org.mockito.Mockito.mock;
@@ -34,7 +33,7 @@ import static org.mockito.Mockito.when;
public class ClusterTestExtensionsUnitTest {
static List<ClusterConfig> cfgEmpty() {
- return Collections.emptyList();
+ return List.of();
}
@SuppressWarnings({"unchecked", "rawtypes"})
@@ -56,25 +55,25 @@ public class ClusterTestExtensionsUnitTest {
when(annot.value()).thenReturn("").thenReturn("
").thenReturn("cfgEmpty");
Assertions.assertEquals(
- "ClusterTemplate value can't be empty string.",
- Assertions.assertThrows(IllegalStateException.class, () ->
- ext.processClusterTemplate(context, annot)
- ).getMessage()
+ "ClusterTemplate value can't be empty string.",
+ Assertions.assertThrows(IllegalStateException.class, () ->
+ ext.processClusterTemplate(context, annot)
+ ).getMessage()
);
Assertions.assertEquals(
- "ClusterTemplate value can't be empty string.",
- Assertions.assertThrows(IllegalStateException.class, () ->
- ext.processClusterTemplate(context, annot)
- ).getMessage()
+ "ClusterTemplate value can't be empty string.",
+ Assertions.assertThrows(IllegalStateException.class, () ->
+ ext.processClusterTemplate(context, annot)
+ ).getMessage()
);
Assertions.assertEquals(
- "ClusterConfig generator method should provide at least one
config",
- Assertions.assertThrows(IllegalStateException.class, () ->
- ext.processClusterTemplate(context, annot)
- ).getMessage()
+ "ClusterConfig generator method should provide at least one
config",
+ Assertions.assertThrows(IllegalStateException.class, () ->
+ ext.processClusterTemplate(context, annot)
+ ).getMessage()
);
}
}
diff --git
a/test-common/test-common-util/src/test/java/org/apache/kafka/common/test/junit/CatalogTestFilterTest.java
b/test-common/test-common-util/src/test/java/org/apache/kafka/common/test/junit/CatalogTestFilterTest.java
index 22669a44f08..59857d4885c 100644
---
a/test-common/test-common-util/src/test/java/org/apache/kafka/common/test/junit/CatalogTestFilterTest.java
+++
b/test-common/test-common-util/src/test/java/org/apache/kafka/common/test/junit/CatalogTestFilterTest.java
@@ -26,7 +26,6 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -56,7 +55,7 @@ public class CatalogTestFilterTest {
@Test
public void testEmptyCatalog(@TempDir Path tempDir) throws IOException {
Path catalog = tempDir.resolve("catalog.txt");
- Files.write(catalog, Collections.emptyList());
+ Files.write(catalog, List.of());
Filter<TestDescriptor> filter =
CatalogTestFilter.create(catalog.toString());
assertTrue(filter.apply(descriptor("o.a.k.Foo",
"testBar1")).excluded());