This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new acd92be6eac MINOR: reduce topicCommandTest brokers from 6 to 3 (#17875)
acd92be6eac is described below
commit acd92be6eacd80f952fe5108cba72d374f901c13
Author: TaiJuWu <[email protected]>
AuthorDate: Sat Nov 23 22:22:29 2024 +0800
MINOR: reduce topicCommandTest brokers from 6 to 3 (#17875)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../org/apache/kafka/tools/TopicCommandTest.java | 142 ++++++++++++++-------
1 file changed, 95 insertions(+), 47 deletions(-)
diff --git a/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java
index aec130ad915..b35952ee088 100644
--- a/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java
@@ -50,8 +50,10 @@ import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.test.api.ClusterConfig;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterTemplate;
+import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestExtensions;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.common.utils.Exit;
@@ -373,7 +375,13 @@ public class TopicCommandTest {
);
}
- @ClusterTemplate("generate")
+ @ClusterTest(
+ brokers = 3,
+ serverProperties = {
+ @ClusterConfigProperty(key = "log.initial.task.delay.ms", value =
"100"),
+ @ClusterConfigProperty(key = "log.segment.delete.delay.ms", value
= "1000")
+ }
+ )
public void testCreate(ClusterInstance clusterInstance) throws
InterruptedException, ExecutionException {
String testTopicName = TestUtils.randomString(10);
@@ -391,7 +399,13 @@ public class TopicCommandTest {
}
}
- @ClusterTemplate("generate")
+ @ClusterTest(
+ brokers = 3,
+ serverProperties = {
+ @ClusterConfigProperty(key = "log.initial.task.delay.ms", value =
"100"),
+ @ClusterConfigProperty(key = "log.segment.delete.delay.ms", value
= "1000")
+ }
+ )
public void testCreateWithDefaults(ClusterInstance clusterInstance) throws
InterruptedException, ExecutionException {
String testTopicName = TestUtils.randomString(10);
@@ -418,7 +432,13 @@ public class TopicCommandTest {
}
}
- @ClusterTemplate("generate")
+ @ClusterTest(
+ brokers = 3,
+ serverProperties = {
+ @ClusterConfigProperty(key = "log.initial.task.delay.ms", value =
"100"),
+ @ClusterConfigProperty(key = "log.segment.delete.delay.ms", value
= "1000")
+ }
+ )
public void testCreateWithDefaultReplication(ClusterInstance
clusterInstance) throws InterruptedException, ExecutionException {
String testTopicName = TestUtils.randomString(10);
@@ -436,7 +456,7 @@ public class TopicCommandTest {
}
}
- @ClusterTemplate("generate")
+ @ClusterTest(brokers = 3)
public void testCreateWithDefaultPartitions(ClusterInstance
clusterInstance) throws InterruptedException, ExecutionException {
String testTopicName = TestUtils.randomString(10);
@@ -455,7 +475,7 @@ public class TopicCommandTest {
}
}
- @ClusterTemplate("generate")
+ @ClusterTest(brokers = 3)
public void testCreateWithConfigs(ClusterInstance clusterInstance) throws
Exception {
String testTopicName = TestUtils.randomString(10);
@@ -474,7 +494,7 @@ public class TopicCommandTest {
}
}
- @ClusterTemplate("generate")
+ @ClusterTest(brokers = 3)
public void testCreateWhenAlreadyExists(ClusterInstance clusterInstance)
throws Exception {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.admin();
@@ -492,7 +512,7 @@ public class TopicCommandTest {
}
}
- @ClusterTemplate("generate")
+ @ClusterTest(brokers = 3)
public void testCreateWhenAlreadyExistsWithIfNotExists(ClusterInstance
clusterInstance) throws Exception {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.admin();
@@ -530,7 +550,6 @@ public class TopicCommandTest {
.get(testTopicName)
.partitions();
- adminClient.close();
assertEquals(3, partitions.size(),
"Unequal partition size: " + partitions.size());
assertEquals(Arrays.asList(5, 4), getPartitionReplicas(partitions,
0),
@@ -542,7 +561,7 @@ public class TopicCommandTest {
}
}
- @ClusterTemplate("generate")
+ @ClusterTest(brokers = 3)
public void testCreateWithInvalidReplicationFactor(ClusterInstance
clusterInstance) throws Exception {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.admin();
@@ -554,7 +573,7 @@ public class TopicCommandTest {
}
}
- @ClusterTemplate("generate")
+ @ClusterTest
public void testCreateWithNegativeReplicationFactor(ClusterInstance
clusterInstance) throws Exception {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.admin();
@@ -565,7 +584,7 @@ public class TopicCommandTest {
}
}
- @ClusterTemplate("generate")
+ @ClusterTest
public void testCreateWithNegativePartitionCount(ClusterInstance
clusterInstance) throws Exception {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.admin();
@@ -575,7 +594,7 @@ public class TopicCommandTest {
}
}
- @ClusterTemplate("generate")
+ @ClusterTest
public void testInvalidTopicLevelConfig(ClusterInstance clusterInstance) {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.admin()) {
@@ -588,7 +607,7 @@ public class TopicCommandTest {
}
}
- @ClusterTemplate("generate")
+ @ClusterTest
public void testListTopics(ClusterInstance clusterInstance) throws
InterruptedException {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.admin()) {
@@ -600,7 +619,7 @@ public class TopicCommandTest {
}
}
- @ClusterTemplate("generate")
+ @ClusterTest(brokers = 3)
public void testListTopicsWithIncludeList(ClusterInstance clusterInstance)
throws InterruptedException {
try (Admin adminClient = clusterInstance.admin()) {
String topic1 = "kafka.testTopic1";
@@ -622,7 +641,7 @@ public class TopicCommandTest {
}
}
- @ClusterTemplate("generate")
+ @ClusterTest(brokers = 3)
public void testListTopicsWithExcludeInternal(ClusterInstance
clusterInstance) throws InterruptedException {
try (Admin adminClient = clusterInstance.admin()) {
String topic1 = "kafka.testTopic1";
@@ -638,7 +657,7 @@ public class TopicCommandTest {
}
}
- @ClusterTemplate("generate")
+ @ClusterTest(brokers = 3)
public void testAlterPartitionCount(ClusterInstance clusterInstance)
throws Exception {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.admin();
@@ -695,7 +714,7 @@ public class TopicCommandTest {
}
}
- @ClusterTemplate("generate")
+ @ClusterTest(brokers = 3)
public void
testAlterAssignmentWithMoreAssignmentThanPartitions(ClusterInstance
clusterInstance) throws Exception {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.admin();
@@ -732,7 +751,7 @@ public class TopicCommandTest {
}
}
- @ClusterTemplate("generate")
+ @ClusterTest
public void testAlterWithInvalidPartitionCount(ClusterInstance
clusterInstance) throws Exception {
String testTopicName = TestUtils.randomString(10);
@@ -747,7 +766,7 @@ public class TopicCommandTest {
}
}
- @ClusterTemplate("generate")
+ @ClusterTest
public void testAlterWhenTopicDoesntExist(ClusterInstance clusterInstance)
throws Exception {
String testTopicName = TestUtils.randomString(10);
@@ -760,7 +779,7 @@ public class TopicCommandTest {
}
}
- @ClusterTemplate("generate")
+ @ClusterTest
public void testAlterWhenTopicDoesntExistWithIfExists(ClusterInstance
clusterInstance) throws Exception {
String testTopicName = TestUtils.randomString(10);
Admin adminClient = clusterInstance.admin();
@@ -823,7 +842,7 @@ public class TopicCommandTest {
}
}
- @ClusterTemplate("generate")
+ @ClusterTest(brokers = 3)
public void
testConfigPreservationAcrossPartitionAlteration(ClusterInstance
clusterInstance) throws Exception {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.admin();
@@ -857,7 +876,13 @@ public class TopicCommandTest {
}
}
- @ClusterTemplate("generate")
+ @ClusterTest(
+ brokers = 3,
+ serverProperties = {
+ @ClusterConfigProperty(key = "log.initial.task.delay.ms", value =
"100"),
+ @ClusterConfigProperty(key = "log.segment.delete.delay.ms", value
= "1000")
+ }
+ )
public void testTopicDeletion(ClusterInstance clusterInstance) throws
Exception {
try (Admin adminClient = clusterInstance.admin();
TopicCommand.TopicService topicService = new
TopicCommand.TopicService(adminClient)) {
@@ -877,7 +902,13 @@ public class TopicCommandTest {
}
}
- @ClusterTemplate("generate")
+ @ClusterTest(
+ brokers = 3,
+ serverProperties = {
+ @ClusterConfigProperty(key = "log.initial.task.delay.ms", value =
"100"),
+ @ClusterConfigProperty(key = "log.segment.delete.delay.ms", value
= "1000")
+ }
+ )
public void
testTopicWithCollidingCharDeletionAndCreateAgain(ClusterInstance
clusterInstance) throws Exception {
try (Admin adminClient = clusterInstance.admin();
TopicCommand.TopicService topicService = new
TopicCommand.TopicService(adminClient)) {
@@ -902,7 +933,13 @@ public class TopicCommandTest {
}
}
- @ClusterTemplate("generate")
+ @ClusterTest(
+ brokers = 3,
+ serverProperties = {
+ @ClusterConfigProperty(key = "log.initial.task.delay.ms", value =
"100"),
+ @ClusterConfigProperty(key = "log.segment.delete.delay.ms", value
= "1000")
+ }
+ )
public void testDeleteInternalTopic(ClusterInstance clusterInstance)
throws Exception {
try (Admin adminClient = clusterInstance.admin();
TopicCommand.TopicService topicService = new
TopicCommand.TopicService(adminClient)) {
@@ -926,7 +963,13 @@ public class TopicCommandTest {
}
}
- @ClusterTemplate("generate")
+ @ClusterTest(
+ brokers = 3,
+ serverProperties = {
+ @ClusterConfigProperty(key = "log.initial.task.delay.ms", value =
"100"),
+ @ClusterConfigProperty(key = "log.segment.delete.delay.ms", value
= "1000")
+ }
+ )
public void testDeleteWhenTopicDoesntExist(ClusterInstance
clusterInstance) throws Exception {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.admin();
@@ -938,7 +981,13 @@ public class TopicCommandTest {
}
}
- @ClusterTemplate("generate")
+ @ClusterTest(
+ brokers = 3,
+ serverProperties = {
+ @ClusterConfigProperty(key = "log.initial.task.delay.ms", value =
"100"),
+ @ClusterConfigProperty(key = "log.segment.delete.delay.ms", value
= "1000")
+ }
+ )
public void testDeleteWhenTopicDoesntExistWithIfExists(ClusterInstance
clusterInstance) throws Exception {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.admin();
@@ -994,7 +1043,7 @@ public class TopicCommandTest {
}
}
- @ClusterTemplate("generate")
+ @ClusterTest
public void testDescribeWhenTopicDoesntExist(ClusterInstance
clusterInstance) {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.admin()) {
@@ -1007,7 +1056,7 @@ public class TopicCommandTest {
}
- @ClusterTemplate("generate")
+ @ClusterTest
public void testDescribeWhenTopicDoesntExistWithIfExists(ClusterInstance
clusterInstance) throws Exception {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.admin()) {
@@ -1015,17 +1064,16 @@ public class TopicCommandTest {
topicService.describeTopic(buildTopicCommandOptionsWithBootstrap(clusterInstance,
"--describe", "--topic", testTopicName, "--if-exists"));
- adminClient.close();
topicService.close();
}
}
- @ClusterTemplate("generate")
- public void testDescribeUnavailablePartitions(ClusterInstance
clusterInstance) throws ExecutionException, InterruptedException {
+ @ClusterTest(brokers = 3)
+ public void testDescribeUnavailablePartitions(ClusterInstance
clusterInstance) throws InterruptedException {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.admin()) {
- int partitions = 6;
+ int partitions = 3;
short replicationFactor = 1;
adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, partitions, replicationFactor)));
@@ -1033,10 +1081,10 @@ public class TopicCommandTest {
// check which partition is on broker 0 which we'll kill
clusterInstance.shutdownBroker(0);
- assertEquals(5, clusterInstance.aliveBrokers().size());
+ assertEquals(2, clusterInstance.aliveBrokers().size());
// wait until the topic metadata for the test topic is propagated
to each alive broker
- clusterInstance.waitForTopic(testTopicName, 6);
+ clusterInstance.waitForTopic(testTopicName, 3);
// grab the console output and assert
String output = captureDescribeTopicStandardOut(clusterInstance,
buildTopicCommandOptionsWithBootstrap(clusterInstance, "--describe", "--topic",
testTopicName, "--unavailable-partitions"));
@@ -1049,17 +1097,17 @@ public class TopicCommandTest {
}
}
- @ClusterTemplate("generate")
+ @ClusterTest(brokers = 3)
public void testDescribeUnderReplicatedPartitions(ClusterInstance
clusterInstance) throws InterruptedException {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.admin()) {
int partitions = 1;
- short replicationFactor = 6;
+ short replicationFactor = 3;
adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, partitions, replicationFactor)));
clusterInstance.waitForTopic(testTopicName, partitions);
clusterInstance.shutdownBroker(0);
- Assertions.assertEquals(clusterInstance.aliveBrokers().size(), 5);
+ Assertions.assertEquals(clusterInstance.aliveBrokers().size(), 2);
TestUtils.waitForCondition(
() ->
clusterInstance.aliveBrokers().values().stream().allMatch(
@@ -1077,23 +1125,23 @@ public class TopicCommandTest {
}
- @ClusterTemplate("generate")
+ @ClusterTest(brokers = 3)
public void testDescribeUnderMinIsrPartitions(ClusterInstance
clusterInstance) throws InterruptedException {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.admin()) {
Map<String, String> topicConfig = new HashMap<>();
- topicConfig.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "6");
+ topicConfig.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3");
int partitions = 1;
- short replicationFactor = 6;
+ short replicationFactor = 3;
adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, partitions, replicationFactor).configs(topicConfig)));
clusterInstance.waitForTopic(testTopicName, partitions);
clusterInstance.shutdownBroker(0);
- assertEquals(5, clusterInstance.aliveBrokers().size());
+ assertEquals(2, clusterInstance.aliveBrokers().size());
TestUtils.waitForCondition(
- () ->
clusterInstance.aliveBrokers().values().stream().allMatch(broker ->
broker.metadataCache().getPartitionInfo(testTopicName, 0).get().isr().size() ==
5),
+ () ->
clusterInstance.aliveBrokers().values().stream().allMatch(broker ->
broker.metadataCache().getPartitionInfo(testTopicName, 0).get().isr().size() ==
2),
CLUSTER_WAIT_MS, String.format("Timeout waiting for
partition metadata propagating to brokers for %s topic", testTopicName)
);
@@ -1275,7 +1323,7 @@ public class TopicCommandTest {
}
}
- @ClusterTemplate("generate")
+ @ClusterTest(brokers = 3)
public void testDescribeReportOverriddenConfigs(ClusterInstance
clusterInstance) throws InterruptedException {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.admin()) {
@@ -1294,7 +1342,7 @@ public class TopicCommandTest {
}
}
- @ClusterTemplate("generate")
+ @ClusterTest
public void testDescribeAndListTopicsWithoutInternalTopics(ClusterInstance
clusterInstance) throws InterruptedException {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.admin()) {
@@ -1316,7 +1364,7 @@ public class TopicCommandTest {
}
}
- @ClusterTemplate("generate")
+ @ClusterTest
public void
testDescribeDoesNotFailWhenListingReassignmentIsUnauthorized(ClusterInstance
clusterInstance) throws Exception {
String testTopicName = TestUtils.randomString(10);
Admin adminClient = clusterInstance.admin();
@@ -1340,14 +1388,14 @@ public class TopicCommandTest {
adminClient.close();
}
- @ClusterTemplate("generate")
+ @ClusterTest(brokers = 3)
public void testCreateWithTopicNameCollision(ClusterInstance
clusterInstance) throws Exception {
try (Admin adminClient = clusterInstance.admin();
TopicCommand.TopicService topicService = new
TopicCommand.TopicService(adminClient)) {
String topic = "foo_bar";
int partitions = 1;
- short replicationFactor = 6;
+ short replicationFactor = 3;
adminClient.createTopics(Collections.singletonList(new
NewTopic(topic, partitions, replicationFactor)));
clusterInstance.waitForTopic(topic, defaultNumPartitions);