Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-11-28 Thread via GitHub


dajac merged PR #14432:
URL: https://github.com/apache/kafka/pull/14432


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-11-27 Thread via GitHub


dajac commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1405785516


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java:
##
@@ -691,6 +735,91 @@ public void testDeleteMember() {
 assertEquals(expectedAssignment, result.targetAssignment());
 }
 
+@Test
+public void testStaticMemberReplace() {
+TargetAssignmentBuilderTestContext context = new 
TargetAssignmentBuilderTestContext(
+"my-group",
+20
+);
+
+Uuid fooTopicId = context.addTopicMetadata("foo", 6, 
Collections.emptyMap());
+Uuid barTopicId = context.addTopicMetadata("bar", 6, 
Collections.emptyMap());
+
+context.addGroupMember("member-1", "member-1", Arrays.asList("foo", 
"bar", "zar"), mkAssignment(
+mkTopicAssignment(fooTopicId, 1, 2),
+mkTopicAssignment(barTopicId, 1, 2)
+));
+
+context.addGroupMember("member-2", "member-2", Arrays.asList("foo", 
"bar", "zar"), mkAssignment(
+mkTopicAssignment(fooTopicId, 3, 4),
+mkTopicAssignment(barTopicId, 3, 4)
+));
+
+context.addGroupMember("member-3", "member-3", Arrays.asList("foo", 
"bar", "zar"), mkAssignment(
+mkTopicAssignment(fooTopicId, 5, 6),
+mkTopicAssignment(barTopicId, 5, 6)
+));
+
+context.updateMemberSubscription("member-1", Arrays.asList("foo", 
"bar", "zar"), Optional.of("member-1"), Optional.empty());
+context.updateMemberSubscription("member-2", Arrays.asList("foo", 
"bar", "zar"), Optional.of("member-2"), Optional.empty());
+context.updateMemberSubscription("member-3", Arrays.asList("foo", 
"bar", "zar"), Optional.of("member-3"), Optional.empty());
+
+// Static member 3 leaves
+context.removeMemberSubscription("member-3", "member-3");
+
+// Another static member joins with the same instance id as the 
departed one
+context.addGroupMember("member-3-a", "member-3", Arrays.asList("foo", 
"bar", "zar"), new HashMap<>());

Review Comment:
   I think that we should call `updateMemberSubscription` instead of calling 
`addGroupMember` here because `updateMemberSubscription` is what is used to 
eventually call `addOrUpdateMember`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-11-26 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1405739030


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java:
##
@@ -691,6 +735,91 @@ public void testDeleteMember() {
 assertEquals(expectedAssignment, result.targetAssignment());
 }
 
+@Test
+public void testStaticMemberReplace() {
+TargetAssignmentBuilderTestContext context = new 
TargetAssignmentBuilderTestContext(
+"my-group",
+20
+);
+
+Uuid fooTopicId = context.addTopicMetadata("foo", 6, 
Collections.emptyMap());
+Uuid barTopicId = context.addTopicMetadata("bar", 6, 
Collections.emptyMap());
+
+context.addGroupMember("member-1", "member-1", Arrays.asList("foo", 
"bar", "zar"), mkAssignment(
+mkTopicAssignment(fooTopicId, 1, 2),
+mkTopicAssignment(barTopicId, 1, 2)
+));
+
+context.addGroupMember("member-2", "member-2", Arrays.asList("foo", 
"bar", "zar"), mkAssignment(
+mkTopicAssignment(fooTopicId, 3, 4),
+mkTopicAssignment(barTopicId, 3, 4)
+));
+
+context.addGroupMember("member-3", "member-3", Arrays.asList("foo", 
"bar", "zar"), mkAssignment(
+mkTopicAssignment(fooTopicId, 5, 6),
+mkTopicAssignment(barTopicId, 5, 6)
+));
+
+context.updateMemberSubscription("member-1", Arrays.asList("foo", 
"bar", "zar"), Optional.of("member-1"), Optional.empty());
+context.updateMemberSubscription("member-2", Arrays.asList("foo", 
"bar", "zar"), Optional.of("member-2"), Optional.empty());
+context.updateMemberSubscription("member-3", Arrays.asList("foo", 
"bar", "zar"), Optional.of("member-3"), Optional.empty());
+
+// Static member 3 leaves
+context.removeMemberSubscription("member-3", "member-3");
+
+// Another static member joins with the same instance id as the 
departed one
+context.addGroupMember("member-3-a", "member-3", Arrays.asList("foo", 
"bar", "zar"), new HashMap<>());

Review Comment:
   Ok.. I am slightly confused by this comment. The new member is being added 
using `addGroupMember` which internally invokes `withMembers`. I had an 
unwanted call to `updateMemberSubscription` which I have removed. Probably I am 
missing something here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-11-26 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1405739030


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java:
##
@@ -691,6 +735,91 @@ public void testDeleteMember() {
 assertEquals(expectedAssignment, result.targetAssignment());
 }
 
+@Test
+public void testStaticMemberReplace() {
+TargetAssignmentBuilderTestContext context = new 
TargetAssignmentBuilderTestContext(
+"my-group",
+20
+);
+
+Uuid fooTopicId = context.addTopicMetadata("foo", 6, 
Collections.emptyMap());
+Uuid barTopicId = context.addTopicMetadata("bar", 6, 
Collections.emptyMap());
+
+context.addGroupMember("member-1", "member-1", Arrays.asList("foo", 
"bar", "zar"), mkAssignment(
+mkTopicAssignment(fooTopicId, 1, 2),
+mkTopicAssignment(barTopicId, 1, 2)
+));
+
+context.addGroupMember("member-2", "member-2", Arrays.asList("foo", 
"bar", "zar"), mkAssignment(
+mkTopicAssignment(fooTopicId, 3, 4),
+mkTopicAssignment(barTopicId, 3, 4)
+));
+
+context.addGroupMember("member-3", "member-3", Arrays.asList("foo", 
"bar", "zar"), mkAssignment(
+mkTopicAssignment(fooTopicId, 5, 6),
+mkTopicAssignment(barTopicId, 5, 6)
+));
+
+context.updateMemberSubscription("member-1", Arrays.asList("foo", 
"bar", "zar"), Optional.of("member-1"), Optional.empty());
+context.updateMemberSubscription("member-2", Arrays.asList("foo", 
"bar", "zar"), Optional.of("member-2"), Optional.empty());
+context.updateMemberSubscription("member-3", Arrays.asList("foo", 
"bar", "zar"), Optional.of("member-3"), Optional.empty());
+
+// Static member 3 leaves
+context.removeMemberSubscription("member-3", "member-3");
+
+// Another static member joins with the same instance id as the 
departed one
+context.addGroupMember("member-3-a", "member-3", Arrays.asList("foo", 
"bar", "zar"), new HashMap<>());

Review Comment:
   Ok.. I am slightly confused by this comment. The new member is being added 
using `addGroupMember` which internally invokes `withMembers`. I had an 
unwanted call to `updateMemberSubscription` which I have removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-11-26 Thread via GitHub


vamossagar12 commented on PR #14432:
URL: https://github.com/apache/kafka/pull/14432#issuecomment-1827266684

   @dajac , thank you for another round of review. I have handled all review 
comments. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-11-26 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1405738152


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java:
##
@@ -691,6 +735,91 @@ public void testDeleteMember() {
 assertEquals(expectedAssignment, result.targetAssignment());
 }
 
+@Test
+public void testStaticMemberReplace() {
+TargetAssignmentBuilderTestContext context = new 
TargetAssignmentBuilderTestContext(
+"my-group",
+20
+);
+
+Uuid fooTopicId = context.addTopicMetadata("foo", 6, 
Collections.emptyMap());
+Uuid barTopicId = context.addTopicMetadata("bar", 6, 
Collections.emptyMap());
+
+context.addGroupMember("member-1", "member-1", Arrays.asList("foo", 
"bar", "zar"), mkAssignment(
+mkTopicAssignment(fooTopicId, 1, 2),
+mkTopicAssignment(barTopicId, 1, 2)
+));
+
+context.addGroupMember("member-2", "member-2", Arrays.asList("foo", 
"bar", "zar"), mkAssignment(
+mkTopicAssignment(fooTopicId, 3, 4),
+mkTopicAssignment(barTopicId, 3, 4)
+));
+
+context.addGroupMember("member-3", "member-3", Arrays.asList("foo", 
"bar", "zar"), mkAssignment(
+mkTopicAssignment(fooTopicId, 5, 6),
+mkTopicAssignment(barTopicId, 5, 6)
+));
+
+context.updateMemberSubscription("member-1", Arrays.asList("foo", 
"bar", "zar"), Optional.of("member-1"), Optional.empty());
+context.updateMemberSubscription("member-2", Arrays.asList("foo", 
"bar", "zar"), Optional.of("member-2"), Optional.empty());
+context.updateMemberSubscription("member-3", Arrays.asList("foo", 
"bar", "zar"), Optional.of("member-3"), Optional.empty());

Review Comment:
   I see. Thanks for the explanation, I hadn't understood the usage of these 
methods correctly. I have removed these unwanted calls to 
`updateMemberSubscription`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-11-26 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1405737404


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -849,21 +922,53 @@ private 
CoordinatorResult consumerGr
 
 // Get or create the member.
 if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString();
-final ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, createIfNotExists);
-throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions);
-
-if (memberEpoch == 0) {
-log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+ConsumerGroupMember member;
+ConsumerGroupMember.Builder updatedMemberBuilder;
+boolean staticMemberReplaced = false;
+if (instanceId == null) {
+member = group.getOrMaybeCreateMember(memberId, createIfNotExists);
+throwIfMemberEpochIsInvalid(member, memberEpoch, 
ownedTopicPartitions);
+if (createIfNotExists) {
+log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+}
+updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+} else {
+member = group.staticMember(instanceId);
+if (memberEpoch == 0) {
+// A new static member joins or the existing static member 
rejoins.
+if (member == null) {
+// New static member.
+member = group.getOrMaybeCreateMember(memberId, 
createIfNotExists);
+updatedMemberBuilder = new 
ConsumerGroupMember.Builder(member);
+log.info("[GroupId {}] Static member {} with instance id 
{} joins the consumer group", groupId, memberId, instanceId);
+} else {
+// Static member rejoins with a different member id so it 
should replace
+// the previous instance iff the previous member had sent 
a Leave group.
+throwIfInstanceIdIsUnreleased(groupId, memberId, 
instanceId, member);
+// Replace the current member.
+staticMemberReplaced = true;
+updatedMemberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+.setAssignedPartitions(member.assignedPartitions());

Review Comment:
   Thank you for the confirmation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-11-26 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1405737202


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java:
##
@@ -223,20 +238,30 @@ public TargetAssignmentResult build() throws 
PartitionAssignorException {
 Map memberSpecs = new HashMap<>();
 
 // Prepare the member spec for all members.
-members.forEach((memberId, member) -> memberSpecs.put(memberId, 
createAssignmentMemberSpec(
-member,
-targetAssignment.getOrDefault(memberId, Assignment.EMPTY),
-subscriptionMetadata
-)));
+members.forEach((memberId, member) -> {
+memberSpecs.put(memberId, createAssignmentMemberSpec(
+member,
+targetAssignment.getOrDefault(memberId, Assignment.EMPTY),
+subscriptionMetadata
+));
+});
 
 // Update the member spec if updated or deleted members.
 updatedMembers.forEach((memberId, updatedMemberOrNull) -> {
 if (updatedMemberOrNull == null) {
 memberSpecs.remove(memberId);
 } else {
+ConsumerGroupMember member = members.get(memberId);
+Assignment assignment;
+// A new static member joins and needs to replace an existing 
departed one.
+if (member == null && 
staticMembers.containsKey(updatedMemberOrNull.instanceId())) {
+assignment = 
targetAssignment.getOrDefault(staticMembers.get(updatedMemberOrNull.instanceId()),
 Assignment.EMPTY);

Review Comment:
   Yes, your understanding is correct. I see what you are saying about how this 
won't work when a new static member joins. I have updated the group to expose 
the current set of static members in the group.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-11-26 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1405733434


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1004,27 +1118,102 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+private void throwIfInstanceIdIsUnreleased(String groupId, String 
memberId, String instanceId, ConsumerGroupMember member) {
+if (member.memberEpoch() != LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
+// The new member can't join.

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-11-26 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1405732647


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1004,27 +1118,102 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+private void throwIfInstanceIdIsUnreleased(String groupId, String 
memberId, String instanceId, ConsumerGroupMember member) {

Review Comment:
   Done, moved the methods next to `throwIfMemberEpochIsInvalid`, added 
Javadocs and updated the argument names.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-11-26 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1405732004


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -766,6 +834,11 @@ private void throwIfMemberEpochIsInvalid(
 int receivedMemberEpoch,
 List 
ownedTopicPartitions
 ) {
+// If a static member rejoins, it's previous epoch would be -2. In 
such a
+// case, we don't need to fence the member.
+if (member.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH && 
receivedMemberEpoch == 0) {
+return;
+}

Review Comment:
   No it is not. Removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-11-26 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1405732141


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -849,21 +922,53 @@ private 
CoordinatorResult consumerGr
 
 // Get or create the member.
 if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString();
-final ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, createIfNotExists);
-throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions);
-
-if (memberEpoch == 0) {
-log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+ConsumerGroupMember member;
+ConsumerGroupMember.Builder updatedMemberBuilder;
+boolean staticMemberReplaced = false;
+if (instanceId == null) {
+member = group.getOrMaybeCreateMember(memberId, createIfNotExists);
+throwIfMemberEpochIsInvalid(member, memberEpoch, 
ownedTopicPartitions);
+if (createIfNotExists) {
+log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+}
+updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+} else {
+member = group.staticMember(instanceId);
+if (memberEpoch == 0) {
+// A new static member joins or the existing static member 
rejoins.
+if (member == null) {
+// New static member.
+member = group.getOrMaybeCreateMember(memberId, 
createIfNotExists);
+updatedMemberBuilder = new 
ConsumerGroupMember.Builder(member);
+log.info("[GroupId {}] Static member {} with instance id 
{} joins the consumer group", groupId, memberId, instanceId);
+} else {
+// Static member rejoins with a different member id so it 
should replace
+// the previous instance iff the previous member had sent 
a Leave group.
+throwIfInstanceIdIsUnreleased(groupId, memberId, 
instanceId, member);
+// Replace the current member.
+staticMemberReplaced = true;
+updatedMemberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+.setAssignedPartitions(member.assignedPartitions());
+removeMemberAndCancelTimers(records, group.groupId(), 
member.memberId());

Review Comment:
   Added a log line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-11-24 Thread via GitHub


dajac commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1404037483


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -691,6 +708,10 @@ private void throwIfConsumerGroupHeartbeatRequestIsInvalid(
 if (request.subscribedTopicNames() == null || 
request.subscribedTopicNames().isEmpty()) {
 throw new InvalidRequestException("SubscribedTopicNames must 
be set in first request.");
 }
+} else if (request.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
+throwIfEmptyString(request.memberId(), "MemberId can't be empty.");
+throwIfNull(request.instanceId(), "InstanceId can't be null for 
Static Member. GroupId: "

Review Comment:
   I would rather use `InstanceId can't be null.` here in order to be 
consistent with the other error messages.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -751,6 +772,53 @@ private void throwIfConsumerGroupIsFull(
 }
 }
 
+/**
+ * Validates and throws an error when the validation fails for static 
member.
+ * @param groupId The group id
+ * @param instanceId  The instance id
+ * @param member  The existing static member in the group.
+ * @param memberEpoch The member epoch with which the static member sends 
heartbeat.
+ * @param memberIdThe member id with which the member joins now.
+ *
+ * @throws UnknownMemberIdException if member sends heartbeat with a 
non-zero epoch and no static member exists for
+ * the instance id.
+ * @throws org.apache.kafka.common.errors.FencedInstanceIdException If 
member joins with non-zero epoch but there
+ * already exists a static member with a different memberId.
+ * @throws org.apache.kafka.common.errors.UnreleasedInstanceIdException A 
new member is trying to leave the group
+ * but the existing static member hasn't requested leaving the group.
+
+ */
+private void throwIfStaticMemberValidationFails(

Review Comment:
   I suppose that we could remove this one now. Could we?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -849,21 +922,53 @@ private 
CoordinatorResult consumerGr
 
 // Get or create the member.
 if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString();
-final ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, createIfNotExists);
-throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions);
-
-if (memberEpoch == 0) {
-log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+ConsumerGroupMember member;
+ConsumerGroupMember.Builder updatedMemberBuilder;
+boolean staticMemberReplaced = false;
+if (instanceId == null) {
+member = group.getOrMaybeCreateMember(memberId, createIfNotExists);
+throwIfMemberEpochIsInvalid(member, memberEpoch, 
ownedTopicPartitions);
+if (createIfNotExists) {
+log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+}
+updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+} else {
+member = group.staticMember(instanceId);
+if (memberEpoch == 0) {
+// A new static member joins or the existing static member 
rejoins.
+if (member == null) {
+// New static member.
+member = group.getOrMaybeCreateMember(memberId, 
createIfNotExists);
+updatedMemberBuilder = new 
ConsumerGroupMember.Builder(member);
+log.info("[GroupId {}] Static member {} with instance id 
{} joins the consumer group", groupId, memberId, instanceId);
+} else {
+// Static member rejoins with a different member id so it 
should replace
+// the previous instance iff the previous member had sent 
a Leave group.
+throwIfInstanceIdIsUnreleased(groupId, memberId, 
instanceId, member);
+// Replace the current member.
+staticMemberReplaced = true;
+updatedMemberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+.setAssignedPartitions(member.assignedPartitions());
+removeMemberAndCancelTimers(records, group.groupId(), 
member.memberId());
+}
+} else {
+throwIfStaticMemberIsUnknown(member, instanceId);
+throwIfInstanceIdIsFenced(memberId, instanceId, member);

Review Comment:
   nit: I would put `member` as the first argument to be consistent with the 
other helpers.



##

Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-11-23 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1402217667


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -829,21 +902,50 @@ private 
CoordinatorResult consumerGr
 
 // Get or create the member.
 if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString();
-final ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, createIfNotExists);
-throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions);
-
-if (memberEpoch == 0) {
-log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+ConsumerGroupMember member;
+ConsumerGroupMember.Builder updatedMemberBuilder;
+ConsumerGroupMember updatedMember;
+boolean staticMemberReplaced = false;
+if (instanceId == null) {
+member = group.getOrMaybeCreateMember(memberId, createIfNotExists);
+throwIfMemberEpochIsInvalid(member, memberEpoch, 
ownedTopicPartitions);
+if (createIfNotExists) {
+log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+}
+updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+} else {
+member = group.staticMember(instanceId);
+throwIfStaticMemberValidationFails(groupId, instanceId, member, 
memberEpoch, memberId);
+if (member == null) {
+member = group.getOrMaybeCreateMember(memberId, 
createIfNotExists);
+}
+throwIfMemberEpochIsInvalid(member, memberEpoch, 
ownedTopicPartitions);
+if (createIfNotExists) {
+log.info("[GroupId {}] Member {}, Instance-Id {} joins the 
consumer group.", groupId, memberId, instanceId);
+}
+staticMemberReplaced = staticMemberReplaced(memberEpoch, member);
+if (staticMemberReplaced) {
+removeMemberAndCancelTimers(records, group.groupId(), 
member.memberId());
+// The replacing member gets the same set of assignments as 
the departed member.
+updatedMemberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+.setAssignedPartitions(member.assignedPartitions())
+
.setPartitionsPendingAssignment(member.partitionsPendingAssignment())
+
.setPartitionsPendingRevocation(member.partitionsPendingRevocation());

Review Comment:
   If I don't set `setAssignedPartitions`, then the replacing static member 
doesn't get it's assignments back. The pending assignments bit, I added just to 
ensure all assignments from previous member are assigned to the new member. The 
reason that happens is that it lands up 
[here](https://github.com/apache/kafka/blob/6b26c0428a5cf1f64e458e5a33df3bbfb0e33c6b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java#L175)
 and if there are no partitions assigned, it gets back empty assignments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-11-23 Thread via GitHub


vamossagar12 commented on PR #14432:
URL: https://github.com/apache/kafka/pull/14432#issuecomment-1824047001

   Thanks @dajac . I have addressed all the comments. The second part of 
[this](https://github.com/apache/kafka/pull/14432/files#r1397382111) comment, I 
am not sure which new data structures being referred here. The test 
`testStaticMemberGetsBackAssignmentUponRejoin` asserts all the records returned 
after replay. Is that what you are referring to?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-11-22 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1402217667


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -829,21 +902,50 @@ private 
CoordinatorResult consumerGr
 
 // Get or create the member.
 if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString();
-final ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, createIfNotExists);
-throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions);
-
-if (memberEpoch == 0) {
-log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+ConsumerGroupMember member;
+ConsumerGroupMember.Builder updatedMemberBuilder;
+ConsumerGroupMember updatedMember;
+boolean staticMemberReplaced = false;
+if (instanceId == null) {
+member = group.getOrMaybeCreateMember(memberId, createIfNotExists);
+throwIfMemberEpochIsInvalid(member, memberEpoch, 
ownedTopicPartitions);
+if (createIfNotExists) {
+log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+}
+updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+} else {
+member = group.staticMember(instanceId);
+throwIfStaticMemberValidationFails(groupId, instanceId, member, 
memberEpoch, memberId);
+if (member == null) {
+member = group.getOrMaybeCreateMember(memberId, 
createIfNotExists);
+}
+throwIfMemberEpochIsInvalid(member, memberEpoch, 
ownedTopicPartitions);
+if (createIfNotExists) {
+log.info("[GroupId {}] Member {}, Instance-Id {} joins the 
consumer group.", groupId, memberId, instanceId);
+}
+staticMemberReplaced = staticMemberReplaced(memberEpoch, member);
+if (staticMemberReplaced) {
+removeMemberAndCancelTimers(records, group.groupId(), 
member.memberId());
+// The replacing member gets the same set of assignments as 
the departed member.
+updatedMemberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+.setAssignedPartitions(member.assignedPartitions())
+
.setPartitionsPendingAssignment(member.partitionsPendingAssignment())
+
.setPartitionsPendingRevocation(member.partitionsPendingRevocation());

Review Comment:
   If I don't set `setAssignedPartitions`, then the replacing static member 
doesn't get it's assignments back. The pending assignments bit, I added just to 
ensure all assignments from previous member are assigned to the new member.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-11-22 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1402215662


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -898,31 +1000,45 @@ private 
CoordinatorResult consumerGr
 group.setMetadataRefreshDeadline(currentTimeMs + 
consumerGroupMetadataRefreshIntervalMs, groupEpoch);
 }
 
-// 2. Update the target assignment if the group epoch is larger than 
the target assignment epoch. The
-// delta between the existing and the new target assignment is 
persisted to the partition.
+// 2. Update the target assignment if the group epoch is larger than 
the target assignment epoch or a static member
+// replaces an existing static member. The delta between the existing 
and the new target assignment is persisted to the partition.
 int targetAssignmentEpoch = group.assignmentEpoch();
 Assignment targetAssignment = group.targetAssignment(memberId);
-if (groupEpoch > targetAssignmentEpoch) {
+if (groupEpoch > targetAssignmentEpoch || staticMemberReplaced) {
 String preferredServerAssignor = 
group.computePreferredServerAssignor(
 member,
 updatedMember
 ).orElse(defaultAssignor.name());
 
 try {
-TargetAssignmentBuilder.TargetAssignmentResult 
assignmentResult =
-new TargetAssignmentBuilder(groupId, groupEpoch, 
assignors.get(preferredServerAssignor))
+TargetAssignmentBuilder assignmentResultBuilder =
+new TargetAssignmentBuilder(groupId, groupEpoch, 
assignors.get(preferredServerAssignor));
+TargetAssignmentBuilder.TargetAssignmentResult 
assignmentResult;
+// A new static member is replacing an older one with the same 
subscriptions.
+// We just need to remove the older member and add the newer 
one. The new member can
+// reuse the target assignment of the older member.
+if (staticMemberReplaced && groupEpoch == 
targetAssignmentEpoch) {

Review Comment:
   Actually, `groupEpoch == targetAssignmentEpoch` is not needed. I was just 
trying to ensure that the group epoch and target member epoch are the same 
which is what will happen when static member is replaced. So, in a way it's 
redundant. I will remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-11-20 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1399522392


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -671,6 +688,10 @@ private void throwIfConsumerGroupHeartbeatRequestIsInvalid(
 if (request.subscribedTopicNames() == null || 
request.subscribedTopicNames().isEmpty()) {
 throw new InvalidRequestException("SubscribedTopicNames must 
be set in first request.");
 }
+} else if (request.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
+throwIfEmptyString(request.memberId(), "MemberId can't be empty.");
+throwIfNull(request.instanceId(), "InstanceId can't be null or 
empty for Static Member. GroupId: "

Review Comment:
   sure. `instance id should not be empty` check is already happening 
[here](https://github.com/apache/kafka/blob/trunk/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java#L658)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-11-20 Thread via GitHub


dajac commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1397287511


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -829,21 +902,50 @@ private 
CoordinatorResult consumerGr
 
 // Get or create the member.
 if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString();
-final ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, createIfNotExists);
-throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions);
-
-if (memberEpoch == 0) {
-log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+ConsumerGroupMember member;
+ConsumerGroupMember.Builder updatedMemberBuilder;
+ConsumerGroupMember updatedMember;
+boolean staticMemberReplaced = false;
+if (instanceId == null) {
+member = group.getOrMaybeCreateMember(memberId, createIfNotExists);
+throwIfMemberEpochIsInvalid(member, memberEpoch, 
ownedTopicPartitions);
+if (createIfNotExists) {
+log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+}
+updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+} else {
+member = group.staticMember(instanceId);

Review Comment:
   I still find this logic quite complex to follow. I wonder if we could be a 
little more explicit. I think that the complexity comes from 
`throwIfStaticMemberValidationFails` which hide quite a lot of the logic.
   
   I wonder if something as follow would be better. I am not sure... What do 
you think?
   
   ```
   ConsumerGroupMember existingMember = group.staticMember(instanceId);
   if (memberEpoch == 0) {
  // A new static member joins or the existing static member rejoins.
  if (existingMember == null) {
 // New static member.
 member = group.getOrMaybeCreateMember(memberId, true);
 updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
  } else {
 // Static member rejoins with a different instance id so it should 
replace
 // the previous instance iff the previous instance has -2.
 if (existingMember.memberEpoch() != LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
// The new member can't join.
throw Errors.UNRELEASED_INSTANCE_ID.exception(...);
 } else {
// Replace the current member.
   staticMemberReplaced = true;
   member = existingMember;
   updatedMemberBuilder = new 
ConsumerGroupMember.Builder(group.getOrMaybeCreateMember(memberId, true));
   removeMemberAndCancelTimers(records, group.groupId(), 
member.memberId());
 }
   } else {
  // Check member id or throw FENCED_INSTANCE_ID
  // Check epoch with throwIfMemberEpochIsInvalid
   }
   ```
   
   Note that I just wrote this without testing it so the code is likely not 
100% correct :).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-11-17 Thread via GitHub


dajac commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1397287511


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -829,21 +902,50 @@ private 
CoordinatorResult consumerGr
 
 // Get or create the member.
 if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString();
-final ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, createIfNotExists);
-throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions);
-
-if (memberEpoch == 0) {
-log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+ConsumerGroupMember member;
+ConsumerGroupMember.Builder updatedMemberBuilder;
+ConsumerGroupMember updatedMember;
+boolean staticMemberReplaced = false;
+if (instanceId == null) {
+member = group.getOrMaybeCreateMember(memberId, createIfNotExists);
+throwIfMemberEpochIsInvalid(member, memberEpoch, 
ownedTopicPartitions);
+if (createIfNotExists) {
+log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+}
+updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+} else {
+member = group.staticMember(instanceId);

Review Comment:
   I still find this logic quite complex to follow. I wonder if we could be a 
little more explicit. I think that the complexity comes from 
`throwIfStaticMemberValidationFails` which hide quite a lot of the logic.
   
   I wonder if something as follow would be better. I am not sure... What do 
you think?
   
   ```
   ConsumerGroupMember existingMember = group.staticMember(instanceId);
   if (memberEpoch == 0) {
  // A new static member joins or the existing static member rejoins.
  if (existingMember == null) {
 // New static member.
 member = group.getOrMaybeCreateMember(memberId, true);
 updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
  } else if (existingMember.memberId().equals(memberId) {
 // Static member rejoins the group with epoch 0.
 member = existingMember;
 updatedMemberBuilder = new ConsumerGroupMember.Builder(existingMember);
  } else {
 // Static member rejoins with a different instance id so it should 
replace
 // the previous instance iff the previous instance has -2.
 if (existingMember.memberEpoch() != LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
// The new member can't join.
throw Errors.UNRELEASED_INSTANCE_ID.exception(...);
 } else {
// Replace the current member.
   staticMemberReplaced = true;
   member = existingMember;
   updatedMemberBuilder = new 
ConsumerGroupMember.Builder(group.getOrMaybeCreateMember(memberId, true));
   removeMemberAndCancelTimers(records, group.groupId(), 
member.memberId());
 }
   } else {
  // Check member id or throw FENCED_INSTANCE_ID
  // Check epoch with throwIfMemberEpochIsInvalid
   }
   ```
   
   Note that I just wrote this without testing it so the code is likely not 
100% correct :).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-11-06 Thread via GitHub


vamossagar12 commented on PR #14432:
URL: https://github.com/apache/kafka/pull/14432#issuecomment-1795371595

   Thanks @dajac , I have addressed the review comments. Please let me know how 
the changes are looking now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-11-06 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1383620879


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -731,6 +752,36 @@ private void throwIfConsumerGroupIsFull(
 }
 }
 
+private void throwIfStaticMemberValidationFails(
+String groupId,
+String instanceId,
+ConsumerGroupMember existingStaticMember,
+int memberEpoch,
+String memberId
+) {
+if (memberEpoch != 0) {
+// The member joined with a non-zero epoch but we haven't 
registered this static member
+// This could be an unknown member for the coordinator.
+if (existingStaticMember == null) {
+throw Errors.UNKNOWN_MEMBER_ID.exception("No existing static 
member found against instance id: " +  instanceId);
+}
+// There already exists a different member id for the same 
instance id.
+if (!existingStaticMember.memberId().equals(memberId)) {
+log.info("Request memberId={} for static member with 
groupInstanceId={} " +
+"is fenced by existing memberId={}",
+memberId, instanceId, existingStaticMember.memberId());
+throw Errors.FENCED_INSTANCE_ID.exception();

Review Comment:
   Added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-11-06 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1383618637


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -983,27 +1085,81 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+private void replaceStaticMemberInConsumerGroup(
+List records,
+ConsumerGroup group,
+ConsumerGroupMember existingStaticMember
+) {
+// Write tombstones for the departed static member.
+removeMember(records, group.groupId(), 
existingStaticMember.memberId());
+// Cancel all the timers of the departed static member.
+cancelTimers(group.groupId(), existingStaticMember.memberId());
+}
+
+private boolean staticMemberReplaced(int memberEpoch, ConsumerGroupMember 
existingStaticMember) {
+return memberEpoch == 0 && existingStaticMember != null && 
existingStaticMember.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH;
+}
+
 /**
  * Handles leave request from a consumer group member.
  * @param groupId   The group id from the request.
  * @param memberId  The member id from the request.
+ * @param memberEpoch   The member epoch from the request.
  *
  * @return A Result containing the ConsumerGroupHeartbeat response and
  * a list of records to update the state machine.
  */
 private CoordinatorResult 
consumerGroupLeave(
 String groupId,
-String memberId
+String instanceId,
+String memberId,
+int memberEpoch
 ) throws ApiException {
 ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false);
-ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, 
false);
+ConsumerGroupMember member = memberEpoch == 
LEAVE_GROUP_STATIC_MEMBER_EPOCH ?
+group.staticMember(instanceId) :
+group.getOrMaybeCreateMember(memberId, false);
 
-log.info("[GroupId " + groupId + "] Member " + memberId + " left the 
consumer group.");
-
-List records = consumerGroupFenceMember(group, member);
+List records = new ArrayList<>();
+if (memberEpoch == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
+throwIfStaticMemberValidationFails(groupId, instanceId, member, 
memberEpoch, memberId);
+log.info("[GroupId {}] Member {} with instance id {} is a static 
member and will not be fenced from the group",
+group.groupId(), member.memberId(), member.instanceId());
+records.addAll(consumerGroupStaticMemberGroupLeave(group, member, 
memberId));
+} else {
+log.info("[GroupId {}] Member {} left the consumer group.", 
groupId, memberId);
+records.addAll(consumerGroupFenceMember(group, member));
+}
 return new CoordinatorResult<>(records, new 
ConsumerGroupHeartbeatResponseData()
 .setMemberId(memberId)
-.setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH));
+.setMemberEpoch(memberEpoch));
+}
+
+/**
+ * Handles the case when a static member decides to leave the group.
+ * The member is not actually fenced from the group, and instead it's
+ * member epoch is updated to -2 to reflect that a member using the given
+ * instance id decided to leave the group and would be back within session
+ * timeout.
+ *
+ * @param group   The group.
+ * @param existingStaticMember  The member.

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-11-06 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1383617522


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -983,27 +1085,81 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+private void replaceStaticMemberInConsumerGroup(
+List records,
+ConsumerGroup group,
+ConsumerGroupMember existingStaticMember
+) {
+// Write tombstones for the departed static member.
+removeMember(records, group.groupId(), 
existingStaticMember.memberId());
+// Cancel all the timers of the departed static member.
+cancelTimers(group.groupId(), existingStaticMember.memberId());
+}
+
+private boolean staticMemberReplaced(int memberEpoch, ConsumerGroupMember 
existingStaticMember) {
+return memberEpoch == 0 && existingStaticMember != null && 
existingStaticMember.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH;
+}
+
 /**
  * Handles leave request from a consumer group member.
  * @param groupId   The group id from the request.
  * @param memberId  The member id from the request.
+ * @param memberEpoch   The member epoch from the request.
  *
  * @return A Result containing the ConsumerGroupHeartbeat response and
  * a list of records to update the state machine.
  */
 private CoordinatorResult 
consumerGroupLeave(
 String groupId,
-String memberId
+String instanceId,
+String memberId,
+int memberEpoch
 ) throws ApiException {
 ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false);
-ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, 
false);
+ConsumerGroupMember member = memberEpoch == 
LEAVE_GROUP_STATIC_MEMBER_EPOCH ?
+group.staticMember(instanceId) :
+group.getOrMaybeCreateMember(memberId, false);
 
-log.info("[GroupId " + groupId + "] Member " + memberId + " left the 
consumer group.");
-
-List records = consumerGroupFenceMember(group, member);
+List records = new ArrayList<>();
+if (memberEpoch == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
+throwIfStaticMemberValidationFails(groupId, instanceId, member, 
memberEpoch, memberId);
+log.info("[GroupId {}] Member {} with instance id {} is a static 
member and will not be fenced from the group",
+group.groupId(), member.memberId(), member.instanceId());

Review Comment:
   done.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -983,27 +1085,81 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+private void replaceStaticMemberInConsumerGroup(
+List records,
+ConsumerGroup group,
+ConsumerGroupMember existingStaticMember
+) {
+// Write tombstones for the departed static member.
+removeMember(records, group.groupId(), 
existingStaticMember.memberId());
+// Cancel all the timers of the departed static member.
+cancelTimers(group.groupId(), existingStaticMember.memberId());
+}
+
+private boolean staticMemberReplaced(int memberEpoch, ConsumerGroupMember 
existingStaticMember) {
+return memberEpoch == 0 && existingStaticMember != null && 
existingStaticMember.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH;
+}
+
 /**
  * Handles leave request from a consumer group member.
  * @param groupId   The group id from the request.
  * @param memberId  The member id from the request.
+ * @param memberEpoch   The member epoch from the request.
  *
  * @return A Result containing the ConsumerGroupHeartbeat response and
  * a list of records to update the state machine.
  */
 private CoordinatorResult 
consumerGroupLeave(
 String groupId,
-String memberId
+String instanceId,
+String memberId,
+int memberEpoch
 ) throws ApiException {
 ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false);
-ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, 
false);
+ConsumerGroupMember member = memberEpoch == 
LEAVE_GROUP_STATIC_MEMBER_EPOCH ?
+group.staticMember(instanceId) :
+group.getOrMaybeCreateMember(memberId, false);
 
-log.info("[GroupId " + groupId + "] Member " + memberId + " left the 
consumer group.");
-
-List records = consumerGroupFenceMember(group, member);
+List records = new ArrayList<>();
+if (memberEpoch == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the 

Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-11-06 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1383616531


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -983,27 +1085,81 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+private void replaceStaticMemberInConsumerGroup(

Review Comment:
   done.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -983,27 +1085,81 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+private void replaceStaticMemberInConsumerGroup(
+List records,
+ConsumerGroup group,
+ConsumerGroupMember existingStaticMember
+) {
+// Write tombstones for the departed static member.
+removeMember(records, group.groupId(), 
existingStaticMember.memberId());
+// Cancel all the timers of the departed static member.
+cancelTimers(group.groupId(), existingStaticMember.memberId());
+}
+
+private boolean staticMemberReplaced(int memberEpoch, ConsumerGroupMember 
existingStaticMember) {
+return memberEpoch == 0 && existingStaticMember != null && 
existingStaticMember.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH;
+}
+
 /**
  * Handles leave request from a consumer group member.
  * @param groupId   The group id from the request.
  * @param memberId  The member id from the request.
+ * @param memberEpoch   The member epoch from the request.
  *
  * @return A Result containing the ConsumerGroupHeartbeat response and
  * a list of records to update the state machine.
  */
 private CoordinatorResult 
consumerGroupLeave(
 String groupId,
-String memberId
+String instanceId,
+String memberId,
+int memberEpoch
 ) throws ApiException {
 ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false);
-ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, 
false);
+ConsumerGroupMember member = memberEpoch == 
LEAVE_GROUP_STATIC_MEMBER_EPOCH ?
+group.staticMember(instanceId) :
+group.getOrMaybeCreateMember(memberId, false);
 
-log.info("[GroupId " + groupId + "] Member " + memberId + " left the 
consumer group.");
-
-List records = consumerGroupFenceMember(group, member);
+List records = new ArrayList<>();
+if (memberEpoch == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
+throwIfStaticMemberValidationFails(groupId, instanceId, member, 
memberEpoch, memberId);

Review Comment:
   Removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-11-06 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1383615616


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -983,27 +1085,81 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);

Review Comment:
   Yes, I am now building the replacing static member with the same set of 
assignments (target/pending). This forces it directly to have the current 
assignment record for the new member.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-11-06 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1383612935


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -898,31 +987,44 @@ private 
CoordinatorResult consumerGr
 group.setMetadataRefreshDeadline(currentTimeMs + 
consumerGroupMetadataRefreshIntervalMs, groupEpoch);
 }
 
-// 2. Update the target assignment if the group epoch is larger than 
the target assignment epoch. The
-// delta between the existing and the new target assignment is 
persisted to the partition.
+// 2. Update the target assignment if the group epoch is larger than 
the target assignment epoch or a static member
+// replaces an existing static member. The delta between the existing 
and the new target assignment is persisted to the partition.
 int targetAssignmentEpoch = group.assignmentEpoch();
 Assignment targetAssignment = group.targetAssignment(memberId);
-if (groupEpoch > targetAssignmentEpoch) {
+if (groupEpoch > targetAssignmentEpoch || staticMemberReplaced) {
 String preferredServerAssignor = 
group.computePreferredServerAssignor(
 member,
 updatedMember
 ).orElse(defaultAssignor.name());
 
 try {
-TargetAssignmentBuilder.TargetAssignmentResult 
assignmentResult =
-new TargetAssignmentBuilder(groupId, groupEpoch, 
assignors.get(preferredServerAssignor))
+TargetAssignmentBuilder assignmentResultBuilder =
+new TargetAssignmentBuilder(groupId, groupEpoch, 
assignors.get(preferredServerAssignor));
+TargetAssignmentBuilder.TargetAssignmentResult 
assignmentResult;
+// A new static member is replacing an older one with the same 
subscriptions.
+// We just need to remove the older member and add the newer 
one. The new member can
+// reuse the target assignment of the older member.
+if (staticMemberReplaced && groupEpoch == 
targetAssignmentEpoch) {
+targetAssignment = 
group.targetAssignment(existingStaticMember.memberId());
+assignmentResult = assignmentResultBuilder
+.removeMember(existingStaticMember.memberId())
+.addOrUpdateMember(memberId, updatedMember)
+.build();
+records.addAll(assignmentResult.records());
+} else {
+assignmentResult = assignmentResultBuilder
 .withMembers(group.members())
 .withSubscriptionMetadata(subscriptionMetadata)
 .withTargetAssignment(group.targetAssignment())
 .addOrUpdateMember(memberId, updatedMember)

Review Comment:
   I think I got it now. I added some state tracking to the 
`TargetAssignmentBuilder` so that it doesn't compute assignments for a 
replacing static member.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-11-06 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1383612141


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -898,31 +987,44 @@ private 
CoordinatorResult consumerGr
 group.setMetadataRefreshDeadline(currentTimeMs + 
consumerGroupMetadataRefreshIntervalMs, groupEpoch);
 }
 
-// 2. Update the target assignment if the group epoch is larger than 
the target assignment epoch. The
-// delta between the existing and the new target assignment is 
persisted to the partition.
+// 2. Update the target assignment if the group epoch is larger than 
the target assignment epoch or a static member
+// replaces an existing static member. The delta between the existing 
and the new target assignment is persisted to the partition.
 int targetAssignmentEpoch = group.assignmentEpoch();
 Assignment targetAssignment = group.targetAssignment(memberId);
-if (groupEpoch > targetAssignmentEpoch) {
+if (groupEpoch > targetAssignmentEpoch || staticMemberReplaced) {
 String preferredServerAssignor = 
group.computePreferredServerAssignor(
 member,
 updatedMember
 ).orElse(defaultAssignor.name());
 
 try {
-TargetAssignmentBuilder.TargetAssignmentResult 
assignmentResult =
-new TargetAssignmentBuilder(groupId, groupEpoch, 
assignors.get(preferredServerAssignor))
+TargetAssignmentBuilder assignmentResultBuilder =
+new TargetAssignmentBuilder(groupId, groupEpoch, 
assignors.get(preferredServerAssignor));
+TargetAssignmentBuilder.TargetAssignmentResult 
assignmentResult;
+// A new static member is replacing an older one with the same 
subscriptions.
+// We just need to remove the older member and add the newer 
one. The new member can
+// reuse the target assignment of the older member.
+if (staticMemberReplaced && groupEpoch == 
targetAssignmentEpoch) {
+targetAssignment = 
group.targetAssignment(existingStaticMember.memberId());
+assignmentResult = assignmentResultBuilder
+.removeMember(existingStaticMember.memberId())
+.addOrUpdateMember(memberId, updatedMember)
+.build();
+records.addAll(assignmentResult.records());

Review Comment:
   I have added members and target assignment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-11-06 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1383610866


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -829,20 +885,49 @@ private 
CoordinatorResult consumerGr
 
 // Get or create the member.
 if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString();
-final ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, createIfNotExists);
+ConsumerGroupMember member;
+ConsumerGroupMember existingStaticMember = null;
+if (instanceId == null) {
+member = group.getOrMaybeCreateMember(memberId, createIfNotExists);
+} else {
+existingStaticMember = group.staticMember(instanceId);
+throwIfStaticMemberValidationFails(groupId, instanceId, 
existingStaticMember, memberEpoch, memberId);
+member = group.getOrMaybeCreateMember(memberId, createIfNotExists);
+}

Review Comment:
   I have updated the logic in line with your suggestion. 



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -869,6 +954,10 @@ private 
CoordinatorResult consumerGr
 groupId, memberId, updatedMember.subscribedTopicRegex());
 bumpGroupEpoch = true;
 }
+} else {
+if (staticMemberReplaced) {
+records.add(newMemberSubscriptionRecord(groupId, 
updatedMember));
+}

Review Comment:
   Yes, that makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-10-31 Thread via GitHub


dajac commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1377557816


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -869,6 +954,10 @@ private 
CoordinatorResult consumerGr
 groupId, memberId, updatedMember.subscribedTopicRegex());
 bumpGroupEpoch = true;
 }
+} else {
+if (staticMemberReplaced) {
+records.add(newMemberSubscriptionRecord(groupId, 
updatedMember));
+}

Review Comment:
   If we rely on `member` and `updatedMember` then we don't need this because 
`!updatedMember.equals(member)` will catch the new member.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -829,20 +885,49 @@ private 
CoordinatorResult consumerGr
 
 // Get or create the member.
 if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString();
-final ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, createIfNotExists);
+ConsumerGroupMember member;
+ConsumerGroupMember existingStaticMember = null;
+if (instanceId == null) {
+member = group.getOrMaybeCreateMember(memberId, createIfNotExists);
+} else {
+existingStaticMember = group.staticMember(instanceId);
+throwIfStaticMemberValidationFails(groupId, instanceId, 
existingStaticMember, memberEpoch, memberId);
+member = group.getOrMaybeCreateMember(memberId, createIfNotExists);
+}

Review Comment:
   I wonder if we could simplify it even more. For instance, would it be 
possible to have something like the following:
   
   ```
   ConsumerGroupMember member;
   ConsumerGroupMember updatedMember;
   
   if (instanceId == null) {
 member = group.getOrMaybeCreateMember(memberId, createIfNotExists);
 throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions);
 log.info(...);
 updatedMember = new ConsumerGroupMember.Builder(member)

   } else {
 // the new logic.
 // member is the current static member.
 // updatedMember is the updated current member or the new one.
   }
   ```
   
   



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -983,27 +1085,81 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+private void replaceStaticMemberInConsumerGroup(
+List records,
+ConsumerGroup group,
+ConsumerGroupMember existingStaticMember
+) {
+// Write tombstones for the departed static member.
+removeMember(records, group.groupId(), 
existingStaticMember.memberId());
+// Cancel all the timers of the departed static member.
+cancelTimers(group.groupId(), existingStaticMember.memberId());
+}
+
+private boolean staticMemberReplaced(int memberEpoch, ConsumerGroupMember 
existingStaticMember) {
+return memberEpoch == 0 && existingStaticMember != null && 
existingStaticMember.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH;
+}
+
 /**
  * Handles leave request from a consumer group member.
  * @param groupId   The group id from the request.
  * @param memberId  The member id from the request.
+ * @param memberEpoch   The member epoch from the request.
  *
  * @return A Result containing the ConsumerGroupHeartbeat response and
  * a list of records to update the state machine.
  */
 private CoordinatorResult 
consumerGroupLeave(
 String groupId,
-String memberId
+String instanceId,
+String memberId,
+int memberEpoch
 ) throws ApiException {
 ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false);
-ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, 
false);
+ConsumerGroupMember member = memberEpoch == 
LEAVE_GROUP_STATIC_MEMBER_EPOCH ?
+group.staticMember(instanceId) :
+group.getOrMaybeCreateMember(memberId, false);
 
-log.info("[GroupId " + groupId + "] Member " + memberId + " left the 
consumer group.");
-
-List records = consumerGroupFenceMember(group, member);
+List records = new ArrayList<>();
+if (memberEpoch == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
+throwIfStaticMemberValidationFails(groupId, instanceId, member, 
memberEpoch, memberId);
+log.info("[GroupId {}] Member {} with instance id {} is a static 
member and will not be fenced from the group",
+group.groupId(), member.memberId(), member.instanceId());

Review Comment:
   nit: `"[GroupId {}] Static member {} with member id {} left the consumer 
group."`? I would also use a similar logging structure for the other log 

Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-10-24 Thread via GitHub


vamossagar12 commented on PR #14432:
URL: https://github.com/apache/kafka/pull/14432#issuecomment-1777654947

   Thanks @dajac , I have addressed all the comments. Have added a couple of 
more questions. Thanks for all the pointers, the changes look cleaner than the 
previous iteration (to me atleast).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-10-24 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1370542261


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -1908,6 +1909,715 @@ public void testLeavingMemberBumpsGroupEpoch() {
 assertRecordsEquals(expectedRecords, result.records());
 }
 
+@Test
+public void testGroupEpochBumpWhenNewStaticMemberJoins() {
+String groupId = "fooup";
+// Use a static member id as it makes the test easier.
+String memberId1 = Uuid.randomUuid().toString();
+String memberId2 = Uuid.randomUuid().toString();
+String memberId3 = Uuid.randomUuid().toString();
+
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+Uuid barTopicId = Uuid.randomUuid();
+String barTopicName = "bar";
+
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+// Consumer group with two static members.
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 6)
+.addTopic(barTopicId, barTopicName, 3)
+.addRacks()
+.build())
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId1)
+.setInstanceId(memberId1)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(9)
+.setTargetMemberEpoch(10)
+.setClientId("client")
+.setClientHost("localhost/127.0.0.1")
+.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+.setServerAssignorName("range")
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1, 2),
+mkTopicAssignment(barTopicId, 0, 1)))
+.build())
+.withMember(new ConsumerGroupMember.Builder(memberId2)
+.setInstanceId(memberId2)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(9)
+.setTargetMemberEpoch(10)
+.setClientId("client")
+.setClientHost("localhost/127.0.0.1")
+// Use zar only here to ensure that metadata needs to be 
recomputed.
+.setSubscribedTopicNames(Arrays.asList("foo", "bar", 
"zar"))
+.setServerAssignorName("range")
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(fooTopicId, 3, 4, 5),
+mkTopicAssignment(barTopicId, 2)))
+.build())
+.withAssignment(memberId1, mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1, 2),
+mkTopicAssignment(barTopicId, 0, 1)))
+.withAssignment(memberId2, mkAssignment(
+mkTopicAssignment(fooTopicId, 3, 4, 5),
+mkTopicAssignment(barTopicId, 2)))
+.withAssignmentEpoch(10))
+.build();
+
+assignor.prepareGroupAssignment(new GroupAssignment(
+new HashMap() {
+{
+put(memberId1, new MemberAssignment(mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1),
+mkTopicAssignment(barTopicId, 0)
+)));
+put(memberId2, new MemberAssignment(mkAssignment(
+mkTopicAssignment(fooTopicId, 2, 3),
+mkTopicAssignment(barTopicId, 1)
+)));
+put(memberId3, new MemberAssignment(mkAssignment(
+mkTopicAssignment(fooTopicId, 4, 5),
+mkTopicAssignment(barTopicId, 2)
+)));
+}
+}
+));
+
+// Member 3 joins the consumer group.
+CoordinatorResult result = 
context.consumerGroupHeartbeat(
+new ConsumerGroupHeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId3)
+.setInstanceId(memberId3)
+.setMemberEpoch(0)
+.setRebalanceTimeoutMs(5000)
+.setServerAssignor("range")
+.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+.setTopicPartitions(Collections.emptyList()));
+
+assertResponseEquals(
+new ConsumerGroupHeartbeatResponseData()
+.setMemberId(memberId3)
+.setMemberEpoch(11)
+

Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-10-24 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1370539621


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -285,6 +302,11 @@ public ConsumerGroupMember getOrMaybeCreateMember(
 return member;
 }
 
+public ConsumerGroupMember getStaticMember(String instanceId) {

Review Comment:
   Done.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -259,6 +265,17 @@ public void setTargetAssignmentEpoch(int 
targetAssignmentEpoch) {
 maybeUpdateGroupState();
 }
 
+/**
+ * Get member id of a static member that matches the given group
+ * instance id.
+ *
+ * @param groupInstanceId the group instance id.
+ * @return the static member if it exists.

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-10-24 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1370538866


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -983,27 +1052,125 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+private void replaceStaticMemberInConsumerGroup(
+String groupId,
+List records,
+ConsumerGroup group,
+ConsumerGroupMember member,
+ConsumerGroupMember existingStaticMember,
+ConsumerGroupMember updatedMember
+) {
+// Write tombstones for the departed static member. Follow the same 
order as the one followed when fencing
+// a member
+records.add(newCurrentAssignmentTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+records.add(newTargetAssignmentTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+records.add(newMemberSubscriptionTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+// Cancel all the timers of the departed static member.
+cancelConsumerGroupSessionTimeout(group.groupId(), 
existingStaticMember.memberId());
+cancelConsumerGroupRevocationTimeout(group.groupId(), 
existingStaticMember.memberId());
+// Write a record corresponding to the new member
+records.add(newMemberSubscriptionRecord(groupId, updatedMember));
+TargetAssignmentBuilder.TargetAssignmentResult assignmentResult = 
computeTargetAssignment(group, group.groupEpoch(), member, updatedMember);
+records.addAll(assignmentResult.records());
+records.add(newCurrentAssignmentRecord(groupId, updatedMember));
+}
+
+private TargetAssignmentBuilder.TargetAssignmentResult 
computeTargetAssignment(
+ConsumerGroup group,
+int groupEpoch,
+ConsumerGroupMember member,
+ConsumerGroupMember updatedMember) {
+String preferredServerAssignor = group.computePreferredServerAssignor(
+member,
+updatedMember
+).orElse(defaultAssignor.name());
+
+String groupId = group.groupId();
+Map subscriptionMetadata = 
group.subscriptionMetadata();
+String memberId = member.memberId();
+try {
+TargetAssignmentBuilder.TargetAssignmentResult assignmentResult =
+new TargetAssignmentBuilder(groupId, groupEpoch, 
assignors.get(preferredServerAssignor))
+.withMembers(group.members())

Review Comment:
   Also, regarding 
   
   > Then we only need to recompute the assignment if there is a group epoch 
bump.
   
   Given that now we will have a group epoch bump whenever a static member 
re-joins with a different subscription, should this be mentioned in the KIP? As 
we noticed, this is a deviation from how the static member rejoining with a 
different subscription case as of today. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-10-24 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1370533114


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -299,6 +321,9 @@ public void updateMember(ConsumerGroupMember newMember) {
 maybeUpdateServerAssignors(oldMember, newMember);
 maybeUpdatePartitionEpoch(oldMember, newMember);
 maybeUpdateGroupState();
+if (newMember.instanceId() != null) {
+staticMembers.put(newMember.instanceId(), newMember.memberId());
+}

Review Comment:
   Done.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -312,6 +337,9 @@ public void removeMember(String memberId) {
 maybeUpdateServerAssignors(oldMember, null);
 maybeRemovePartitionEpoch(oldMember);
 maybeUpdateGroupState();
+if (oldMember.instanceId() != null) {
+staticMembers.remove(oldMember.instanceId());
+}

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-10-24 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1370532536


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -983,27 +1052,125 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+private void replaceStaticMemberInConsumerGroup(
+String groupId,
+List records,
+ConsumerGroup group,
+ConsumerGroupMember member,
+ConsumerGroupMember existingStaticMember,
+ConsumerGroupMember updatedMember
+) {
+// Write tombstones for the departed static member. Follow the same 
order as the one followed when fencing
+// a member
+records.add(newCurrentAssignmentTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+records.add(newTargetAssignmentTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+records.add(newMemberSubscriptionTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+// Cancel all the timers of the departed static member.
+cancelConsumerGroupSessionTimeout(group.groupId(), 
existingStaticMember.memberId());
+cancelConsumerGroupRevocationTimeout(group.groupId(), 
existingStaticMember.memberId());
+// Write a record corresponding to the new member
+records.add(newMemberSubscriptionRecord(groupId, updatedMember));
+TargetAssignmentBuilder.TargetAssignmentResult assignmentResult = 
computeTargetAssignment(group, group.groupEpoch(), member, updatedMember);
+records.addAll(assignmentResult.records());
+records.add(newCurrentAssignmentRecord(groupId, updatedMember));
+}
+
+private TargetAssignmentBuilder.TargetAssignmentResult 
computeTargetAssignment(
+ConsumerGroup group,
+int groupEpoch,
+ConsumerGroupMember member,
+ConsumerGroupMember updatedMember) {
+String preferredServerAssignor = group.computePreferredServerAssignor(
+member,
+updatedMember
+).orElse(defaultAssignor.name());
+
+String groupId = group.groupId();
+Map subscriptionMetadata = 
group.subscriptionMetadata();
+String memberId = member.memberId();
+try {
+TargetAssignmentBuilder.TargetAssignmentResult assignmentResult =
+new TargetAssignmentBuilder(groupId, groupEpoch, 
assignors.get(preferredServerAssignor))
+.withMembers(group.members())

Review Comment:
   That makes sense and thanks for the explanation. I have now changed the code 
to remove the existing static member and add the new static member. Rest of the 
state would remain as is. 



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -107,6 +107,11 @@ public static class DeadlineAndEpoch {
  */

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-10-24 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1370531600


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -983,27 +1052,125 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+private void replaceStaticMemberInConsumerGroup(
+String groupId,
+List records,
+ConsumerGroup group,
+ConsumerGroupMember member,
+ConsumerGroupMember existingStaticMember,
+ConsumerGroupMember updatedMember
+) {
+// Write tombstones for the departed static member. Follow the same 
order as the one followed when fencing
+// a member
+records.add(newCurrentAssignmentTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+records.add(newTargetAssignmentTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+records.add(newMemberSubscriptionTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+// Cancel all the timers of the departed static member.
+cancelConsumerGroupSessionTimeout(group.groupId(), 
existingStaticMember.memberId());
+cancelConsumerGroupRevocationTimeout(group.groupId(), 
existingStaticMember.memberId());
+// Write a record corresponding to the new member
+records.add(newMemberSubscriptionRecord(groupId, updatedMember));
+TargetAssignmentBuilder.TargetAssignmentResult assignmentResult = 
computeTargetAssignment(group, group.groupEpoch(), member, updatedMember);
+records.addAll(assignmentResult.records());
+records.add(newCurrentAssignmentRecord(groupId, updatedMember));

Review Comment:
   I got around the issue by explicitly setting the rack info in the 
subscription metadata like 
[here](https://github.com/apache/kafka/pull/14432/files#diff-1396a400e5c9f5ccaa44cb20194066eaa919a7e02c558db02005a4a2b67a93b9R2117-R2122).
 I guess so far this wasn't apparent because all the tests expected a group 
epoch bump happening. In this case, we didn't want a group epoch bump and hence 
I could notice the discrepancy.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-10-24 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1370523927


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -983,27 +1052,125 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+private void replaceStaticMemberInConsumerGroup(
+String groupId,
+List records,
+ConsumerGroup group,
+ConsumerGroupMember member,
+ConsumerGroupMember existingStaticMember,
+ConsumerGroupMember updatedMember
+) {
+// Write tombstones for the departed static member. Follow the same 
order as the one followed when fencing
+// a member
+records.add(newCurrentAssignmentTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+records.add(newTargetAssignmentTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+records.add(newMemberSubscriptionTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));

Review Comment:
   Done.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -983,27 +1052,125 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+private void replaceStaticMemberInConsumerGroup(
+String groupId,
+List records,
+ConsumerGroup group,
+ConsumerGroupMember member,
+ConsumerGroupMember existingStaticMember,
+ConsumerGroupMember updatedMember
+) {
+// Write tombstones for the departed static member. Follow the same 
order as the one followed when fencing
+// a member
+records.add(newCurrentAssignmentTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+records.add(newTargetAssignmentTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+records.add(newMemberSubscriptionTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+// Cancel all the timers of the departed static member.
+cancelConsumerGroupSessionTimeout(group.groupId(), 
existingStaticMember.memberId());
+cancelConsumerGroupRevocationTimeout(group.groupId(), 
existingStaticMember.memberId());

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-10-24 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1370523697


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -747,6 +798,11 @@ private void throwIfMemberEpochIsInvalid(
 List 
ownedTopicPartitions
 ) {
 if (receivedMemberEpoch > member.memberEpoch()) {
+// If a static member rejoins, it's previous epoch would be -2. In 
such a
+// case, we don't need to fence the member.
+if (member.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH && 
receivedMemberEpoch == 0) {
+return;
+}

Review Comment:
   Yeah, that makes sense as well. I placed it inside the if condition because 
this condition at hand shows up only when received epoch > current member 
epoch. But it should be ok to have it outside as you said. I have made the 
change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-10-18 Thread via GitHub


dajac commented on PR #14432:
URL: https://github.com/apache/kafka/pull/14432#issuecomment-1767897337

   @vamossagar12 Thanks. I just replied to your questions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-10-18 Thread via GitHub


dajac commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1363419939


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -747,6 +798,11 @@ private void throwIfMemberEpochIsInvalid(
 List 
ownedTopicPartitions
 ) {
 if (receivedMemberEpoch > member.memberEpoch()) {
+// If a static member rejoins, it's previous epoch would be -2. In 
such a
+// case, we don't need to fence the member.
+if (member.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH && 
receivedMemberEpoch == 0) {
+return;
+}

Review Comment:
   I see. Would it make sense to put this condition first in the method, before 
`if (receivedMemberEpoch > member.memberEpoch())`? I got confused by the fact 
that it is within the if branch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-10-18 Thread via GitHub


dajac commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1363414003


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -983,27 +1052,125 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+private void replaceStaticMemberInConsumerGroup(
+String groupId,
+List records,
+ConsumerGroup group,
+ConsumerGroupMember member,
+ConsumerGroupMember existingStaticMember,
+ConsumerGroupMember updatedMember
+) {
+// Write tombstones for the departed static member. Follow the same 
order as the one followed when fencing
+// a member
+records.add(newCurrentAssignmentTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+records.add(newTargetAssignmentTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+records.add(newMemberSubscriptionTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+// Cancel all the timers of the departed static member.
+cancelConsumerGroupSessionTimeout(group.groupId(), 
existingStaticMember.memberId());
+cancelConsumerGroupRevocationTimeout(group.groupId(), 
existingStaticMember.memberId());
+// Write a record corresponding to the new member
+records.add(newMemberSubscriptionRecord(groupId, updatedMember));
+TargetAssignmentBuilder.TargetAssignmentResult assignmentResult = 
computeTargetAssignment(group, group.groupEpoch(), member, updatedMember);
+records.addAll(assignmentResult.records());
+records.add(newCurrentAssignmentRecord(groupId, updatedMember));

Review Comment:
   Hum... I am not sure to fully follow. The subscription metadata should not 
be different if the subscriptions and the metadata image have not changed. Does 
the new static member has the same subs as the previous one in your case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-10-18 Thread via GitHub


dajac commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1363408245


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -983,27 +1052,125 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+private void replaceStaticMemberInConsumerGroup(
+String groupId,
+List records,
+ConsumerGroup group,
+ConsumerGroupMember member,
+ConsumerGroupMember existingStaticMember,
+ConsumerGroupMember updatedMember
+) {
+// Write tombstones for the departed static member. Follow the same 
order as the one followed when fencing
+// a member
+records.add(newCurrentAssignmentTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+records.add(newTargetAssignmentTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+records.add(newMemberSubscriptionTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+// Cancel all the timers of the departed static member.
+cancelConsumerGroupSessionTimeout(group.groupId(), 
existingStaticMember.memberId());
+cancelConsumerGroupRevocationTimeout(group.groupId(), 
existingStaticMember.memberId());
+// Write a record corresponding to the new member
+records.add(newMemberSubscriptionRecord(groupId, updatedMember));
+TargetAssignmentBuilder.TargetAssignmentResult assignmentResult = 
computeTargetAssignment(group, group.groupEpoch(), member, updatedMember);
+records.addAll(assignmentResult.records());
+records.add(newCurrentAssignmentRecord(groupId, updatedMember));
+}
+
+private TargetAssignmentBuilder.TargetAssignmentResult 
computeTargetAssignment(
+ConsumerGroup group,
+int groupEpoch,
+ConsumerGroupMember member,
+ConsumerGroupMember updatedMember) {
+String preferredServerAssignor = group.computePreferredServerAssignor(
+member,
+updatedMember
+).orElse(defaultAssignor.name());
+
+String groupId = group.groupId();
+Map subscriptionMetadata = 
group.subscriptionMetadata();
+String memberId = member.memberId();
+try {
+TargetAssignmentBuilder.TargetAssignmentResult assignmentResult =
+new TargetAssignmentBuilder(groupId, groupEpoch, 
assignors.get(preferredServerAssignor))
+.withMembers(group.members())

Review Comment:
   Well, at the moment, a new target assignment will be computed for the new 
static member and that block of code will indeed create the record for it. What 
I meant is that the new static member should actually reuse the target 
assignment of the previous member (vs computing a new one). Then we only need 
to recompute the assignment if there is a group epoch bump.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-10-18 Thread via GitHub


vamossagar12 commented on PR #14432:
URL: https://github.com/apache/kafka/pull/14432#issuecomment-1767770461

   Thanks for the review @dajac , Most of my changes are on my local, but 
before I push those, wanted your thoughts on 
[this](https://github.com/apache/kafka/pull/14432/files#r1363301832) comment. I 
can proceed with the rest of the changes post that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-10-18 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1363327084


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -747,6 +798,11 @@ private void throwIfMemberEpochIsInvalid(
 List 
ownedTopicPartitions
 ) {
 if (receivedMemberEpoch > member.memberEpoch()) {
+// If a static member rejoins, it's previous epoch would be -2. In 
such a
+// case, we don't need to fence the member.
+if (member.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH && 
receivedMemberEpoch == 0) {
+return;
+}

Review Comment:
   This is mainly needed for the static member rejoin case. Let's say a static 
member with instance id `id` departed. When it departs, we would write a member 
epoch value of -2 against it. Now, if a new static member joins with the same 
instance id `id` and a member epoch value of 0, then without this condition, 
the rejoin would always fail with `FencedMemberEpochException`. This condition 
was added to avoid the same. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-10-18 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1363322519


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -983,27 +1052,125 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+private void replaceStaticMemberInConsumerGroup(
+String groupId,
+List records,
+ConsumerGroup group,
+ConsumerGroupMember member,
+ConsumerGroupMember existingStaticMember,
+ConsumerGroupMember updatedMember
+) {
+// Write tombstones for the departed static member. Follow the same 
order as the one followed when fencing
+// a member
+records.add(newCurrentAssignmentTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+records.add(newTargetAssignmentTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+records.add(newMemberSubscriptionTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+// Cancel all the timers of the departed static member.
+cancelConsumerGroupSessionTimeout(group.groupId(), 
existingStaticMember.memberId());
+cancelConsumerGroupRevocationTimeout(group.groupId(), 
existingStaticMember.memberId());
+// Write a record corresponding to the new member
+records.add(newMemberSubscriptionRecord(groupId, updatedMember));
+TargetAssignmentBuilder.TargetAssignmentResult assignmentResult = 
computeTargetAssignment(group, group.groupEpoch(), member, updatedMember);
+records.addAll(assignmentResult.records());
+records.add(newCurrentAssignmentRecord(groupId, updatedMember));
+}
+
+private TargetAssignmentBuilder.TargetAssignmentResult 
computeTargetAssignment(
+ConsumerGroup group,
+int groupEpoch,
+ConsumerGroupMember member,
+ConsumerGroupMember updatedMember) {
+String preferredServerAssignor = group.computePreferredServerAssignor(
+member,
+updatedMember
+).orElse(defaultAssignor.name());
+
+String groupId = group.groupId();
+Map subscriptionMetadata = 
group.subscriptionMetadata();
+String memberId = member.memberId();
+try {
+TargetAssignmentBuilder.TargetAssignmentResult assignmentResult =
+new TargetAssignmentBuilder(groupId, groupEpoch, 
assignors.get(preferredServerAssignor))
+.withMembers(group.members())

Review Comment:
   I missed the `removeMembers` part. Thanks for pointing it out. Regarding 
   
   > we also need to set the target assignment of the new member from the old 
one.
   
   I assumed, this [block of 
code](https://github.com/apache/kafka/blob/trunk/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java#L262-L288)
  should take care of it. Was my assumption wrong?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-10-18 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1363301832


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -983,27 +1052,125 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+private void replaceStaticMemberInConsumerGroup(
+String groupId,
+List records,
+ConsumerGroup group,
+ConsumerGroupMember member,
+ConsumerGroupMember existingStaticMember,
+ConsumerGroupMember updatedMember
+) {
+// Write tombstones for the departed static member. Follow the same 
order as the one followed when fencing
+// a member
+records.add(newCurrentAssignmentTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+records.add(newTargetAssignmentTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+records.add(newMemberSubscriptionTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+// Cancel all the timers of the departed static member.
+cancelConsumerGroupSessionTimeout(group.groupId(), 
existingStaticMember.memberId());
+cancelConsumerGroupRevocationTimeout(group.groupId(), 
existingStaticMember.memberId());
+// Write a record corresponding to the new member
+records.add(newMemberSubscriptionRecord(groupId, updatedMember));
+TargetAssignmentBuilder.TargetAssignmentResult assignmentResult = 
computeTargetAssignment(group, group.groupEpoch(), member, updatedMember);
+records.addAll(assignmentResult.records());
+records.add(newCurrentAssignmentRecord(groupId, updatedMember));

Review Comment:
   The main reason for extracting this out was that while using the main logic, 
there always always a group epoch bump even when a new static member replaces 
an older one. When I debugged it further, it seems to be because of this logic 
[here](https://github.com/apache/kafka/blob/trunk/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java#L885-L889)
   . More specifically, the issue at hand is that `subscriptionMetadata` has 
partition racks info while the currently stored metadata doesn't have it. This 
is with regards to the test `testStaticMemberGetsBackAssignmentUponRejoin`. I  
wasn't totally sure if this is an issue with the test itself but since this led 
to a group epoch bump, I thought we shouldn't do it. 
   
   Actually when I think about it now, maybe it makes sense to have a group 
epoch bump in this case as well. While it might go against no rebalance during 
static member rejoin but the reason for rejoin is a change in subscription 
metadata and not a static member re-join. The latter seemed harder to replicate 
via tests though because it always bumped up the group epoch due to the above 
mentioned issue. Please let me know your thoughts.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-10-17 Thread via GitHub


dajac commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1362102003


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -285,6 +302,11 @@ public ConsumerGroupMember getOrMaybeCreateMember(
 return member;
 }
 
+public ConsumerGroupMember getStaticMember(String instanceId) {

Review Comment:
   nit: We don't prefix getters with `get`. Let's add javadoc as well.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -312,6 +337,9 @@ public void removeMember(String memberId) {
 maybeUpdateServerAssignors(oldMember, null);
 maybeRemovePartitionEpoch(oldMember);
 maybeUpdateGroupState();
+if (oldMember.instanceId() != null) {
+staticMembers.remove(oldMember.instanceId());
+}

Review Comment:
   ditto.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -983,27 +1052,125 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+private void replaceStaticMemberInConsumerGroup(
+String groupId,
+List records,
+ConsumerGroup group,
+ConsumerGroupMember member,
+ConsumerGroupMember existingStaticMember,
+ConsumerGroupMember updatedMember
+) {
+// Write tombstones for the departed static member. Follow the same 
order as the one followed when fencing
+// a member
+records.add(newCurrentAssignmentTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+records.add(newTargetAssignmentTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+records.add(newMemberSubscriptionTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));

Review Comment:
   We have similar code somewhere else. Could we add a method for this and 
reuse it?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -107,6 +107,11 @@ public static class DeadlineAndEpoch {
  */

Review Comment:
   Let's add unit tests for the new or changed methods to the corresponding 
file.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -983,27 +1052,125 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+private void replaceStaticMemberInConsumerGroup(
+String groupId,
+List records,
+ConsumerGroup group,
+ConsumerGroupMember member,
+ConsumerGroupMember existingStaticMember,
+ConsumerGroupMember updatedMember
+) {
+// Write tombstones for the departed static member. Follow the same 
order as the one followed when fencing
+// a member
+records.add(newCurrentAssignmentTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+records.add(newTargetAssignmentTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+records.add(newMemberSubscriptionTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+// Cancel all the timers of the departed static member.
+cancelConsumerGroupSessionTimeout(group.groupId(), 
existingStaticMember.memberId());
+cancelConsumerGroupRevocationTimeout(group.groupId(), 
existingStaticMember.memberId());
+// Write a record corresponding to the new member
+records.add(newMemberSubscriptionRecord(groupId, updatedMember));
+TargetAssignmentBuilder.TargetAssignmentResult assignmentResult = 
computeTargetAssignment(group, group.groupEpoch(), member, updatedMember);
+records.addAll(assignmentResult.records());
+records.add(newCurrentAssignmentRecord(groupId, updatedMember));
+}
+
+private TargetAssignmentBuilder.TargetAssignmentResult 
computeTargetAssignment(
+ConsumerGroup group,
+int groupEpoch,
+ConsumerGroupMember member,
+ConsumerGroupMember updatedMember) {
+String preferredServerAssignor = group.computePreferredServerAssignor(
+member,
+updatedMember
+).orElse(defaultAssignor.name());
+
+String groupId = group.groupId();
+Map subscriptionMetadata = 
group.subscriptionMetadata();
+String memberId = member.memberId();
+try {
+TargetAssignmentBuilder.TargetAssignmentResult assignmentResult =
+new TargetAssignmentBuilder(groupId, groupEpoch, 
assignors.get(preferredServerAssignor))
+.withMembers(group.members())

Review Comment:
   I think that the old member will be in `members` so the computed target 
assignment is incorrect. We need to remove it with `removeMember` and we also 
need to set 

Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-10-11 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1354520295


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -790,6 +812,12 @@ private 
CoordinatorResult consumerGr
 groupId, memberId, updatedMember.subscribedTopicRegex());
 bumpGroupEpoch = true;
 }
+} else {
+// A new static member replaces an older one with the same 
instance id, assignments etc.
+// We will create a new member subscription record for this new 
member.
+if (instanceId != null) {
+records.add(newMemberSubscriptionRecord(groupId, 
updatedMember));
+}

Review Comment:
   This bit of code has changed now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-10-11 Thread via GitHub


vamossagar12 commented on PR #14432:
URL: https://github.com/apache/kafka/pull/14432#issuecomment-1757221757

   Thanks @dajac , I have addressed the comments. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-10-11 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1354518973


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -285,6 +302,64 @@ public ConsumerGroupMember getOrMaybeCreateMember(
 return member;
 }
 
+/**
+ * Gets or creates a static member.
+ *
+ * @param memberId  The member id.
+ * @param instanceIdThe group instance id.
+ * @param createIfNotExists Booleans indicating whether the member must be
+ *  created if it does not exist.
+ *
+ * @return A ConsumerGroupMember.
+ */
+public ConsumerGroupMember getOrMaybeCreateStaticMember(
+String memberId,
+String instanceId,
+boolean createIfNotExists
+) {
+ConsumerGroupMember member;
+String existingMemberId = staticMemberId(instanceId);
+if (!createIfNotExists) {
+// The member joined with a non-zero epoch but we haven't 
registered this static member
+// This could be an unknown member for the coordinator.
+if (existingMemberId == null) {
+throw Errors.UNKNOWN_MEMBER_ID.exception();
+}
+// We can't create a member at this point. If the 2 member-ids 
don't match,
+// we will throw an error.
+if (!existingMemberId.equals(memberId)) {
+throw Errors.FENCED_INSTANCE_ID.exception();
+}
+member = getOrMaybeCreateMember(memberId, false);
+} else {
+// No existing member found against this instance id. Creating new.
+if (existingMemberId == null) {
+member = getOrMaybeCreateMember(memberId, true);
+staticMembers.put(instanceId, memberId);
+return member;
+} else {
+// Get the details of the existing member
+ConsumerGroupMember existingMember = 
getOrMaybeCreateMember(existingMemberId, false);
+int currentMemberEpoch = existingMember.memberEpoch();
+// A new member with a used instance id joined but the 
previous member using the same instance id
+// hasn't requested leaving the group.
+if (currentMemberEpoch != -2 && 
!existingMemberId.equals(memberId)) {
+throw Errors.UNRELEASED_INSTANCE_ID.exception();
+}
+// A new static member is trying to take the place of a 
departed static member. We will
+// provide the assignments of the old member to the new one.
+member = new ConsumerGroupMember.Builder(memberId, 
existingMember)
+.setMemberEpoch(existingMember.targetMemberEpoch())
+.setPreviousMemberEpoch(0)
+
.setTargetMemberEpoch(existingMember.targetMemberEpoch())
+.build();

Review Comment:
   YEs, that didn't quite make sense. It's not needed anymore. Fixed indent as 
well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-10-11 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1354516913


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -285,6 +302,64 @@ public ConsumerGroupMember getOrMaybeCreateMember(
 return member;
 }
 
+/**
+ * Gets or creates a static member.
+ *
+ * @param memberId  The member id.
+ * @param instanceIdThe group instance id.
+ * @param createIfNotExists Booleans indicating whether the member must be
+ *  created if it does not exist.
+ *
+ * @return A ConsumerGroupMember.
+ */
+public ConsumerGroupMember getOrMaybeCreateStaticMember(
+String memberId,
+String instanceId,
+boolean createIfNotExists
+) {
+ConsumerGroupMember member;
+String existingMemberId = staticMemberId(instanceId);
+if (!createIfNotExists) {
+// The member joined with a non-zero epoch but we haven't 
registered this static member
+// This could be an unknown member for the coordinator.
+if (existingMemberId == null) {
+throw Errors.UNKNOWN_MEMBER_ID.exception();

Review Comment:
   This method is no longer being used. I added custom messages in the other 
method which is now being called.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-10-11 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1354507878


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1127,7 +1176,9 @@ public void replay(
 Set oldSubscribedTopicNames = new 
HashSet<>(consumerGroup.subscribedTopicNames());
 
 if (value != null) {
-ConsumerGroupMember oldMember = 
consumerGroup.getOrMaybeCreateMember(memberId, true);
+ConsumerGroupMember oldMember = value.instanceId() != null ?
+consumerGroup.getOrMaybeCreateStaticMember(memberId, 
value.instanceId(), true) :
+consumerGroup.getOrMaybeCreateMember(memberId, true);
 consumerGroup.updateMember(new 
ConsumerGroupMember.Builder(oldMember)

Review Comment:
   Done 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-10-11 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1354507878


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1127,7 +1176,9 @@ public void replay(
 Set oldSubscribedTopicNames = new 
HashSet<>(consumerGroup.subscribedTopicNames());
 
 if (value != null) {
-ConsumerGroupMember oldMember = 
consumerGroup.getOrMaybeCreateMember(memberId, true);
+ConsumerGroupMember oldMember = value.instanceId() != null ?
+consumerGroup.getOrMaybeCreateStaticMember(memberId, 
value.instanceId(), true) :
+consumerGroup.getOrMaybeCreateMember(memberId, true);
 consumerGroup.updateMember(new 
ConsumerGroupMember.Builder(oldMember)

Review Comment:
   yes. Removed 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-10-11 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1354506847


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1084,10 +1129,14 @@ public 
CoordinatorResult consumerGro
 ) throws ApiException {
 throwIfConsumerGroupHeartbeatRequestIsInvalid(request);
 
-if (request.memberEpoch() == LEAVE_GROUP_MEMBER_EPOCH) {
+if (request.memberEpoch() == LEAVE_GROUP_MEMBER_EPOCH || 
request.memberEpoch() == -2) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-10-11 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1354505609


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -750,7 +770,9 @@ private 
CoordinatorResult consumerGr
 
 // Get or create the member.
 if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString();
-final ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, createIfNotExists);
+final ConsumerGroupMember member = instanceId == null ?
+group.getOrMaybeCreateMember(memberId, createIfNotExists) :
+group.getOrMaybeCreateStaticMember(memberId, instanceId, 
createIfNotExists);
 throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions);
 
 if (memberEpoch == 0) {

Review Comment:
   > I just thought about something else. When a static member is replaced, we 
need to write records to erase the state of the previous member.
   
   YEs that was a miss. I have added relevant tombstone records for the 
replaced static member and also cancelled it's timers.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -908,23 +936,40 @@ private 
CoordinatorResult consumerGr
  * Handles leave request from a consumer group member.
  * @param groupId   The group id from the request.
  * @param memberId  The member id from the request.
+ * @param memberEpoch   The member epoch from the request.
  *
  * @return A Result containing the ConsumerGroupHeartbeat response and
  * a list of records to update the state machine.
  */
 private CoordinatorResult 
consumerGroupLeave(
 String groupId,
-String memberId
+String instanceId,
+String memberId,
+int memberEpoch
 ) throws ApiException {
 ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false);
-ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, 
false);
-
-log.info("[GroupId " + groupId + "] Member " + memberId + " left the 
consumer group.");
+ConsumerGroupMember member = memberEpoch == -2 ?
+group.getOrMaybeCreateStaticMember(memberId, instanceId, 
false) :
+group.getOrMaybeCreateMember(memberId, false);
 
-List records = consumerGroupFenceMember(group, member);
+List records = new ArrayList<>();
+// The departing member is a static one. We don't need to fence this 
member because it is
+// expected to come back within session timeout
+if (memberEpoch == -2) {

Review Comment:
   Added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-10-11 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1354491971


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -285,6 +302,64 @@ public ConsumerGroupMember getOrMaybeCreateMember(
 return member;
 }
 
+/**
+ * Gets or creates a static member.
+ *
+ * @param memberId  The member id.
+ * @param instanceIdThe group instance id.
+ * @param createIfNotExists Booleans indicating whether the member must be
+ *  created if it does not exist.
+ *
+ * @return A ConsumerGroupMember.
+ */
+public ConsumerGroupMember getOrMaybeCreateStaticMember(
+String memberId,
+String instanceId,
+boolean createIfNotExists
+) {
+ConsumerGroupMember member;
+String existingMemberId = staticMemberId(instanceId);
+if (!createIfNotExists) {
+// The member joined with a non-zero epoch but we haven't 
registered this static member
+// This could be an unknown member for the coordinator.
+if (existingMemberId == null) {
+throw Errors.UNKNOWN_MEMBER_ID.exception();
+}
+// We can't create a member at this point. If the 2 member-ids 
don't match,
+// we will throw an error.
+if (!existingMemberId.equals(memberId)) {
+throw Errors.FENCED_INSTANCE_ID.exception();
+}
+member = getOrMaybeCreateMember(memberId, false);
+} else {
+// No existing member found against this instance id. Creating new.
+if (existingMemberId == null) {
+member = getOrMaybeCreateMember(memberId, true);
+staticMembers.put(instanceId, memberId);
+return member;
+} else {
+// Get the details of the existing member
+ConsumerGroupMember existingMember = 
getOrMaybeCreateMember(existingMemberId, false);
+int currentMemberEpoch = existingMember.memberEpoch();
+// A new member with a used instance id joined but the 
previous member using the same instance id
+// hasn't requested leaving the group.
+if (currentMemberEpoch != -2 && 
!existingMemberId.equals(memberId)) {

Review Comment:
   Actually this is no longer required. Have removed it.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -285,6 +302,64 @@ public ConsumerGroupMember getOrMaybeCreateMember(
 return member;
 }
 
+/**
+ * Gets or creates a static member.
+ *
+ * @param memberId  The member id.
+ * @param instanceIdThe group instance id.
+ * @param createIfNotExists Booleans indicating whether the member must be
+ *  created if it does not exist.
+ *
+ * @return A ConsumerGroupMember.
+ */
+public ConsumerGroupMember getOrMaybeCreateStaticMember(
+String memberId,
+String instanceId,
+boolean createIfNotExists
+) {
+ConsumerGroupMember member;
+String existingMemberId = staticMemberId(instanceId);
+if (!createIfNotExists) {
+// The member joined with a non-zero epoch but we haven't 
registered this static member
+// This could be an unknown member for the coordinator.
+if (existingMemberId == null) {
+throw Errors.UNKNOWN_MEMBER_ID.exception();
+}
+// We can't create a member at this point. If the 2 member-ids 
don't match,
+// we will throw an error.
+if (!existingMemberId.equals(memberId)) {
+throw Errors.FENCED_INSTANCE_ID.exception();
+}
+member = getOrMaybeCreateMember(memberId, false);
+} else {
+// No existing member found against this instance id. Creating new.
+if (existingMemberId == null) {
+member = getOrMaybeCreateMember(memberId, true);
+staticMembers.put(instanceId, memberId);
+return member;
+} else {
+// Get the details of the existing member
+ConsumerGroupMember existingMember = 
getOrMaybeCreateMember(existingMemberId, false);
+int currentMemberEpoch = existingMember.memberEpoch();
+// A new member with a used instance id joined but the 
previous member using the same instance id
+// hasn't requested leaving the group.
+if (currentMemberEpoch != -2 && 
!existingMemberId.equals(memberId)) {
+throw Errors.UNRELEASED_INSTANCE_ID.exception();
+}
+// A new static member is trying to take the 

Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-10-11 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1354488025


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -285,6 +302,64 @@ public ConsumerGroupMember getOrMaybeCreateMember(
 return member;
 }
 
+/**
+ * Gets or creates a static member.
+ *
+ * @param memberId  The member id.
+ * @param instanceIdThe group instance id.
+ * @param createIfNotExists Booleans indicating whether the member must be
+ *  created if it does not exist.
+ *
+ * @return A ConsumerGroupMember.
+ */
+public ConsumerGroupMember getOrMaybeCreateStaticMember(
+String memberId,
+String instanceId,
+boolean createIfNotExists
+) {
+ConsumerGroupMember member;
+String existingMemberId = staticMemberId(instanceId);
+if (!createIfNotExists) {
+// The member joined with a non-zero epoch but we haven't 
registered this static member
+// This could be an unknown member for the coordinator.
+if (existingMemberId == null) {
+throw Errors.UNKNOWN_MEMBER_ID.exception();
+}
+// We can't create a member at this point. If the 2 member-ids 
don't match,
+// we will throw an error.
+if (!existingMemberId.equals(memberId)) {
+throw Errors.FENCED_INSTANCE_ID.exception();
+}
+member = getOrMaybeCreateMember(memberId, false);
+} else {
+// No existing member found against this instance id. Creating new.
+if (existingMemberId == null) {
+member = getOrMaybeCreateMember(memberId, true);
+staticMembers.put(instanceId, memberId);

Review Comment:
   Makes sense, I have removed this direct update of states and moved it to 
`replay()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-10-11 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1354486658


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -564,6 +564,22 @@ private void throwIfNotNull(
 }
 }
 
+/**
+ * Throws an InvalidRequestException if the value is null.
+ *
+ * @param value The value.
+ * @param error The error message.
+ * @throws InvalidRequestException
+ */
+private void throwIfNull(
+Object value,
+String error

Review Comment:
   done.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -107,6 +107,11 @@ public static class DeadlineAndEpoch {
  */
 private final TimelineHashMap members;
 
+/**
+ * The static group members.
+ */
+private final Map staticMembers;

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-10-06 Thread via GitHub


dajac commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1348699907


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -107,6 +107,11 @@ public static class DeadlineAndEpoch {
  */
 private final TimelineHashMap members;
 
+/**
+ * The static group members.
+ */
+private final Map staticMembers;

Review Comment:
   I suppose that this must be a `TimelineHashMap`.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -564,6 +564,22 @@ private void throwIfNotNull(
 }
 }
 
+/**
+ * Throws an InvalidRequestException if the value is null.
+ *
+ * @param value The value.
+ * @param error The error message.
+ * @throws InvalidRequestException
+ */
+private void throwIfNull(
+Object value,
+String error

Review Comment:
   nit: Indentation should be 4 spaces.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -285,6 +302,64 @@ public ConsumerGroupMember getOrMaybeCreateMember(
 return member;
 }
 
+/**
+ * Gets or creates a static member.
+ *
+ * @param memberId  The member id.
+ * @param instanceIdThe group instance id.
+ * @param createIfNotExists Booleans indicating whether the member must be
+ *  created if it does not exist.
+ *
+ * @return A ConsumerGroupMember.
+ */
+public ConsumerGroupMember getOrMaybeCreateStaticMember(
+String memberId,
+String instanceId,
+boolean createIfNotExists
+) {
+ConsumerGroupMember member;
+String existingMemberId = staticMemberId(instanceId);
+if (!createIfNotExists) {
+// The member joined with a non-zero epoch but we haven't 
registered this static member
+// This could be an unknown member for the coordinator.
+if (existingMemberId == null) {
+throw Errors.UNKNOWN_MEMBER_ID.exception();
+}
+// We can't create a member at this point. If the 2 member-ids 
don't match,
+// we will throw an error.
+if (!existingMemberId.equals(memberId)) {
+throw Errors.FENCED_INSTANCE_ID.exception();
+}
+member = getOrMaybeCreateMember(memberId, false);
+} else {
+// No existing member found against this instance id. Creating new.
+if (existingMemberId == null) {
+member = getOrMaybeCreateMember(memberId, true);
+staticMembers.put(instanceId, memberId);
+return member;
+} else {
+// Get the details of the existing member
+ConsumerGroupMember existingMember = 
getOrMaybeCreateMember(existingMemberId, false);
+int currentMemberEpoch = existingMember.memberEpoch();
+// A new member with a used instance id joined but the 
previous member using the same instance id
+// hasn't requested leaving the group.
+if (currentMemberEpoch != -2 && 
!existingMemberId.equals(memberId)) {
+throw Errors.UNRELEASED_INSTANCE_ID.exception();
+}
+// A new static member is trying to take the place of a 
departed static member. We will
+// provide the assignments of the old member to the new one.
+member = new ConsumerGroupMember.Builder(memberId, 
existingMember)
+.setMemberEpoch(existingMember.targetMemberEpoch())
+.setPreviousMemberEpoch(0)
+
.setTargetMemberEpoch(existingMember.targetMemberEpoch())
+.build();
+updateMember(member);
+staticMembers.put(instanceId, memberId);

Review Comment:
   As said previously, the state should not be updated here but in replay.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -750,7 +770,9 @@ private 
CoordinatorResult consumerGr
 
 // Get or create the member.
 if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString();
-final ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, createIfNotExists);
+final ConsumerGroupMember member = instanceId == null ?
+group.getOrMaybeCreateMember(memberId, createIfNotExists) :
+group.getOrMaybeCreateStaticMember(memberId, instanceId, 
createIfNotExists);

Review Comment:
   nit: indentation.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -908,23 +936,40 @@ private 
CoordinatorResult 

Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-10-04 Thread via GitHub


vamossagar12 commented on PR #14432:
URL: https://github.com/apache/kafka/pull/14432#issuecomment-1747157933

   > Thanks @vamossagar12! I will get to this PR soon.
   
   Thank you. I am still coming to terms with the new group coordinator so 
there might be rough edges in this PR (upfront apologies for that )


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-10-04 Thread via GitHub


dajac commented on PR #14432:
URL: https://github.com/apache/kafka/pull/14432#issuecomment-1747117455

   Thanks @vamossagar12! I will get to this PR soon.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-10-04 Thread via GitHub


vamossagar12 commented on PR #14432:
URL: https://github.com/apache/kafka/pull/14432#issuecomment-1747026910

   > I realised that a logic similar to 
https://github.com/apache/kafka/blob/trunk/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java#L774-L780
 is missing in this PR. I think that's important. To get this working though, I 
would also need to add the logic to add a different map for staticMembers and 
also support adding a static member to the group. In that case, I might need to 
enhance this PR to support both Leave request and join request.
   
   I updated this PR to handle this case and to also handle (re)-join and 
leaving the group using ConsumerGroupHeartbeat api.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org