This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit f57440bcb860be136c2e4d6ce808ac41ccfdc351
Author: Bruno Cadonna <br...@confluent.io>
AuthorDate: Mon Jun 24 16:43:11 2024 +0200

    Replay all streams-related records
    
    See https://github.com/lucasbru/kafka/pull/23
---
 .../coordinator/group/GroupCoordinatorShard.java   |  63 +++++++
 .../coordinator/group/GroupMetadataManager.java    | 204 ++++++++++++++++++++-
 .../coordinator/group/streams/StreamsGroup.java    |   6 +-
 3 files changed, 265 insertions(+), 8 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index 1ff9c7786ee..099fdba95f7 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -87,6 +87,20 @@ import 
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMe
 import 
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue;
 import 
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey;
 import 
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
 import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
 import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
 import org.apache.kafka.image.MetadataDelta;
@@ -870,6 +884,55 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
                 );
                 break;
 
+            case 15:
+                groupMetadataManager.replay(
+                    (StreamsGroupMetadataKey) key.message(),
+                    (StreamsGroupMetadataValue) messageOrNull(value)
+                );
+                break;
+
+            case 16:
+                groupMetadataManager.replay(
+                    (StreamsGroupPartitionMetadataKey) key.message(),
+                    (StreamsGroupPartitionMetadataValue) messageOrNull(value)
+                );
+                break;
+                
+            case 17:
+                groupMetadataManager.replay(
+                    (StreamsGroupMemberMetadataKey) key.message(),
+                    (StreamsGroupMemberMetadataValue) messageOrNull(value)
+                );
+                break;
+
+            case 18:
+                groupMetadataManager.replay(
+                    (StreamsGroupTargetAssignmentMetadataKey) key.message(),
+                    (StreamsGroupTargetAssignmentMetadataValue) 
messageOrNull(value)
+                );
+                break;
+
+            case 19:
+                groupMetadataManager.replay(
+                    (StreamsGroupTargetAssignmentMemberKey) key.message(),
+                    (StreamsGroupTargetAssignmentMemberValue) 
messageOrNull(value)
+                );
+                break;
+
+            case 20:
+                groupMetadataManager.replay(
+                    (StreamsGroupCurrentMemberAssignmentKey) key.message(),
+                    (StreamsGroupCurrentMemberAssignmentValue) 
messageOrNull(value)
+                );
+                break;
+
+            case 21:
+                groupMetadataManager.replay(
+                    (StreamsGroupTopologyKey) key.message(),
+                    (StreamsGroupTopologyValue) messageOrNull(value)
+                );
+                break;
+                
             default:
                 throw new IllegalStateException("Received an unknown record 
type " + key.version()
                     + " in " + record);
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index b2a013348fb..84ee2c7bdac 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -103,6 +103,18 @@ import 
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMe
 import 
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue;
 import 
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey;
 import 
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue;
 import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
 import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
 import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
@@ -117,6 +129,9 @@ import 
org.apache.kafka.coordinator.group.modern.consumer.CurrentAssignmentBuild
 import org.apache.kafka.coordinator.group.modern.share.ShareGroup;
 import 
org.apache.kafka.coordinator.group.modern.share.ShareGroupAssignmentBuilder;
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupMember;
+import org.apache.kafka.coordinator.group.streams.StreamsGroup;
+import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;
+import org.apache.kafka.coordinator.group.streams.StreamsTopology;
 import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.image.TopicImage;
@@ -154,6 +169,7 @@ import static 
org.apache.kafka.common.requests.JoinGroupRequest.UNKNOWN_MEMBER_I
 import static org.apache.kafka.coordinator.group.Group.GroupType.CLASSIC;
 import static org.apache.kafka.coordinator.group.Group.GroupType.CONSUMER;
 import static org.apache.kafka.coordinator.group.Group.GroupType.SHARE;
+import static org.apache.kafka.coordinator.group.Group.GroupType.STREAMS;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord;
@@ -195,6 +211,7 @@ import static 
org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMe
 @SuppressWarnings("JavaNCSS")
 public class GroupMetadataManager {
 
+
     public static class Builder {
         private LogContext logContext = null;
         private SnapshotRegistry snapshotRegistry = null;
@@ -884,6 +901,46 @@ public class GroupMetadataManager {
         }
     }
 
+    /**
+     * The method should be called on the replay path.
+     * Gets or maybe creates a streams group and updates the groups map if a 
new group is created.
+     *
+     * @param groupId           The group id.
+     * @param createIfNotExists A boolean indicating whether the group should 
be
+     *                          created if it does not exist.
+     *
+     * @return A StreamsGroup.
+     * @throws IllegalStateException if the group does not exist and 
createIfNotExists is false or
+     *                               if the group is not a consumer group.
+     * Package private for testing.
+     */
+    StreamsGroup getOrMaybeCreatePersistedStreamsGroup(
+        String groupId,
+        boolean createIfNotExists
+    ) throws GroupIdNotFoundException {
+        Group group = groups.get(groupId);
+
+        if (group == null && !createIfNotExists) {
+            throw new IllegalStateException(String.format("Streams group %s 
not found.", groupId));
+        }
+
+        if (group == null) {
+            StreamsGroup streamsGroup = new StreamsGroup(snapshotRegistry, 
groupId, metrics);
+            groups.put(groupId, streamsGroup);
+            // ToDo: migrate to streams group
+//            metrics.onConsumerGroupStateTransition(null, 
streamsGroup.state());
+            return streamsGroup;
+        } else {
+            if (group.type() == STREAMS) {
+                return (StreamsGroup) group;
+            } else {
+                // We don't support upgrading/downgrading between protocols at 
the moment so
+                // we throw an exception if a group exists with the wrong type.
+                throw new IllegalStateException(String.format("Group %s is not 
a streams group.", groupId));
+            }
+        }
+    }
+
     /**
      * Gets or maybe creates a classic group.
      *
@@ -3376,12 +3433,24 @@ public class GroupMetadataManager {
         StreamsGroupTopologyKey key,
         StreamsGroupTopologyValue value
     ) {
-
-        // TODO: Insert the topology information to the in-memory 
representation. Needs the notion
-        //       of a Streams group
-//        String groupId = key.groupId();
-//        StreamsGroup streamsGroup = 
getOrMaybeCreatePersistedConsumerGroup(groupId, value != null);
-//        streamsGroup.setTopology(value);
+        String groupId = key.groupId();
+        StreamsGroup streamsGroup = 
getOrMaybeCreatePersistedStreamsGroup(groupId, value != null);
+        Set<String> oldSubscribedTopicNames = new 
HashSet<>(streamsGroup.subscriptionMetadata().keySet());
+        if (value != null) {
+            StreamsTopology topology = StreamsTopology.fromRecord(value);
+            streamsGroup.setTopology(topology);
+            Set<String> newSubscribedTopicNames = new HashSet<>();
+            topology.subtopologies().forEach(
+                (subtopologyId, subtopology) -> {
+                    newSubscribedTopicNames.addAll(subtopology.sourceTopics());
+                    
newSubscribedTopicNames.addAll(subtopology.repartitionSourceTopics().stream()
+                        
.map(StreamsGroupTopologyValue.TopicInfo::name).collect(Collectors.toSet()));
+                }
+            );
+            updateGroupsByTopics(groupId, oldSubscribedTopicNames, 
newSubscribedTopicNames);
+        } else {
+            updateGroupsByTopics(groupId, oldSubscribedTopicNames, 
Collections.emptySet());
+        }
     }
 
     /**
@@ -3791,7 +3860,6 @@ public class GroupMetadataManager {
             if (!shareGroup.members().isEmpty()) {
                 throw new IllegalStateException("Received a tombstone record 
to delete group " + groupId
                     + " but the group still has " + 
shareGroup.members().size() + " members.");
-            }
             if (!shareGroup.targetAssignment().isEmpty()) {
                 throw new IllegalStateException("Received a tombstone record 
to delete group " + groupId
                     + " but the target assignment still has " + 
shareGroup.targetAssignment().size()
@@ -3803,7 +3871,129 @@ public class GroupMetadataManager {
             }
             removeGroup(groupId);
         }
+    }
+
+    /**
+     * Replays StreamsGroupMemberMetadataKey/Value to update the hard state of
+     * the streams group. It updates the subscription part of the member or
+     * deletes the member.
+     *
+     * @param key   A StreamsGroupMemberMetadataKey key.
+     * @param value A StreamsGroupMemberMetadataValue record.
+     */
+    public void replay(
+        StreamsGroupMemberMetadataKey key,
+        StreamsGroupMemberMetadataValue value
+    ) {
+        String groupId = key.groupId();
+        String memberId = key.memberId();
+
+        StreamsGroup streamsGroup = 
getOrMaybeCreatePersistedStreamsGroup(groupId, value != null);
+        Set<String> oldSubscribedTopicNames = new 
HashSet<>(streamsGroup.subscriptionMetadata().keySet());
+
+        if (value != null) {
+            StreamsGroupMember oldMember = 
streamsGroup.getOrMaybeCreateMember(memberId, true);
+            streamsGroup.updateMember(new StreamsGroupMember.Builder(oldMember)
+                .updateWith(value)
+                .build());
+        } else {
+            StreamsGroupMember oldMember = 
streamsGroup.getOrMaybeCreateMember(memberId, false);
+            if (oldMember.memberEpoch() != LEAVE_GROUP_MEMBER_EPOCH) {
+                throw new IllegalStateException("Received a tombstone record 
to delete member " + memberId
+                    + " but did not receive 
StreamsGroupCurrentMemberAssignmentValue tombstone.");
+            }
+            if (streamsGroup.targetAssignment().containsKey(memberId)) {
+                throw new IllegalStateException("Received a tombstone record 
to delete member " + memberId
+                    + " but did not receive 
StreamsGroupTargetAssignmentMetadataValue tombstone.");
+            }
+            streamsGroup.removeMember(memberId);
+        }
 
+        // ToDo: this is probably a no-op for a streams group since the 
subscribed topics do not change
+        // between group members.
+        updateGroupsByTopics(groupId, oldSubscribedTopicNames, 
streamsGroup.subscriptionMetadata().keySet());
+    }
+
+    /**
+     * Replays StreamsGroupTargetAssignmentMetadataKey/Value to update the 
hard state of
+     * the streams group. It updates the target assignment epoch or set it to 
-1 to signal
+     * that it has been deleted.
+     *
+     * @param key   A StreamsGroupTargetAssignmentMetadataKey key.
+     * @param value A StreamsGroupTargetAssignmentMetadataValue record.
+     */
+    public void replay(
+        StreamsGroupTargetAssignmentMetadataKey key,
+        StreamsGroupTargetAssignmentMetadataValue value
+    ) {
+        String groupId = key.groupId();
+        StreamsGroup streamsGroup = 
getOrMaybeCreatePersistedStreamsGroup(groupId, false);
+
+        if (value != null) {
+            streamsGroup.setTargetAssignmentEpoch(value.assignmentEpoch());
+        } else {
+            if (!streamsGroup.targetAssignment().isEmpty()) {
+                throw new IllegalStateException("Received a tombstone record 
to delete target assignment of " + groupId
+                    + " but the assignment still has " + 
streamsGroup.targetAssignment().size() + " members.");
+            }
+            streamsGroup.setTargetAssignmentEpoch(-1);
+        }
+    }
+
+    /**
+     * Replays StreamsGroupTargetAssignmentMemberKey/Value to update the hard 
state of
+     * the consumer group. It updates the target assignment of the member or 
deletes it.
+     *
+     * @param key   A ConsumerGroupTargetAssignmentMemberKey key.
+     * @param value A ConsumerGroupTargetAssignmentMemberValue record.
+     */
+    public void replay(
+        StreamsGroupTargetAssignmentMemberKey key,
+        StreamsGroupTargetAssignmentMemberValue value
+    ) {
+        String groupId = key.groupId();
+        String memberId = key.memberId();
+        StreamsGroup streamsGroup = 
getOrMaybeCreatePersistedStreamsGroup(groupId, false);
+
+        if (value != null) {
+            streamsGroup.updateTargetAssignment(memberId, 
org.apache.kafka.coordinator.group.streams.Assignment.fromRecord(value));
+        } else {
+            streamsGroup.removeTargetAssignment(memberId);
+        }
+    }
+
+    /**
+     * Replays ConsumerGroupCurrentMemberAssignmentKey/Value to update the 
hard state of
+     * the consumer group. It updates the assignment of a member or deletes it.
+     *
+     * @param key   A ConsumerGroupCurrentMemberAssignmentKey key.
+     * @param value A ConsumerGroupCurrentMemberAssignmentValue record.
+     */
+    public void replay(
+        StreamsGroupCurrentMemberAssignmentKey key,
+        StreamsGroupCurrentMemberAssignmentValue value
+    ) {
+        String groupId = key.groupId();
+        String memberId = key.memberId();
+        StreamsGroup streamsGroup = 
getOrMaybeCreatePersistedStreamsGroup(groupId, false);
+        StreamsGroupMember oldMember = 
streamsGroup.getOrMaybeCreateMember(memberId, false);
+
+        if (value != null) {
+            StreamsGroupMember newMember = new 
StreamsGroupMember.Builder(oldMember)
+                .updateWith(value)
+                .build();
+            streamsGroup.updateMember(newMember);
+        } else {
+            StreamsGroupMember newMember = new 
StreamsGroupMember.Builder(oldMember)
+                .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
+                .setPreviousMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
+                .setAssignedActiveTasks(Collections.emptyMap())
+                .setAssignedStandbyTasks(Collections.emptyMap())
+                .setAssignedWarmupTasks(Collections.emptyMap())
+                .setActiveTasksPendingRevocation(Collections.emptyMap())
+                .build();
+            streamsGroup.updateMember(newMember);
+        }
     }
 
     /**
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
index 4ba97bc2288..d90e749e303 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
@@ -173,7 +173,7 @@ public class StreamsGroup implements Group {
     /**
      * The Streams topology.
      */
-    private final StreamsTopology topology;
+    private StreamsTopology topology;
 
     /**
      * The metadata refresh deadline. It consists of a timestamp in 
milliseconds together with the group epoch at the time of setting it.
@@ -247,6 +247,10 @@ public class StreamsGroup implements Group {
         return topology;
     }
 
+    public void setTopology(StreamsTopology topology) {
+        this.topology = topology;
+    }
+
     /**
      * @return The group id.
      */

Reply via email to