This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 52e1569ef22 KAFKA-19862: Group coordinator loading may fail when there
is concurrent compaction (4.0) (#21118)
52e1569ef22 is described below
commit 52e1569ef22cdbc4420fcdc19b2b9d4f4fab7da7
Author: Izzy Harker <[email protected]>
AuthorDate: Thu Dec 11 08:18:09 2025 -0600
KAFKA-19862: Group coordinator loading may fail when there is concurrent
compaction (4.0) (#21118)
Cherry-pick changes (https://github.com/apache/kafka/pull/20907) to 4.0
Conflicts:
-> Removed
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
-> Removed
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
->
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
- only added required ConsumerGroup tests and kept everything else the
same
-> Kept GroupCoordinatorMetricsShard argument in ConsumerGroup
constructor
Reviewers: Sean Quah <[email protected]>, Lucas Brutschy
<[email protected]>, David Jacot <[email protected]>
---
.../coordinator/group/GroupMetadataManager.java | 9 +-
.../group/modern/consumer/ConsumerGroup.java | 40 +++++---
.../group/GroupMetadataManagerTest.java | 104 +++++++++++++++++++++
.../group/classic/ClassicGroupTest.java | 2 +
.../group/modern/consumer/ConsumerGroupTest.java | 64 +++++++++----
5 files changed, 184 insertions(+), 35 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index d97479f1269..7089bb11287 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -694,10 +694,10 @@ public class GroupMetadataManager {
}
if (group == null) {
- return new ConsumerGroup(snapshotRegistry, groupId, metrics);
+ return new ConsumerGroup(logContext, snapshotRegistry, groupId,
metrics);
} else if (createIfNotExists && maybeDeleteEmptyClassicGroup(group,
records)) {
log.info("[GroupId {}] Converted the empty classic group to a
consumer group.", groupId);
- return new ConsumerGroup(snapshotRegistry, groupId, metrics);
+ return new ConsumerGroup(logContext, snapshotRegistry, groupId,
metrics);
} else {
if (group.type() == CONSUMER) {
return (ConsumerGroup) group;
@@ -766,7 +766,7 @@ public class GroupMetadataManager {
}
if (group == null) {
- ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry,
groupId, metrics);
+ ConsumerGroup consumerGroup = new ConsumerGroup(logContext,
snapshotRegistry, groupId, metrics);
groups.put(groupId, consumerGroup);
return consumerGroup;
} else if (group.type() == CONSUMER) {
@@ -776,7 +776,7 @@ public class GroupMetadataManager {
// offsets if no group existed. Simple classic groups are not
backed by any records
// in the __consumer_offsets topic hence we can safely replace it
here. Without this,
// replaying consumer group records after offset commit records
would not work.
- ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry,
groupId, metrics);
+ ConsumerGroup consumerGroup = new ConsumerGroup(logContext,
snapshotRegistry, groupId, metrics);
groups.put(groupId, consumerGroup);
return consumerGroup;
} else {
@@ -1118,6 +1118,7 @@ public class GroupMetadataManager {
ConsumerGroup consumerGroup;
try {
consumerGroup = ConsumerGroup.fromClassicGroup(
+ logContext,
snapshotRegistry,
metrics,
classicGroup,
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
index c0f8673eae6..ec1e5a8d960 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
@@ -29,6 +29,7 @@ import
org.apache.kafka.common.message.ConsumerProtocolSubscription;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.requests.JoinGroupRequest;
+import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers;
import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
@@ -50,6 +51,8 @@ import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineInteger;
import org.apache.kafka.timeline.TimelineObject;
+import org.slf4j.Logger;
+
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
@@ -105,6 +108,11 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
}
}
+ /**
+ * The logger.
+ */
+ private final Logger log;
+
/**
* The group state.
*/
@@ -153,11 +161,13 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
private final TimelineHashMap<String, ResolvedRegularExpression>
resolvedRegularExpressions;
public ConsumerGroup(
+ LogContext logContext,
SnapshotRegistry snapshotRegistry,
String groupId,
GroupCoordinatorMetricsShard metrics
) {
super(snapshotRegistry, groupId);
+ this.log = logContext.logger(ConsumerGroup.class);
this.state = new TimelineObject<>(snapshotRegistry, EMPTY);
this.staticMembers = new TimelineHashMap<>(snapshotRegistry, 0);
this.serverAssignors = new TimelineHashMap<>(snapshotRegistry, 0);
@@ -1040,7 +1050,6 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
*
* @param assignment The assignment.
* @param expectedEpoch The expected epoch.
- * @throws IllegalStateException if the epoch does not match the expected
one.
* package-private for testing.
*/
void removePartitionEpochs(
@@ -1051,11 +1060,12 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
currentPartitionEpoch.compute(topicId, (__, partitionsOrNull) -> {
if (partitionsOrNull != null) {
assignedPartitions.forEach(partitionId -> {
- Integer prevValue =
partitionsOrNull.remove(partitionId);
- if (prevValue != expectedEpoch) {
- throw new IllegalStateException(
- String.format("Cannot remove the epoch %d from
%s-%s because the partition is " +
- "still owned at a different epoch %d",
expectedEpoch, topicId, partitionId, prevValue));
+ Integer prevValue = partitionsOrNull.get(partitionId);
+ if (prevValue != null && prevValue == expectedEpoch) {
+ partitionsOrNull.remove(partitionId);
+ } else {
+ log.debug("[GroupId {}] Cannot remove the epoch {}
from {}-{} because the partition is " +
+ "still owned at a different epoch {}",
groupId, expectedEpoch, topicId, partitionId, prevValue);
}
});
if (partitionsOrNull.isEmpty()) {
@@ -1064,9 +1074,9 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
return partitionsOrNull;
}
} else {
- throw new IllegalStateException(
- String.format("Cannot remove the epoch %d from %s
because it does not have any epoch",
- expectedEpoch, topicId));
+ log.debug("[GroupId {}] Cannot remove the epoch {} from {}
because it does not have any epoch",
+ groupId, expectedEpoch, topicId);
+ return partitionsOrNull;
}
});
});
@@ -1077,7 +1087,7 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
*
* @param assignment The assignment.
* @param epoch The new epoch.
- * @throws IllegalStateException if the partition already has an epoch
assigned.
+ * @throws IllegalStateException if updating a partition with a smaller or
equal epoch.
* package-private for testing.
*/
void addPartitionEpochs(
@@ -1090,8 +1100,10 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
partitionsOrNull = new TimelineHashMap<>(snapshotRegistry,
assignedPartitions.size());
}
for (Integer partitionId : assignedPartitions) {
- Integer prevValue = partitionsOrNull.put(partitionId,
epoch);
- if (prevValue != null) {
+ Integer prevValue = partitionsOrNull.get(partitionId);
+ if (prevValue == null || prevValue < epoch) {
+ partitionsOrNull.put(partitionId, epoch);
+ } else {
throw new IllegalStateException(
String.format("Cannot set the epoch of %s-%s to %d
because the partition is " +
"still owned at epoch %d", topicId,
partitionId, epoch, prevValue));
@@ -1127,6 +1139,7 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
/**
* Create a new consumer group according to the given classic group.
*
+ * @param logContext The log context.
* @param snapshotRegistry The SnapshotRegistry.
* @param metrics The GroupCoordinatorMetricsShard.
* @param classicGroup The converted classic group.
@@ -1138,6 +1151,7 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
* @throws UnsupportedVersionException if userData from a custom assignor
would be lost.
*/
public static ConsumerGroup fromClassicGroup(
+ LogContext logContext,
SnapshotRegistry snapshotRegistry,
GroupCoordinatorMetricsShard metrics,
ClassicGroup classicGroup,
@@ -1145,7 +1159,7 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
ClusterImage clusterImage
) {
String groupId = classicGroup.groupId();
- ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry,
groupId, metrics);
+ ConsumerGroup consumerGroup = new ConsumerGroup(logContext,
snapshotRegistry, groupId, metrics);
consumerGroup.setGroupEpoch(classicGroup.generationId());
consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId());
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 6b150d09c4f..ea73b44590c 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -16848,6 +16848,110 @@ public class GroupMetadataManagerTest {
);
}
+ @Test
+ public void testReplayConsumerGroupCurrentMemberAssignmentWithCompaction()
{
+ String groupId = "fooup";
+ String memberIdA = "memberIdA";
+ String memberIdB = "memberIdB";
+ Uuid topicId = Uuid.randomUuid();
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder().build();
+
+ // This test enacts the following scenario:
+ // 1. Member A is assigned partition 0.
+ // 2. Member A is unassigned partition 0 [record removed by
compaction].
+ // 3. Member B is assigned partition 0.
+ // 4. Member A is assigned partition 1.
+ // If record 2 is processed, there are no issues, however with
compaction it is possible that
+ // unassignment records are removed. We would like to not fail in
these cases.
+ // Therefore we will allow assignments to owned partitions as long as
the epoch is larger.
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberIdA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+ .build()));
+
+ // Partition 0's owner is replaced by member B at epoch 12.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberIdB)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(12)
+ .setPreviousMemberEpoch(11)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+ .build()));
+
+ // Partition 0 must remain with member B at epoch 12 even though
member A has just been unassigned partition 0.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberIdA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(13)
+ .setPreviousMemberEpoch(12)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+ .build()));
+
+ // Verify partition epochs.
+ ConsumerGroup group =
context.groupMetadataManager.consumerGroup(groupId);
+ assertEquals(12, group.currentPartitionEpoch(topicId, 0));
+ assertEquals(13, group.currentPartitionEpoch(topicId, 1));
+ }
+
+ @Test
+ public void
testReplayConsumerGroupCurrentMemberAssignmentUnownedTopicWithCompaction() {
+ String groupId = "fooup";
+ String memberIdA = "memberIdA";
+ String memberIdB = "memberIdB";
+ Uuid fooTopicId = Uuid.randomUuid();
+ Uuid barTopicId = Uuid.randomUuid();
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder().build();
+
+ // This test enacts the following scenario:
+ // 1. Member A is assigned partition foo-0.
+ // 2. Member A is unassigned partition foo-0 [record removed by
compaction].
+ // 3. Member B is assigned partition foo-0.
+ // 4. Member B is unassigned partition foo-0.
+ // 5. Member A is assigned partition bar-0.
+ // This is a legitimate set of assignments but with compaction the
unassignment record can be skipped.
+ // This can lead to conflicts from updating an owned partition in step
3 and attempting
+ // to remove nonexistent ownership in step 5. We want to ensure
removing ownership from a
+ // completely unowned partition in step 5 is allowed.
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberIdA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId,
0)))
+ .build()));
+
+ // foo-0's owner is replaced by member B at epoch 12.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberIdB)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(12)
+ .setPreviousMemberEpoch(11)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId,
0)))
+ .build()));
+
+ // foo becomes unowned.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberIdB)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(13)
+ .setPreviousMemberEpoch(12)
+ .build()));
+
+ // Member A is unassigned foo-0.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberIdA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(14)
+ .setPreviousMemberEpoch(13)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(barTopicId,
0)))
+ .build()));
+
+ // Verify foo-0 is unowned and bar-0 is owned by member A at epoch 14.
+ ConsumerGroup group =
context.groupMetadataManager.consumerGroup(groupId);
+ assertEquals(-1, group.currentPartitionEpoch(fooTopicId, 0));
+ assertEquals(14, group.currentPartitionEpoch(barTopicId, 0));
+ }
+
private static void checkJoinGroupResponse(
JoinGroupResponseData expectedResponse,
JoinGroupResponseData actualResponse,
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
index ba27e555b07..6705fea7d10 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
@@ -1383,6 +1383,7 @@ public class ClassicGroupTest {
.build();
ConsumerGroup consumerGroup = new ConsumerGroup(
+ logContext,
new SnapshotRegistry(logContext),
groupId,
mock(GroupCoordinatorMetricsShard.class)
@@ -1536,6 +1537,7 @@ public class ClassicGroupTest {
.build();
ConsumerGroup consumerGroup = new ConsumerGroup(
+ logContext,
new SnapshotRegistry(logContext),
groupId,
mock(GroupCoordinatorMetricsShard.class)
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
index 8153a84d397..66525468fe9 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
@@ -87,6 +87,7 @@ public class ConsumerGroupTest {
private ConsumerGroup createConsumerGroup(String groupId) {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new
LogContext());
return new ConsumerGroup(
+ new LogContext(),
snapshotRegistry,
groupId,
mock(GroupCoordinatorMetricsShard.class)
@@ -282,14 +283,26 @@ public class ConsumerGroupTest {
consumerGroup.updateMember(m1);
ConsumerGroupMember m2 = new ConsumerGroupMember.Builder("m2")
+ .setMemberEpoch(11)
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 1)))
+ .build();
+
+ // m2 can acquire foo-1 because the epoch is larger than m1's epoch.
+ // This should not throw IllegalStateException.
+ consumerGroup.updateMember(m2);
+
+ ConsumerGroupMember m3 = new ConsumerGroupMember.Builder("m3")
.setMemberEpoch(10)
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 1)))
.build();
- // m2 should not be able to acquire foo-1 because the partition is
- // still owned by another member.
- assertThrows(IllegalStateException.class, () ->
consumerGroup.updateMember(m2));
+ // m3 should not be able to acquire foo-1 because the epoch is smaller
+ // than the current partition epoch (11).
+ assertThrows(IllegalStateException.class, () -> {
+ consumerGroup.updateMember(m3);
+ });
}
@Test
@@ -297,13 +310,13 @@ public class ConsumerGroupTest {
Uuid fooTopicId = Uuid.randomUuid();
ConsumerGroup consumerGroup = createConsumerGroup("foo");
- // Removing should fail because there is no epoch set.
- assertThrows(IllegalStateException.class, () ->
consumerGroup.removePartitionEpochs(
+ // Removing should be a no-op when there is no epoch set.
+ consumerGroup.removePartitionEpochs(
mkAssignment(
mkTopicAssignment(fooTopicId, 1)
),
10
- ));
+ );
ConsumerGroupMember m1 = new ConsumerGroupMember.Builder("m1")
.setMemberEpoch(10)
@@ -313,13 +326,15 @@ public class ConsumerGroupTest {
consumerGroup.updateMember(m1);
- // Removing should fail because the expected epoch is incorrect.
- assertThrows(IllegalStateException.class, () ->
consumerGroup.removePartitionEpochs(
+ // Removing with incorrect epoch should do nothing.
+ // A debug message is logged, no exception is thrown.
+ consumerGroup.removePartitionEpochs(
mkAssignment(
mkTopicAssignment(fooTopicId, 1)
),
11
- ));
+ );
+ assertEquals(10, consumerGroup.currentPartitionEpoch(fooTopicId, 1));
}
@Test
@@ -334,14 +349,24 @@ public class ConsumerGroupTest {
10
);
- // Changing the epoch should fail because the owner of the partition
- // should remove it first.
- assertThrows(IllegalStateException.class, () ->
consumerGroup.addPartitionEpochs(
+ // Updating to a larger epoch should succeed.
+ consumerGroup.addPartitionEpochs(
mkAssignment(
mkTopicAssignment(fooTopicId, 1)
),
11
- ));
+ );
+ assertEquals(11, consumerGroup.currentPartitionEpoch(fooTopicId, 1));
+
+ // Updating to a smaller epoch should fail.
+ assertThrows(IllegalStateException.class, () -> {
+ consumerGroup.addPartitionEpochs(
+ mkAssignment(
+ mkTopicAssignment(fooTopicId, 1)
+ ),
+ 10
+ );
+ });
}
@Test
@@ -904,7 +929,7 @@ public class ConsumerGroupTest {
public void testUpdateInvertedAssignment() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new
LogContext());
GroupCoordinatorMetricsShard metricsShard =
mock(GroupCoordinatorMetricsShard.class);
- ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry,
"test-group", metricsShard);
+ ConsumerGroup consumerGroup = new ConsumerGroup(new LogContext(),
snapshotRegistry, "test-group", metricsShard);
Uuid topicId = Uuid.randomUuid();
String memberId1 = "member1";
String memberId2 = "member2";
@@ -1124,7 +1149,7 @@ public class ConsumerGroupTest {
Collections.emptyMap(),
new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)
);
- ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo",
metricsShard);
+ ConsumerGroup group = new ConsumerGroup(new LogContext(),
snapshotRegistry, "group-foo", metricsShard);
snapshotRegistry.idempotentCreateSnapshot(0);
assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY.toString(),
group.stateAsString(0));
group.updateMember(new ConsumerGroupMember.Builder("member1")
@@ -1139,6 +1164,7 @@ public class ConsumerGroupTest {
public void testValidateOffsetFetch() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new
LogContext());
ConsumerGroup group = new ConsumerGroup(
+ new LogContext(),
snapshotRegistry,
"group-foo",
mock(GroupCoordinatorMetricsShard.class)
@@ -1200,7 +1226,7 @@ public class ConsumerGroupTest {
long commitTimestamp = 20000L;
long offsetsRetentionMs = 10000L;
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L,
OptionalInt.empty(), "", commitTimestamp, OptionalLong.empty());
- ConsumerGroup group = new ConsumerGroup(new SnapshotRegistry(new
LogContext()), "group-id", mock(GroupCoordinatorMetricsShard.class));
+ ConsumerGroup group = new ConsumerGroup(new LogContext(), new
SnapshotRegistry(new LogContext()), "group-id",
mock(GroupCoordinatorMetricsShard.class));
Optional<OffsetExpirationCondition> offsetExpirationCondition =
group.offsetExpirationCondition();
assertTrue(offsetExpirationCondition.isPresent());
@@ -1258,7 +1284,7 @@ public class ConsumerGroupTest {
@Test
public void testAsDescribedGroup() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new
LogContext());
- ConsumerGroup group = new ConsumerGroup(snapshotRegistry,
"group-id-1", mock(GroupCoordinatorMetricsShard.class));
+ ConsumerGroup group = new ConsumerGroup(new LogContext(),
snapshotRegistry, "group-id-1", mock(GroupCoordinatorMetricsShard.class));
snapshotRegistry.idempotentCreateSnapshot(0);
assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY.toString(),
group.stateAsString(0));
@@ -1300,7 +1326,7 @@ public class ConsumerGroupTest {
Collections.emptyMap(),
new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)
);
- ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo",
metricsShard);
+ ConsumerGroup group = new ConsumerGroup(new LogContext(),
snapshotRegistry, "group-foo", metricsShard);
snapshotRegistry.idempotentCreateSnapshot(0);
assertTrue(group.isInStates(Collections.singleton("empty"), 0));
assertFalse(group.isInStates(Collections.singleton("Empty"), 0));
@@ -1530,6 +1556,7 @@ public class ConsumerGroupTest {
classicGroup.add(member);
ConsumerGroup consumerGroup = ConsumerGroup.fromClassicGroup(
+ logContext,
new SnapshotRegistry(logContext),
mock(GroupCoordinatorMetricsShard.class),
classicGroup,
@@ -1538,6 +1565,7 @@ public class ConsumerGroupTest {
);
ConsumerGroup expectedConsumerGroup = new ConsumerGroup(
+ new LogContext(),
new SnapshotRegistry(logContext),
groupId,
mock(GroupCoordinatorMetricsShard.class)