This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new caf4a6cc5f7 KAFKA-19216: Eliminate flakiness in
kafka.server.share.SharePartitionTest (#19639)
caf4a6cc5f7 is described below
commit caf4a6cc5f76e3058ba8048f13b75794e6e971fe
Author: Abhinav Dixit <[email protected]>
AuthorDate: Tue May 6 00:34:22 2025 +0530
KAFKA-19216: Eliminate flakiness in kafka.server.share.SharePartitionTest
(#19639)
### About
11 of the test cases in `SharePartitionTest` have failed at least once
in the past 28 days.
https://develocity.apache.org/scans/tests?search.relativeStartTime=P28D&search.rootProjectNames=kafka&search.timeZoneId=Europe%2FLondon&tests.container=kafka.server.share.SharePartitionTest
Observing the flakiness, they seem to be caused due to the usage of
`SystemTimer` for various acquisition lock timeout related tests. I have
replaced the usage of `SystemTimer` with `MockTimer` and also improved
the `MockTimer` API with regard to removing the timer task entries that
have already been cancelled.
Also, this has reduced the time taken to run `SharePartitionTest` from
~6 sec to ~1.5 sec
### Testing
The testing has been done with the help of already present unit tests in
Apache Kafka.
Reviewers: Andrew Schofield <[email protected]>
---
.../common/runtime/CoordinatorRuntimeTest.java | 10 +++----
.../kafka/server/share/SharePartitionTest.java | 34 +++++++++++++++++-----
.../apache/kafka/server/util/timer/MockTimer.java | 3 ++
3 files changed, 33 insertions(+), 14 deletions(-)
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
index 418d58376cc..b82829e1d62 100644
---
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
@@ -2815,9 +2815,8 @@ public class CoordinatorRuntimeTest {
assertTrue(write1.isDone());
assertTrue(write2.isDone());
- // All timer tasks have been cancelled. TimerTask entries are not
removed in MockTimer.
- assertEquals(2, timer.size());
- timer.taskQueue().forEach(taskEntry ->
assertTrue(taskEntry.cancelled()));
+ // All timer tasks have been cancelled. Hence,they have been removed
in MockTimer.
+ assertEquals(0, timer.size());
}
@Test
@@ -2885,9 +2884,8 @@ public class CoordinatorRuntimeTest {
assertEquals(1,
runtime.contextOrThrow(TP).coordinator.lastCommittedOffset());
assertTrue(write1.isDone());
- // All timer tasks have been cancelled. TimerTask entries are not
removed in MockTimer.
- assertEquals(1, timer.size());
- timer.taskQueue().forEach(taskEntry ->
assertTrue(taskEntry.cancelled()));
+ // All timer tasks have been cancelled. Hence, they have been removed
in MockTimer.
+ assertEquals(0, timer.size());
}
@Test
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index 465bce6de6a..da35cb0f428 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -67,8 +67,7 @@ import
org.apache.kafka.server.share.persister.WriteShareGroupStateResult;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.storage.log.FetchPartitionData;
import org.apache.kafka.server.util.FutureUtils;
-import org.apache.kafka.server.util.timer.SystemTimer;
-import org.apache.kafka.server.util.timer.SystemTimerReaper;
+import org.apache.kafka.server.util.timer.MockTimer;
import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.storage.internals.log.OffsetResultHolder;
import org.apache.kafka.test.TestUtils;
@@ -117,7 +116,7 @@ public class SharePartitionTest {
private static final Time MOCK_TIME = new MockTime();
private static final short MAX_IN_FLIGHT_MESSAGES = 200;
private static final int ACQUISITION_LOCK_TIMEOUT_MS = 100;
- private static final int DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS =
300;
+ private static final int DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS =
120;
private static final int BATCH_SIZE = 500;
private static final int DEFAULT_FETCH_OFFSET = 0;
private static final int MAX_FETCH_RECORDS = Integer.MAX_VALUE;
@@ -129,8 +128,7 @@ public class SharePartitionTest {
@BeforeEach
public void setUp() {
kafka.utils.TestUtils.clearYammerMetrics();
- mockTimer = new
SystemTimerReaper("share-group-lock-timeout-test-reaper",
- new SystemTimer("share-group-lock-test-timeout"));
+ mockTimer = new MockTimer();
sharePartitionMetrics = new SharePartitionMetrics(GROUP_ID,
TOPIC_ID_PARTITION.topic(), TOPIC_ID_PARTITION.partition());
}
@@ -2925,6 +2923,7 @@ public class SharePartitionTest {
assertEquals(1, sharePartition.timer().size());
// Allowing acquisition lock to expire.
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
TestUtils.waitForCondition(
() -> sharePartition.nextFetchOffset() == 0 &&
sharePartition.cachedState().get(0L).batchState() ==
RecordState.AVAILABLE &&
@@ -2951,6 +2950,7 @@ public class SharePartitionTest {
assertNotNull(sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask());
// Allowing acquisition lock to expire.
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
TestUtils.waitForCondition(
() -> sharePartition.timer().size() == 0
&& sharePartition.nextFetchOffset() == 10
@@ -2985,6 +2985,7 @@ public class SharePartitionTest {
assertEquals(2, sharePartition.timer().size());
// Allowing acquisition lock to expire. The acquisition lock timeout
will cause release of records for all the acquired records.
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
TestUtils.waitForCondition(
() -> sharePartition.timer().size() == 0 &&
sharePartition.nextFetchOffset() == 0 &&
@@ -3012,6 +3013,7 @@ public class SharePartitionTest {
assertEquals(1, sharePartition.timer().size());
// Allowing acquisition lock to expire.
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
TestUtils.waitForCondition(
() -> sharePartition.timer().size() == 0 &&
sharePartition.nextFetchOffset() == 10 &&
@@ -3128,6 +3130,7 @@ public class SharePartitionTest {
// Allowing acquisition lock to expire. The acquisition lock timeout
will cause release of records for batch with starting offset 1.
// Since, other records have been acknowledged.
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
TestUtils.waitForCondition(
() -> sharePartition.timer().size() == 0 &&
sharePartition.nextFetchOffset() == 1 &&
@@ -3155,6 +3158,7 @@ public class SharePartitionTest {
assertEquals(1, sharePartition.timer().size());
// Allowing acquisition lock to expire.
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
TestUtils.waitForCondition(
() -> sharePartition.timer().size() == 0 &&
sharePartition.nextFetchOffset() == 10 &&
@@ -3179,6 +3183,7 @@ public class SharePartitionTest {
assertEquals(3, sharePartition.timer().size());
// Allowing acquisition lock to expire for the acquired subset batch.
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
TestUtils.waitForCondition(
() -> {
Map<Long, InFlightState> expectedOffsetStateMap = new
HashMap<>();
@@ -3259,6 +3264,7 @@ public class SharePartitionTest {
assertEquals(3, sharePartition.timer().size());
// Allowing acquisition lock to expire for the offsets that have not
been acknowledged yet.
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
TestUtils.waitForCondition(
() -> {
Map<Long, InFlightState> expectedOffsetStateMap1 = new
HashMap<>();
@@ -3321,6 +3327,7 @@ public class SharePartitionTest {
assertEquals(2, sharePartition.timer().size());
// Allowing acquisition lock to expire.
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
TestUtils.waitForCondition(
() -> sharePartition.timer().size() == 0 &&
sharePartition.nextFetchOffset() == 0 &&
@@ -3338,6 +3345,7 @@ public class SharePartitionTest {
assertEquals(1, sharePartition.timer().size());
// Allowing acquisition lock to expire to archive the records that
reach max delivery count.
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
TestUtils.waitForCondition(
() -> sharePartition.timer().size() == 0 &&
sharePartition.nextFetchOffset() == 0 &&
@@ -3363,6 +3371,7 @@ public class SharePartitionTest {
assertEquals(1, sharePartition.timer().size());
// Allowing acquisition lock to expire.
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
TestUtils.waitForCondition(
() -> sharePartition.timer().size() == 0 &&
sharePartition.nextFetchOffset() == 0 &&
@@ -3386,6 +3395,7 @@ public class SharePartitionTest {
assertNull(sharePartition.cachedState().get(0L).offsetState().get(9L).acquisitionLockTimeoutTask());
// Allowing acquisition lock to expire to archive the records that
reach max delivery count.
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
TestUtils.waitForCondition(
() -> {
Map<Long, InFlightState> expectedOffsetStateMap = new
HashMap<>();
@@ -3436,6 +3446,7 @@ public class SharePartitionTest {
assertEquals(1, sharePartition.timer().size());
// Allowing acquisition lock to expire.
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
TestUtils.waitForCondition(
() -> sharePartition.timer().size() == 0 &&
sharePartition.nextFetchOffset() == 0 &&
@@ -3450,6 +3461,7 @@ public class SharePartitionTest {
assertEquals(1, sharePartition.timer().size());
// Allowing acquisition lock to expire to archive the records that
reach max delivery count.
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
TestUtils.waitForCondition(
() -> sharePartition.timer().size() == 0 &&
// After the second failed attempt to acknowledge the
record batch successfully, the record batch is archived.
@@ -3473,6 +3485,7 @@ public class SharePartitionTest {
assertEquals(1, sharePartition.timer().size());
// Allowing acquisition lock to expire.
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
TestUtils.waitForCondition(
() -> sharePartition.timer().size() == 0 &&
sharePartition.nextFetchOffset() == 5 &&
@@ -3531,6 +3544,7 @@ public class SharePartitionTest {
assertEquals(1, sharePartition.timer().size());
// Allowing acquisition lock to expire will only affect the offsets
that have not been acknowledged yet.
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
TestUtils.waitForCondition(
() -> {
// Check cached state.
@@ -3576,6 +3590,7 @@ public class SharePartitionTest {
assertNotNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask());
// Allowing acquisition lock to expire. Even if write share group
state RPC fails, state transition still happens.
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
TestUtils.waitForCondition(
() -> sharePartition.timer().size() == 0 &&
sharePartition.nextFetchOffset() == 5 &&
@@ -3616,6 +3631,7 @@ public class SharePartitionTest {
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
// Allowing acquisition lock to expire. Even if write share group
state RPC fails, state transition still happens.
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
TestUtils.waitForCondition(
() -> {
Map<Long, InFlightState> expectedOffsetStateMap = new
HashMap<>();
@@ -4184,7 +4200,6 @@ public class SharePartitionTest {
@Test
public void
testAcquisitionLockOnReleasingAcknowledgedMultipleSubsetRecordBatchWithGapOffsets()
{
SharePartition sharePartition = SharePartitionBuilder.builder()
- .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
.withState(SharePartitionState.ACTIVE)
.build();
MemoryRecords records1 = memoryRecords(2, 5);
@@ -4977,6 +4992,7 @@ public class SharePartitionTest {
assertEquals(7, sharePartition.cachedState().size());
// Allowing acquisition lock to expire.
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
TestUtils.waitForCondition(
() -> {
Map<Long, InFlightState> expectedOffsetStateMap1 = new
HashMap<>();
@@ -5035,6 +5051,7 @@ public class SharePartitionTest {
assertEquals(2, sharePartition.cachedState().size());
// Allowing acquisition lock to expire.
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
TestUtils.waitForCondition(
() ->
sharePartition.cachedState().get(5L).batchMemberId().equals(EMPTY_MEMBER_ID) &&
sharePartition.cachedState().get(5L).batchState() ==
RecordState.ARCHIVED &&
@@ -5063,6 +5080,7 @@ public class SharePartitionTest {
assertEquals(2, sharePartition.cachedState().size());
// Allowing acquisition lock to expire.
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
TestUtils.waitForCondition(
() -> {
Map<Long, InFlightState> expectedOffsetStateMap = new
HashMap<>();
@@ -5088,7 +5106,6 @@ public class SharePartitionTest {
Mockito.when(groupConfig.shareRecordLockDurationMs()).thenReturn(expectedDurationMs);
SharePartition sharePartition = SharePartitionBuilder.builder()
- .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
.withGroupConfigManager(groupConfigManager).build();
SharePartition.AcquisitionLockTimerTask timerTask =
sharePartition.scheduleAcquisitionLockTimeout(MEMBER_ID, 100L, 200L);
@@ -5111,7 +5128,6 @@ public class SharePartitionTest {
.thenReturn(expectedDurationMs2);
SharePartition sharePartition = SharePartitionBuilder.builder()
- .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
.withGroupConfigManager(groupConfigManager).build();
SharePartition.AcquisitionLockTimerTask timerTask1 =
sharePartition.scheduleAcquisitionLockTimeout(MEMBER_ID, 100L, 200L);
@@ -5257,6 +5273,7 @@ public class SharePartitionTest {
assertNotNull(sharePartition.cachedState().get(2L).batchAcquisitionLockTimeoutTask());
// Allowing acquisition lock to expire.
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
TestUtils.waitForCondition(
() -> sharePartition.nextFetchOffset() == 7 &&
sharePartition.cachedState().isEmpty() &&
sharePartition.startOffset() == 7 &&
sharePartition.endOffset() == 7,
@@ -5306,6 +5323,7 @@ public class SharePartitionTest {
assertNotNull(sharePartition.cachedState().get(1L).batchAcquisitionLockTimeoutTask());
// Allowing acquisition lock to expire.
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
TestUtils.waitForCondition(
() -> sharePartition.nextFetchOffset() == 3 &&
sharePartition.cachedState().isEmpty() &&
sharePartition.startOffset() == 3 &&
sharePartition.endOffset() == 3,
diff --git
a/server-common/src/test/java/org/apache/kafka/server/util/timer/MockTimer.java
b/server-common/src/test/java/org/apache/kafka/server/util/timer/MockTimer.java
index 8de1b91c32d..2bdbc9cb080 100644
---
a/server-common/src/test/java/org/apache/kafka/server/util/timer/MockTimer.java
+++
b/server-common/src/test/java/org/apache/kafka/server/util/timer/MockTimer.java
@@ -81,6 +81,9 @@ public class MockTimer implements Timer {
}
public int size() {
+ synchronized (taskQueue) {
+ taskQueue.removeIf(TimerTaskEntry::cancelled);
+ }
return taskQueue.size();
}