dajac commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1362102003


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -285,6 +302,11 @@ public ConsumerGroupMember getOrMaybeCreateMember(
         return member;
     }
 
+    public ConsumerGroupMember getStaticMember(String instanceId) {

Review Comment:
   nit: We don't prefix getters with `get`. Let's add javadoc as well.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -312,6 +337,9 @@ public void removeMember(String memberId) {
         maybeUpdateServerAssignors(oldMember, null);
         maybeRemovePartitionEpoch(oldMember);
         maybeUpdateGroupState();
+        if (oldMember.instanceId() != null) {
+            staticMembers.remove(oldMember.instanceId());
+        }

Review Comment:
   ditto.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -983,27 +1052,125 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         return new CoordinatorResult<>(records, response);
     }
 
+    private void replaceStaticMemberInConsumerGroup(
+        String groupId,
+        List<Record> records,
+        ConsumerGroup group,
+        ConsumerGroupMember member,
+        ConsumerGroupMember existingStaticMember,
+        ConsumerGroupMember updatedMember
+    ) {
+        // Write tombstones for the departed static member. Follow the same 
order as the one followed when fencing
+        // a member
+        records.add(newCurrentAssignmentTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+        records.add(newTargetAssignmentTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+        records.add(newMemberSubscriptionTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));

Review Comment:
   We have similar code somewhere else. Could we add a method for this and 
reuse it?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -107,6 +107,11 @@ public static class DeadlineAndEpoch {
      */

Review Comment:
   Let's add unit tests for the new or changed methods to the corresponding 
file.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -983,27 +1052,125 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         return new CoordinatorResult<>(records, response);
     }
 
+    private void replaceStaticMemberInConsumerGroup(
+        String groupId,
+        List<Record> records,
+        ConsumerGroup group,
+        ConsumerGroupMember member,
+        ConsumerGroupMember existingStaticMember,
+        ConsumerGroupMember updatedMember
+    ) {
+        // Write tombstones for the departed static member. Follow the same 
order as the one followed when fencing
+        // a member
+        records.add(newCurrentAssignmentTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+        records.add(newTargetAssignmentTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+        records.add(newMemberSubscriptionTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+        // Cancel all the timers of the departed static member.
+        cancelConsumerGroupSessionTimeout(group.groupId(), 
existingStaticMember.memberId());
+        cancelConsumerGroupRevocationTimeout(group.groupId(), 
existingStaticMember.memberId());
+        // Write a record corresponding to the new member
+        records.add(newMemberSubscriptionRecord(groupId, updatedMember));
+        TargetAssignmentBuilder.TargetAssignmentResult assignmentResult = 
computeTargetAssignment(group, group.groupEpoch(), member, updatedMember);
+        records.addAll(assignmentResult.records());
+        records.add(newCurrentAssignmentRecord(groupId, updatedMember));
+    }
+
+    private TargetAssignmentBuilder.TargetAssignmentResult 
computeTargetAssignment(
+        ConsumerGroup group,
+        int groupEpoch,
+        ConsumerGroupMember member,
+        ConsumerGroupMember updatedMember) {
+        String preferredServerAssignor = group.computePreferredServerAssignor(
+            member,
+            updatedMember
+        ).orElse(defaultAssignor.name());
+
+        String groupId = group.groupId();
+        Map<String, TopicMetadata> subscriptionMetadata = 
group.subscriptionMetadata();
+        String memberId = member.memberId();
+        try {
+            TargetAssignmentBuilder.TargetAssignmentResult assignmentResult =
+                new TargetAssignmentBuilder(groupId, groupEpoch, 
assignors.get(preferredServerAssignor))
+                    .withMembers(group.members())

Review Comment:
   I think that the old member will be in `members` so the computed target 
assignment is incorrect. We need to remove it with `removeMember` and we also 
need to set the target assignment of the new member from the old one.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -299,6 +321,9 @@ public void updateMember(ConsumerGroupMember newMember) {
         maybeUpdateServerAssignors(oldMember, newMember);
         maybeUpdatePartitionEpoch(oldMember, newMember);
         maybeUpdateGroupState();
+        if (newMember.instanceId() != null) {
+            staticMembers.put(newMember.instanceId(), newMember.memberId());
+        }

Review Comment:
   nit: Would it make sense to have a method like the others? I would also do 
this before calling `maybeUpdateGroupState`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -259,6 +265,17 @@ public void setTargetAssignmentEpoch(int 
targetAssignmentEpoch) {
         maybeUpdateGroupState();
     }
 
+    /**
+     * Get member id of a static member that matches the given group
+     * instance id.
+     *
+     * @param groupInstanceId the group instance id.
+     * @return the static member if it exists.

Review Comment:
   nit: We tend to use a single line for getters. eg. `@return The member id 
corresponding to the given instance id or null if it does not exist`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -983,27 +1052,125 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         return new CoordinatorResult<>(records, response);
     }
 
+    private void replaceStaticMemberInConsumerGroup(
+        String groupId,
+        List<Record> records,
+        ConsumerGroup group,
+        ConsumerGroupMember member,
+        ConsumerGroupMember existingStaticMember,
+        ConsumerGroupMember updatedMember
+    ) {
+        // Write tombstones for the departed static member. Follow the same 
order as the one followed when fencing
+        // a member
+        records.add(newCurrentAssignmentTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+        records.add(newTargetAssignmentTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+        records.add(newMemberSubscriptionTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+        // Cancel all the timers of the departed static member.
+        cancelConsumerGroupSessionTimeout(group.groupId(), 
existingStaticMember.memberId());
+        cancelConsumerGroupRevocationTimeout(group.groupId(), 
existingStaticMember.memberId());

Review Comment:
   Same for this one. It would be great to have a method.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -983,27 +1052,125 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         return new CoordinatorResult<>(records, response);
     }
 
+    private void replaceStaticMemberInConsumerGroup(
+        String groupId,
+        List<Record> records,
+        ConsumerGroup group,
+        ConsumerGroupMember member,
+        ConsumerGroupMember existingStaticMember,
+        ConsumerGroupMember updatedMember
+    ) {
+        // Write tombstones for the departed static member. Follow the same 
order as the one followed when fencing
+        // a member
+        records.add(newCurrentAssignmentTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+        records.add(newTargetAssignmentTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+        records.add(newMemberSubscriptionTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+        // Cancel all the timers of the departed static member.
+        cancelConsumerGroupSessionTimeout(group.groupId(), 
existingStaticMember.memberId());
+        cancelConsumerGroupRevocationTimeout(group.groupId(), 
existingStaticMember.memberId());
+        // Write a record corresponding to the new member
+        records.add(newMemberSubscriptionRecord(groupId, updatedMember));
+        TargetAssignmentBuilder.TargetAssignmentResult assignmentResult = 
computeTargetAssignment(group, group.groupEpoch(), member, updatedMember);
+        records.addAll(assignmentResult.records());
+        records.add(newCurrentAssignmentRecord(groupId, updatedMember));

Review Comment:
   I am not sold on this. Is it too difficult to reuse the main logic? There 
are a few issues with this approach. For instance, the member's assignment is 
not reconciled like we do in the main logic. Another one is that the 
subscription metadata must be updated as well if the subscriptions have changed.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -747,6 +798,11 @@ private void throwIfMemberEpochIsInvalid(
         List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
ownedTopicPartitions
     ) {
         if (receivedMemberEpoch > member.memberEpoch()) {
+            // If a static member rejoins, it's previous epoch would be -2. In 
such a
+            // case, we don't need to fence the member.
+            if (member.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH && 
receivedMemberEpoch == 0) {
+                return;
+            }

Review Comment:
   I don't fully get this one. Could you please elaborate?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to