This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.2 by this push:
new 5240f8a3123 KAFKA-19862: Group coordinator loading may fail when there
is concurrent compaction (#20907)
5240f8a3123 is described below
commit 5240f8a31239cad48c0470de812ba1859eec54c6
Author: Izzy Harker <[email protected]>
AuthorDate: Wed Dec 10 01:51:31 2025 -0600
KAFKA-19862: Group coordinator loading may fail when there is concurrent
compaction (#20907)
When the group coordinator loads with concurrent compaction, the record
unassigning a partition/task can be missed which leads to an
IllegalStateException being thrown, however the records self-resolve by
the time loading is finished. This PR makes the change to log a warning
and proceed with loading instead of throwing an IllegalStateException.
Testing: Added unit tests for Consumer and Streams groups to ensure this
change does not impact loading success.
Reviewers: Sean Quah <[email protected]>, Lucas Brutschy
<[email protected]>, David Jacot <[email protected]>
---
.../coordinator/group/GroupMetadataManager.java | 9 +-
.../group/modern/consumer/ConsumerGroup.java | 40 ++--
.../coordinator/group/streams/StreamsGroup.java | 41 ++--
.../group/GroupCoordinatorShardTest.java | 4 +-
.../group/GroupMetadataManagerTest.java | 232 +++++++++++++++++++++
.../group/classic/ClassicGroupTest.java | 2 +
.../group/modern/consumer/ConsumerGroupTest.java | 64 ++++--
.../group/streams/StreamsGroupTest.java | 42 ++--
8 files changed, 358 insertions(+), 76 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 129d6794d69..6bf963c968e 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -815,10 +815,10 @@ public class GroupMetadataManager {
}
if (group == null) {
- return new ConsumerGroup(snapshotRegistry, groupId);
+ return new ConsumerGroup(logContext, snapshotRegistry, groupId);
} else if (createIfNotExists && maybeDeleteEmptyClassicGroup(group,
records)) {
log.info("[GroupId {}] Converted the empty classic group to a
consumer group.", groupId);
- return new ConsumerGroup(snapshotRegistry, groupId);
+ return new ConsumerGroup(logContext, snapshotRegistry, groupId);
} else {
if (group.type() == CONSUMER) {
return (ConsumerGroup) group;
@@ -982,7 +982,7 @@ public class GroupMetadataManager {
}
if (group == null) {
- ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry,
groupId);
+ ConsumerGroup consumerGroup = new ConsumerGroup(logContext,
snapshotRegistry, groupId);
groups.put(groupId, consumerGroup);
return consumerGroup;
} else if (group.type() == CONSUMER) {
@@ -992,7 +992,7 @@ public class GroupMetadataManager {
// offsets if no group existed. Simple classic groups are not
backed by any records
// in the __consumer_offsets topic hence we can safely replace it
here. Without this,
// replaying consumer group records after offset commit records
would not work.
- ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry,
groupId);
+ ConsumerGroup consumerGroup = new ConsumerGroup(logContext,
snapshotRegistry, groupId);
groups.put(groupId, consumerGroup);
return consumerGroup;
} else {
@@ -1371,6 +1371,7 @@ public class GroupMetadataManager {
ConsumerGroup consumerGroup;
try {
consumerGroup = ConsumerGroup.fromClassicGroup(
+ logContext,
snapshotRegistry,
classicGroup,
topicHashCache,
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
index a32efe9e027..8aa858ced02 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
@@ -29,6 +29,7 @@ import
org.apache.kafka.common.message.ConsumerProtocolSubscription;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.requests.JoinGroupRequest;
+import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.group.CommitPartitionValidator;
@@ -49,6 +50,8 @@ import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineInteger;
import org.apache.kafka.timeline.TimelineObject;
+import org.slf4j.Logger;
+
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
@@ -104,6 +107,11 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
}
}
+ /**
+ * The logger.
+ */
+ private final Logger log;
+
/**
* The group state.
*/
@@ -149,10 +157,12 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
private final TimelineObject<Boolean> hasSubscriptionMetadataRecord;
public ConsumerGroup(
+ LogContext logContext,
SnapshotRegistry snapshotRegistry,
String groupId
) {
super(snapshotRegistry, groupId);
+ this.log = logContext.logger(ConsumerGroup.class);
this.state = new TimelineObject<>(snapshotRegistry, EMPTY);
this.staticMembers = new TimelineHashMap<>(snapshotRegistry, 0);
this.serverAssignors = new TimelineHashMap<>(snapshotRegistry, 0);
@@ -1037,7 +1047,6 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
*
* @param assignment The assignment.
* @param expectedEpoch The expected epoch.
- * @throws IllegalStateException if the epoch does not match the expected
one.
* package-private for testing.
*/
void removePartitionEpochs(
@@ -1048,11 +1057,12 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
currentPartitionEpoch.compute(topicId, (__, partitionsOrNull) -> {
if (partitionsOrNull != null) {
assignedPartitions.forEach(partitionId -> {
- Integer prevValue =
partitionsOrNull.remove(partitionId);
- if (prevValue != expectedEpoch) {
- throw new IllegalStateException(
- String.format("Cannot remove the epoch %d from
%s-%s because the partition is " +
- "still owned at a different epoch %d",
expectedEpoch, topicId, partitionId, prevValue));
+ Integer prevValue = partitionsOrNull.get(partitionId);
+ if (prevValue != null && prevValue == expectedEpoch) {
+ partitionsOrNull.remove(partitionId);
+ } else {
+ log.debug("[GroupId {}] Cannot remove the epoch {}
from {}-{} because the partition is " +
+ "still owned at a different epoch {}",
groupId, expectedEpoch, topicId, partitionId, prevValue);
}
});
if (partitionsOrNull.isEmpty()) {
@@ -1061,9 +1071,9 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
return partitionsOrNull;
}
} else {
- throw new IllegalStateException(
- String.format("Cannot remove the epoch %d from %s
because it does not have any epoch",
- expectedEpoch, topicId));
+ log.debug("[GroupId {}] Cannot remove the epoch {} from {}
because it does not have any epoch",
+ groupId, expectedEpoch, topicId);
+ return partitionsOrNull;
}
});
});
@@ -1074,7 +1084,7 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
*
* @param assignment The assignment.
* @param epoch The new epoch.
- * @throws IllegalStateException if the partition already has an epoch
assigned.
+ * @throws IllegalStateException if updating a partition with a smaller or
equal epoch.
* package-private for testing.
*/
void addPartitionEpochs(
@@ -1087,8 +1097,10 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
partitionsOrNull = new TimelineHashMap<>(snapshotRegistry,
assignedPartitions.size());
}
for (Integer partitionId : assignedPartitions) {
- Integer prevValue = partitionsOrNull.put(partitionId,
epoch);
- if (prevValue != null) {
+ Integer prevValue = partitionsOrNull.get(partitionId);
+ if (prevValue == null || prevValue < epoch) {
+ partitionsOrNull.put(partitionId, epoch);
+ } else {
throw new IllegalStateException(
String.format("Cannot set the epoch of %s-%s to %d
because the partition is " +
"still owned at epoch %d", topicId,
partitionId, epoch, prevValue));
@@ -1124,6 +1136,7 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
/**
* Create a new consumer group according to the given classic group.
*
+ * @param logContext The log context.
* @param snapshotRegistry The SnapshotRegistry.
* @param classicGroup The converted classic group.
* @param topicHashCache The cache for topic hashes.
@@ -1134,13 +1147,14 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
* @throws UnsupportedVersionException if userData from a custom assignor
would be lost.
*/
public static ConsumerGroup fromClassicGroup(
+ LogContext logContext,
SnapshotRegistry snapshotRegistry,
ClassicGroup classicGroup,
Map<String, Long> topicHashCache,
CoordinatorMetadataImage metadataImage
) {
String groupId = classicGroup.groupId();
- ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry,
groupId);
+ ConsumerGroup consumerGroup = new ConsumerGroup(logContext,
snapshotRegistry, groupId);
consumerGroup.setGroupEpoch(classicGroup.generationId());
consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId());
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
index 82dd871b3cd..621a8f9d1bd 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
@@ -947,7 +947,7 @@ public class StreamsGroup implements Group {
*
* @param assignment The assignment.
* @param expectedProcessId The expected process ID.
- * @throws IllegalStateException if the process ID does not match the
expected one. package-private for testing.
+ * package-private for testing.
*/
private void removeTaskProcessIds(
Map<String, Map<Integer, Integer>> assignment,
@@ -958,11 +958,12 @@ public class StreamsGroup implements Group {
currentTasksProcessId.compute(subtopologyId, (__,
partitionsOrNull) -> {
if (partitionsOrNull != null) {
assignedPartitions.keySet().forEach(partitionId -> {
- String prevValue =
partitionsOrNull.remove(partitionId);
- if (!Objects.equals(prevValue, expectedProcessId)) {
- throw new IllegalStateException(
- String.format("Cannot remove the process ID %s
from task %s_%s because the partition is " +
- "still owned at a different process ID
%s", expectedProcessId, subtopologyId, partitionId, prevValue));
+ String prevValue = partitionsOrNull.get(partitionId);
+ if (Objects.equals(prevValue, expectedProcessId)) {
+ partitionsOrNull.remove(partitionId);
+ } else {
+ log.debug("[GroupId {}] Cannot remove the process
ID {} from task {}_{} because the partition is " +
+ "still owned at a different process ID
{}", groupId, expectedProcessId, subtopologyId, partitionId, prevValue);
}
});
if (partitionsOrNull.isEmpty()) {
@@ -971,9 +972,9 @@ public class StreamsGroup implements Group {
return partitionsOrNull;
}
} else {
- throw new IllegalStateException(
- String.format("Cannot remove the process ID %s from %s
because it does not have any processId",
- expectedProcessId, subtopologyId));
+ log.debug("[GroupId {}] Cannot remove the process ID {}
from {} because it does not have any processId",
+ groupId, expectedProcessId, subtopologyId);
+ return partitionsOrNull;
}
});
});
@@ -984,7 +985,7 @@ public class StreamsGroup implements Group {
*
* @param assignment The assignment.
* @param processIdToRemove The expected process ID.
- * @throws IllegalStateException if the process ID does not match the
expected one. package-private for testing.
+ * package-private for testing.
*/
private void removeTaskProcessIdsFromSet(
Map<String, Set<Integer>> assignment,
@@ -995,10 +996,9 @@ public class StreamsGroup implements Group {
currentTasksProcessId.compute(subtopologyId, (__,
partitionsOrNull) -> {
if (partitionsOrNull != null) {
assignedPartitions.forEach(partitionId -> {
- if
(!partitionsOrNull.get(partitionId).remove(processIdToRemove)) {
- throw new IllegalStateException(
- String.format("Cannot remove the process ID %s
from task %s_%s because the task is " +
- "not owned by this process ID",
processIdToRemove, subtopologyId, partitionId));
+ if (!partitionsOrNull.containsKey(partitionId) ||
!partitionsOrNull.get(partitionId).remove(processIdToRemove)) {
+ log.debug("[GroupId {}] Cannot remove the process
ID {} from task {}_{} because the task is " +
+ "not owned by this process ID", groupId,
processIdToRemove, subtopologyId, partitionId);
}
});
if (partitionsOrNull.isEmpty()) {
@@ -1007,9 +1007,9 @@ public class StreamsGroup implements Group {
return partitionsOrNull;
}
} else {
- throw new IllegalStateException(
- String.format("Cannot remove the process ID %s from %s
because it does not have any process ID",
- processIdToRemove, subtopologyId));
+ log.debug("[GroupId {}] Cannot remove the process ID {}
from {} because it does not have any process ID",
+ groupId, processIdToRemove, subtopologyId);
+ return partitionsOrNull;
}
});
});
@@ -1020,7 +1020,7 @@ public class StreamsGroup implements Group {
*
* @param tasks The assigned tasks.
* @param processId The process ID.
- * @throws IllegalStateException if the partition already has an epoch
assigned. package-private for testing.
+ * package-private for testing.
*/
void addTaskProcessId(
TasksTupleWithEpochs tasks,
@@ -1046,9 +1046,8 @@ public class StreamsGroup implements Group {
for (Integer partitionId :
assignedTaskPartitionsWithEpochs.keySet()) {
String prevValue = partitionsOrNull.put(partitionId,
processId);
if (prevValue != null) {
- throw new IllegalStateException(
- String.format("Cannot set the process ID of %s-%s
to %s because the partition is " +
- "still owned by process ID %s", subtopologyId,
partitionId, processId, prevValue));
+ log.debug("[GroupId {}] Setting the process ID of
{}-{} to {} even though the partition is " +
+ "still owned by process ID {}", groupId,
subtopologyId, partitionId, processId, prevValue);
}
}
return partitionsOrNull;
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
index 4dc0c8ff78c..f533cffc742 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
@@ -1384,8 +1384,8 @@ public class GroupCoordinatorShardTest {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new
LogContext());
- ConsumerGroup group1 = new ConsumerGroup(snapshotRegistry, "group-id");
- ConsumerGroup group2 = new ConsumerGroup(snapshotRegistry,
"other-group-id");
+ ConsumerGroup group1 = new ConsumerGroup(new LogContext(),
snapshotRegistry, "group-id");
+ ConsumerGroup group2 = new ConsumerGroup(new LogContext(),
snapshotRegistry, "other-group-id");
when(groupMetadataManager.groupIds()).thenReturn(Set.of("group-id",
"other-group-id"));
when(groupMetadataManager.group("group-id")).thenReturn(group1);
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 3f1ca4770b0..b8beedca616 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -24468,6 +24468,238 @@ public class GroupMetadataManagerTest {
assertDoesNotThrow(() -> context.replay(record));
}
+ @Test
+ public void testReplayConsumerGroupCurrentMemberAssignmentWithCompaction()
{
+ String groupId = "fooup";
+ String memberIdA = "memberIdA";
+ String memberIdB = "memberIdB";
+ Uuid topicId = Uuid.randomUuid();
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder().build();
+
+ // This test enacts the following scenario:
+ // 1. Member A is assigned partition 0.
+ // 2. Member A is unassigned partition 0 [record removed by
compaction].
+ // 3. Member B is assigned partition 0.
+ // 4. Member A is assigned partition 1.
+ // If record 2 is processed, there are no issues, however with
compaction it is possible that
+ // unassignment records are removed. We would like to not fail in
these cases.
+ // Therefore we will allow assignments to owned partitions as long as
the epoch is larger.
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberIdA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+ .build()));
+
+ // Partition 0's owner is replaced by member B at epoch 12.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberIdB)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(12)
+ .setPreviousMemberEpoch(11)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+ .build()));
+
+ // Partition 0 must remain with member B at epoch 12 even though
member A has just been unassigned partition 0.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberIdA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(13)
+ .setPreviousMemberEpoch(12)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+ .build()));
+
+ // Verify partition epochs.
+ ConsumerGroup group =
context.groupMetadataManager.consumerGroup(groupId);
+ assertEquals(12, group.currentPartitionEpoch(topicId, 0));
+ assertEquals(13, group.currentPartitionEpoch(topicId, 1));
+ }
+
+ @Test
+ public void
testReplayConsumerGroupCurrentMemberAssignmentUnownedTopicWithCompaction() {
+ String groupId = "fooup";
+ String memberIdA = "memberIdA";
+ String memberIdB = "memberIdB";
+ Uuid fooTopicId = Uuid.randomUuid();
+ Uuid barTopicId = Uuid.randomUuid();
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder().build();
+
+ // This test enacts the following scenario:
+ // 1. Member A is assigned partition foo-0.
+ // 2. Member A is unassigned partition foo-0 [record removed by
compaction].
+ // 3. Member B is assigned partition foo-0.
+ // 4. Member B is unassigned partition foo-0.
+ // 5. Member A is assigned partition bar-0.
+ // This is a legitimate set of assignments but with compaction the
unassignment record can be skipped.
+ // This can lead to conflicts from updating an owned partition in step
3 and attempting
+ // to remove nonexistent ownership in step 5. We want to ensure
removing ownership from a
+ // completely unowned partition in step 5 is allowed.
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberIdA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId,
0)))
+ .build()));
+
+ // foo-0's owner is replaced by member B at epoch 12.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberIdB)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(12)
+ .setPreviousMemberEpoch(11)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId,
0)))
+ .build()));
+
+ // foo becomes unowned.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberIdB)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(13)
+ .setPreviousMemberEpoch(12)
+ .build()));
+
+ // Member A is unassigned foo-0.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberIdA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(14)
+ .setPreviousMemberEpoch(13)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(barTopicId,
0)))
+ .build()));
+
+ // Verify foo-0 is unowned and bar-0 is owned by member A at epoch 14.
+ ConsumerGroup group =
context.groupMetadataManager.consumerGroup(groupId);
+ assertEquals(-1, group.currentPartitionEpoch(fooTopicId, 0));
+ assertEquals(14, group.currentPartitionEpoch(barTopicId, 0));
+ }
+
+ @Test
+ public void testReplayStreamsGroupCurrentMemberAssignmentWithCompaction() {
+ String groupId = "fooup";
+ String memberIdA = "memberIdA";
+ String memberIdB = "memberIdB";
+ String processIdA = "processIdA";
+ String processIdB = "processIdB";
+ String subtopologyId = "subtopology";
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder().build();
+ // Initialize members with process Ids.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
+ streamsGroupMemberBuilderWithDefaults(memberIdA)
+ .setProcessId(processIdA)
+ .build()));
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
+ streamsGroupMemberBuilderWithDefaults(memberIdB)
+ .setProcessId(processIdB)
+ .build()));
+
+ // This test enacts the following scenario:
+ // 1. Member A is assigned task 0.
+ // 2. Member A is unassigned task 0 [record removed by compaction].
+ // 3. Member B is assigned task 0.
+ // 4. Member A is assigned task 1.
+ // If record 2 is processed, there are no issues, however with
compaction it is possible that
+ // unassignment records are removed. We would like to not fail in
these cases.
+ // Therefore we will allow assignments to owned tasks as long as the
epoch is larger.
+
+ // Assign task 0 to member A.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
streamsGroupMemberBuilderWithDefaults(memberIdA)
+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTupleWithEpochs(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasksWithEpochs(subtopologyId,
Map.of(0, 11))))
+ .build()));
+
+ // Task 0's owner is replaced by member B at epoch 12.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
streamsGroupMemberBuilderWithDefaults(memberIdB)
+ .setMemberEpoch(12)
+ .setPreviousMemberEpoch(11)
+
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTupleWithEpochs(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasksWithEpochs(subtopologyId,
Map.of(0, 12))))
+ .build()));
+
+ // Task 0 must remain with member B at epoch 12 even though member A
has just been unassigned task 0.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
streamsGroupMemberBuilderWithDefaults(memberIdA)
+ .setMemberEpoch(13)
+ .setPreviousMemberEpoch(12)
+
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTupleWithEpochs(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasksWithEpochs(subtopologyId,
Map.of(1, 13))))
+ .build()));
+
+ // Verify task 1 is assigned to member A and task 0 to member B.
+ StreamsGroup group =
context.groupMetadataManager.streamsGroup(groupId);
+ assertEquals(processIdA,
group.currentActiveTaskProcessId(subtopologyId, 1));
+ assertEquals(processIdB,
group.currentActiveTaskProcessId(subtopologyId, 0));
+ }
+
+ @Test
+ public void
testReplayStreamsGroupCurrentMemberAssignmentUnownedTopologyWithCompaction() {
+ String groupId = "fooup";
+ String memberIdA = "memberIdA";
+ String memberIdB = "memberIdB";
+ String processIdA = "processIdA";
+ String processIdB = "processIdB";
+ String subtopologyFoo = "subtopologyFoo";
+ String subtopologyBar = "subtopologyBar";
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder().build();
+ // Initialize members with process Ids.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
+ streamsGroupMemberBuilderWithDefaults(memberIdA)
+ .setProcessId(processIdA)
+ .build()));
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
+ streamsGroupMemberBuilderWithDefaults(memberIdB)
+ .setProcessId(processIdB)
+ .build()));
+
+ // This test enacts the following scenario:
+ // 1. Member A is assigned task foo-0.
+ // 2. Member A is unassigned task foo-0 [record removed by compaction].
+ // 3. Member B is assigned task foo-0.
+ // 4. Member B is unassigned task foo-0.
+ // 5. Member A is assigned task bar-0.
+ // This is a legitimate set of assignments but with compaction the
unassignment record can be skipped.
+ // This can lead to conflicts from updating an owned subtopology in
step 3 and attempting to remove
+ // nonexistent ownership in step 5. We want to ensure removing
ownership from a
+ // completely unowned subtopology in step 5 is allowed.
+
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
streamsGroupMemberBuilderWithDefaults(memberIdA)
+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTupleWithEpochs(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasksWithEpochs(subtopologyFoo,
Map.of(0, 11))))
+ .build()));
+
+ // foo-0's owner is replaced by member B at epoch 12.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
streamsGroupMemberBuilderWithDefaults(memberIdB)
+ .setMemberEpoch(12)
+ .setPreviousMemberEpoch(11)
+
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTupleWithEpochs(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasksWithEpochs(subtopologyFoo,
Map.of(0, 12))))
+ .build()));
+
+ // foo becomes unowned
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
streamsGroupMemberBuilderWithDefaults(memberIdB)
+ .setMemberEpoch(13)
+ .setPreviousMemberEpoch(12)
+ .build()));
+
+ // Member A is unassigned foo-0.
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
streamsGroupMemberBuilderWithDefaults(memberIdA)
+ .setMemberEpoch(14)
+ .setPreviousMemberEpoch(13)
+
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTupleWithEpochs(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasksWithEpochs(subtopologyBar,
Map.of(0, 14))))
+ .build()));
+
+ // Verify foo-0 is unassigned and bar-0 is assigned to member A.
+ StreamsGroup group =
context.groupMetadataManager.streamsGroup(groupId);
+ assertEquals(null, group.currentActiveTaskProcessId(subtopologyFoo,
0));
+ assertEquals(processIdA,
group.currentActiveTaskProcessId(subtopologyBar, 0));
+ }
+
private record PendingAssignmentCase(
String description,
String groupId,
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
index 24212d10982..445ba969648 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
@@ -1380,6 +1380,7 @@ public class ClassicGroupTest {
.build();
ConsumerGroup consumerGroup = new ConsumerGroup(
+ logContext,
new SnapshotRegistry(logContext),
groupId
);
@@ -1532,6 +1533,7 @@ public class ClassicGroupTest {
.build();
ConsumerGroup consumerGroup = new ConsumerGroup(
+ logContext,
new SnapshotRegistry(logContext),
groupId
);
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
index 5956e51b161..c8ada8fc5f4 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
@@ -87,6 +87,7 @@ public class ConsumerGroupTest {
private ConsumerGroup createConsumerGroup(String groupId) {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new
LogContext());
return new ConsumerGroup(
+ new LogContext(),
snapshotRegistry,
groupId
);
@@ -281,14 +282,26 @@ public class ConsumerGroupTest {
consumerGroup.updateMember(m1);
ConsumerGroupMember m2 = new ConsumerGroupMember.Builder("m2")
+ .setMemberEpoch(11)
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 1)))
+ .build();
+
+ // m2 can acquire foo-1 because the epoch is larger than m1's epoch.
+ // This should not throw IllegalStateException.
+ consumerGroup.updateMember(m2);
+
+ ConsumerGroupMember m3 = new ConsumerGroupMember.Builder("m3")
.setMemberEpoch(10)
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 1)))
.build();
- // m2 should not be able to acquire foo-1 because the partition is
- // still owned by another member.
- assertThrows(IllegalStateException.class, () ->
consumerGroup.updateMember(m2));
+ // m3 should not be able to acquire foo-1 because the epoch is smaller
+ // than the current partition epoch (11).
+ assertThrows(IllegalStateException.class, () -> {
+ consumerGroup.updateMember(m3);
+ });
}
@Test
@@ -296,13 +309,13 @@ public class ConsumerGroupTest {
Uuid fooTopicId = Uuid.randomUuid();
ConsumerGroup consumerGroup = createConsumerGroup("foo");
- // Removing should fail because there is no epoch set.
- assertThrows(IllegalStateException.class, () ->
consumerGroup.removePartitionEpochs(
+ // Removing should be a no-op when there is no epoch set.
+ consumerGroup.removePartitionEpochs(
mkAssignment(
mkTopicAssignment(fooTopicId, 1)
),
10
- ));
+ );
ConsumerGroupMember m1 = new ConsumerGroupMember.Builder("m1")
.setMemberEpoch(10)
@@ -312,13 +325,15 @@ public class ConsumerGroupTest {
consumerGroup.updateMember(m1);
- // Removing should fail because the expected epoch is incorrect.
- assertThrows(IllegalStateException.class, () ->
consumerGroup.removePartitionEpochs(
+ // Removing with incorrect epoch should do nothing.
+ // A debug message is logged, no exception is thrown.
+ consumerGroup.removePartitionEpochs(
mkAssignment(
mkTopicAssignment(fooTopicId, 1)
),
11
- ));
+ );
+ assertEquals(10, consumerGroup.currentPartitionEpoch(fooTopicId, 1));
}
@Test
@@ -333,14 +348,24 @@ public class ConsumerGroupTest {
10
);
- // Changing the epoch should fail because the owner of the partition
- // should remove it first.
- assertThrows(IllegalStateException.class, () ->
consumerGroup.addPartitionEpochs(
+ // Updating to a larger epoch should succeed.
+ consumerGroup.addPartitionEpochs(
mkAssignment(
mkTopicAssignment(fooTopicId, 1)
),
11
- ));
+ );
+ assertEquals(11, consumerGroup.currentPartitionEpoch(fooTopicId, 1));
+
+ // Updating to a smaller epoch should fail.
+ assertThrows(IllegalStateException.class, () -> {
+ consumerGroup.addPartitionEpochs(
+ mkAssignment(
+ mkTopicAssignment(fooTopicId, 1)
+ ),
+ 10
+ );
+ });
}
@Test
@@ -696,7 +721,7 @@ public class ConsumerGroupTest {
@Test
public void testUpdateInvertedAssignment() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new
LogContext());
- ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry,
"test-group");
+ ConsumerGroup consumerGroup = new ConsumerGroup(new LogContext(),
snapshotRegistry, "test-group");
Uuid topicId = Uuid.randomUuid();
String memberId1 = "member1";
String memberId2 = "member2";
@@ -911,7 +936,7 @@ public class ConsumerGroupTest {
@Test
public void testAsListedGroup() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new
LogContext());
- ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo");
+ ConsumerGroup group = new ConsumerGroup(new LogContext(),
snapshotRegistry, "group-foo");
snapshotRegistry.idempotentCreateSnapshot(0);
assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY.toString(),
group.stateAsString(0));
group.updateMember(new ConsumerGroupMember.Builder("member1")
@@ -926,6 +951,7 @@ public class ConsumerGroupTest {
public void testValidateOffsetFetch() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new
LogContext());
ConsumerGroup group = new ConsumerGroup(
+ new LogContext(),
snapshotRegistry,
"group-foo"
);
@@ -986,7 +1012,7 @@ public class ConsumerGroupTest {
long commitTimestamp = 20000L;
long offsetsRetentionMs = 10000L;
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L,
OptionalInt.empty(), "", commitTimestamp, OptionalLong.empty(), Uuid.ZERO_UUID);
- ConsumerGroup group = new ConsumerGroup(new SnapshotRegistry(new
LogContext()), "group-id");
+ ConsumerGroup group = new ConsumerGroup(new LogContext(), new
SnapshotRegistry(new LogContext()), "group-id");
Optional<OffsetExpirationCondition> offsetExpirationCondition =
group.offsetExpirationCondition();
assertTrue(offsetExpirationCondition.isPresent());
@@ -1023,7 +1049,7 @@ public class ConsumerGroupTest {
@Test
public void testAsDescribedGroup() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new
LogContext());
- ConsumerGroup group = new ConsumerGroup(snapshotRegistry,
"group-id-1");
+ ConsumerGroup group = new ConsumerGroup(new LogContext(),
snapshotRegistry, "group-id-1");
snapshotRegistry.idempotentCreateSnapshot(0);
assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY.toString(),
group.stateAsString(0));
@@ -1060,7 +1086,7 @@ public class ConsumerGroupTest {
@Test
public void testIsInStatesCaseInsensitive() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new
LogContext());
- ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo");
+ ConsumerGroup group = new ConsumerGroup(new LogContext(),
snapshotRegistry, "group-foo");
snapshotRegistry.idempotentCreateSnapshot(0);
assertTrue(group.isInStates(Set.of("empty"), 0));
assertFalse(group.isInStates(Set.of("Empty"), 0));
@@ -1290,6 +1316,7 @@ public class ConsumerGroupTest {
classicGroup.add(member);
ConsumerGroup consumerGroup = ConsumerGroup.fromClassicGroup(
+ logContext,
new SnapshotRegistry(logContext),
classicGroup,
new HashMap<>(),
@@ -1297,6 +1324,7 @@ public class ConsumerGroupTest {
);
ConsumerGroup expectedConsumerGroup = new ConsumerGroup(
+ new LogContext(),
new SnapshotRegistry(logContext),
groupId
);
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
index ced7d7ee4e7..f015441f86f 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
@@ -342,7 +342,7 @@ public class StreamsGroupTest {
StreamsGroup streamsGroup = createStreamsGroup("foo");
StreamsGroupMember m1 = new StreamsGroupMember.Builder("m1")
- .setProcessId("process")
+ .setProcessId("process1")
.setAssignedTasks(
new TasksTupleWithEpochs(
mkTasksPerSubtopologyWithCommonEpoch(10,
mkTasks(fooSubtopologyId, 1)),
@@ -355,19 +355,20 @@ public class StreamsGroupTest {
streamsGroup.updateMember(m1);
StreamsGroupMember m2 = new StreamsGroupMember.Builder("m2")
- .setProcessId("process")
+ .setProcessId("process2")
.setAssignedTasks(
new TasksTupleWithEpochs(
- mkTasksPerSubtopologyWithCommonEpoch(10,
mkTasks(fooSubtopologyId, 1)),
+ mkTasksPerSubtopologyWithCommonEpoch(9,
mkTasks(fooSubtopologyId, 1)),
Map.of(),
Map.of()
)
)
.build();
- // m2 should not be able to acquire foo-1 because the partition is
- // still owned by another member.
- assertThrows(IllegalStateException.class, () ->
streamsGroup.updateMember(m2));
+ // We allow m2 to acquire foo-1 despite the fact that m1 has ownership
because the processId is different.
+ streamsGroup.updateMember(m2);
+
+ assertEquals("process2",
streamsGroup.currentActiveTaskProcessId(fooSubtopologyId, 1));
}
@@ -377,11 +378,11 @@ public class StreamsGroupTest {
String fooSubtopologyId = "foo-sub";
StreamsGroup streamsGroup = createStreamsGroup("foo");
- // Removing should fail because there is no epoch set.
- assertThrows(IllegalStateException.class, () ->
streamsGroup.removeTaskProcessIds(
+ // Removing should be a no-op when there is no process id set.
+ streamsGroup.removeTaskProcessIds(
TaskAssignmentTestUtil.mkTasksTupleWithCommonEpoch(taskRole, 10,
mkTasks(fooSubtopologyId, 1)),
"process"
- ));
+ );
StreamsGroupMember m1 = new StreamsGroupMember.Builder("m1")
.setProcessId("process")
@@ -390,11 +391,15 @@ public class StreamsGroupTest {
streamsGroup.updateMember(m1);
- // Removing should fail because the expected epoch is incorrect.
- assertThrows(IllegalStateException.class, () ->
streamsGroup.removeTaskProcessIds(
- TaskAssignmentTestUtil.mkTasksTupleWithCommonEpoch(taskRole, 10,
mkTasks(fooSubtopologyId, 1)),
+ // Removing with incorrect process id should do nothing.
+ // A debug message is logged, no exception is thrown.
+ streamsGroup.removeTaskProcessIds(
+ TaskAssignmentTestUtil.mkTasksTupleWithCommonEpoch(taskRole, 9,
mkTasks(fooSubtopologyId, 1)),
"process1"
- ));
+ );
+ if (taskRole == TaskRole.ACTIVE) {
+ assertEquals("process",
streamsGroup.currentActiveTaskProcessId(fooSubtopologyId, 1));
+ }
}
@Test
@@ -411,16 +416,17 @@ public class StreamsGroupTest {
"process"
);
- // Changing the epoch should fail because the owner of the partition
- // should remove it first.
- assertThrows(IllegalStateException.class, () ->
streamsGroup.addTaskProcessId(
+ // We allow replacing with a different process id.
+ streamsGroup.addTaskProcessId(
new TasksTupleWithEpochs(
mkTasksPerSubtopologyWithCommonEpoch(10,
mkTasks(fooSubtopologyId, 1)),
mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 2)),
mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 3))
),
- "process"
- ));
+ "process2"
+ );
+
+ assertEquals("process2",
streamsGroup.currentActiveTaskProcessId(fooSubtopologyId, 1));
}
@Test