This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 423a6353a8a KAFKA-18025: Rework acquisition lock timeout test (#17985)
423a6353a8a is described below

commit 423a6353a8a645725dcbce6403d5c00653d51f37
Author: Andrew Schofield <[email protected]>
AuthorDate: Wed Dec 4 15:28:55 2024 +0000

    KAFKA-18025: Rework acquisition lock timeout test (#17985)
    
    Additional work on ShareConsumerTest.testAcquisitionLockTimeoutOnConsumer. 
First, mark the test as flaky since it fails occasionally and it would be best 
to get a decent number of passes before assuming it's no longer flaky. Then, 
change the assertions so that the test checks which records were received 
before it counts them (we might get too many records because the wrong records 
are being returned, or because records are actually being duplicated). The rare 
failures appear to be rel [...]
    
    There are a couple of other improvements too. Reducing the number of 
share-group state partitions so we don't have the overhead of creating 50 
partitions when a few will do. Changing the warm-up logic since that has been 
observed very occasionally to assert.
    
    Reviewers: ShivsundarR <[email protected]>,  Manikumar Reddy 
<[email protected]>
---
 .../java/kafka/test/api/ShareConsumerTest.java     | 70 +++++++++++++---------
 1 file changed, 43 insertions(+), 27 deletions(-)

diff --git a/core/src/test/java/kafka/test/api/ShareConsumerTest.java 
b/core/src/test/java/kafka/test/api/ShareConsumerTest.java
index c3e97d16d24..d31ca18e8b7 100644
--- a/core/src/test/java/kafka/test/api/ShareConsumerTest.java
+++ b/core/src/test/java/kafka/test/api/ShareConsumerTest.java
@@ -129,6 +129,7 @@ public class ShareConsumerTest {
             .setConfigProp("group.share.record.lock.duration.ms", "15000")
             .setConfigProp("offsets.topic.replication.factor", "1")
             .setConfigProp("share.coordinator.state.topic.min.isr", "1")
+            .setConfigProp("share.coordinator.state.topic.num.partitions", "3")
             .setConfigProp("share.coordinator.state.topic.replication.factor", 
"1")
             .setConfigProp("transaction.state.log.min.isr", "1")
             .setConfigProp("transaction.state.log.replication.factor", "1")
@@ -1278,51 +1279,64 @@ public class ShareConsumerTest {
         }
     }
 
+    @Flaky("KAFKA-18025")
     @ParameterizedTest(name = "{displayName}.persister={0}")
     @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
     public void testAcquisitionLockTimeoutOnConsumer(String persister) throws 
InterruptedException {
         alterShareAutoOffsetReset("group1", "earliest");
         try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
-             KafkaShareConsumer<byte[], byte[]> shareConsumer1 = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
+             KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
 
             ProducerRecord<byte[], byte[]> producerRecord1 = new 
ProducerRecord<>(tp.topic(), tp.partition(), null,
                 "key_1".getBytes(), "value_1".getBytes());
             ProducerRecord<byte[], byte[]> producerRecord2 = new 
ProducerRecord<>(tp.topic(), tp.partition(), null,
                 "key_2".getBytes(), "value_2".getBytes());
-            shareConsumer1.subscribe(Collections.singleton(tp.topic()));
+            shareConsumer.subscribe(Set.of(tp.topic()));
 
+            // Produce a first record which is consumed and acknowledged 
normally.
             producer.send(producerRecord1);
             producer.flush();
 
-            // Poll two times to receive records. The first poll puts the 
acquisition lock and fetches the record.
-            // Since, we are only sending one record and acquisition lock 
hasn't timed out, the second poll only acknowledges the
-            // record from the first poll and no more fetch.
-            ConsumerRecords<byte[], byte[]> records1 = 
shareConsumer1.poll(Duration.ofMillis(5000));
-            assertEquals(1, records1.count());
-            assertEquals("key_1", new 
String(records1.iterator().next().key()));
-            assertEquals("value_1", new 
String(records1.iterator().next().value()));
-            ConsumerRecords<byte[], byte[]> records2 = 
shareConsumer1.poll(Duration.ofMillis(500));
-            assertEquals(0, records2.count());
+            // Poll twice to receive records. The first poll fetches the 
record and starts the acquisition lock timer.
+            // Since, we are only sending one record and the acquisition lock 
hasn't timed out, the second poll only
+            // acknowledges the record from the first poll and does not fetch 
any more records.
+            ConsumerRecords<byte[], byte[]> consumerRecords = 
shareConsumer.poll(Duration.ofMillis(5000));
+            ConsumerRecord<byte[], byte[]> consumerRecord = 
consumerRecords.records(tp).get(0);
+            assertEquals("key_1", new String(consumerRecord.key()));
+            assertEquals("value_1", new String(consumerRecord.value()));
+            assertEquals(1, consumerRecords.count());
 
-            producer.send(producerRecord2);
+            consumerRecords = shareConsumer.poll(Duration.ofMillis(1000));
+            assertEquals(0, consumerRecords.count());
 
-            // Poll three times. The first poll puts the acquisition lock and 
fetches the record. Before the second poll,
-            // acquisition lock times out and hence the consumer needs to 
fetch the record again. Since, the acquisition lock
-            // hasn't timed out before the third poll, the third poll only 
acknowledges the record from the second poll and no more fetch.
-            records1 = shareConsumer1.poll(Duration.ofMillis(5000));
-            assertEquals(1, records1.count());
-            assertEquals("key_2", new 
String(records1.iterator().next().key()));
-            assertEquals("value_2", new 
String(records1.iterator().next().value()));
+            // Produce a second record which is fetched, but not acknowledged 
before it times out. The record will
+            // be released automatically by the broker. It is then fetched 
again and acknowledged normally.
+            producer.send(producerRecord2);
+            producer.flush();
 
-            // Allowing acquisition lock to expire.
+            // Poll three more times. The first poll fetches the second record 
and starts the acquisition lock timer.
+            // Before the second poll, acquisition lock times out and hence 
the consumer needs to fetch the record again.
+            // The acquisition lock doesn't time out between the second and 
third polls, so the third poll only acknowledges
+            // the record from the second poll and does not fetch any more 
records.
+            consumerRecords = shareConsumer.poll(Duration.ofMillis(5000));
+            consumerRecord = consumerRecords.records(tp).get(0);
+            assertEquals("key_2", new String(consumerRecord.key()));
+            assertEquals("value_2", new String(consumerRecord.value()));
+            assertEquals(1, consumerRecords.count());
+
+            // Allow the acquisition lock to time out.
             Thread.sleep(20000);
 
-            records2 = shareConsumer1.poll(Duration.ofMillis(5000));
-            assertEquals(1, records2.count());
-            assertEquals("key_2", new 
String(records2.iterator().next().key()));
-            assertEquals("value_2", new 
String(records2.iterator().next().value()));
-            ConsumerRecords<byte[], byte[]> records3 = 
shareConsumer1.poll(Duration.ofMillis(500));
-            assertEquals(0, records3.count());
+            consumerRecords = shareConsumer.poll(Duration.ofMillis(5000));
+            consumerRecord = consumerRecords.records(tp).get(0);
+            // By checking the key and value before the count, we get a bit 
more information if too many records are returned.
+            // This test has been observed to fail very occasionally because 
of this.
+            assertEquals("key_2", new String(consumerRecord.key()));
+            assertEquals("value_2", new String(consumerRecord.value()));
+            assertEquals(1, consumerRecords.count());
+
+            consumerRecords = shareConsumer.poll(Duration.ofMillis(1000));
+            assertEquals(0, consumerRecords.count());
         }
     }
 
@@ -1978,7 +1992,9 @@ public class ShareConsumerTest {
         try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
              KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"warmupgroup1")) {
 
-            producer.send(record).get(15000, TimeUnit.MILLISECONDS);
+            producer.send(record);
+            producer.flush();
+
             shareConsumer.subscribe(subscription);
             TestUtils.waitForCondition(
                 () -> shareConsumer.poll(Duration.ofMillis(5000)).count() == 
1, 30000, 200L, () -> "warmup record not received");

Reply via email to