This is an automated email from the ASF dual-hosted git repository.
manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 423a6353a8a KAFKA-18025: Rework acquisition lock timeout test (#17985)
423a6353a8a is described below
commit 423a6353a8a645725dcbce6403d5c00653d51f37
Author: Andrew Schofield <[email protected]>
AuthorDate: Wed Dec 4 15:28:55 2024 +0000
KAFKA-18025: Rework acquisition lock timeout test (#17985)
Additional work on ShareConsumerTest.testAcquisitionLockTimeoutOnConsumer.
First, mark the test as flaky since it fails occasionally and it would be best
to get a decent number of passes before assuming it's no longer flaky. Then,
change the assertions so that the test checks which records were received
before it counts them (we might get too many records because the wrong records
are being returned, or because records are actually being duplicated). The rare
failures appear to be rel [...]
There are a couple of other improvements too. Reducing the number of
share-group state partitions so we don't have the overhead of creating 50
partitions when a few will do. Changing the warm-up logic since that has been
observed very occasionally to assert.
Reviewers: ShivsundarR <[email protected]>, Manikumar Reddy
<[email protected]>
---
.../java/kafka/test/api/ShareConsumerTest.java | 70 +++++++++++++---------
1 file changed, 43 insertions(+), 27 deletions(-)
diff --git a/core/src/test/java/kafka/test/api/ShareConsumerTest.java
b/core/src/test/java/kafka/test/api/ShareConsumerTest.java
index c3e97d16d24..d31ca18e8b7 100644
--- a/core/src/test/java/kafka/test/api/ShareConsumerTest.java
+++ b/core/src/test/java/kafka/test/api/ShareConsumerTest.java
@@ -129,6 +129,7 @@ public class ShareConsumerTest {
.setConfigProp("group.share.record.lock.duration.ms", "15000")
.setConfigProp("offsets.topic.replication.factor", "1")
.setConfigProp("share.coordinator.state.topic.min.isr", "1")
+ .setConfigProp("share.coordinator.state.topic.num.partitions", "3")
.setConfigProp("share.coordinator.state.topic.replication.factor",
"1")
.setConfigProp("transaction.state.log.min.isr", "1")
.setConfigProp("transaction.state.log.replication.factor", "1")
@@ -1278,51 +1279,64 @@ public class ShareConsumerTest {
}
}
+ @Flaky("KAFKA-18025")
@ParameterizedTest(name = "{displayName}.persister={0}")
@ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
public void testAcquisitionLockTimeoutOnConsumer(String persister) throws
InterruptedException {
alterShareAutoOffsetReset("group1", "earliest");
try (KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer());
- KafkaShareConsumer<byte[], byte[]> shareConsumer1 =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1")) {
+ KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1")) {
ProducerRecord<byte[], byte[]> producerRecord1 = new
ProducerRecord<>(tp.topic(), tp.partition(), null,
"key_1".getBytes(), "value_1".getBytes());
ProducerRecord<byte[], byte[]> producerRecord2 = new
ProducerRecord<>(tp.topic(), tp.partition(), null,
"key_2".getBytes(), "value_2".getBytes());
- shareConsumer1.subscribe(Collections.singleton(tp.topic()));
+ shareConsumer.subscribe(Set.of(tp.topic()));
+ // Produce a first record which is consumed and acknowledged
normally.
producer.send(producerRecord1);
producer.flush();
- // Poll two times to receive records. The first poll puts the
acquisition lock and fetches the record.
- // Since, we are only sending one record and acquisition lock
hasn't timed out, the second poll only acknowledges the
- // record from the first poll and no more fetch.
- ConsumerRecords<byte[], byte[]> records1 =
shareConsumer1.poll(Duration.ofMillis(5000));
- assertEquals(1, records1.count());
- assertEquals("key_1", new
String(records1.iterator().next().key()));
- assertEquals("value_1", new
String(records1.iterator().next().value()));
- ConsumerRecords<byte[], byte[]> records2 =
shareConsumer1.poll(Duration.ofMillis(500));
- assertEquals(0, records2.count());
+ // Poll twice to receive records. The first poll fetches the
record and starts the acquisition lock timer.
+ // Since, we are only sending one record and the acquisition lock
hasn't timed out, the second poll only
+ // acknowledges the record from the first poll and does not fetch
any more records.
+ ConsumerRecords<byte[], byte[]> consumerRecords =
shareConsumer.poll(Duration.ofMillis(5000));
+ ConsumerRecord<byte[], byte[]> consumerRecord =
consumerRecords.records(tp).get(0);
+ assertEquals("key_1", new String(consumerRecord.key()));
+ assertEquals("value_1", new String(consumerRecord.value()));
+ assertEquals(1, consumerRecords.count());
- producer.send(producerRecord2);
+ consumerRecords = shareConsumer.poll(Duration.ofMillis(1000));
+ assertEquals(0, consumerRecords.count());
- // Poll three times. The first poll puts the acquisition lock and
fetches the record. Before the second poll,
- // acquisition lock times out and hence the consumer needs to
fetch the record again. Since, the acquisition lock
- // hasn't timed out before the third poll, the third poll only
acknowledges the record from the second poll and no more fetch.
- records1 = shareConsumer1.poll(Duration.ofMillis(5000));
- assertEquals(1, records1.count());
- assertEquals("key_2", new
String(records1.iterator().next().key()));
- assertEquals("value_2", new
String(records1.iterator().next().value()));
+ // Produce a second record which is fetched, but not acknowledged
before it times out. The record will
+ // be released automatically by the broker. It is then fetched
again and acknowledged normally.
+ producer.send(producerRecord2);
+ producer.flush();
- // Allowing acquisition lock to expire.
+ // Poll three more times. The first poll fetches the second record
and starts the acquisition lock timer.
+ // Before the second poll, acquisition lock times out and hence
the consumer needs to fetch the record again.
+ // The acquisition lock doesn't time out between the second and
third polls, so the third poll only acknowledges
+ // the record from the second poll and does not fetch any more
records.
+ consumerRecords = shareConsumer.poll(Duration.ofMillis(5000));
+ consumerRecord = consumerRecords.records(tp).get(0);
+ assertEquals("key_2", new String(consumerRecord.key()));
+ assertEquals("value_2", new String(consumerRecord.value()));
+ assertEquals(1, consumerRecords.count());
+
+ // Allow the acquisition lock to time out.
Thread.sleep(20000);
- records2 = shareConsumer1.poll(Duration.ofMillis(5000));
- assertEquals(1, records2.count());
- assertEquals("key_2", new
String(records2.iterator().next().key()));
- assertEquals("value_2", new
String(records2.iterator().next().value()));
- ConsumerRecords<byte[], byte[]> records3 =
shareConsumer1.poll(Duration.ofMillis(500));
- assertEquals(0, records3.count());
+ consumerRecords = shareConsumer.poll(Duration.ofMillis(5000));
+ consumerRecord = consumerRecords.records(tp).get(0);
+ // By checking the key and value before the count, we get a bit
more information if too many records are returned.
+ // This test has been observed to fail very occasionally because
of this.
+ assertEquals("key_2", new String(consumerRecord.key()));
+ assertEquals("value_2", new String(consumerRecord.value()));
+ assertEquals(1, consumerRecords.count());
+
+ consumerRecords = shareConsumer.poll(Duration.ofMillis(1000));
+ assertEquals(0, consumerRecords.count());
}
}
@@ -1978,7 +1992,9 @@ public class ShareConsumerTest {
try (KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer());
KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"warmupgroup1")) {
- producer.send(record).get(15000, TimeUnit.MILLISECONDS);
+ producer.send(record);
+ producer.flush();
+
shareConsumer.subscribe(subscription);
TestUtils.waitForCondition(
() -> shareConsumer.poll(Duration.ofMillis(5000)).count() ==
1, 30000, 200L, () -> "warmup record not received");