chia7712 commented on code in PR #19154:
URL: https://github.com/apache/kafka/pull/19154#discussion_r2005990845
##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientRebootstrapTest.java:
##########
@@ -94,4 +101,129 @@ public void testAdminRebootstrapDisabled(ClusterInstance
clusterInstance) {
// Since the brokers cached during the bootstrap are offline, the
admin client needs to wait the default timeout for other threads.
admin.close(Duration.ZERO);
}
+
+ public void consumerRebootstrap(ClusterInstance clusterInstance,
GroupProtocol groupProtocol) throws InterruptedException, ExecutionException {
+ clusterInstance.createTopic(TOPIC, 1, (short) REPLICAS);
+
+ var part = 0;
+ var broker0 = 0;
+ var broker1 = 1;
+ var partitions = List.of(new TopicPartition(TOPIC, part));
+
+ try (var producer = clusterInstance.producer()) {
+ var recordMetadata = producer.send(new ProducerRecord<>(TOPIC,
part, "key 0".getBytes(), "value 0".getBytes())).get();
+ assertEquals(0, recordMetadata.offset());
+ producer.flush();
+ }
+
+ clusterInstance.shutdownBroker(broker0);
+
+ try (var consumer =
clusterInstance.consumer(Map.of(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
groupProtocol.name))) {
+ // Only the server 1 is available for the consumer during the
bootstrap.
+ consumer.assign(partitions);
+ consumer.seekToBeginning(partitions);
+ assertEquals(1, consumer.poll(Duration.ofSeconds(1)).count());
+
+ // Bring back the server 0 and shut down 1.
+ clusterInstance.shutdownBroker(broker1);
+ clusterInstance.startBroker(broker0);
+
+ try (var producer = clusterInstance.producer()) {
+ var recordMetadata = producer.send(new ProducerRecord<>(TOPIC,
part, "key 1".getBytes(), "value 1".getBytes())).get();
+ assertEquals(1, recordMetadata.offset());
+ producer.flush();
+ }
+
+ // The server 1 originally cached during the bootstrap, is offline.
+ // However, the server 0 from the bootstrap list is online.
+ assertEquals(1, consumer.poll(Duration.ofSeconds(1)).count());
+ }
+ }
+
+ @ClusterTest(
+ brokers = REPLICAS,
+ types = {Type.KRAFT},
+ serverProperties = {
+ @ClusterConfigProperty(key =
TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, value = "true"),
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2"),
+ })
+ public void testClassicConsumerRebootstrap(ClusterInstance
clusterInstance) throws InterruptedException, ExecutionException {
+ consumerRebootstrap(clusterInstance, GroupProtocol.CLASSIC);
+ }
+
+ @ClusterTest(
+ brokers = REPLICAS,
+ types = {Type.KRAFT},
+ serverProperties = {
+ @ClusterConfigProperty(key =
TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, value = "true"),
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2"),
+ })
+ public void testConsumerRebootstrap(ClusterInstance clusterInstance)
throws InterruptedException, ExecutionException {
+ consumerRebootstrap(clusterInstance, GroupProtocol.CONSUMER);
+ }
+
+ public void consumerRebootstrapDisabled(ClusterInstance clusterInstance,
GroupProtocol groupProtocol) throws InterruptedException, ExecutionException {
+ clusterInstance.createTopic(TOPIC, 1, (short) REPLICAS);
+
+ var part = 0;
+ var broker0 = 0;
+ var broker1 = 1;
+ var tp = new TopicPartition(TOPIC, part);
+
+ try (var producer = clusterInstance.producer()) {
+ var recordMetadata = producer.send(new ProducerRecord<>(TOPIC,
part, "key 0".getBytes(), "value 0".getBytes())).get();
Review Comment:
there is only one partition, so we can streamline the code:
```
var recordMetadata = producer.send(new ProducerRecord<>(TOPIC, "value
0".getBytes())).get();
```
##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientRebootstrapTest.java:
##########
@@ -94,4 +101,129 @@ public void testAdminRebootstrapDisabled(ClusterInstance
clusterInstance) {
// Since the brokers cached during the bootstrap are offline, the
admin client needs to wait the default timeout for other threads.
admin.close(Duration.ZERO);
}
+
+ public void consumerRebootstrap(ClusterInstance clusterInstance,
GroupProtocol groupProtocol) throws InterruptedException, ExecutionException {
+ clusterInstance.createTopic(TOPIC, 1, (short) REPLICAS);
+
+ var part = 0;
+ var broker0 = 0;
+ var broker1 = 1;
+ var partitions = List.of(new TopicPartition(TOPIC, part));
+
+ try (var producer = clusterInstance.producer()) {
+ var recordMetadata = producer.send(new ProducerRecord<>(TOPIC,
part, "key 0".getBytes(), "value 0".getBytes())).get();
+ assertEquals(0, recordMetadata.offset());
+ producer.flush();
+ }
+
+ clusterInstance.shutdownBroker(broker0);
+
+ try (var consumer =
clusterInstance.consumer(Map.of(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
groupProtocol.name))) {
+ // Only the server 1 is available for the consumer during the
bootstrap.
+ consumer.assign(partitions);
+ consumer.seekToBeginning(partitions);
+ assertEquals(1, consumer.poll(Duration.ofSeconds(1)).count());
+
+ // Bring back the server 0 and shut down 1.
+ clusterInstance.shutdownBroker(broker1);
+ clusterInstance.startBroker(broker0);
+
+ try (var producer = clusterInstance.producer()) {
+ var recordMetadata = producer.send(new ProducerRecord<>(TOPIC,
part, "key 1".getBytes(), "value 1".getBytes())).get();
+ assertEquals(1, recordMetadata.offset());
+ producer.flush();
+ }
+
+ // The server 1 originally cached during the bootstrap, is offline.
+ // However, the server 0 from the bootstrap list is online.
+ assertEquals(1, consumer.poll(Duration.ofSeconds(1)).count());
+ }
+ }
+
+ @ClusterTest(
+ brokers = REPLICAS,
+ types = {Type.KRAFT},
+ serverProperties = {
+ @ClusterConfigProperty(key =
TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, value = "true"),
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2"),
+ })
+ public void testClassicConsumerRebootstrap(ClusterInstance
clusterInstance) throws InterruptedException, ExecutionException {
+ consumerRebootstrap(clusterInstance, GroupProtocol.CLASSIC);
+ }
+
+ @ClusterTest(
+ brokers = REPLICAS,
+ types = {Type.KRAFT},
+ serverProperties = {
+ @ClusterConfigProperty(key =
TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, value = "true"),
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2"),
+ })
+ public void testConsumerRebootstrap(ClusterInstance clusterInstance)
throws InterruptedException, ExecutionException {
+ consumerRebootstrap(clusterInstance, GroupProtocol.CONSUMER);
+ }
+
+ public void consumerRebootstrapDisabled(ClusterInstance clusterInstance,
GroupProtocol groupProtocol) throws InterruptedException, ExecutionException {
+ clusterInstance.createTopic(TOPIC, 1, (short) REPLICAS);
+
+ var part = 0;
+ var broker0 = 0;
+ var broker1 = 1;
+ var tp = new TopicPartition(TOPIC, part);
+
+ try (var producer = clusterInstance.producer()) {
+ var recordMetadata = producer.send(new ProducerRecord<>(TOPIC,
part, "key 0".getBytes(), "value 0".getBytes())).get();
+ assertEquals(0, recordMetadata.offset());
+ producer.flush();
+ }
+
+ clusterInstance.shutdownBroker(broker0);
+
+ try (var consumer = clusterInstance.consumer(Map.of(
+ CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG, "none",
+ ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name)
+ )) {
+ // Only the server 1 is available for the consumer during the
bootstrap.
+ consumer.assign(List.of(tp));
+ consumer.seekToBeginning(List.of(tp));
+ assertEquals(1, consumer.poll(Duration.ofSeconds(1)).count());
+
+ // Bring back the server 0 and shut down 1.
+ clusterInstance.shutdownBroker(broker1);
+ clusterInstance.startBroker(broker0);
+
+ try (var producer = clusterInstance.producer()) {
+ var recordMetadata = producer.send(new ProducerRecord<>(TOPIC,
part, "key 1".getBytes(), "value 1".getBytes())).get();
Review Comment:
ditto
##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientRebootstrapTest.java:
##########
@@ -94,4 +101,129 @@ public void testAdminRebootstrapDisabled(ClusterInstance
clusterInstance) {
// Since the brokers cached during the bootstrap are offline, the
admin client needs to wait the default timeout for other threads.
admin.close(Duration.ZERO);
}
+
+ public void consumerRebootstrap(ClusterInstance clusterInstance,
GroupProtocol groupProtocol) throws InterruptedException, ExecutionException {
+ clusterInstance.createTopic(TOPIC, 1, (short) REPLICAS);
+
+ var part = 0;
+ var broker0 = 0;
+ var broker1 = 1;
+ var partitions = List.of(new TopicPartition(TOPIC, part));
+
+ try (var producer = clusterInstance.producer()) {
+ var recordMetadata = producer.send(new ProducerRecord<>(TOPIC,
part, "key 0".getBytes(), "value 0".getBytes())).get();
+ assertEquals(0, recordMetadata.offset());
+ producer.flush();
+ }
+
+ clusterInstance.shutdownBroker(broker0);
+
+ try (var consumer =
clusterInstance.consumer(Map.of(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
groupProtocol.name))) {
+ // Only the server 1 is available for the consumer during the
bootstrap.
+ consumer.assign(partitions);
+ consumer.seekToBeginning(partitions);
+ assertEquals(1, consumer.poll(Duration.ofSeconds(1)).count());
+
+ // Bring back the server 0 and shut down 1.
+ clusterInstance.shutdownBroker(broker1);
+ clusterInstance.startBroker(broker0);
+
+ try (var producer = clusterInstance.producer()) {
+ var recordMetadata = producer.send(new ProducerRecord<>(TOPIC,
part, "key 1".getBytes(), "value 1".getBytes())).get();
+ assertEquals(1, recordMetadata.offset());
+ producer.flush();
+ }
+
+ // The server 1 originally cached during the bootstrap, is offline.
+ // However, the server 0 from the bootstrap list is online.
+ assertEquals(1, consumer.poll(Duration.ofSeconds(1)).count());
+ }
+ }
+
+ @ClusterTest(
+ brokers = REPLICAS,
+ types = {Type.KRAFT},
+ serverProperties = {
+ @ClusterConfigProperty(key =
TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, value = "true"),
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2"),
+ })
+ public void testClassicConsumerRebootstrap(ClusterInstance
clusterInstance) throws InterruptedException, ExecutionException {
+ consumerRebootstrap(clusterInstance, GroupProtocol.CLASSIC);
+ }
+
+ @ClusterTest(
+ brokers = REPLICAS,
+ types = {Type.KRAFT},
+ serverProperties = {
+ @ClusterConfigProperty(key =
TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, value = "true"),
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2"),
+ })
+ public void testConsumerRebootstrap(ClusterInstance clusterInstance)
throws InterruptedException, ExecutionException {
+ consumerRebootstrap(clusterInstance, GroupProtocol.CONSUMER);
+ }
+
+ public void consumerRebootstrapDisabled(ClusterInstance clusterInstance,
GroupProtocol groupProtocol) throws InterruptedException, ExecutionException {
+ clusterInstance.createTopic(TOPIC, 1, (short) REPLICAS);
+
+ var part = 0;
+ var broker0 = 0;
+ var broker1 = 1;
+ var tp = new TopicPartition(TOPIC, part);
+
+ try (var producer = clusterInstance.producer()) {
+ var recordMetadata = producer.send(new ProducerRecord<>(TOPIC,
part, "key 0".getBytes(), "value 0".getBytes())).get();
+ assertEquals(0, recordMetadata.offset());
+ producer.flush();
+ }
+
+ clusterInstance.shutdownBroker(broker0);
+
+ try (var consumer = clusterInstance.consumer(Map.of(
+ CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG, "none",
+ ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name)
+ )) {
+ // Only the server 1 is available for the consumer during the
bootstrap.
+ consumer.assign(List.of(tp));
+ consumer.seekToBeginning(List.of(tp));
+ assertEquals(1, consumer.poll(Duration.ofSeconds(1)).count());
+
+ // Bring back the server 0 and shut down 1.
+ clusterInstance.shutdownBroker(broker1);
+ clusterInstance.startBroker(broker0);
+
+ try (var producer = clusterInstance.producer()) {
+ var recordMetadata = producer.send(new ProducerRecord<>(TOPIC,
part, "key 1".getBytes(), "value 1".getBytes())).get();
+ assertEquals(1, recordMetadata.offset());
+ producer.flush();
Review Comment:
we don't need to call `flush` after you call the `get`
##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientRebootstrapTest.java:
##########
@@ -94,4 +101,129 @@ public void testAdminRebootstrapDisabled(ClusterInstance
clusterInstance) {
// Since the brokers cached during the bootstrap are offline, the
admin client needs to wait the default timeout for other threads.
admin.close(Duration.ZERO);
}
+
+ public void consumerRebootstrap(ClusterInstance clusterInstance,
GroupProtocol groupProtocol) throws InterruptedException, ExecutionException {
+ clusterInstance.createTopic(TOPIC, 1, (short) REPLICAS);
+
+ var part = 0;
+ var broker0 = 0;
+ var broker1 = 1;
+ var partitions = List.of(new TopicPartition(TOPIC, part));
+
+ try (var producer = clusterInstance.producer()) {
+ var recordMetadata = producer.send(new ProducerRecord<>(TOPIC,
part, "key 0".getBytes(), "value 0".getBytes())).get();
Review Comment:
ditto
##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientRebootstrapTest.java:
##########
@@ -94,4 +101,129 @@ public void testAdminRebootstrapDisabled(ClusterInstance
clusterInstance) {
// Since the brokers cached during the bootstrap are offline, the
admin client needs to wait the default timeout for other threads.
admin.close(Duration.ZERO);
}
+
+ public void consumerRebootstrap(ClusterInstance clusterInstance,
GroupProtocol groupProtocol) throws InterruptedException, ExecutionException {
+ clusterInstance.createTopic(TOPIC, 1, (short) REPLICAS);
+
+ var part = 0;
+ var broker0 = 0;
+ var broker1 = 1;
+ var partitions = List.of(new TopicPartition(TOPIC, part));
+
+ try (var producer = clusterInstance.producer()) {
+ var recordMetadata = producer.send(new ProducerRecord<>(TOPIC,
part, "key 0".getBytes(), "value 0".getBytes())).get();
+ assertEquals(0, recordMetadata.offset());
+ producer.flush();
+ }
+
+ clusterInstance.shutdownBroker(broker0);
+
+ try (var consumer =
clusterInstance.consumer(Map.of(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
groupProtocol.name))) {
+ // Only the server 1 is available for the consumer during the
bootstrap.
+ consumer.assign(partitions);
+ consumer.seekToBeginning(partitions);
+ assertEquals(1, consumer.poll(Duration.ofSeconds(1)).count());
Review Comment:
we can't assume the records get returned by only one poll. could you please
add loop to avoid flaky?
##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientRebootstrapTest.java:
##########
@@ -94,4 +101,129 @@ public void testAdminRebootstrapDisabled(ClusterInstance
clusterInstance) {
// Since the brokers cached during the bootstrap are offline, the
admin client needs to wait the default timeout for other threads.
admin.close(Duration.ZERO);
}
+
+ public void consumerRebootstrap(ClusterInstance clusterInstance,
GroupProtocol groupProtocol) throws InterruptedException, ExecutionException {
+ clusterInstance.createTopic(TOPIC, 1, (short) REPLICAS);
+
+ var part = 0;
+ var broker0 = 0;
+ var broker1 = 1;
+ var partitions = List.of(new TopicPartition(TOPIC, part));
+
+ try (var producer = clusterInstance.producer()) {
+ var recordMetadata = producer.send(new ProducerRecord<>(TOPIC,
part, "key 0".getBytes(), "value 0".getBytes())).get();
+ assertEquals(0, recordMetadata.offset());
+ producer.flush();
+ }
+
+ clusterInstance.shutdownBroker(broker0);
+
+ try (var consumer =
clusterInstance.consumer(Map.of(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
groupProtocol.name))) {
+ // Only the server 1 is available for the consumer during the
bootstrap.
+ consumer.assign(partitions);
+ consumer.seekToBeginning(partitions);
+ assertEquals(1, consumer.poll(Duration.ofSeconds(1)).count());
+
+ // Bring back the server 0 and shut down 1.
+ clusterInstance.shutdownBroker(broker1);
+ clusterInstance.startBroker(broker0);
+
+ try (var producer = clusterInstance.producer()) {
+ var recordMetadata = producer.send(new ProducerRecord<>(TOPIC,
part, "key 1".getBytes(), "value 1".getBytes())).get();
+ assertEquals(1, recordMetadata.offset());
+ producer.flush();
+ }
+
+ // The server 1 originally cached during the bootstrap, is offline.
+ // However, the server 0 from the bootstrap list is online.
+ assertEquals(1, consumer.poll(Duration.ofSeconds(1)).count());
Review Comment:
ditto
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]