smjn commented on code in PR #18656:
URL: https://github.com/apache/kafka/pull/18656#discussion_r1923949876
##########
core/src/test/java/kafka/test/api/ShareConsumerTest.java:
##########
@@ -101,67 +100,64 @@
@Timeout(1200)
@Tag("integration")
+@ExtendWith(ClusterTestExtensions.class)
+@ClusterTestDefaults(
+ serverProperties = {
+ @ClusterConfigProperty(key = "auto.create.topics.enable", value =
"false"),
+ @ClusterConfigProperty(key = "group.coordinator.rebalance.protocols",
value = "classic,consumer,share"),
+ @ClusterConfigProperty(key = "group.share.enable", value = "true"),
+ @ClusterConfigProperty(key = "group.share.partition.max.record.locks",
value = "10000"),
+ @ClusterConfigProperty(key = "group.share.record.lock.duration.ms",
value = "15000"),
+ @ClusterConfigProperty(key = "offsets.topic.replication.factor", value
= "1"),
+ @ClusterConfigProperty(key = "share.coordinator.state.topic.min.isr",
value = "1"),
+ @ClusterConfigProperty(key =
"share.coordinator.state.topic.num.partitions", value = "3"),
+ @ClusterConfigProperty(key =
"share.coordinator.state.topic.replication.factor", value = "1"),
+ @ClusterConfigProperty(key = "transaction.state.log.min.isr", value =
"1"),
+ @ClusterConfigProperty(key =
"transaction.state.log.replication.factor", value = "1"),
+ @ClusterConfigProperty(key = "unstable.api.versions.enable", value =
"true")
+ },
+ types = {Type.KRAFT}
+)
public class ShareConsumerTest {
- private KafkaClusterTestKit cluster;
+ private final ClusterInstance cluster;
private final TopicPartition tp = new TopicPartition("topic", 0);
private final TopicPartition tp2 = new TopicPartition("topic2", 0);
private final TopicPartition warmupTp = new TopicPartition("warmup", 0);
private List<TopicPartition> sgsTopicPartitions;
- private Admin adminClient;
-
- @BeforeEach
- public void createCluster(TestInfo testInfo) throws Exception {
- cluster = new KafkaClusterTestKit.Builder(
- new TestKitNodes.Builder()
- .setNumBrokerNodes(1)
- .setNumControllerNodes(1)
- .build())
- .setConfigProp("auto.create.topics.enable", "false")
- .setConfigProp("group.coordinator.rebalance.protocols",
"classic,consumer,share")
- .setConfigProp("group.share.enable", "true")
- .setConfigProp("group.share.partition.max.record.locks", "10000")
- .setConfigProp("group.share.record.lock.duration.ms", "15000")
- .setConfigProp("offsets.topic.replication.factor", "1")
- .setConfigProp("share.coordinator.state.topic.min.isr", "1")
- .setConfigProp("share.coordinator.state.topic.num.partitions", "3")
- .setConfigProp("share.coordinator.state.topic.replication.factor",
"1")
- .setConfigProp("transaction.state.log.min.isr", "1")
- .setConfigProp("transaction.state.log.replication.factor", "1")
- .setConfigProp("unstable.api.versions.enable", "true")
- .build();
- cluster.format();
- cluster.startup();
- cluster.waitForActiveController();
- cluster.waitForReadyBrokers();
- createTopic("topic");
- createTopic("topic2");
- adminClient = createAdminClient();
- sgsTopicPartitions = IntStream.range(0, 3)
- .mapToObj(part -> new
TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, part))
- .toList();
- warmup();
- }
-
- @AfterEach
- public void destroyCluster() throws Exception {
- adminClient.close();
- cluster.close();
- }
-
- @Test
+ public ShareConsumerTest(ClusterInstance cluster) {
+ this.cluster = cluster;
+ }
+
+ private void setup() {
Review Comment:
@AndrewJSchofield I think not - since the clusterInstance is not populated
at that time. It shows up as null.
Using `@BeforeEach` - it throws an exception
```
org.opentest4j.AssertionFailedError
at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:46)
at org.junit.jupiter.api.Assertions.fail(Assertions.java:161)
at kafka.test.api.ShareConsumerTest.setup(ShareConsumerTest.java:144)
...
Caused by: java.lang.NullPointerException: Cannot invoke
"org.apache.kafka.common.test.KafkaClusterTestKit.waitForReadyBrokers()"
because "this.clusterTestKit" is null
at
org.apache.kafka.common.test.api.RaftClusterInvocationContext$RaftClusterInstance.waitForReadyBrokers(RaftClusterInvocationContext.java:219)
at kafka.test.api.ShareConsumerTest.setup(ShareConsumerTest.java:136)
... 17 more
```
I checked a couple of implementations and they are using this approach to
perform pre-test initializations.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]