This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new caf4a6cc5f7 KAFKA-19216: Eliminate flakiness in 
kafka.server.share.SharePartitionTest (#19639)
caf4a6cc5f7 is described below

commit caf4a6cc5f76e3058ba8048f13b75794e6e971fe
Author: Abhinav Dixit <[email protected]>
AuthorDate: Tue May 6 00:34:22 2025 +0530

    KAFKA-19216: Eliminate flakiness in kafka.server.share.SharePartitionTest 
(#19639)
    
    ### About
    11 of the test cases in `SharePartitionTest` have failed at least once
    in the past 28 days.
    
    
https://develocity.apache.org/scans/tests?search.relativeStartTime=P28D&search.rootProjectNames=kafka&search.timeZoneId=Europe%2FLondon&tests.container=kafka.server.share.SharePartitionTest
    Observing the flakiness, they seem to be caused due to the usage of
    `SystemTimer` for various acquisition lock timeout related tests. I have
    replaced the usage of `SystemTimer` with `MockTimer` and also improved
    the `MockTimer` API with regard to removing the timer task entries that
    have already been cancelled.
    Also, this has reduced the time taken to run `SharePartitionTest` from
    ~6 sec to ~1.5 sec
    
    ### Testing
    The testing has been done with the help of already present unit tests in
    Apache Kafka.
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../common/runtime/CoordinatorRuntimeTest.java     | 10 +++----
 .../kafka/server/share/SharePartitionTest.java     | 34 +++++++++++++++++-----
 .../apache/kafka/server/util/timer/MockTimer.java  |  3 ++
 3 files changed, 33 insertions(+), 14 deletions(-)

diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
index 418d58376cc..b82829e1d62 100644
--- 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
@@ -2815,9 +2815,8 @@ public class CoordinatorRuntimeTest {
         assertTrue(write1.isDone());
         assertTrue(write2.isDone());
 
-        // All timer tasks have been cancelled. TimerTask entries are not 
removed in MockTimer.
-        assertEquals(2, timer.size());
-        timer.taskQueue().forEach(taskEntry -> 
assertTrue(taskEntry.cancelled()));
+        // All timer tasks have been cancelled. Hence,they have been removed 
in MockTimer.
+        assertEquals(0, timer.size());
     }
 
     @Test
@@ -2885,9 +2884,8 @@ public class CoordinatorRuntimeTest {
         assertEquals(1, 
runtime.contextOrThrow(TP).coordinator.lastCommittedOffset());
         assertTrue(write1.isDone());
 
-        // All timer tasks have been cancelled. TimerTask entries are not 
removed in MockTimer.
-        assertEquals(1, timer.size());
-        timer.taskQueue().forEach(taskEntry -> 
assertTrue(taskEntry.cancelled()));
+        // All timer tasks have been cancelled. Hence, they have been removed 
in MockTimer.
+        assertEquals(0, timer.size());
     }
 
     @Test
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index 465bce6de6a..da35cb0f428 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -67,8 +67,7 @@ import 
org.apache.kafka.server.share.persister.WriteShareGroupStateResult;
 import org.apache.kafka.server.storage.log.FetchIsolation;
 import org.apache.kafka.server.storage.log.FetchPartitionData;
 import org.apache.kafka.server.util.FutureUtils;
-import org.apache.kafka.server.util.timer.SystemTimer;
-import org.apache.kafka.server.util.timer.SystemTimerReaper;
+import org.apache.kafka.server.util.timer.MockTimer;
 import org.apache.kafka.server.util.timer.Timer;
 import org.apache.kafka.storage.internals.log.OffsetResultHolder;
 import org.apache.kafka.test.TestUtils;
@@ -117,7 +116,7 @@ public class SharePartitionTest {
     private static final Time MOCK_TIME = new MockTime();
     private static final short MAX_IN_FLIGHT_MESSAGES = 200;
     private static final int ACQUISITION_LOCK_TIMEOUT_MS = 100;
-    private static final int DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS = 
300;
+    private static final int DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS = 
120;
     private static final int BATCH_SIZE = 500;
     private static final int DEFAULT_FETCH_OFFSET = 0;
     private static final int MAX_FETCH_RECORDS = Integer.MAX_VALUE;
@@ -129,8 +128,7 @@ public class SharePartitionTest {
     @BeforeEach
     public void setUp() {
         kafka.utils.TestUtils.clearYammerMetrics();
-        mockTimer = new 
SystemTimerReaper("share-group-lock-timeout-test-reaper",
-            new SystemTimer("share-group-lock-test-timeout"));
+        mockTimer = new MockTimer();
         sharePartitionMetrics = new SharePartitionMetrics(GROUP_ID, 
TOPIC_ID_PARTITION.topic(), TOPIC_ID_PARTITION.partition());
     }
 
@@ -2925,6 +2923,7 @@ public class SharePartitionTest {
         assertEquals(1, sharePartition.timer().size());
 
         // Allowing acquisition lock to expire.
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
         TestUtils.waitForCondition(
                 () -> sharePartition.nextFetchOffset() == 0 &&
                         sharePartition.cachedState().get(0L).batchState() == 
RecordState.AVAILABLE &&
@@ -2951,6 +2950,7 @@ public class SharePartitionTest {
         
assertNotNull(sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask());
 
         // Allowing acquisition lock to expire.
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
         TestUtils.waitForCondition(
                 () -> sharePartition.timer().size() == 0
                         && sharePartition.nextFetchOffset() == 10
@@ -2985,6 +2985,7 @@ public class SharePartitionTest {
         assertEquals(2, sharePartition.timer().size());
 
         // Allowing acquisition lock to expire. The acquisition lock timeout 
will cause release of records for all the acquired records.
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
         TestUtils.waitForCondition(
                 () -> sharePartition.timer().size() == 0 &&
                         sharePartition.nextFetchOffset() == 0 &&
@@ -3012,6 +3013,7 @@ public class SharePartitionTest {
         assertEquals(1, sharePartition.timer().size());
 
         // Allowing acquisition lock to expire.
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
         TestUtils.waitForCondition(
                 () -> sharePartition.timer().size() == 0 &&
                         sharePartition.nextFetchOffset() == 10 &&
@@ -3128,6 +3130,7 @@ public class SharePartitionTest {
 
         // Allowing acquisition lock to expire. The acquisition lock timeout 
will cause release of records for batch with starting offset 1.
         // Since, other records have been acknowledged.
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
         TestUtils.waitForCondition(
                 () -> sharePartition.timer().size() == 0 &&
                         sharePartition.nextFetchOffset() == 1 &&
@@ -3155,6 +3158,7 @@ public class SharePartitionTest {
         assertEquals(1, sharePartition.timer().size());
 
         // Allowing acquisition lock to expire.
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
         TestUtils.waitForCondition(
                 () -> sharePartition.timer().size() == 0 &&
                         sharePartition.nextFetchOffset() == 10 &&
@@ -3179,6 +3183,7 @@ public class SharePartitionTest {
         assertEquals(3, sharePartition.timer().size());
 
         // Allowing acquisition lock to expire for the acquired subset batch.
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
         TestUtils.waitForCondition(
                 () -> {
                     Map<Long, InFlightState> expectedOffsetStateMap = new 
HashMap<>();
@@ -3259,6 +3264,7 @@ public class SharePartitionTest {
         assertEquals(3, sharePartition.timer().size());
 
         // Allowing acquisition lock to expire for the offsets that have not 
been acknowledged yet.
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
         TestUtils.waitForCondition(
                 () -> {
                     Map<Long, InFlightState> expectedOffsetStateMap1 = new 
HashMap<>();
@@ -3321,6 +3327,7 @@ public class SharePartitionTest {
         assertEquals(2, sharePartition.timer().size());
 
         // Allowing acquisition lock to expire.
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
         TestUtils.waitForCondition(
                 () -> sharePartition.timer().size() == 0 &&
                         sharePartition.nextFetchOffset() == 0 &&
@@ -3338,6 +3345,7 @@ public class SharePartitionTest {
         assertEquals(1, sharePartition.timer().size());
 
         // Allowing acquisition lock to expire to archive the records that 
reach max delivery count.
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
         TestUtils.waitForCondition(
                 () -> sharePartition.timer().size() == 0 &&
                         sharePartition.nextFetchOffset() == 0 &&
@@ -3363,6 +3371,7 @@ public class SharePartitionTest {
         assertEquals(1, sharePartition.timer().size());
 
         // Allowing acquisition lock to expire.
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
         TestUtils.waitForCondition(
                 () -> sharePartition.timer().size() == 0 &&
                         sharePartition.nextFetchOffset() == 0 &&
@@ -3386,6 +3395,7 @@ public class SharePartitionTest {
         
assertNull(sharePartition.cachedState().get(0L).offsetState().get(9L).acquisitionLockTimeoutTask());
 
         // Allowing acquisition lock to expire to archive the records that 
reach max delivery count.
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
         TestUtils.waitForCondition(
                 () -> {
                     Map<Long, InFlightState> expectedOffsetStateMap = new 
HashMap<>();
@@ -3436,6 +3446,7 @@ public class SharePartitionTest {
         assertEquals(1, sharePartition.timer().size());
 
         // Allowing acquisition lock to expire.
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
         TestUtils.waitForCondition(
                 () -> sharePartition.timer().size() == 0 &&
                         sharePartition.nextFetchOffset() == 0 &&
@@ -3450,6 +3461,7 @@ public class SharePartitionTest {
         assertEquals(1, sharePartition.timer().size());
 
         // Allowing acquisition lock to expire to archive the records that 
reach max delivery count.
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
         TestUtils.waitForCondition(
                 () -> sharePartition.timer().size() == 0 &&
                         // After the second failed attempt to acknowledge the 
record batch successfully, the record batch is archived.
@@ -3473,6 +3485,7 @@ public class SharePartitionTest {
         assertEquals(1, sharePartition.timer().size());
 
         // Allowing acquisition lock to expire.
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
         TestUtils.waitForCondition(
                 () -> sharePartition.timer().size() == 0 &&
                         sharePartition.nextFetchOffset() == 5 &&
@@ -3531,6 +3544,7 @@ public class SharePartitionTest {
         assertEquals(1, sharePartition.timer().size());
 
         // Allowing acquisition lock to expire will only affect the offsets 
that have not been acknowledged yet.
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
         TestUtils.waitForCondition(
                 () -> {
                     // Check cached state.
@@ -3576,6 +3590,7 @@ public class SharePartitionTest {
         
assertNotNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask());
 
         // Allowing acquisition lock to expire. Even if write share group 
state RPC fails, state transition still happens.
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
         TestUtils.waitForCondition(
                 () -> sharePartition.timer().size() == 0 &&
                         sharePartition.nextFetchOffset() == 5 &&
@@ -3616,6 +3631,7 @@ public class SharePartitionTest {
         
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
 
         // Allowing acquisition lock to expire. Even if write share group 
state RPC fails, state transition still happens.
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
         TestUtils.waitForCondition(
                 () -> {
                     Map<Long, InFlightState> expectedOffsetStateMap = new 
HashMap<>();
@@ -4184,7 +4200,6 @@ public class SharePartitionTest {
     @Test
     public void 
testAcquisitionLockOnReleasingAcknowledgedMultipleSubsetRecordBatchWithGapOffsets()
 {
         SharePartition sharePartition = SharePartitionBuilder.builder()
-            .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
             .withState(SharePartitionState.ACTIVE)
             .build();
         MemoryRecords records1 = memoryRecords(2, 5);
@@ -4977,6 +4992,7 @@ public class SharePartitionTest {
         assertEquals(7, sharePartition.cachedState().size());
 
         // Allowing acquisition lock to expire.
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
         TestUtils.waitForCondition(
             () -> {
                 Map<Long, InFlightState> expectedOffsetStateMap1 = new 
HashMap<>();
@@ -5035,6 +5051,7 @@ public class SharePartitionTest {
         assertEquals(2, sharePartition.cachedState().size());
 
         // Allowing acquisition lock to expire.
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
         TestUtils.waitForCondition(
             () -> 
sharePartition.cachedState().get(5L).batchMemberId().equals(EMPTY_MEMBER_ID) &&
                     sharePartition.cachedState().get(5L).batchState() == 
RecordState.ARCHIVED &&
@@ -5063,6 +5080,7 @@ public class SharePartitionTest {
         assertEquals(2, sharePartition.cachedState().size());
 
         // Allowing acquisition lock to expire.
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
         TestUtils.waitForCondition(
             () -> {
                 Map<Long, InFlightState> expectedOffsetStateMap = new 
HashMap<>();
@@ -5088,7 +5106,6 @@ public class SharePartitionTest {
         
Mockito.when(groupConfig.shareRecordLockDurationMs()).thenReturn(expectedDurationMs);
 
         SharePartition sharePartition = SharePartitionBuilder.builder()
-            .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
             .withGroupConfigManager(groupConfigManager).build();
 
         SharePartition.AcquisitionLockTimerTask timerTask = 
sharePartition.scheduleAcquisitionLockTimeout(MEMBER_ID, 100L, 200L);
@@ -5111,7 +5128,6 @@ public class SharePartitionTest {
             .thenReturn(expectedDurationMs2);
 
         SharePartition sharePartition = SharePartitionBuilder.builder()
-            .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
             .withGroupConfigManager(groupConfigManager).build();
 
         SharePartition.AcquisitionLockTimerTask timerTask1 = 
sharePartition.scheduleAcquisitionLockTimeout(MEMBER_ID, 100L, 200L);
@@ -5257,6 +5273,7 @@ public class SharePartitionTest {
         
assertNotNull(sharePartition.cachedState().get(2L).batchAcquisitionLockTimeoutTask());
 
         // Allowing acquisition lock to expire.
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
         TestUtils.waitForCondition(
                 () -> sharePartition.nextFetchOffset() == 7 && 
sharePartition.cachedState().isEmpty() &&
                             sharePartition.startOffset() == 7 && 
sharePartition.endOffset() == 7,
@@ -5306,6 +5323,7 @@ public class SharePartitionTest {
         
assertNotNull(sharePartition.cachedState().get(1L).batchAcquisitionLockTimeoutTask());
 
         // Allowing acquisition lock to expire.
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
         TestUtils.waitForCondition(
                 () -> sharePartition.nextFetchOffset() == 3 && 
sharePartition.cachedState().isEmpty() &&
                         sharePartition.startOffset() == 3 && 
sharePartition.endOffset() == 3,
diff --git 
a/server-common/src/test/java/org/apache/kafka/server/util/timer/MockTimer.java 
b/server-common/src/test/java/org/apache/kafka/server/util/timer/MockTimer.java
index 8de1b91c32d..2bdbc9cb080 100644
--- 
a/server-common/src/test/java/org/apache/kafka/server/util/timer/MockTimer.java
+++ 
b/server-common/src/test/java/org/apache/kafka/server/util/timer/MockTimer.java
@@ -81,6 +81,9 @@ public class MockTimer implements Timer {
     }
 
     public int size() {
+        synchronized (taskQueue) {
+            taskQueue.removeIf(TimerTaskEntry::cancelled);
+        }
         return taskQueue.size();
     }
 

Reply via email to