This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.1 by this push:
new 724870acf47 KAFKA-19862: Group coordinator loading may fail when there
is concurrent compaction (4.1) (#21119)
724870acf47 is described below
commit 724870acf470274dbe7f50af001d95b9eba1b41d
Author: Izzy Harker <[email protected]>
AuthorDate: Thu Dec 11 08:19:25 2025 -0600
KAFKA-19862: Group coordinator loading may fail when there is concurrent
compaction (4.1) (#21119)
Cherry-pick changes (https://github.com/apache/kafka/pull/20907) to 4.1
Conflicts:
->
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
- only added new tests and kept everything else the same, removed epochs
from assignments in new streams tests
-> Kept GroupCoordinatorMetricsShard argument in ConsumerGroup
constructor (multiple files)
Reviewers: Sean Quah [[email protected]](mailto:[email protected]),
Lucas Brutschy [[email protected]](mailto:[email protected]),
David Jacot [[email protected]](mailto:[email protected])
---
.../coordinator/group/GroupMetadataManager.java | 9 +-
.../group/modern/consumer/ConsumerGroup.java | 40 ++--
.../coordinator/group/streams/StreamsGroup.java | 41 ++--
.../group/GroupCoordinatorShardTest.java | 4 +-
.../group/GroupMetadataManagerTest.java | 232 +++++++++++++++++++++
.../group/classic/ClassicGroupTest.java | 2 +
.../group/modern/consumer/ConsumerGroupTest.java | 64 ++++--
.../group/streams/StreamsGroupTest.java | 40 ++--
8 files changed, 357 insertions(+), 75 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 5b20c4ba4ba..6516ef181d9 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -817,10 +817,10 @@ public class GroupMetadataManager {
}
if (group == null) {
- return new ConsumerGroup(snapshotRegistry, groupId, metrics);
+ return new ConsumerGroup(logContext, snapshotRegistry, groupId,
metrics);
} else if (createIfNotExists && maybeDeleteEmptyClassicGroup(group,
records)) {
log.info("[GroupId {}] Converted the empty classic group to a
consumer group.", groupId);
- return new ConsumerGroup(snapshotRegistry, groupId, metrics);
+ return new ConsumerGroup(logContext, snapshotRegistry, groupId,
metrics);
} else {
if (group.type() == CONSUMER) {
return (ConsumerGroup) group;
@@ -975,7 +975,7 @@ public class GroupMetadataManager {
}
if (group == null) {
- ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry,
groupId, metrics);
+ ConsumerGroup consumerGroup = new ConsumerGroup(logContext,
snapshotRegistry, groupId, metrics);
groups.put(groupId, consumerGroup);
return consumerGroup;
} else if (group.type() == CONSUMER) {
@@ -985,7 +985,7 @@ public class GroupMetadataManager {
// offsets if no group existed. Simple classic groups are not
backed by any records
// in the __consumer_offsets topic hence we can safely replace it
here. Without this,
// replaying consumer group records after offset commit records
would not work.
- ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry,
groupId, metrics);
+ ConsumerGroup consumerGroup = new ConsumerGroup(logContext,
snapshotRegistry, groupId, metrics);
groups.put(groupId, consumerGroup);
return consumerGroup;
} else {
@@ -1364,6 +1364,7 @@ public class GroupMetadataManager {
ConsumerGroup consumerGroup;
try {
consumerGroup = ConsumerGroup.fromClassicGroup(
+ logContext,
snapshotRegistry,
metrics,
classicGroup,
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
index 880cd49769c..04776108957 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
@@ -29,6 +29,7 @@ import
org.apache.kafka.common.message.ConsumerProtocolSubscription;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.requests.JoinGroupRequest;
+import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers;
import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
@@ -50,6 +51,8 @@ import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineInteger;
import org.apache.kafka.timeline.TimelineObject;
+import org.slf4j.Logger;
+
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
@@ -105,6 +108,11 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
}
}
+ /**
+ * The logger.
+ */
+ private final Logger log;
+
/**
* The group state.
*/
@@ -155,11 +163,13 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
private final TimelineObject<Boolean> hasSubscriptionMetadataRecord;
public ConsumerGroup(
+ LogContext logContext,
SnapshotRegistry snapshotRegistry,
String groupId,
GroupCoordinatorMetricsShard metrics
) {
super(snapshotRegistry, groupId);
+ this.log = logContext.logger(ConsumerGroup.class);
this.state = new TimelineObject<>(snapshotRegistry, EMPTY);
this.staticMembers = new TimelineHashMap<>(snapshotRegistry, 0);
this.serverAssignors = new TimelineHashMap<>(snapshotRegistry, 0);
@@ -1043,7 +1053,6 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
*
* @param assignment The assignment.
* @param expectedEpoch The expected epoch.
- * @throws IllegalStateException if the epoch does not match the expected
one.
* package-private for testing.
*/
void removePartitionEpochs(
@@ -1054,11 +1063,12 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
currentPartitionEpoch.compute(topicId, (__, partitionsOrNull) -> {
if (partitionsOrNull != null) {
assignedPartitions.forEach(partitionId -> {
- Integer prevValue =
partitionsOrNull.remove(partitionId);
- if (prevValue != expectedEpoch) {
- throw new IllegalStateException(
- String.format("Cannot remove the epoch %d from
%s-%s because the partition is " +
- "still owned at a different epoch %d",
expectedEpoch, topicId, partitionId, prevValue));
+ Integer prevValue = partitionsOrNull.get(partitionId);
+ if (prevValue != null && prevValue == expectedEpoch) {
+ partitionsOrNull.remove(partitionId);
+ } else {
+ log.debug("[GroupId {}] Cannot remove the epoch {}
from {}-{} because the partition is " +
+ "still owned at a different epoch {}",
groupId, expectedEpoch, topicId, partitionId, prevValue);
}
});
if (partitionsOrNull.isEmpty()) {
@@ -1067,9 +1077,9 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
return partitionsOrNull;
}
} else {
- throw new IllegalStateException(
- String.format("Cannot remove the epoch %d from %s
because it does not have any epoch",
- expectedEpoch, topicId));
+ log.debug("[GroupId {}] Cannot remove the epoch {} from {}
because it does not have any epoch",
+ groupId, expectedEpoch, topicId);
+ return partitionsOrNull;
}
});
});
@@ -1080,7 +1090,7 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
*
* @param assignment The assignment.
* @param epoch The new epoch.
- * @throws IllegalStateException if the partition already has an epoch
assigned.
+ * @throws IllegalStateException if updating a partition with a smaller or
equal epoch.
* package-private for testing.
*/
void addPartitionEpochs(
@@ -1093,8 +1103,10 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
partitionsOrNull = new TimelineHashMap<>(snapshotRegistry,
assignedPartitions.size());
}
for (Integer partitionId : assignedPartitions) {
- Integer prevValue = partitionsOrNull.put(partitionId,
epoch);
- if (prevValue != null) {
+ Integer prevValue = partitionsOrNull.get(partitionId);
+ if (prevValue == null || prevValue < epoch) {
+ partitionsOrNull.put(partitionId, epoch);
+ } else {
throw new IllegalStateException(
String.format("Cannot set the epoch of %s-%s to %d
because the partition is " +
"still owned at epoch %d", topicId,
partitionId, epoch, prevValue));
@@ -1130,6 +1142,7 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
/**
* Create a new consumer group according to the given classic group.
*
+ * @param logContext The log context.
* @param snapshotRegistry The SnapshotRegistry.
* @param metrics The GroupCoordinatorMetricsShard.
* @param classicGroup The converted classic group.
@@ -1141,6 +1154,7 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
* @throws UnsupportedVersionException if userData from a custom assignor
would be lost.
*/
public static ConsumerGroup fromClassicGroup(
+ LogContext logContext,
SnapshotRegistry snapshotRegistry,
GroupCoordinatorMetricsShard metrics,
ClassicGroup classicGroup,
@@ -1148,7 +1162,7 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
MetadataImage metadataImage
) {
String groupId = classicGroup.groupId();
- ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry,
groupId, metrics);
+ ConsumerGroup consumerGroup = new ConsumerGroup(logContext,
snapshotRegistry, groupId, metrics);
consumerGroup.setGroupEpoch(classicGroup.generationId());
consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId());
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
index afc252a7fee..ee49c7643d0 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
@@ -915,7 +915,7 @@ public class StreamsGroup implements Group {
*
* @param assignment The assignment.
* @param expectedProcessId The expected process ID.
- * @throws IllegalStateException if the process ID does not match the
expected one. package-private for testing.
+ * package-private for testing.
*/
private void removeTaskProcessIds(
Map<String, Set<Integer>> assignment,
@@ -926,11 +926,12 @@ public class StreamsGroup implements Group {
currentTasksProcessId.compute(subtopologyId, (__,
partitionsOrNull) -> {
if (partitionsOrNull != null) {
assignedPartitions.forEach(partitionId -> {
- String prevValue =
partitionsOrNull.remove(partitionId);
- if (!Objects.equals(prevValue, expectedProcessId)) {
- throw new IllegalStateException(
- String.format("Cannot remove the process ID %s
from task %s_%s because the partition is " +
- "still owned at a different process ID
%s", expectedProcessId, subtopologyId, partitionId, prevValue));
+ String prevValue = partitionsOrNull.get(partitionId);
+ if (Objects.equals(prevValue, expectedProcessId)) {
+ partitionsOrNull.remove(partitionId);
+ } else {
+ log.debug("[GroupId {}] Cannot remove the process
ID {} from task {}_{} because the partition is " +
+ "still owned at a different process ID
{}", groupId, expectedProcessId, subtopologyId, partitionId, prevValue);
}
});
if (partitionsOrNull.isEmpty()) {
@@ -939,9 +940,9 @@ public class StreamsGroup implements Group {
return partitionsOrNull;
}
} else {
- throw new IllegalStateException(
- String.format("Cannot remove the process ID %s from %s
because it does not have any processId",
- expectedProcessId, subtopologyId));
+ log.debug("[GroupId {}] Cannot remove the process ID {}
from {} because it does not have any processId",
+ groupId, expectedProcessId, subtopologyId);
+ return partitionsOrNull;
}
});
});
@@ -952,7 +953,7 @@ public class StreamsGroup implements Group {
*
* @param assignment The assignment.
* @param processIdToRemove The expected process ID.
- * @throws IllegalStateException if the process ID does not match the
expected one. package-private for testing.
+ * package-private for testing.
*/
private void removeTaskProcessIdsFromSet(
Map<String, Set<Integer>> assignment,
@@ -963,10 +964,9 @@ public class StreamsGroup implements Group {
currentTasksProcessId.compute(subtopologyId, (__,
partitionsOrNull) -> {
if (partitionsOrNull != null) {
assignedPartitions.forEach(partitionId -> {
- if
(!partitionsOrNull.get(partitionId).remove(processIdToRemove)) {
- throw new IllegalStateException(
- String.format("Cannot remove the process ID %s
from task %s_%s because the task is " +
- "not owned by this process ID",
processIdToRemove, subtopologyId, partitionId));
+ if (!partitionsOrNull.containsKey(partitionId) ||
!partitionsOrNull.get(partitionId).remove(processIdToRemove)) {
+ log.debug("[GroupId {}] Cannot remove the process
ID {} from task {}_{} because the task is " +
+ "not owned by this process ID", groupId,
processIdToRemove, subtopologyId, partitionId);
}
});
if (partitionsOrNull.isEmpty()) {
@@ -975,9 +975,9 @@ public class StreamsGroup implements Group {
return partitionsOrNull;
}
} else {
- throw new IllegalStateException(
- String.format("Cannot remove the process ID %s from %s
because it does not have any process ID",
- processIdToRemove, subtopologyId));
+ log.debug("[GroupId {}] Cannot remove the process ID {}
from {} because it does not have any process ID",
+ groupId, processIdToRemove, subtopologyId);
+ return partitionsOrNull;
}
});
});
@@ -988,7 +988,7 @@ public class StreamsGroup implements Group {
*
* @param tasks The assigned tasks.
* @param processId The process ID.
- * @throws IllegalStateException if the partition already has an epoch
assigned. package-private for testing.
+ * package-private for testing.
*/
void addTaskProcessId(
TasksTuple tasks,
@@ -1014,9 +1014,8 @@ public class StreamsGroup implements Group {
for (Integer partitionId : assignedTaskPartitions) {
String prevValue = partitionsOrNull.put(partitionId,
processId);
if (prevValue != null) {
- throw new IllegalStateException(
- String.format("Cannot set the process ID of %s-%s
to %s because the partition is " +
- "still owned by process ID %s", subtopologyId,
partitionId, processId, prevValue));
+ log.debug("[GroupId {}] Setting the process ID of
{}-{} to {} even though the partition is " +
+ "still owned by process ID {}", groupId,
subtopologyId, partitionId, processId, prevValue);
}
}
return partitionsOrNull;
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
index d58c6b6b1ac..605fef803b1 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
@@ -1386,8 +1386,8 @@ public class GroupCoordinatorShardTest {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new
LogContext());
GroupCoordinatorMetricsShard metricsShard =
mock(GroupCoordinatorMetricsShard.class);
- ConsumerGroup group1 = new ConsumerGroup(snapshotRegistry, "group-id",
metricsShard);
- ConsumerGroup group2 = new ConsumerGroup(snapshotRegistry,
"other-group-id", metricsShard);
+ ConsumerGroup group1 = new ConsumerGroup(new LogContext(),
snapshotRegistry, "group-id", metricsShard);
+ ConsumerGroup group2 = new ConsumerGroup(new LogContext(),
snapshotRegistry, "other-group-id", metricsShard);
when(groupMetadataManager.groupIds()).thenReturn(Set.of("group-id",
"other-group-id"));
when(groupMetadataManager.group("group-id")).thenReturn(group1);
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 3c876284515..e9069f12f8c 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -23465,6 +23465,238 @@ public class GroupMetadataManagerTest {
return members;
}
+ @Test
+ public void testReplayConsumerGroupCurrentMemberAssignmentWithCompaction()
{
+ String groupId = "fooup";
+ String memberIdA = "memberIdA";
+ String memberIdB = "memberIdB";
+ Uuid topicId = Uuid.randomUuid();
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder().build();
+
+ // This test enacts the following scenario:
+ // 1. Member A is assigned partition 0.
+ // 2. Member A is unassigned partition 0 [record removed by
compaction].
+ // 3. Member B is assigned partition 0.
+ // 4. Member A is assigned partition 1.
+ // If record 2 is processed, there are no issues, however with
compaction it is possible that
+ // unassignment records are removed. We would like to not fail in
these cases.
+ // Therefore we will allow assignments to owned partitions as long as
the epoch is larger.
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberIdA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+ .build()));
+
+ // Partition 0's owner is replaced by member B at epoch 12.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberIdB)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(12)
+ .setPreviousMemberEpoch(11)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+ .build()));
+
+ // Partition 0 must remain with member B at epoch 12 even though
member A has just been unassigned partition 0.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberIdA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(13)
+ .setPreviousMemberEpoch(12)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+ .build()));
+
+ // Verify partition epochs.
+ ConsumerGroup group =
context.groupMetadataManager.consumerGroup(groupId);
+ assertEquals(12, group.currentPartitionEpoch(topicId, 0));
+ assertEquals(13, group.currentPartitionEpoch(topicId, 1));
+ }
+
+ @Test
+ public void
testReplayConsumerGroupCurrentMemberAssignmentUnownedTopicWithCompaction() {
+ String groupId = "fooup";
+ String memberIdA = "memberIdA";
+ String memberIdB = "memberIdB";
+ Uuid fooTopicId = Uuid.randomUuid();
+ Uuid barTopicId = Uuid.randomUuid();
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder().build();
+
+ // This test enacts the following scenario:
+ // 1. Member A is assigned partition foo-0.
+ // 2. Member A is unassigned partition foo-0 [record removed by
compaction].
+ // 3. Member B is assigned partition foo-0.
+ // 4. Member B is unassigned partition foo-0.
+ // 5. Member A is assigned partition bar-0.
+ // This is a legitimate set of assignments but with compaction the
unassignment record can be skipped.
+ // This can lead to conflicts from updating an owned partition in step
3 and attempting
+ // to remove nonexistent ownership in step 5. We want to ensure
removing ownership from a
+ // completely unowned partition in step 5 is allowed.
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberIdA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId,
0)))
+ .build()));
+
+ // foo-0's owner is replaced by member B at epoch 12.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberIdB)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(12)
+ .setPreviousMemberEpoch(11)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId,
0)))
+ .build()));
+
+ // foo becomes unowned.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberIdB)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(13)
+ .setPreviousMemberEpoch(12)
+ .build()));
+
+ // Member A is unassigned foo-0.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberIdA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(14)
+ .setPreviousMemberEpoch(13)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(barTopicId,
0)))
+ .build()));
+
+ // Verify foo-0 is unowned and bar-0 is owned by member A at epoch 14.
+ ConsumerGroup group =
context.groupMetadataManager.consumerGroup(groupId);
+ assertEquals(-1, group.currentPartitionEpoch(fooTopicId, 0));
+ assertEquals(14, group.currentPartitionEpoch(barTopicId, 0));
+ }
+
+ @Test
+ public void testReplayStreamsGroupCurrentMemberAssignmentWithCompaction() {
+ String groupId = "fooup";
+ String memberIdA = "memberIdA";
+ String memberIdB = "memberIdB";
+ String processIdA = "processIdA";
+ String processIdB = "processIdB";
+ String subtopologyId = "subtopology";
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder().build();
+ // Initialize members with process Ids.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
+ streamsGroupMemberBuilderWithDefaults(memberIdA)
+ .setProcessId(processIdA)
+ .build()));
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
+ streamsGroupMemberBuilderWithDefaults(memberIdB)
+ .setProcessId(processIdB)
+ .build()));
+
+ // This test enacts the following scenario:
+ // 1. Member A is assigned task 0.
+ // 2. Member A is unassigned task 0 [record removed by compaction].
+ // 3. Member B is assigned task 0.
+ // 4. Member A is assigned task 1.
+ // If record 2 is processed, there are no issues, however with
compaction it is possible that
+ // unassignment records are removed. We would like to not fail in
these cases.
+ // Therefore we will allow assignments to owned tasks as long as the
epoch is larger.
+
+ // Assign task 0 to member A.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
streamsGroupMemberBuilderWithDefaults(memberIdA)
+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasks(subtopologyId, 0)))
+ .build()));
+
+ // Task 0's owner is replaced by member B at epoch 12.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
streamsGroupMemberBuilderWithDefaults(memberIdB)
+ .setMemberEpoch(12)
+ .setPreviousMemberEpoch(11)
+
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasks(subtopologyId, 0)))
+ .build()));
+
+ // Task 0 must remain with member B at epoch 12 even though member A
has just been unassigned task 0.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
streamsGroupMemberBuilderWithDefaults(memberIdA)
+ .setMemberEpoch(13)
+ .setPreviousMemberEpoch(12)
+
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasks(subtopologyId, 1)))
+ .build()));
+
+ // Verify task 1 is assigned to member A and task 0 to member B.
+ StreamsGroup group =
context.groupMetadataManager.streamsGroup(groupId);
+ assertEquals(processIdA,
group.currentActiveTaskProcessId(subtopologyId, 1));
+ assertEquals(processIdB,
group.currentActiveTaskProcessId(subtopologyId, 0));
+ }
+
+ @Test
+ public void
testReplayStreamsGroupCurrentMemberAssignmentUnownedTopologyWithCompaction() {
+ String groupId = "fooup";
+ String memberIdA = "memberIdA";
+ String memberIdB = "memberIdB";
+ String processIdA = "processIdA";
+ String processIdB = "processIdB";
+ String subtopologyFoo = "subtopologyFoo";
+ String subtopologyBar = "subtopologyBar";
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder().build();
+ // Initialize members with process Ids.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
+ streamsGroupMemberBuilderWithDefaults(memberIdA)
+ .setProcessId(processIdA)
+ .build()));
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
+ streamsGroupMemberBuilderWithDefaults(memberIdB)
+ .setProcessId(processIdB)
+ .build()));
+
+ // This test enacts the following scenario:
+ // 1. Member A is assigned task foo-0.
+ // 2. Member A is unassigned task foo-0 [record removed by compaction].
+ // 3. Member B is assigned task foo-0.
+ // 4. Member B is unassigned task foo-0.
+ // 5. Member A is assigned task bar-0.
+ // This is a legitimate set of assignments but with compaction the
unassignment record can be skipped.
+ // This can lead to conflicts from updating an owned subtopology in
step 3 and attempting to remove
+ // nonexistent ownership in step 5. We want to ensure removing
ownership from a
+ // completely unowned subtopology in step 5 is allowed.
+
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
streamsGroupMemberBuilderWithDefaults(memberIdA)
+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasks(subtopologyFoo, 0)))
+ .build()));
+
+ // foo-0's owner is replaced by member B at epoch 12.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
streamsGroupMemberBuilderWithDefaults(memberIdB)
+ .setMemberEpoch(12)
+ .setPreviousMemberEpoch(11)
+
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasks(subtopologyFoo, 0)))
+ .build()));
+
+ // foo becomes unowned
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
streamsGroupMemberBuilderWithDefaults(memberIdB)
+ .setMemberEpoch(13)
+ .setPreviousMemberEpoch(12)
+ .build()));
+
+ // Member A is unassigned foo-0.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
streamsGroupMemberBuilderWithDefaults(memberIdA)
+ .setMemberEpoch(14)
+ .setPreviousMemberEpoch(13)
+
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasks(subtopologyBar, 0)))
+ .build()));
+
+ // Verify foo-0 is unassigned and bar-0 is assigned to member A.
+ StreamsGroup group =
context.groupMetadataManager.streamsGroup(groupId);
+ assertEquals(null, group.currentActiveTaskProcessId(subtopologyFoo,
0));
+ assertEquals(processIdA,
group.currentActiveTaskProcessId(subtopologyBar, 0));
+ }
+
private static List<String> verifyClassicGroupJoinResponses(
List<GroupMetadataManagerTestContext.JoinResult> joinResults,
int expectedSuccessCount,
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
index dfcb415fd3e..2464324733b 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
@@ -1381,6 +1381,7 @@ public class ClassicGroupTest {
.build();
ConsumerGroup consumerGroup = new ConsumerGroup(
+ logContext,
new SnapshotRegistry(logContext),
groupId,
mock(GroupCoordinatorMetricsShard.class)
@@ -1534,6 +1535,7 @@ public class ClassicGroupTest {
.build();
ConsumerGroup consumerGroup = new ConsumerGroup(
+ logContext,
new SnapshotRegistry(logContext),
groupId,
mock(GroupCoordinatorMetricsShard.class)
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
index f6afa3ee08a..f8bf2fe15fa 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
@@ -89,6 +89,7 @@ public class ConsumerGroupTest {
private ConsumerGroup createConsumerGroup(String groupId) {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new
LogContext());
return new ConsumerGroup(
+ new LogContext(),
snapshotRegistry,
groupId,
mock(GroupCoordinatorMetricsShard.class)
@@ -284,14 +285,26 @@ public class ConsumerGroupTest {
consumerGroup.updateMember(m1);
ConsumerGroupMember m2 = new ConsumerGroupMember.Builder("m2")
+ .setMemberEpoch(11)
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 1)))
+ .build();
+
+ // m2 can acquire foo-1 because the epoch is larger than m1's epoch.
+ // This should not throw IllegalStateException.
+ consumerGroup.updateMember(m2);
+
+ ConsumerGroupMember m3 = new ConsumerGroupMember.Builder("m3")
.setMemberEpoch(10)
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 1)))
.build();
- // m2 should not be able to acquire foo-1 because the partition is
- // still owned by another member.
- assertThrows(IllegalStateException.class, () ->
consumerGroup.updateMember(m2));
+ // m3 should not be able to acquire foo-1 because the epoch is smaller
+ // than the current partition epoch (11).
+ assertThrows(IllegalStateException.class, () -> {
+ consumerGroup.updateMember(m3);
+ });
}
@Test
@@ -299,13 +312,13 @@ public class ConsumerGroupTest {
Uuid fooTopicId = Uuid.randomUuid();
ConsumerGroup consumerGroup = createConsumerGroup("foo");
- // Removing should fail because there is no epoch set.
- assertThrows(IllegalStateException.class, () ->
consumerGroup.removePartitionEpochs(
+ // Removing should be a no-op when there is no epoch set.
+ consumerGroup.removePartitionEpochs(
mkAssignment(
mkTopicAssignment(fooTopicId, 1)
),
10
- ));
+ );
ConsumerGroupMember m1 = new ConsumerGroupMember.Builder("m1")
.setMemberEpoch(10)
@@ -315,13 +328,15 @@ public class ConsumerGroupTest {
consumerGroup.updateMember(m1);
- // Removing should fail because the expected epoch is incorrect.
- assertThrows(IllegalStateException.class, () ->
consumerGroup.removePartitionEpochs(
+ // Removing with incorrect epoch should do nothing.
+ // A debug message is logged, no exception is thrown.
+ consumerGroup.removePartitionEpochs(
mkAssignment(
mkTopicAssignment(fooTopicId, 1)
),
11
- ));
+ );
+ assertEquals(10, consumerGroup.currentPartitionEpoch(fooTopicId, 1));
}
@Test
@@ -336,14 +351,24 @@ public class ConsumerGroupTest {
10
);
- // Changing the epoch should fail because the owner of the partition
- // should remove it first.
- assertThrows(IllegalStateException.class, () ->
consumerGroup.addPartitionEpochs(
+ // Updating to a larger epoch should succeed.
+ consumerGroup.addPartitionEpochs(
mkAssignment(
mkTopicAssignment(fooTopicId, 1)
),
11
- ));
+ );
+ assertEquals(11, consumerGroup.currentPartitionEpoch(fooTopicId, 1));
+
+ // Updating to a smaller epoch should fail.
+ assertThrows(IllegalStateException.class, () -> {
+ consumerGroup.addPartitionEpochs(
+ mkAssignment(
+ mkTopicAssignment(fooTopicId, 1)
+ ),
+ 10
+ );
+ });
}
@Test
@@ -700,7 +725,7 @@ public class ConsumerGroupTest {
public void testUpdateInvertedAssignment() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new
LogContext());
GroupCoordinatorMetricsShard metricsShard =
mock(GroupCoordinatorMetricsShard.class);
- ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry,
"test-group", metricsShard);
+ ConsumerGroup consumerGroup = new ConsumerGroup(new LogContext(),
snapshotRegistry, "test-group", metricsShard);
Uuid topicId = Uuid.randomUuid();
String memberId1 = "member1";
String memberId2 = "member2";
@@ -920,7 +945,7 @@ public class ConsumerGroupTest {
Map.of(),
new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)
);
- ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo",
metricsShard);
+ ConsumerGroup group = new ConsumerGroup(new LogContext(),
snapshotRegistry, "group-foo", metricsShard);
snapshotRegistry.idempotentCreateSnapshot(0);
assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY.toString(),
group.stateAsString(0));
group.updateMember(new ConsumerGroupMember.Builder("member1")
@@ -935,6 +960,7 @@ public class ConsumerGroupTest {
public void testValidateOffsetFetch() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new
LogContext());
ConsumerGroup group = new ConsumerGroup(
+ new LogContext(),
snapshotRegistry,
"group-foo",
mock(GroupCoordinatorMetricsShard.class)
@@ -996,7 +1022,7 @@ public class ConsumerGroupTest {
long commitTimestamp = 20000L;
long offsetsRetentionMs = 10000L;
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L,
OptionalInt.empty(), "", commitTimestamp, OptionalLong.empty(), Uuid.ZERO_UUID);
- ConsumerGroup group = new ConsumerGroup(new SnapshotRegistry(new
LogContext()), "group-id", mock(GroupCoordinatorMetricsShard.class));
+ ConsumerGroup group = new ConsumerGroup(new LogContext(), new
SnapshotRegistry(new LogContext()), "group-id",
mock(GroupCoordinatorMetricsShard.class));
Optional<OffsetExpirationCondition> offsetExpirationCondition =
group.offsetExpirationCondition();
assertTrue(offsetExpirationCondition.isPresent());
@@ -1033,7 +1059,7 @@ public class ConsumerGroupTest {
@Test
public void testAsDescribedGroup() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new
LogContext());
- ConsumerGroup group = new ConsumerGroup(snapshotRegistry,
"group-id-1", mock(GroupCoordinatorMetricsShard.class));
+ ConsumerGroup group = new ConsumerGroup(new LogContext(),
snapshotRegistry, "group-id-1", mock(GroupCoordinatorMetricsShard.class));
snapshotRegistry.idempotentCreateSnapshot(0);
assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY.toString(),
group.stateAsString(0));
@@ -1075,7 +1101,7 @@ public class ConsumerGroupTest {
Map.of(),
new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)
);
- ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo",
metricsShard);
+ ConsumerGroup group = new ConsumerGroup(new LogContext(),
snapshotRegistry, "group-foo", metricsShard);
snapshotRegistry.idempotentCreateSnapshot(0);
assertTrue(group.isInStates(Set.of("empty"), 0));
assertFalse(group.isInStates(Set.of("Empty"), 0));
@@ -1305,6 +1331,7 @@ public class ConsumerGroupTest {
classicGroup.add(member);
ConsumerGroup consumerGroup = ConsumerGroup.fromClassicGroup(
+ logContext,
new SnapshotRegistry(logContext),
mock(GroupCoordinatorMetricsShard.class),
classicGroup,
@@ -1313,6 +1340,7 @@ public class ConsumerGroupTest {
);
ConsumerGroup expectedConsumerGroup = new ConsumerGroup(
+ new LogContext(),
new SnapshotRegistry(logContext),
groupId,
mock(GroupCoordinatorMetricsShard.class)
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
index 8966c936356..22d42a3d97b 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
@@ -348,7 +348,7 @@ public class StreamsGroupTest {
StreamsGroup streamsGroup = createStreamsGroup("foo");
StreamsGroupMember m1 = new StreamsGroupMember.Builder("m1")
- .setProcessId("process")
+ .setProcessId("process1")
.setAssignedTasks(
new TasksTuple(
mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 1)),
@@ -361,7 +361,7 @@ public class StreamsGroupTest {
streamsGroup.updateMember(m1);
StreamsGroupMember m2 = new StreamsGroupMember.Builder("m2")
- .setProcessId("process")
+ .setProcessId("process2")
.setAssignedTasks(
new TasksTuple(
mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 1)),
@@ -371,9 +371,10 @@ public class StreamsGroupTest {
)
.build();
- // m2 should not be able to acquire foo-1 because the partition is
- // still owned by another member.
- assertThrows(IllegalStateException.class, () ->
streamsGroup.updateMember(m2));
+ // We allow m2 to acquire foo-1 despite the fact that m1 has ownership
because the processId is different.
+ streamsGroup.updateMember(m2);
+
+ assertEquals("process2",
streamsGroup.currentActiveTaskProcessId(fooSubtopologyId, 1));
}
@@ -383,11 +384,11 @@ public class StreamsGroupTest {
String fooSubtopologyId = "foo-sub";
StreamsGroup streamsGroup = createStreamsGroup("foo");
- // Removing should fail because there is no epoch set.
- assertThrows(IllegalStateException.class, () ->
streamsGroup.removeTaskProcessIds(
+ // Removing should be a no-op when there is no process id set.
+ streamsGroup.removeTaskProcessIds(
mkTasksTuple(taskRole, mkTasks(fooSubtopologyId, 1)),
"process"
- ));
+ );
StreamsGroupMember m1 = new StreamsGroupMember.Builder("m1")
.setProcessId("process")
@@ -396,11 +397,15 @@ public class StreamsGroupTest {
streamsGroup.updateMember(m1);
- // Removing should fail because the expected epoch is incorrect.
- assertThrows(IllegalStateException.class, () ->
streamsGroup.removeTaskProcessIds(
- mkTasksTuple(taskRole, mkTasks(fooSubtopologyId, 1)),
+ // Removing with incorrect process id should do nothing.
+ // A debug message is logged, no exception is thrown.
+ streamsGroup.removeTaskProcessIds(
+ TaskAssignmentTestUtil.mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 1)),
"process1"
- ));
+ );
+ if (taskRole == TaskRole.ACTIVE) {
+ assertEquals("process",
streamsGroup.currentActiveTaskProcessId(fooSubtopologyId, 1));
+ }
}
@Test
@@ -417,16 +422,17 @@ public class StreamsGroupTest {
"process"
);
- // Changing the epoch should fail because the owner of the partition
- // should remove it first.
- assertThrows(IllegalStateException.class, () ->
streamsGroup.addTaskProcessId(
+ // We allow replacing with a different process id.
+ streamsGroup.addTaskProcessId(
new TasksTuple(
mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 1)),
mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 2)),
mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 3))
),
- "process"
- ));
+ "process2"
+ );
+
+ assertEquals("process2",
streamsGroup.currentActiveTaskProcessId(fooSubtopologyId, 1));
}
@Test