This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 849bf6c7a18 MINOR: Remove unused type parameter T from
CoordinatorTimer (#21360)
849bf6c7a18 is described below
commit 849bf6c7a18cb4969f2674150386274e18bef913
Author: David Jacot <[email protected]>
AuthorDate: Mon Jan 26 18:19:48 2026 +0100
MINOR: Remove unused type parameter T from CoordinatorTimer (#21360)
The type parameter T in CoordinatorTimer<T, U> was always Void in all
usages. This simplifies the interface to CoordinatorTimer<U> and updates
all references across coordinator-common, group-coordinator, and
share-coordinator modules.
Reviewers: Sean Quah <[email protected]>, Lianet Magrans
<[email protected]>
---
.../common/runtime/CoordinatorShardBuilder.java | 2 +-
.../common/runtime/CoordinatorTimer.java | 16 ++--
.../common/runtime/CoordinatorTimerImpl.java | 8 +-
.../common/runtime/MockCoordinatorShard.java | 8 +-
.../runtime/MockCoordinatorShardBuilder.java | 4 +-
.../common/runtime/MockCoordinatorTimer.java | 28 +++----
.../org/apache/kafka/coordinator/group/Group.java | 2 +-
.../coordinator/group/GroupCoordinatorShard.java | 8 +-
.../coordinator/group/GroupMetadataManager.java | 8 +-
.../coordinator/group/streams/StreamsGroup.java | 2 +-
.../group/GroupCoordinatorShardTest.java | 12 +--
.../group/GroupMetadataManagerTest.java | 94 +++++++++++-----------
.../group/GroupMetadataManagerTestContext.java | 38 ++++-----
.../group/OffsetMetadataManagerTest.java | 12 +--
.../group/streams/StreamsGroupTest.java | 2 +-
.../coordinator/share/ShareCoordinatorShard.java | 2 +-
16 files changed, 124 insertions(+), 122 deletions(-)
diff --git
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorShardBuilder.java
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorShardBuilder.java
index 62092999c39..83583fb302c 100644
---
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorShardBuilder.java
+++
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorShardBuilder.java
@@ -72,7 +72,7 @@ public interface CoordinatorShardBuilder<S extends
CoordinatorShard<U>, U> {
* @return The builder.
*/
CoordinatorShardBuilder<S, U> withTimer(
- CoordinatorTimer<Void, U> timer
+ CoordinatorTimer<U> timer
);
/**
diff --git
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimer.java
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimer.java
index 2f288df3026..17d9e10ecdd 100644
---
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimer.java
+++
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimer.java
@@ -22,8 +22,10 @@ import java.util.concurrent.TimeUnit;
/**
* An interface to schedule and cancel operations.
+ *
+ * @param <U> The record type.
*/
-public interface CoordinatorTimer<T, U> {
+public interface CoordinatorTimer<U> {
/**
* Generates the records needed to implement this timeout write operation.
In general,
* this operation should not modify the hard state of the coordinator.
That modifications
@@ -33,10 +35,10 @@ public interface CoordinatorTimer<T, U> {
* A typical example of timeout operation is the session timeout used by
the consumer
* group rebalance protocol.
*
- * @param <T> The record type.
+ * @param <U> The record type.
*/
- interface TimeoutOperation<T, U> {
- CoordinatorResult<T, U> generateRecords() throws KafkaException;
+ interface TimeoutOperation<U> {
+ CoordinatorResult<Void, U> generateRecords() throws KafkaException;
}
/**
@@ -50,7 +52,7 @@ public interface CoordinatorTimer<T, U> {
* be retried on failure.
* @param operation The operation to perform upon expiration.
*/
- void schedule(String key, long delay, TimeUnit unit, boolean retry,
TimeoutOperation<T, U> operation);
+ void schedule(String key, long delay, TimeUnit unit, boolean retry,
TimeoutOperation<U> operation);
/**
* Add an operation to the timer. If an operation with the same key
@@ -64,7 +66,7 @@ public interface CoordinatorTimer<T, U> {
* @param retryBackoff The delay when rescheduled on retry. The same unit
is used.
* @param operation The operation to perform upon expiration.
*/
- void schedule(String key, long delay, TimeUnit unit, boolean retry, long
retryBackoff, TimeoutOperation<T, U> operation);
+ void schedule(String key, long delay, TimeUnit unit, boolean retry, long
retryBackoff, TimeoutOperation<U> operation);
/**
* Add an operation to the timer if there's no operation with the same key.
@@ -76,7 +78,7 @@ public interface CoordinatorTimer<T, U> {
* be retried on failure.
* @param operation The operation to perform upon expiration.
*/
- void scheduleIfAbsent(String key, long delay, TimeUnit unit, boolean
retry, TimeoutOperation<T, U> operation);
+ void scheduleIfAbsent(String key, long delay, TimeUnit unit, boolean
retry, TimeoutOperation<U> operation);
/**
* Remove an operation corresponding to a given key.
diff --git
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimerImpl.java
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimerImpl.java
index 0c66c99c25f..b99e396da77 100644
---
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimerImpl.java
+++
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimerImpl.java
@@ -38,7 +38,7 @@ import java.util.concurrent.TimeUnit;
*
* When a timer fails with an unexpected exception, the timer is rescheduled
with a backoff.
*/
-public class CoordinatorTimerImpl<U> implements CoordinatorTimer<Void, U> {
+public class CoordinatorTimerImpl<U> implements CoordinatorTimer<U> {
private final Logger log;
private final Timer timer;
private final CoordinatorShardScheduler<U> scheduler;
@@ -60,7 +60,7 @@ public class CoordinatorTimerImpl<U> implements
CoordinatorTimer<Void, U> {
long delay,
TimeUnit unit,
boolean retry,
- TimeoutOperation<Void, U> operation
+ TimeoutOperation<U> operation
) {
schedule(key, delay, unit, retry, 500, operation);
}
@@ -72,7 +72,7 @@ public class CoordinatorTimerImpl<U> implements
CoordinatorTimer<Void, U> {
TimeUnit unit,
boolean retry,
long retryBackoff,
- TimeoutOperation<Void, U> operation
+ TimeoutOperation<U> operation
) {
// The TimerTask wraps the TimeoutOperation into a write operation.
When the TimerTask
// expires, the operation is scheduled through the scheduler to be
executed. This
@@ -141,7 +141,7 @@ public class CoordinatorTimerImpl<U> implements
CoordinatorTimer<Void, U> {
long delay,
TimeUnit unit,
boolean retry,
- TimeoutOperation<Void, U> operation
+ TimeoutOperation<U> operation
) {
if (!tasks.containsKey(key)) {
schedule(key, delay, unit, retry, 500, operation);
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorShard.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorShard.java
index 28167504f57..7214b869ca8 100644
---
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorShard.java
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorShard.java
@@ -53,19 +53,19 @@ public class MockCoordinatorShard implements
CoordinatorShard<String> {
private final SnapshotRegistry snapshotRegistry;
private final TimelineHashSet<RecordAndMetadata> records;
private final TimelineHashMap<Long, TimelineHashSet<RecordAndMetadata>>
pendingRecords;
- private final CoordinatorTimer<Void, String> timer;
+ private final CoordinatorTimer<String> timer;
private final CoordinatorExecutor<String> executor;
MockCoordinatorShard(
SnapshotRegistry snapshotRegistry,
- CoordinatorTimer<Void, String> timer
+ CoordinatorTimer<String> timer
) {
this(snapshotRegistry, timer, null);
}
MockCoordinatorShard(
SnapshotRegistry snapshotRegistry,
- CoordinatorTimer<Void, String> timer,
+ CoordinatorTimer<String> timer,
CoordinatorExecutor<String> executor
) {
this.snapshotRegistry = snapshotRegistry;
@@ -130,7 +130,7 @@ public class MockCoordinatorShard implements
CoordinatorShard<String> {
.toList();
}
- CoordinatorTimer<Void, String> timer() {
+ CoordinatorTimer<String> timer() {
return timer;
}
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorShardBuilder.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorShardBuilder.java
index dea2dcc1c17..13b0c3b13d1 100644
---
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorShardBuilder.java
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorShardBuilder.java
@@ -28,7 +28,7 @@ import java.util.Objects;
*/
public class MockCoordinatorShardBuilder implements
CoordinatorShardBuilder<MockCoordinatorShard, String> {
private SnapshotRegistry snapshotRegistry;
- private CoordinatorTimer<Void, String> timer;
+ private CoordinatorTimer<String> timer;
private CoordinatorExecutor<String> executor;
@Override
@@ -63,7 +63,7 @@ public class MockCoordinatorShardBuilder implements
CoordinatorShardBuilder<Mock
@Override
public CoordinatorShardBuilder<MockCoordinatorShard, String> withTimer(
- CoordinatorTimer<Void, String> timer
+ CoordinatorTimer<String> timer
) {
this.timer = timer;
return this;
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorTimer.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorTimer.java
index 69e3954a0a6..aa09d0face5 100644
---
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorTimer.java
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorTimer.java
@@ -31,23 +31,23 @@ import java.util.concurrent.TimeUnit;
* expire timeouts. They are only expired when {@link
MockCoordinatorTimer#poll()}
* is called.
*/
-public class MockCoordinatorTimer<T, U> implements CoordinatorTimer<T, U> {
+public class MockCoordinatorTimer<U> implements CoordinatorTimer<U> {
/**
* Represents a scheduled timeout.
*/
- public record ScheduledTimeout<T, U>(String key, long deadlineMs,
TimeoutOperation<T, U> operation) {
+ public record ScheduledTimeout<U>(String key, long deadlineMs,
TimeoutOperation<U> operation) {
}
/**
* Represents an expired timeout.
*/
- public record ExpiredTimeout<T, U>(String key, CoordinatorResult<T, U>
result) {
+ public record ExpiredTimeout<U>(String key, CoordinatorResult<Void, U>
result) {
}
private final Time time;
- private final Map<String, ScheduledTimeout<T, U>> timeoutMap = new
HashMap<>();
- private final PriorityQueue<ScheduledTimeout<T, U>> timeoutQueue = new
PriorityQueue<>(
+ private final Map<String, ScheduledTimeout<U>> timeoutMap = new
HashMap<>();
+ private final PriorityQueue<ScheduledTimeout<U>> timeoutQueue = new
PriorityQueue<>(
Comparator.comparingLong(entry -> entry.deadlineMs)
);
@@ -65,12 +65,12 @@ public class MockCoordinatorTimer<T, U> implements
CoordinatorTimer<T, U> {
TimeUnit unit,
boolean retry,
long retryBackoff,
- TimeoutOperation<T, U> operation
+ TimeoutOperation<U> operation
) {
cancel(key);
long deadlineMs = time.milliseconds() + unit.toMillis(delay);
- ScheduledTimeout<T, U> timeout = new ScheduledTimeout<>(key,
deadlineMs, operation);
+ ScheduledTimeout<U> timeout = new ScheduledTimeout<>(key, deadlineMs,
operation);
timeoutQueue.add(timeout);
timeoutMap.put(key, timeout);
}
@@ -81,7 +81,7 @@ public class MockCoordinatorTimer<T, U> implements
CoordinatorTimer<T, U> {
long delay,
TimeUnit unit,
boolean retry,
- TimeoutOperation<T, U> operation
+ TimeoutOperation<U> operation
) {
schedule(key, delay, unit, retry, 500L, operation);
}
@@ -92,7 +92,7 @@ public class MockCoordinatorTimer<T, U> implements
CoordinatorTimer<T, U> {
long delay,
TimeUnit unit,
boolean retry,
- TimeoutOperation<T, U> operation
+ TimeoutOperation<U> operation
) {
if (!timeoutMap.containsKey(key)) {
schedule(key, delay, unit, retry, 500L, operation);
@@ -104,7 +104,7 @@ public class MockCoordinatorTimer<T, U> implements
CoordinatorTimer<T, U> {
*/
@Override
public void cancel(String key) {
- ScheduledTimeout<T, U> timeout = timeoutMap.remove(key);
+ ScheduledTimeout<U> timeout = timeoutMap.remove(key);
if (timeout != null) {
timeoutQueue.remove(timeout);
}
@@ -128,7 +128,7 @@ public class MockCoordinatorTimer<T, U> implements
CoordinatorTimer<T, U> {
/**
* @return The scheduled timeout for the key; null otherwise.
*/
- public ScheduledTimeout<T, U> timeout(String key) {
+ public ScheduledTimeout<U> timeout(String key) {
return timeoutMap.get(key);
}
@@ -142,10 +142,10 @@ public class MockCoordinatorTimer<T, U> implements
CoordinatorTimer<T, U> {
/**
* @return A list of expired timeouts based on the current time.
*/
- public List<ExpiredTimeout<T, U>> poll() {
- List<ExpiredTimeout<T, U>> results = new ArrayList<>();
+ public List<ExpiredTimeout<U>> poll() {
+ List<ExpiredTimeout<U>> results = new ArrayList<>();
- ScheduledTimeout<T, U> timeout = timeoutQueue.peek();
+ ScheduledTimeout<U> timeout = timeoutQueue.peek();
while (timeout != null && timeout.deadlineMs <= time.milliseconds()) {
timeoutQueue.poll();
timeoutMap.remove(timeout.key, timeout);
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
index 501c048352f..e0bff1d51c4 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
@@ -166,7 +166,7 @@ public interface Group {
*
* @param timer The coordinator timer.
*/
- default void cancelTimers(CoordinatorTimer<Void, CoordinatorRecord> timer)
{}
+ default void cancelTimers(CoordinatorTimer<CoordinatorRecord> timer) {}
/**
* @return Whether the group is in Empty state.
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index 15eecf0fe7f..8937df57f32 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -159,7 +159,7 @@ public class GroupCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
private LogContext logContext;
private SnapshotRegistry snapshotRegistry;
private Time time;
- private CoordinatorTimer<Void, CoordinatorRecord> timer;
+ private CoordinatorTimer<CoordinatorRecord> timer;
private CoordinatorExecutor<CoordinatorRecord> executor;
private CoordinatorMetrics coordinatorMetrics;
private TopicPartition topicPartition;
@@ -191,7 +191,7 @@ public class GroupCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
@Override
public CoordinatorShardBuilder<GroupCoordinatorShard,
CoordinatorRecord> withTimer(
- CoordinatorTimer<Void, CoordinatorRecord> timer
+ CoordinatorTimer<CoordinatorRecord> timer
) {
this.timer = timer;
return this;
@@ -410,7 +410,7 @@ public class GroupCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
/**
* The coordinator timer.
*/
- private final CoordinatorTimer<Void, CoordinatorRecord> timer;
+ private final CoordinatorTimer<CoordinatorRecord> timer;
/**
* The group coordinator config.
@@ -441,7 +441,7 @@ public class GroupCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
GroupMetadataManager groupMetadataManager,
OffsetMetadataManager offsetMetadataManager,
Time time,
- CoordinatorTimer<Void, CoordinatorRecord> timer,
+ CoordinatorTimer<CoordinatorRecord> timer,
GroupCoordinatorConfig config,
CoordinatorMetrics coordinatorMetrics,
CoordinatorMetricsShard metricsShard
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 205c9ee2d6e..af80cbea4df 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -281,7 +281,7 @@ public class GroupMetadataManager {
private LogContext logContext = null;
private SnapshotRegistry snapshotRegistry = null;
private Time time = null;
- private CoordinatorTimer<Void, CoordinatorRecord> timer = null;
+ private CoordinatorTimer<CoordinatorRecord> timer = null;
private CoordinatorExecutor<CoordinatorRecord> executor = null;
private GroupCoordinatorConfig config = null;
private GroupConfigManager groupConfigManager = null;
@@ -306,7 +306,7 @@ public class GroupMetadataManager {
return this;
}
- Builder withTimer(CoordinatorTimer<Void, CoordinatorRecord> timer) {
+ Builder withTimer(CoordinatorTimer<CoordinatorRecord> timer) {
this.timer = timer;
return this;
}
@@ -421,7 +421,7 @@ public class GroupMetadataManager {
/**
* The system timer.
*/
- private final CoordinatorTimer<Void, CoordinatorRecord> timer;
+ private final CoordinatorTimer<CoordinatorRecord> timer;
/**
* The executor to executor asynchronous tasks.
@@ -514,7 +514,7 @@ public class GroupMetadataManager {
SnapshotRegistry snapshotRegistry,
LogContext logContext,
Time time,
- CoordinatorTimer<Void, CoordinatorRecord> timer,
+ CoordinatorTimer<CoordinatorRecord> timer,
CoordinatorExecutor<CoordinatorRecord> executor,
GroupCoordinatorMetricsShard metrics,
CoordinatorMetadataImage metadataImage,
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
index 621a8f9d1bd..e97bafee55b 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
@@ -835,7 +835,7 @@ public class StreamsGroup implements Group {
}
@Override
- public void cancelTimers(CoordinatorTimer<Void, CoordinatorRecord> timer) {
+ public void cancelTimers(CoordinatorTimer<CoordinatorRecord> timer) {
timer.cancel(initialRebalanceTimeoutKey(groupId));
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
index 85a1cbc92ab..11367610e7e 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
@@ -1334,7 +1334,7 @@ public class GroupCoordinatorShardTest {
GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
Time mockTime = new MockTime();
- MockCoordinatorTimer<Void, CoordinatorRecord> timer = new
MockCoordinatorTimer<>(mockTime);
+ MockCoordinatorTimer<CoordinatorRecord> timer = new
MockCoordinatorTimer<>(mockTime);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
@@ -1353,7 +1353,7 @@ public class GroupCoordinatorShardTest {
// Confirm that it is rescheduled after completion.
mockTime.sleep(1000L);
- List<MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord>>
tasks = timer.poll();
+ List<MockCoordinatorTimer.ExpiredTimeout<CoordinatorRecord>> tasks =
timer.poll();
assertEquals(1, tasks.size());
assertTrue(timer.contains(GROUP_EXPIRATION_KEY));
@@ -1366,7 +1366,7 @@ public class GroupCoordinatorShardTest {
GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
Time mockTime = new MockTime();
- MockCoordinatorTimer<Void, CoordinatorRecord> timer = new
MockCoordinatorTimer<>(mockTime);
+ MockCoordinatorTimer<CoordinatorRecord> timer = new
MockCoordinatorTimer<>(mockTime);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
@@ -1426,7 +1426,7 @@ public class GroupCoordinatorShardTest {
GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
Time mockTime = new MockTime();
- MockCoordinatorTimer<Void, CoordinatorRecord> timer = new
MockCoordinatorTimer<>(mockTime);
+ MockCoordinatorTimer<CoordinatorRecord> timer = new
MockCoordinatorTimer<>(mockTime);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
@@ -1465,7 +1465,7 @@ public class GroupCoordinatorShardTest {
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
CoordinatorMetricsShard metricsShard =
mock(CoordinatorMetricsShard.class);
MockTime time = new MockTime();
- MockCoordinatorTimer<Void, CoordinatorRecord> timer = new
MockCoordinatorTimer<>(time);
+ MockCoordinatorTimer<CoordinatorRecord> timer = new
MockCoordinatorTimer<>(time);
GroupCoordinatorConfig config = mock(GroupCoordinatorConfig.class);
when(config.offsetsRetentionCheckIntervalMs()).thenReturn(60 * 60 *
1000L);
@@ -1568,7 +1568,7 @@ public class GroupCoordinatorShardTest {
GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
Time mockTime = new MockTime();
- MockCoordinatorTimer<Void, CoordinatorRecord> timer = new
MockCoordinatorTimer<>(mockTime);
+ MockCoordinatorTimer<CoordinatorRecord> timer = new
MockCoordinatorTimer<>(mockTime);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 922521ec91b..ba2d9199d72 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -4343,11 +4343,11 @@ public class GroupMetadataManagerTest {
context.assertSessionTimeout(groupId, memberId, 45000);
// Advance time past the session timeout.
- List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts =
context.sleep(45000 + 1);
+ List<ExpiredTimeout<CoordinatorRecord>> timeouts = context.sleep(45000
+ 1);
// Verify the expired timeout.
assertEquals(
- List.of(new ExpiredTimeout<Void, CoordinatorRecord>(
+ List.of(new ExpiredTimeout<CoordinatorRecord>(
groupSessionTimeoutKey(groupId, memberId),
new CoordinatorResult<>(
List.of(
@@ -4407,12 +4407,12 @@ public class GroupMetadataManagerTest {
assertNotNull(context.timer.timeout(groupSessionTimeoutKey(groupId,
memberId)));
// Advance time past the session timeout.
- List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts =
context.sleep(45000 + 1);
+ List<ExpiredTimeout<CoordinatorRecord>> timeouts = context.sleep(45000
+ 1);
// Verify the expired timeout.
assertEquals(
List.of(
- new ExpiredTimeout<Void, CoordinatorRecord>(
+ new ExpiredTimeout<CoordinatorRecord>(
groupSessionTimeoutKey(groupId, memberId),
new CoordinatorResult<>(
List.of(
@@ -4469,11 +4469,11 @@ public class GroupMetadataManagerTest {
context.assertSessionTimeout(groupId, memberId, 45000);
// Advance time past the session timeout.
- List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts =
context.sleep(45000 + 1);
+ List<ExpiredTimeout<CoordinatorRecord>> timeouts = context.sleep(45000
+ 1);
// Verify the expired timeout.
assertEquals(
- List.of(new ExpiredTimeout<Void, CoordinatorRecord>(
+ List.of(new ExpiredTimeout<CoordinatorRecord>(
groupSessionTimeoutKey(groupId, memberId),
new CoordinatorResult<>(
List.of(
@@ -4531,12 +4531,12 @@ public class GroupMetadataManagerTest {
assertNotNull(context.timer.timeout(groupSessionTimeoutKey(groupId,
memberId)));
// Advance time past the session timeout.
- List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts =
context.sleep(45000 + 1);
+ List<ExpiredTimeout<CoordinatorRecord>> timeouts = context.sleep(45000
+ 1);
// Verify the expired timeout.
assertEquals(
List.of(
- new ExpiredTimeout<Void, CoordinatorRecord>(
+ new ExpiredTimeout<CoordinatorRecord>(
groupSessionTimeoutKey(groupId, memberId),
new CoordinatorResult<>(
List.of(
@@ -4612,11 +4612,11 @@ public class GroupMetadataManagerTest {
context.assertSessionTimeout(groupId, memberId, 45000);
// Advance time past the session timeout. No static member joined back
as a replacement
- List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts =
context.sleep(45000 + 1);
+ List<ExpiredTimeout<CoordinatorRecord>> timeouts = context.sleep(45000
+ 1);
// Verify the expired timeout.
assertEquals(
- List.of(new ExpiredTimeout<Void, CoordinatorRecord>(
+ List.of(new ExpiredTimeout<CoordinatorRecord>(
groupSessionTimeoutKey(groupId, memberId),
new CoordinatorResult<>(
List.of(
@@ -4746,7 +4746,7 @@ public class GroupMetadataManagerTest {
// Verify that there is a revocation timeout. Keep a reference
// to the timeout for later.
- ScheduledTimeout<Void, CoordinatorRecord> scheduledTimeout =
+ ScheduledTimeout<CoordinatorRecord> scheduledTimeout =
context.assertRebalanceTimeout(groupId, memberId1, 12000);
assertEquals(
@@ -4890,11 +4890,11 @@ public class GroupMetadataManagerTest {
);
// Advance time past the revocation timeout.
- List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts =
context.sleep(10000 + 1);
+ List<ExpiredTimeout<CoordinatorRecord>> timeouts = context.sleep(10000
+ 1);
// Verify the expired timeout.
assertEquals(
- List.of(new ExpiredTimeout<Void, CoordinatorRecord>(
+ List.of(new ExpiredTimeout<CoordinatorRecord>(
groupRebalanceTimeoutKey(groupId, memberId1),
new CoordinatorResult<>(
List.of(
@@ -5418,7 +5418,7 @@ public class GroupMetadataManagerTest {
context.groupMetadataManager.onLoaded();
IntStream.range(0, 2).forEach(i -> {
- ScheduledTimeout<Void, CoordinatorRecord> timeout =
context.timer.timeout(
+ ScheduledTimeout<CoordinatorRecord> timeout =
context.timer.timeout(
classicGroupHeartbeatKey("group-id", "member-1"));
assertNotNull(timeout);
@@ -6304,7 +6304,7 @@ public class GroupMetadataManagerTest {
String memberId = group.leaderOrNull();
// Advance clock by new member join timeout. Member should be removed
from group as heartbeat expires.
// A group that transitions to Empty after completing join phase will
generate records.
- List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts =
context.sleep(context.classicGroupNewMemberJoinTimeoutMs);
+ List<ExpiredTimeout<CoordinatorRecord>> timeouts =
context.sleep(context.classicGroupNewMemberJoinTimeoutMs);
assertEquals(1, timeouts.size());
timeouts.forEach(timeout -> {
@@ -7121,7 +7121,7 @@ public class GroupMetadataManagerTest {
assertEquals(1, group.numMembers());
// Member should be removed as heartbeat expires. The group is now
empty.
- List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts =
context.sleep(5000);
+ List<ExpiredTimeout<CoordinatorRecord>> timeouts = context.sleep(5000);
List<CoordinatorRecord> expectedRecords =
List.of(GroupMetadataManagerTestContext.newGroupMetadataRecord(
group.groupId(),
new GroupMetadataValue()
@@ -7370,7 +7370,7 @@ public class GroupMetadataManagerTest {
// Advance clock by rebalance timeout so that the join phase completes
with duplicate follower.
// Both heartbeats will expire but only the leader is kicked out.
- List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts =
context.sleep(10000);
+ List<ExpiredTimeout<CoordinatorRecord>> timeouts =
context.sleep(10000);
assertEquals(2, timeouts.size());
timeouts.forEach(timeout -> assertEquals(EMPTY_RESULT,
timeout.result()));
@@ -8139,7 +8139,7 @@ public class GroupMetadataManagerTest {
assertTrue(group.isInState(PREPARING_REBALANCE));
// Advance clock by session timeout to kick leader out and complete
join phase.
- List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts =
context.sleep(5000);
+ List<ExpiredTimeout<CoordinatorRecord>> timeouts = context.sleep(5000);
// Both leader and follower heartbeat timers may expire. However, the
follower heartbeat expiration
// will not kick the follower out because it is awaiting a join
response.
assertTrue(timeouts.size() <= 2);
@@ -9002,7 +9002,7 @@ public class GroupMetadataManagerTest {
// Advance clock by session timeout to expire leader heartbeat and
prepare rebalance.
// This should complete follower's sync response. The follower's
heartbeat expiration will not kick
// the follower out because it is awaiting sync.
- List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts =
context.sleep(10000);
+ List<ExpiredTimeout<CoordinatorRecord>> timeouts =
context.sleep(10000);
assertTrue(timeouts.size() <= 2);
timeouts.forEach(timeout ->
assertTrue(timeout.result().records().isEmpty()));
@@ -9834,9 +9834,9 @@ public class GroupMetadataManagerTest {
// Advance clock by 1/2 rebalance timeout to expire the pending sync.
Members should be removed.
// The group becomes empty, generating an empty group metadata record.
- List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts =
context.sleep(rebalanceTimeoutMs / 2);
+ List<ExpiredTimeout<CoordinatorRecord>> timeouts =
context.sleep(rebalanceTimeoutMs / 2);
assertEquals(1, timeouts.size());
- ExpiredTimeout<Void, CoordinatorRecord> timeout = timeouts.get(0);
+ ExpiredTimeout<CoordinatorRecord> timeout = timeouts.get(0);
assertEquals(classicGroupSyncKey("group-id"), timeout.key());
assertEquals(
List.of(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(group,
group.groupAssignment())),
@@ -9888,9 +9888,9 @@ public class GroupMetadataManagerTest {
context.verifyHeartbeat(group.groupId(), joinResponses.get(0),
Errors.NONE);
// Advance clock by 1/2 rebalance timeout to expire the pending sync.
Followers should be removed.
- List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts =
context.sleep(rebalanceTimeoutMs / 2);
+ List<ExpiredTimeout<CoordinatorRecord>> timeouts =
context.sleep(rebalanceTimeoutMs / 2);
assertEquals(1, timeouts.size());
- ExpiredTimeout<Void, CoordinatorRecord> timeout = timeouts.get(0);
+ ExpiredTimeout<CoordinatorRecord> timeout = timeouts.get(0);
assertEquals(classicGroupSyncKey("group-id"), timeout.key());
assertTrue(timeout.result().records().isEmpty());
@@ -9939,9 +9939,9 @@ public class GroupMetadataManagerTest {
}).toList();
// Advance clock by 1/2 rebalance timeout to expire the pending sync.
Leader should be kicked out.
- List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts =
context.sleep(rebalanceTimeoutMs / 2);
+ List<ExpiredTimeout<CoordinatorRecord>> timeouts =
context.sleep(rebalanceTimeoutMs / 2);
assertEquals(1, timeouts.size());
- ExpiredTimeout<Void, CoordinatorRecord> timeout = timeouts.get(0);
+ ExpiredTimeout<CoordinatorRecord> timeout = timeouts.get(0);
assertEquals(classicGroupSyncKey("group-id"), timeout.key());
assertTrue(timeout.result().records().isEmpty());
@@ -12885,12 +12885,12 @@ public class GroupMetadataManagerTest {
);
// The new classic member 1 has a heartbeat timeout.
- ScheduledTimeout<Void, CoordinatorRecord> heartbeatTimeout =
context.timer.timeout(
+ ScheduledTimeout<CoordinatorRecord> heartbeatTimeout =
context.timer.timeout(
classicGroupHeartbeatKey(groupId, memberId1)
);
assertNotNull(heartbeatTimeout);
// The new rebalance has a groupJoin timeout.
- ScheduledTimeout<Void, CoordinatorRecord> groupJoinTimeout =
context.timer.timeout(
+ ScheduledTimeout<CoordinatorRecord> groupJoinTimeout =
context.timer.timeout(
classicGroupJoinKey(groupId)
);
assertNotNull(groupJoinTimeout);
@@ -13009,7 +13009,7 @@ public class GroupMetadataManagerTest {
// Advance time past the session timeout.
// Member 2 should be fenced from the group, thus triggering the
downgrade.
- ExpiredTimeout<Void, CoordinatorRecord> timeout = context.sleep(45000
+ 1).get(0);
+ ExpiredTimeout<CoordinatorRecord> timeout = context.sleep(45000 +
1).get(0);
assertEquals(groupSessionTimeoutKey(groupId, memberId2),
timeout.key());
byte[] assignment =
Utils.toArray(ConsumerProtocol.serializeAssignment(new
ConsumerPartitionAssignor.Assignment(List.of(
@@ -13069,12 +13069,12 @@ public class GroupMetadataManagerTest {
);
// The new classic member 1 has a heartbeat timeout.
- ScheduledTimeout<Void, CoordinatorRecord> heartbeatTimeout =
context.timer.timeout(
+ ScheduledTimeout<CoordinatorRecord> heartbeatTimeout =
context.timer.timeout(
classicGroupHeartbeatKey(groupId, memberId1)
);
assertNotNull(heartbeatTimeout);
// The new rebalance has a groupJoin timeout.
- ScheduledTimeout<Void, CoordinatorRecord> groupJoinTimeout =
context.timer.timeout(
+ ScheduledTimeout<CoordinatorRecord> groupJoinTimeout =
context.timer.timeout(
classicGroupJoinKey(groupId)
);
assertNotNull(groupJoinTimeout);
@@ -13211,7 +13211,7 @@ public class GroupMetadataManagerTest {
// Advance time past the session timeout.
// Member 2 should be fenced from the group, thus triggering the
downgrade.
- ExpiredTimeout<Void, CoordinatorRecord> timeout = context.sleep(30000
+ 1).get(0);
+ ExpiredTimeout<CoordinatorRecord> timeout = context.sleep(30000 +
1).get(0);
assertEquals(groupRebalanceTimeoutKey(groupId, memberId2),
timeout.key());
byte[] assignment =
Utils.toArray(ConsumerProtocol.serializeAssignment(new
ConsumerPartitionAssignor.Assignment(List.of(
@@ -13271,12 +13271,12 @@ public class GroupMetadataManagerTest {
);
// The new classic member 1 has a heartbeat timeout.
- ScheduledTimeout<Void, CoordinatorRecord> heartbeatTimeout =
context.timer.timeout(
+ ScheduledTimeout<CoordinatorRecord> heartbeatTimeout =
context.timer.timeout(
classicGroupHeartbeatKey(groupId, memberId1)
);
assertNotNull(heartbeatTimeout);
// The new rebalance has a groupJoin timeout.
- ScheduledTimeout<Void, CoordinatorRecord> groupJoinTimeout =
context.timer.timeout(
+ ScheduledTimeout<CoordinatorRecord> groupJoinTimeout =
context.timer.timeout(
classicGroupJoinKey(groupId)
);
assertNotNull(groupJoinTimeout);
@@ -13534,7 +13534,7 @@ public class GroupMetadataManagerTest {
);
// The new classic member 1 has a heartbeat timeout.
- ScheduledTimeout<Void, CoordinatorRecord> heartbeatTimeout =
context.timer.timeout(
+ ScheduledTimeout<CoordinatorRecord> heartbeatTimeout =
context.timer.timeout(
classicGroupHeartbeatKey(groupId, memberId1)
);
assertNotNull(heartbeatTimeout);
@@ -15411,11 +15411,11 @@ public class GroupMetadataManagerTest {
context.assertSessionTimeout(groupId, memberId, sessionTimeout);
// Advance clock by session timeout + 1.
- List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts =
context.sleep(sessionTimeout + 1);
+ List<ExpiredTimeout<CoordinatorRecord>> timeouts =
context.sleep(sessionTimeout + 1);
// The member is fenced from the group.
assertEquals(1, timeouts.size());
- ExpiredTimeout<Void, CoordinatorRecord> timeout = timeouts.get(0);
+ ExpiredTimeout<CoordinatorRecord> timeout = timeouts.get(0);
assertEquals(groupSessionTimeoutKey(groupId, memberId), timeout.key());
assertRecordsEquals(
List.of(
@@ -15476,11 +15476,11 @@ public class GroupMetadataManagerTest {
context.assertJoinTimeout(groupId, memberId, rebalanceTimeout);
// Advance clock by rebalance timeout + 1.
- List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts =
context.sleep(rebalanceTimeout + 1);
+ List<ExpiredTimeout<CoordinatorRecord>> timeouts =
context.sleep(rebalanceTimeout + 1);
// The member is fenced from the group.
assertEquals(1, timeouts.size());
- ExpiredTimeout<Void, CoordinatorRecord> timeout = timeouts.get(0);
+ ExpiredTimeout<CoordinatorRecord> timeout = timeouts.get(0);
assertEquals(consumerGroupJoinKey(groupId, memberId), timeout.key());
assertRecordsEquals(
List.of(
@@ -16075,12 +16075,12 @@ public class GroupMetadataManagerTest {
);
// The new classic member 1 has a heartbeat timeout.
- ScheduledTimeout<Void, CoordinatorRecord> heartbeatTimeout =
context.timer.timeout(
+ ScheduledTimeout<CoordinatorRecord> heartbeatTimeout =
context.timer.timeout(
classicGroupHeartbeatKey(groupId, memberId1)
);
assertNotNull(heartbeatTimeout);
// The new rebalance has a groupJoin timeout.
- ScheduledTimeout<Void, CoordinatorRecord> groupJoinTimeout =
context.timer.timeout(
+ ScheduledTimeout<CoordinatorRecord> groupJoinTimeout =
context.timer.timeout(
classicGroupJoinKey(groupId)
);
assertNotNull(groupJoinTimeout);
@@ -19340,11 +19340,11 @@ public class GroupMetadataManagerTest {
context.assertSessionTimeout(groupId, memberId, 45000);
// Advance time past the session timeout.
- List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts =
context.sleep(45000 + 1);
+ List<ExpiredTimeout<CoordinatorRecord>> timeouts = context.sleep(45000
+ 1);
// Verify the expired timeout.
assertEquals(
- List.of(new ExpiredTimeout<Void, CoordinatorRecord>(
+ List.of(new ExpiredTimeout<CoordinatorRecord>(
groupSessionTimeoutKey(groupId, memberId),
new CoordinatorResult<>(
List.of(
@@ -19486,7 +19486,7 @@ public class GroupMetadataManagerTest {
// Verify that there is a revocation timeout. Keep a reference
// to the timeout for later.
- ScheduledTimeout<Void, CoordinatorRecord> scheduledTimeout =
+ ScheduledTimeout<CoordinatorRecord> scheduledTimeout =
context.assertRebalanceTimeout(groupId, memberId1, 12000);
assertEquals(
@@ -19649,11 +19649,11 @@ public class GroupMetadataManagerTest {
);
// Advance time past the revocation timeout.
- List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts =
context.sleep(rebalanceTimeoutMs + 1);
+ List<ExpiredTimeout<CoordinatorRecord>> timeouts =
context.sleep(rebalanceTimeoutMs + 1);
// Verify the expired timeout.
assertEquals(
- List.of(new ExpiredTimeout<Void, CoordinatorRecord>(
+ List.of(new ExpiredTimeout<CoordinatorRecord>(
groupRebalanceTimeoutKey(groupId, memberId1),
new CoordinatorResult<>(
List.of(
@@ -23505,11 +23505,11 @@ public class GroupMetadataManagerTest {
// Member 2 is fenced due to reaching the session timeout.
context.assertSessionTimeout(groupId, memberId2, 45000);
- List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts =
context.sleep(45000 + 1);
+ List<ExpiredTimeout<CoordinatorRecord>> timeouts = context.sleep(45000
+ 1);
// Verify the expired timeout.
assertEquals(
- List.of(new ExpiredTimeout<Void, CoordinatorRecord>(
+ List.of(new ExpiredTimeout<CoordinatorRecord>(
groupSessionTimeoutKey(groupId, memberId2),
new CoordinatorResult<>(
List.of(
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
index b52d3edb3fe..1bbfd86f96a 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
@@ -198,7 +198,7 @@ public class GroupMetadataManagerTestContext {
}
}
- public static void
assertNoOrEmptyResult(List<MockCoordinatorTimer.ExpiredTimeout<Void,
CoordinatorRecord>> timeouts) {
+ public static void
assertNoOrEmptyResult(List<MockCoordinatorTimer.ExpiredTimeout<CoordinatorRecord>>
timeouts) {
assertTrue(timeouts.size() <= 1);
timeouts.forEach(timeout -> assertEquals(EMPTY_RESULT,
timeout.result()));
}
@@ -458,7 +458,7 @@ public class GroupMetadataManagerTestContext {
public static class Builder {
private MockTime time = new MockTime(0, 0, 0);
- private final MockCoordinatorTimer<Void, CoordinatorRecord> timer =
new MockCoordinatorTimer<>(time);
+ private final MockCoordinatorTimer<CoordinatorRecord> timer = new
MockCoordinatorTimer<>(time);
private final MockCoordinatorExecutor<CoordinatorRecord> executor =
new MockCoordinatorExecutor<>();
private final LogContext logContext = new LogContext();
private final SnapshotRegistry snapshotRegistry = new
SnapshotRegistry(logContext);
@@ -576,7 +576,7 @@ public class GroupMetadataManagerTestContext {
}
final MockTime time;
- final MockCoordinatorTimer<Void, CoordinatorRecord> timer;
+ final MockCoordinatorTimer<CoordinatorRecord> timer;
final MockCoordinatorExecutor<CoordinatorRecord> executor;
final SnapshotRegistry snapshotRegistry;
final GroupCoordinatorMetricsShard metrics;
@@ -590,7 +590,7 @@ public class GroupMetadataManagerTestContext {
public GroupMetadataManagerTestContext(
MockTime time,
- MockCoordinatorTimer<Void, CoordinatorRecord> timer,
+ MockCoordinatorTimer<CoordinatorRecord> timer,
MockCoordinatorExecutor<CoordinatorRecord> executor,
SnapshotRegistry snapshotRegistry,
GroupCoordinatorMetricsShard metrics,
@@ -761,9 +761,9 @@ public class GroupMetadataManagerTestContext {
return result;
}
- public List<MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord>>
sleep(long ms) {
+ public List<MockCoordinatorTimer.ExpiredTimeout<CoordinatorRecord>>
sleep(long ms) {
time.sleep(ms);
- List<MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord>>
timeouts = timer.poll();
+ List<MockCoordinatorTimer.ExpiredTimeout<CoordinatorRecord>> timeouts
= timer.poll();
timeouts.forEach(timeout -> {
if (timeout.result().replayRecords()) {
timeout.result().records().forEach(this::replay);
@@ -787,7 +787,7 @@ public class GroupMetadataManagerTestContext {
String memberId,
long delayMs
) {
- MockCoordinatorTimer.ScheduledTimeout<Void, CoordinatorRecord> timeout
=
+ MockCoordinatorTimer.ScheduledTimeout<CoordinatorRecord> timeout =
timer.timeout(groupSessionTimeoutKey(groupId, memberId));
assertNotNull(timeout);
assertEquals(time.milliseconds() + delayMs, timeout.deadlineMs());
@@ -797,17 +797,17 @@ public class GroupMetadataManagerTestContext {
String groupId,
String memberId
) {
- MockCoordinatorTimer.ScheduledTimeout<Void, CoordinatorRecord> timeout
=
+ MockCoordinatorTimer.ScheduledTimeout<CoordinatorRecord> timeout =
timer.timeout(groupSessionTimeoutKey(groupId, memberId));
assertNull(timeout);
}
- public MockCoordinatorTimer.ScheduledTimeout<Void, CoordinatorRecord>
assertRebalanceTimeout(
+ public MockCoordinatorTimer.ScheduledTimeout<CoordinatorRecord>
assertRebalanceTimeout(
String groupId,
String memberId,
long delayMs
) {
- MockCoordinatorTimer.ScheduledTimeout<Void, CoordinatorRecord> timeout
=
+ MockCoordinatorTimer.ScheduledTimeout<CoordinatorRecord> timeout =
timer.timeout(groupRebalanceTimeoutKey(groupId, memberId));
assertNotNull(timeout);
assertEquals(time.milliseconds() + delayMs, timeout.deadlineMs());
@@ -818,17 +818,17 @@ public class GroupMetadataManagerTestContext {
String groupId,
String memberId
) {
- MockCoordinatorTimer.ScheduledTimeout<Void, CoordinatorRecord> timeout
=
+ MockCoordinatorTimer.ScheduledTimeout<CoordinatorRecord> timeout =
timer.timeout(groupRebalanceTimeoutKey(groupId, memberId));
assertNull(timeout);
}
- public MockCoordinatorTimer.ScheduledTimeout<Void, CoordinatorRecord>
assertJoinTimeout(
+ public MockCoordinatorTimer.ScheduledTimeout<CoordinatorRecord>
assertJoinTimeout(
String groupId,
String memberId,
long delayMs
) {
- MockCoordinatorTimer.ScheduledTimeout<Void, CoordinatorRecord> timeout
=
+ MockCoordinatorTimer.ScheduledTimeout<CoordinatorRecord> timeout =
timer.timeout(consumerGroupJoinKey(groupId, memberId));
assertNotNull(timeout);
assertEquals(time.milliseconds() + delayMs, timeout.deadlineMs());
@@ -839,17 +839,17 @@ public class GroupMetadataManagerTestContext {
String groupId,
String memberId
) {
- MockCoordinatorTimer.ScheduledTimeout<Void, CoordinatorRecord> timeout
=
+ MockCoordinatorTimer.ScheduledTimeout<CoordinatorRecord> timeout =
timer.timeout(consumerGroupJoinKey(groupId, memberId));
assertNull(timeout);
}
- public MockCoordinatorTimer.ScheduledTimeout<Void, CoordinatorRecord>
assertSyncTimeout(
+ public MockCoordinatorTimer.ScheduledTimeout<CoordinatorRecord>
assertSyncTimeout(
String groupId,
String memberId,
long delayMs
) {
- MockCoordinatorTimer.ScheduledTimeout<Void, CoordinatorRecord> timeout
=
+ MockCoordinatorTimer.ScheduledTimeout<CoordinatorRecord> timeout =
timer.timeout(consumerGroupSyncKey(groupId, memberId));
assertNotNull(timeout);
assertEquals(time.milliseconds() + delayMs, timeout.deadlineMs());
@@ -860,7 +860,7 @@ public class GroupMetadataManagerTestContext {
String groupId,
String memberId
) {
- MockCoordinatorTimer.ScheduledTimeout<Void, CoordinatorRecord> timeout
=
+ MockCoordinatorTimer.ScheduledTimeout<CoordinatorRecord> timeout =
timer.timeout(consumerGroupSyncKey(groupId, memberId));
assertNull(timeout);
}
@@ -999,7 +999,7 @@ public class GroupMetadataManagerTestContext {
);
assertTrue(secondJoinResult.records.isEmpty());
- List<MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord>>
timeouts = sleep(classicGroupInitialRebalanceDelayMs);
+ List<MockCoordinatorTimer.ExpiredTimeout<CoordinatorRecord>> timeouts
= sleep(classicGroupInitialRebalanceDelayMs);
assertEquals(1, timeouts.size());
assertTrue(secondJoinResult.joinFuture.isDone());
assertEquals(Errors.NONE.code(),
secondJoinResult.joinFuture.get().errorCode());
@@ -1312,7 +1312,7 @@ public class GroupMetadataManagerTestContext {
.map(member ->
classicGroupHeartbeatKey(group.groupId(),
member.memberId())).collect(Collectors.toSet());
// Member should be removed as session expires.
- List<MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord>>
timeouts = sleep(timeoutMs);
+ List<MockCoordinatorTimer.ExpiredTimeout<CoordinatorRecord>> timeouts
= sleep(timeoutMs);
List<CoordinatorRecord> expectedRecords =
List.of(newGroupMetadataRecord(
group.groupId(),
new GroupMetadataValue()
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
index 18e119961c9..0ef8efa158e 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
@@ -108,7 +108,7 @@ public class OffsetMetadataManagerTest {
static class OffsetMetadataManagerTestContext {
public static class Builder {
private final MockTime time = new MockTime();
- private final MockCoordinatorTimer<Void, CoordinatorRecord> timer
= new MockCoordinatorTimer<>(time);
+ private final MockCoordinatorTimer<CoordinatorRecord> timer = new
MockCoordinatorTimer<>(time);
private final MockCoordinatorExecutor<CoordinatorRecord> executor
= new MockCoordinatorExecutor<>();
private final LogContext logContext = new LogContext();
private final SnapshotRegistry snapshotRegistry = new
SnapshotRegistry(logContext);
@@ -175,7 +175,7 @@ public class OffsetMetadataManagerTest {
}
final MockTime time;
- final MockCoordinatorTimer<Void, CoordinatorRecord> timer;
+ final MockCoordinatorTimer<CoordinatorRecord> timer;
final SnapshotRegistry snapshotRegistry;
final GroupCoordinatorMetricsShard metrics;
final GroupMetadataManager groupMetadataManager;
@@ -186,7 +186,7 @@ public class OffsetMetadataManagerTest {
OffsetMetadataManagerTestContext(
MockTime time,
- MockCoordinatorTimer<Void, CoordinatorRecord> timer,
+ MockCoordinatorTimer<CoordinatorRecord> timer,
SnapshotRegistry snapshotRegistry,
GroupCoordinatorMetricsShard metrics,
GroupMetadataManager groupMetadataManager,
@@ -384,9 +384,9 @@ public class OffsetMetadataManagerTest {
return response.topics();
}
- public List<MockCoordinatorTimer.ExpiredTimeout<Void,
CoordinatorRecord>> sleep(long ms) {
+ public List<MockCoordinatorTimer.ExpiredTimeout<CoordinatorRecord>>
sleep(long ms) {
time.sleep(ms);
- List<MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord>>
timeouts = timer.poll();
+ List<MockCoordinatorTimer.ExpiredTimeout<CoordinatorRecord>>
timeouts = timer.poll();
timeouts.forEach(timeout -> {
if (timeout.result().replayRecords()) {
timeout.result().records().forEach(this::replay);
@@ -996,7 +996,7 @@ public class OffsetMetadataManagerTest {
// Advance time by half of the session timeout again. The timeout
should
// expire and the member is removed from the group.
- List<MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord>>
timeouts =
+ List<MockCoordinatorTimer.ExpiredTimeout<CoordinatorRecord>> timeouts =
context.sleep(5000 / 2);
assertEquals(1, timeouts.size());
assertFalse(group.hasMember(member.memberId()));
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
index f015441f86f..83eaa1f13b0 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
@@ -1240,7 +1240,7 @@ public class StreamsGroupTest {
@Test
public void testCancelTimers() {
StreamsGroup streamsGroup = createStreamsGroup("test-group");
- CoordinatorTimer<Void, CoordinatorRecord> timer =
mock(CoordinatorTimer.class);
+ CoordinatorTimer<CoordinatorRecord> timer =
mock(CoordinatorTimer.class);
streamsGroup.cancelTimers(timer);
diff --git
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
index 6c54f5cff9b..970cf561498 100644
---
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
+++
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
@@ -123,7 +123,7 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
}
@Override
- public CoordinatorShardBuilder<ShareCoordinatorShard,
CoordinatorRecord> withTimer(CoordinatorTimer<Void, CoordinatorRecord> timer) {
+ public CoordinatorShardBuilder<ShareCoordinatorShard,
CoordinatorRecord> withTimer(CoordinatorTimer<CoordinatorRecord> timer) {
// method is required due to interface
return this;
}