This is an automated email from the ASF dual-hosted git repository. lucasbru pushed a commit to branch kip1071 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit f57440bcb860be136c2e4d6ce808ac41ccfdc351 Author: Bruno Cadonna <br...@confluent.io> AuthorDate: Mon Jun 24 16:43:11 2024 +0200 Replay all streams-related records See https://github.com/lucasbru/kafka/pull/23 --- .../coordinator/group/GroupCoordinatorShard.java | 63 +++++++ .../coordinator/group/GroupMetadataManager.java | 204 ++++++++++++++++++++- .../coordinator/group/streams/StreamsGroup.java | 6 +- 3 files changed, 265 insertions(+), 8 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java index 1ff9c7786ee..099fdba95f7 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java @@ -87,6 +87,20 @@ import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMe import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue; import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey; import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics; import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard; import org.apache.kafka.image.MetadataDelta; @@ -870,6 +884,55 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord ); break; + case 15: + groupMetadataManager.replay( + (StreamsGroupMetadataKey) key.message(), + (StreamsGroupMetadataValue) messageOrNull(value) + ); + break; + + case 16: + groupMetadataManager.replay( + (StreamsGroupPartitionMetadataKey) key.message(), + (StreamsGroupPartitionMetadataValue) messageOrNull(value) + ); + break; + + case 17: + groupMetadataManager.replay( + (StreamsGroupMemberMetadataKey) key.message(), + (StreamsGroupMemberMetadataValue) messageOrNull(value) + ); + break; + + case 18: + groupMetadataManager.replay( + (StreamsGroupTargetAssignmentMetadataKey) key.message(), + (StreamsGroupTargetAssignmentMetadataValue) messageOrNull(value) + ); + break; + + case 19: + groupMetadataManager.replay( + (StreamsGroupTargetAssignmentMemberKey) key.message(), + (StreamsGroupTargetAssignmentMemberValue) messageOrNull(value) + ); + break; + + case 20: + groupMetadataManager.replay( + (StreamsGroupCurrentMemberAssignmentKey) key.message(), + (StreamsGroupCurrentMemberAssignmentValue) messageOrNull(value) + ); + break; + + case 21: + groupMetadataManager.replay( + (StreamsGroupTopologyKey) key.message(), + (StreamsGroupTopologyValue) messageOrNull(value) + ); + break; + default: throw new IllegalStateException("Received an unknown record type " + key.version() + " in " + record); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index b2a013348fb..84ee2c7bdac 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -103,6 +103,18 @@ import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMe import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue; import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey; import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard; @@ -117,6 +129,9 @@ import org.apache.kafka.coordinator.group.modern.consumer.CurrentAssignmentBuild import org.apache.kafka.coordinator.group.modern.share.ShareGroup; import org.apache.kafka.coordinator.group.modern.share.ShareGroupAssignmentBuilder; import org.apache.kafka.coordinator.group.modern.share.ShareGroupMember; +import org.apache.kafka.coordinator.group.streams.StreamsGroup; +import org.apache.kafka.coordinator.group.streams.StreamsGroupMember; +import org.apache.kafka.coordinator.group.streams.StreamsTopology; import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataImage; import org.apache.kafka.image.TopicImage; @@ -154,6 +169,7 @@ import static org.apache.kafka.common.requests.JoinGroupRequest.UNKNOWN_MEMBER_I import static org.apache.kafka.coordinator.group.Group.GroupType.CLASSIC; import static org.apache.kafka.coordinator.group.Group.GroupType.CONSUMER; import static org.apache.kafka.coordinator.group.Group.GroupType.SHARE; +import static org.apache.kafka.coordinator.group.Group.GroupType.STREAMS; import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord; import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord; import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord; @@ -195,6 +211,7 @@ import static org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMe @SuppressWarnings("JavaNCSS") public class GroupMetadataManager { + public static class Builder { private LogContext logContext = null; private SnapshotRegistry snapshotRegistry = null; @@ -884,6 +901,46 @@ public class GroupMetadataManager { } } + /** + * The method should be called on the replay path. + * Gets or maybe creates a streams group and updates the groups map if a new group is created. + * + * @param groupId The group id. + * @param createIfNotExists A boolean indicating whether the group should be + * created if it does not exist. + * + * @return A StreamsGroup. + * @throws IllegalStateException if the group does not exist and createIfNotExists is false or + * if the group is not a consumer group. + * Package private for testing. + */ + StreamsGroup getOrMaybeCreatePersistedStreamsGroup( + String groupId, + boolean createIfNotExists + ) throws GroupIdNotFoundException { + Group group = groups.get(groupId); + + if (group == null && !createIfNotExists) { + throw new IllegalStateException(String.format("Streams group %s not found.", groupId)); + } + + if (group == null) { + StreamsGroup streamsGroup = new StreamsGroup(snapshotRegistry, groupId, metrics); + groups.put(groupId, streamsGroup); + // ToDo: migrate to streams group +// metrics.onConsumerGroupStateTransition(null, streamsGroup.state()); + return streamsGroup; + } else { + if (group.type() == STREAMS) { + return (StreamsGroup) group; + } else { + // We don't support upgrading/downgrading between protocols at the moment so + // we throw an exception if a group exists with the wrong type. + throw new IllegalStateException(String.format("Group %s is not a streams group.", groupId)); + } + } + } + /** * Gets or maybe creates a classic group. * @@ -3376,12 +3433,24 @@ public class GroupMetadataManager { StreamsGroupTopologyKey key, StreamsGroupTopologyValue value ) { - - // TODO: Insert the topology information to the in-memory representation. Needs the notion - // of a Streams group -// String groupId = key.groupId(); -// StreamsGroup streamsGroup = getOrMaybeCreatePersistedConsumerGroup(groupId, value != null); -// streamsGroup.setTopology(value); + String groupId = key.groupId(); + StreamsGroup streamsGroup = getOrMaybeCreatePersistedStreamsGroup(groupId, value != null); + Set<String> oldSubscribedTopicNames = new HashSet<>(streamsGroup.subscriptionMetadata().keySet()); + if (value != null) { + StreamsTopology topology = StreamsTopology.fromRecord(value); + streamsGroup.setTopology(topology); + Set<String> newSubscribedTopicNames = new HashSet<>(); + topology.subtopologies().forEach( + (subtopologyId, subtopology) -> { + newSubscribedTopicNames.addAll(subtopology.sourceTopics()); + newSubscribedTopicNames.addAll(subtopology.repartitionSourceTopics().stream() + .map(StreamsGroupTopologyValue.TopicInfo::name).collect(Collectors.toSet())); + } + ); + updateGroupsByTopics(groupId, oldSubscribedTopicNames, newSubscribedTopicNames); + } else { + updateGroupsByTopics(groupId, oldSubscribedTopicNames, Collections.emptySet()); + } } /** @@ -3791,7 +3860,6 @@ public class GroupMetadataManager { if (!shareGroup.members().isEmpty()) { throw new IllegalStateException("Received a tombstone record to delete group " + groupId + " but the group still has " + shareGroup.members().size() + " members."); - } if (!shareGroup.targetAssignment().isEmpty()) { throw new IllegalStateException("Received a tombstone record to delete group " + groupId + " but the target assignment still has " + shareGroup.targetAssignment().size() @@ -3803,7 +3871,129 @@ public class GroupMetadataManager { } removeGroup(groupId); } + } + + /** + * Replays StreamsGroupMemberMetadataKey/Value to update the hard state of + * the streams group. It updates the subscription part of the member or + * deletes the member. + * + * @param key A StreamsGroupMemberMetadataKey key. + * @param value A StreamsGroupMemberMetadataValue record. + */ + public void replay( + StreamsGroupMemberMetadataKey key, + StreamsGroupMemberMetadataValue value + ) { + String groupId = key.groupId(); + String memberId = key.memberId(); + + StreamsGroup streamsGroup = getOrMaybeCreatePersistedStreamsGroup(groupId, value != null); + Set<String> oldSubscribedTopicNames = new HashSet<>(streamsGroup.subscriptionMetadata().keySet()); + + if (value != null) { + StreamsGroupMember oldMember = streamsGroup.getOrMaybeCreateMember(memberId, true); + streamsGroup.updateMember(new StreamsGroupMember.Builder(oldMember) + .updateWith(value) + .build()); + } else { + StreamsGroupMember oldMember = streamsGroup.getOrMaybeCreateMember(memberId, false); + if (oldMember.memberEpoch() != LEAVE_GROUP_MEMBER_EPOCH) { + throw new IllegalStateException("Received a tombstone record to delete member " + memberId + + " but did not receive StreamsGroupCurrentMemberAssignmentValue tombstone."); + } + if (streamsGroup.targetAssignment().containsKey(memberId)) { + throw new IllegalStateException("Received a tombstone record to delete member " + memberId + + " but did not receive StreamsGroupTargetAssignmentMetadataValue tombstone."); + } + streamsGroup.removeMember(memberId); + } + // ToDo: this is probably a no-op for a streams group since the subscribed topics do not change + // between group members. + updateGroupsByTopics(groupId, oldSubscribedTopicNames, streamsGroup.subscriptionMetadata().keySet()); + } + + /** + * Replays StreamsGroupTargetAssignmentMetadataKey/Value to update the hard state of + * the streams group. It updates the target assignment epoch or set it to -1 to signal + * that it has been deleted. + * + * @param key A StreamsGroupTargetAssignmentMetadataKey key. + * @param value A StreamsGroupTargetAssignmentMetadataValue record. + */ + public void replay( + StreamsGroupTargetAssignmentMetadataKey key, + StreamsGroupTargetAssignmentMetadataValue value + ) { + String groupId = key.groupId(); + StreamsGroup streamsGroup = getOrMaybeCreatePersistedStreamsGroup(groupId, false); + + if (value != null) { + streamsGroup.setTargetAssignmentEpoch(value.assignmentEpoch()); + } else { + if (!streamsGroup.targetAssignment().isEmpty()) { + throw new IllegalStateException("Received a tombstone record to delete target assignment of " + groupId + + " but the assignment still has " + streamsGroup.targetAssignment().size() + " members."); + } + streamsGroup.setTargetAssignmentEpoch(-1); + } + } + + /** + * Replays StreamsGroupTargetAssignmentMemberKey/Value to update the hard state of + * the consumer group. It updates the target assignment of the member or deletes it. + * + * @param key A ConsumerGroupTargetAssignmentMemberKey key. + * @param value A ConsumerGroupTargetAssignmentMemberValue record. + */ + public void replay( + StreamsGroupTargetAssignmentMemberKey key, + StreamsGroupTargetAssignmentMemberValue value + ) { + String groupId = key.groupId(); + String memberId = key.memberId(); + StreamsGroup streamsGroup = getOrMaybeCreatePersistedStreamsGroup(groupId, false); + + if (value != null) { + streamsGroup.updateTargetAssignment(memberId, org.apache.kafka.coordinator.group.streams.Assignment.fromRecord(value)); + } else { + streamsGroup.removeTargetAssignment(memberId); + } + } + + /** + * Replays ConsumerGroupCurrentMemberAssignmentKey/Value to update the hard state of + * the consumer group. It updates the assignment of a member or deletes it. + * + * @param key A ConsumerGroupCurrentMemberAssignmentKey key. + * @param value A ConsumerGroupCurrentMemberAssignmentValue record. + */ + public void replay( + StreamsGroupCurrentMemberAssignmentKey key, + StreamsGroupCurrentMemberAssignmentValue value + ) { + String groupId = key.groupId(); + String memberId = key.memberId(); + StreamsGroup streamsGroup = getOrMaybeCreatePersistedStreamsGroup(groupId, false); + StreamsGroupMember oldMember = streamsGroup.getOrMaybeCreateMember(memberId, false); + + if (value != null) { + StreamsGroupMember newMember = new StreamsGroupMember.Builder(oldMember) + .updateWith(value) + .build(); + streamsGroup.updateMember(newMember); + } else { + StreamsGroupMember newMember = new StreamsGroupMember.Builder(oldMember) + .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH) + .setPreviousMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH) + .setAssignedActiveTasks(Collections.emptyMap()) + .setAssignedStandbyTasks(Collections.emptyMap()) + .setAssignedWarmupTasks(Collections.emptyMap()) + .setActiveTasksPendingRevocation(Collections.emptyMap()) + .build(); + streamsGroup.updateMember(newMember); + } } /** diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java index 4ba97bc2288..d90e749e303 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java @@ -173,7 +173,7 @@ public class StreamsGroup implements Group { /** * The Streams topology. */ - private final StreamsTopology topology; + private StreamsTopology topology; /** * The metadata refresh deadline. It consists of a timestamp in milliseconds together with the group epoch at the time of setting it. @@ -247,6 +247,10 @@ public class StreamsGroup implements Group { return topology; } + public void setTopology(StreamsTopology topology) { + this.topology = topology; + } + /** * @return The group id. */