[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator

2023-07-27 Thread via GitHub


jeffkbkim commented on code in PR #14056:
URL: https://github.com/apache/kafka/pull/14056#discussion_r1276971308


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -8540,5 +8568,1182 @@ private static class RebalanceResult {
 this.followerAssignment = followerAssignment;
 }
 }
+
+@Test
+public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws 
Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+context.createGenericGroup("group-id");
+
+RebalanceResult rebalanceResult = staticMembersJoinAndRebalance(
+context,
+"group-id",
+"leader-instance-id",
+"follower-instance-id"
+);
+
+SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder()
+.withGroupId("group-id")
+.withGroupInstanceId("leader-instance-id")
+.withMemberId(rebalanceResult.leaderId)
+.withGenerationId(rebalanceResult.generationId)
+.build();
+
+CompletableFuture syncFuture = new 
CompletableFuture<>();
+CoordinatorResult result = 
context.sendGenericGroupSync(syncRequest, syncFuture);
+
+assertTrue(result.records().isEmpty());
+assertTrue(syncFuture.isDone());
+assertEquals(Errors.NONE.code(), syncFuture.get().errorCode());
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId(rebalanceResult.leaderId)
+.setGenerationId(rebalanceResult.generationId);
+
+HeartbeatResponseData validHeartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.NONE.code(), validHeartbeatResponse.errorCode());
+
+HeartbeatResponseData inValidHeartbeatResponse = 
context.sendGenericGroupHeartbeat(
+heartbeatRequest.setGroupInstanceId("leader-instance-id")
+.setMemberId("invalid-member-id"));
+
+assertEquals(Errors.FENCED_INSTANCE_ID.code(), 
inValidHeartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatUnknownGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(-1);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatDeadGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+group.transitionTo(DEAD);
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(-1);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatEmptyGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+JoinGroupRequestProtocolCollection protocols = new 
JoinGroupRequestProtocolCollection();
+protocols.add(new JoinGroupRequestProtocol()
+.setName("range")
+.setMetadata(new byte[]{0}));
+
+group.add(new GenericGroupMember(
+"member-id",
+Optional.empty(),
+"client-id",
+"client-host",
+1,
+5000,
+"consumer",
+protocols
+));
+
+group.transitionTo(PREPARING_REBALANCE);
+group.transitionTo(EMPTY);
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(0);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatUnknownMemberExistingGroup() throws Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator

2023-07-27 Thread via GitHub


jeffkbkim commented on code in PR #14056:
URL: https://github.com/apache/kafka/pull/14056#discussion_r1276959521


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -8540,5 +8568,1182 @@ private static class RebalanceResult {
 this.followerAssignment = followerAssignment;
 }
 }
+
+@Test
+public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws 
Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+context.createGenericGroup("group-id");
+
+RebalanceResult rebalanceResult = staticMembersJoinAndRebalance(
+context,
+"group-id",
+"leader-instance-id",
+"follower-instance-id"
+);
+
+SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder()
+.withGroupId("group-id")
+.withGroupInstanceId("leader-instance-id")
+.withMemberId(rebalanceResult.leaderId)
+.withGenerationId(rebalanceResult.generationId)
+.build();
+
+CompletableFuture syncFuture = new 
CompletableFuture<>();
+CoordinatorResult result = 
context.sendGenericGroupSync(syncRequest, syncFuture);
+
+assertTrue(result.records().isEmpty());
+assertTrue(syncFuture.isDone());
+assertEquals(Errors.NONE.code(), syncFuture.get().errorCode());
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId(rebalanceResult.leaderId)
+.setGenerationId(rebalanceResult.generationId);
+
+HeartbeatResponseData validHeartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.NONE.code(), validHeartbeatResponse.errorCode());
+
+HeartbeatResponseData inValidHeartbeatResponse = 
context.sendGenericGroupHeartbeat(
+heartbeatRequest.setGroupInstanceId("leader-instance-id")
+.setMemberId("invalid-member-id"));
+
+assertEquals(Errors.FENCED_INSTANCE_ID.code(), 
inValidHeartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatUnknownGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(-1);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatDeadGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+group.transitionTo(DEAD);
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(-1);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatEmptyGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+JoinGroupRequestProtocolCollection protocols = new 
JoinGroupRequestProtocolCollection();
+protocols.add(new JoinGroupRequestProtocol()
+.setName("range")
+.setMetadata(new byte[]{0}));
+
+group.add(new GenericGroupMember(
+"member-id",
+Optional.empty(),
+"client-id",
+"client-host",
+1,
+5000,
+"consumer",
+protocols
+));
+
+group.transitionTo(PREPARING_REBALANCE);
+group.transitionTo(EMPTY);
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(0);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatUnknownMemberExistingGroup() throws Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator

2023-07-27 Thread via GitHub


jeffkbkim commented on code in PR #14056:
URL: https://github.com/apache/kafka/pull/14056#discussion_r1276944101


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -8540,5 +8568,1182 @@ private static class RebalanceResult {
 this.followerAssignment = followerAssignment;
 }
 }
+
+@Test
+public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws 
Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+context.createGenericGroup("group-id");
+
+RebalanceResult rebalanceResult = staticMembersJoinAndRebalance(
+context,
+"group-id",
+"leader-instance-id",
+"follower-instance-id"
+);
+
+SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder()
+.withGroupId("group-id")
+.withGroupInstanceId("leader-instance-id")
+.withMemberId(rebalanceResult.leaderId)
+.withGenerationId(rebalanceResult.generationId)
+.build();
+
+CompletableFuture syncFuture = new 
CompletableFuture<>();
+CoordinatorResult result = 
context.sendGenericGroupSync(syncRequest, syncFuture);
+
+assertTrue(result.records().isEmpty());
+assertTrue(syncFuture.isDone());
+assertEquals(Errors.NONE.code(), syncFuture.get().errorCode());
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId(rebalanceResult.leaderId)
+.setGenerationId(rebalanceResult.generationId);
+
+HeartbeatResponseData validHeartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.NONE.code(), validHeartbeatResponse.errorCode());
+
+HeartbeatResponseData inValidHeartbeatResponse = 
context.sendGenericGroupHeartbeat(
+heartbeatRequest.setGroupInstanceId("leader-instance-id")
+.setMemberId("invalid-member-id"));
+
+assertEquals(Errors.FENCED_INSTANCE_ID.code(), 
inValidHeartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatUnknownGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(-1);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatDeadGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+group.transitionTo(DEAD);
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(-1);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatEmptyGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+JoinGroupRequestProtocolCollection protocols = new 
JoinGroupRequestProtocolCollection();
+protocols.add(new JoinGroupRequestProtocol()
+.setName("range")
+.setMetadata(new byte[]{0}));
+
+group.add(new GenericGroupMember(
+"member-id",
+Optional.empty(),
+"client-id",
+"client-host",
+1,
+5000,
+"consumer",
+protocols
+));
+
+group.transitionTo(PREPARING_REBALANCE);
+group.transitionTo(EMPTY);
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(0);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatUnknownMemberExistingGroup() throws Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator

2023-07-27 Thread via GitHub


jeffkbkim commented on code in PR #14056:
URL: https://github.com/apache/kafka/pull/14056#discussion_r1276943880


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -8540,5 +8568,1182 @@ private static class RebalanceResult {
 this.followerAssignment = followerAssignment;
 }
 }
+
+@Test
+public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws 
Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+context.createGenericGroup("group-id");
+
+RebalanceResult rebalanceResult = staticMembersJoinAndRebalance(
+context,
+"group-id",
+"leader-instance-id",
+"follower-instance-id"
+);
+
+SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder()
+.withGroupId("group-id")
+.withGroupInstanceId("leader-instance-id")
+.withMemberId(rebalanceResult.leaderId)
+.withGenerationId(rebalanceResult.generationId)
+.build();
+
+CompletableFuture syncFuture = new 
CompletableFuture<>();
+CoordinatorResult result = 
context.sendGenericGroupSync(syncRequest, syncFuture);
+
+assertTrue(result.records().isEmpty());
+assertTrue(syncFuture.isDone());
+assertEquals(Errors.NONE.code(), syncFuture.get().errorCode());
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId(rebalanceResult.leaderId)
+.setGenerationId(rebalanceResult.generationId);
+
+HeartbeatResponseData validHeartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.NONE.code(), validHeartbeatResponse.errorCode());
+
+HeartbeatResponseData inValidHeartbeatResponse = 
context.sendGenericGroupHeartbeat(
+heartbeatRequest.setGroupInstanceId("leader-instance-id")
+.setMemberId("invalid-member-id"));
+
+assertEquals(Errors.FENCED_INSTANCE_ID.code(), 
inValidHeartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatUnknownGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(-1);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatDeadGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+group.transitionTo(DEAD);
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(-1);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatEmptyGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+JoinGroupRequestProtocolCollection protocols = new 
JoinGroupRequestProtocolCollection();
+protocols.add(new JoinGroupRequestProtocol()
+.setName("range")
+.setMetadata(new byte[]{0}));
+
+group.add(new GenericGroupMember(
+"member-id",
+Optional.empty(),
+"client-id",
+"client-host",
+1,
+5000,
+"consumer",
+protocols
+));
+
+group.transitionTo(PREPARING_REBALANCE);
+group.transitionTo(EMPTY);
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(0);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatUnknownMemberExistingGroup() throws Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator

2023-07-27 Thread via GitHub


jeffkbkim commented on code in PR #14056:
URL: https://github.com/apache/kafka/pull/14056#discussion_r1276942521


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -8540,5 +8568,1182 @@ private static class RebalanceResult {
 this.followerAssignment = followerAssignment;
 }
 }
+
+@Test
+public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws 
Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+context.createGenericGroup("group-id");
+
+RebalanceResult rebalanceResult = staticMembersJoinAndRebalance(
+context,
+"group-id",
+"leader-instance-id",
+"follower-instance-id"
+);
+
+SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder()
+.withGroupId("group-id")
+.withGroupInstanceId("leader-instance-id")
+.withMemberId(rebalanceResult.leaderId)
+.withGenerationId(rebalanceResult.generationId)
+.build();
+
+CompletableFuture syncFuture = new 
CompletableFuture<>();
+CoordinatorResult result = 
context.sendGenericGroupSync(syncRequest, syncFuture);
+
+assertTrue(result.records().isEmpty());
+assertTrue(syncFuture.isDone());
+assertEquals(Errors.NONE.code(), syncFuture.get().errorCode());
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId(rebalanceResult.leaderId)
+.setGenerationId(rebalanceResult.generationId);
+
+HeartbeatResponseData validHeartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.NONE.code(), validHeartbeatResponse.errorCode());
+
+HeartbeatResponseData inValidHeartbeatResponse = 
context.sendGenericGroupHeartbeat(
+heartbeatRequest.setGroupInstanceId("leader-instance-id")
+.setMemberId("invalid-member-id"));
+
+assertEquals(Errors.FENCED_INSTANCE_ID.code(), 
inValidHeartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatUnknownGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(-1);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatDeadGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+group.transitionTo(DEAD);
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(-1);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatEmptyGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+JoinGroupRequestProtocolCollection protocols = new 
JoinGroupRequestProtocolCollection();
+protocols.add(new JoinGroupRequestProtocol()
+.setName("range")
+.setMetadata(new byte[]{0}));
+
+group.add(new GenericGroupMember(
+"member-id",
+Optional.empty(),
+"client-id",
+"client-host",
+1,
+5000,
+"consumer",
+protocols
+));
+
+group.transitionTo(PREPARING_REBALANCE);
+group.transitionTo(EMPTY);
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(0);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatUnknownMemberExistingGroup() throws Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator

2023-07-27 Thread via GitHub


jeffkbkim commented on code in PR #14056:
URL: https://github.com/apache/kafka/pull/14056#discussion_r1276939557


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -8540,5 +8568,1182 @@ private static class RebalanceResult {
 this.followerAssignment = followerAssignment;
 }
 }
+
+@Test
+public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws 
Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+context.createGenericGroup("group-id");
+
+RebalanceResult rebalanceResult = staticMembersJoinAndRebalance(
+context,
+"group-id",
+"leader-instance-id",
+"follower-instance-id"
+);
+
+SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder()
+.withGroupId("group-id")
+.withGroupInstanceId("leader-instance-id")
+.withMemberId(rebalanceResult.leaderId)
+.withGenerationId(rebalanceResult.generationId)
+.build();
+
+CompletableFuture syncFuture = new 
CompletableFuture<>();
+CoordinatorResult result = 
context.sendGenericGroupSync(syncRequest, syncFuture);
+
+assertTrue(result.records().isEmpty());
+assertTrue(syncFuture.isDone());
+assertEquals(Errors.NONE.code(), syncFuture.get().errorCode());
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId(rebalanceResult.leaderId)
+.setGenerationId(rebalanceResult.generationId);
+
+HeartbeatResponseData validHeartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.NONE.code(), validHeartbeatResponse.errorCode());
+
+HeartbeatResponseData inValidHeartbeatResponse = 
context.sendGenericGroupHeartbeat(
+heartbeatRequest.setGroupInstanceId("leader-instance-id")
+.setMemberId("invalid-member-id"));
+
+assertEquals(Errors.FENCED_INSTANCE_ID.code(), 
inValidHeartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatUnknownGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(-1);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatDeadGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+group.transitionTo(DEAD);
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(-1);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatEmptyGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+JoinGroupRequestProtocolCollection protocols = new 
JoinGroupRequestProtocolCollection();
+protocols.add(new JoinGroupRequestProtocol()
+.setName("range")
+.setMetadata(new byte[]{0}));
+
+group.add(new GenericGroupMember(
+"member-id",
+Optional.empty(),
+"client-id",
+"client-host",
+1,
+5000,
+"consumer",
+protocols
+));
+
+group.transitionTo(PREPARING_REBALANCE);
+group.transitionTo(EMPTY);
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(0);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatUnknownMemberExistingGroup() throws Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator

2023-07-27 Thread via GitHub


jeffkbkim commented on code in PR #14056:
URL: https://github.com/apache/kafka/pull/14056#discussion_r1276936889


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -8540,5 +8568,1182 @@ private static class RebalanceResult {
 this.followerAssignment = followerAssignment;
 }
 }
+
+@Test
+public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws 
Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+context.createGenericGroup("group-id");
+
+RebalanceResult rebalanceResult = staticMembersJoinAndRebalance(
+context,
+"group-id",
+"leader-instance-id",
+"follower-instance-id"
+);
+
+SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder()
+.withGroupId("group-id")
+.withGroupInstanceId("leader-instance-id")
+.withMemberId(rebalanceResult.leaderId)
+.withGenerationId(rebalanceResult.generationId)
+.build();
+
+CompletableFuture syncFuture = new 
CompletableFuture<>();
+CoordinatorResult result = 
context.sendGenericGroupSync(syncRequest, syncFuture);
+
+assertTrue(result.records().isEmpty());
+assertTrue(syncFuture.isDone());
+assertEquals(Errors.NONE.code(), syncFuture.get().errorCode());
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId(rebalanceResult.leaderId)
+.setGenerationId(rebalanceResult.generationId);
+
+HeartbeatResponseData validHeartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.NONE.code(), validHeartbeatResponse.errorCode());
+
+HeartbeatResponseData inValidHeartbeatResponse = 
context.sendGenericGroupHeartbeat(
+heartbeatRequest.setGroupInstanceId("leader-instance-id")
+.setMemberId("invalid-member-id"));
+
+assertEquals(Errors.FENCED_INSTANCE_ID.code(), 
inValidHeartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatUnknownGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(-1);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatDeadGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+group.transitionTo(DEAD);
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(-1);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatEmptyGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+JoinGroupRequestProtocolCollection protocols = new 
JoinGroupRequestProtocolCollection();
+protocols.add(new JoinGroupRequestProtocol()
+.setName("range")
+.setMetadata(new byte[]{0}));
+
+group.add(new GenericGroupMember(
+"member-id",
+Optional.empty(),
+"client-id",
+"client-host",
+1,
+5000,
+"consumer",
+protocols
+));
+
+group.transitionTo(PREPARING_REBALANCE);
+group.transitionTo(EMPTY);
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(0);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatUnknownMemberExistingGroup() throws Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator

2023-07-27 Thread via GitHub


jeffkbkim commented on code in PR #14056:
URL: https://github.com/apache/kafka/pull/14056#discussion_r1276923811


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -8540,5 +8568,1182 @@ private static class RebalanceResult {
 this.followerAssignment = followerAssignment;
 }
 }
+
+@Test
+public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws 
Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+context.createGenericGroup("group-id");
+
+RebalanceResult rebalanceResult = staticMembersJoinAndRebalance(
+context,
+"group-id",
+"leader-instance-id",
+"follower-instance-id"
+);
+
+SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder()
+.withGroupId("group-id")
+.withGroupInstanceId("leader-instance-id")
+.withMemberId(rebalanceResult.leaderId)
+.withGenerationId(rebalanceResult.generationId)
+.build();
+
+CompletableFuture syncFuture = new 
CompletableFuture<>();
+CoordinatorResult result = 
context.sendGenericGroupSync(syncRequest, syncFuture);
+
+assertTrue(result.records().isEmpty());
+assertTrue(syncFuture.isDone());
+assertEquals(Errors.NONE.code(), syncFuture.get().errorCode());
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId(rebalanceResult.leaderId)
+.setGenerationId(rebalanceResult.generationId);
+
+HeartbeatResponseData validHeartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.NONE.code(), validHeartbeatResponse.errorCode());
+
+HeartbeatResponseData inValidHeartbeatResponse = 
context.sendGenericGroupHeartbeat(
+heartbeatRequest.setGroupInstanceId("leader-instance-id")
+.setMemberId("invalid-member-id"));
+
+assertEquals(Errors.FENCED_INSTANCE_ID.code(), 
inValidHeartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatUnknownGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(-1);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatDeadGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+group.transitionTo(DEAD);
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(-1);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatEmptyGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+JoinGroupRequestProtocolCollection protocols = new 
JoinGroupRequestProtocolCollection();
+protocols.add(new JoinGroupRequestProtocol()
+.setName("range")
+.setMetadata(new byte[]{0}));
+
+group.add(new GenericGroupMember(
+"member-id",
+Optional.empty(),
+"client-id",
+"client-host",
+1,
+5000,
+"consumer",
+protocols
+));
+
+group.transitionTo(PREPARING_REBALANCE);
+group.transitionTo(EMPTY);
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(0);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatUnknownMemberExistingGroup() throws Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator

2023-07-27 Thread via GitHub


jeffkbkim commented on code in PR #14056:
URL: https://github.com/apache/kafka/pull/14056#discussion_r1276922896


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -8540,5 +8568,1182 @@ private static class RebalanceResult {
 this.followerAssignment = followerAssignment;
 }
 }
+
+@Test
+public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws 
Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+context.createGenericGroup("group-id");
+
+RebalanceResult rebalanceResult = staticMembersJoinAndRebalance(
+context,
+"group-id",
+"leader-instance-id",
+"follower-instance-id"
+);
+
+SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder()
+.withGroupId("group-id")
+.withGroupInstanceId("leader-instance-id")
+.withMemberId(rebalanceResult.leaderId)
+.withGenerationId(rebalanceResult.generationId)
+.build();
+
+CompletableFuture syncFuture = new 
CompletableFuture<>();
+CoordinatorResult result = 
context.sendGenericGroupSync(syncRequest, syncFuture);
+
+assertTrue(result.records().isEmpty());
+assertTrue(syncFuture.isDone());
+assertEquals(Errors.NONE.code(), syncFuture.get().errorCode());
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId(rebalanceResult.leaderId)
+.setGenerationId(rebalanceResult.generationId);
+
+HeartbeatResponseData validHeartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.NONE.code(), validHeartbeatResponse.errorCode());
+
+HeartbeatResponseData inValidHeartbeatResponse = 
context.sendGenericGroupHeartbeat(
+heartbeatRequest.setGroupInstanceId("leader-instance-id")
+.setMemberId("invalid-member-id"));
+
+assertEquals(Errors.FENCED_INSTANCE_ID.code(), 
inValidHeartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatUnknownGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(-1);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatDeadGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+group.transitionTo(DEAD);
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(-1);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatEmptyGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+JoinGroupRequestProtocolCollection protocols = new 
JoinGroupRequestProtocolCollection();
+protocols.add(new JoinGroupRequestProtocol()
+.setName("range")
+.setMetadata(new byte[]{0}));
+
+group.add(new GenericGroupMember(
+"member-id",
+Optional.empty(),
+"client-id",
+"client-host",
+1,
+5000,
+"consumer",
+protocols
+));
+
+group.transitionTo(PREPARING_REBALANCE);
+group.transitionTo(EMPTY);
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(0);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatUnknownMemberExistingGroup() throws Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator

2023-07-27 Thread via GitHub


jeffkbkim commented on code in PR #14056:
URL: https://github.com/apache/kafka/pull/14056#discussion_r1276921862


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -8540,5 +8568,1182 @@ private static class RebalanceResult {
 this.followerAssignment = followerAssignment;
 }
 }
+
+@Test
+public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws 
Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+context.createGenericGroup("group-id");
+
+RebalanceResult rebalanceResult = staticMembersJoinAndRebalance(
+context,
+"group-id",
+"leader-instance-id",
+"follower-instance-id"
+);
+
+SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder()
+.withGroupId("group-id")
+.withGroupInstanceId("leader-instance-id")
+.withMemberId(rebalanceResult.leaderId)
+.withGenerationId(rebalanceResult.generationId)
+.build();
+
+CompletableFuture syncFuture = new 
CompletableFuture<>();
+CoordinatorResult result = 
context.sendGenericGroupSync(syncRequest, syncFuture);
+
+assertTrue(result.records().isEmpty());
+assertTrue(syncFuture.isDone());
+assertEquals(Errors.NONE.code(), syncFuture.get().errorCode());
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId(rebalanceResult.leaderId)
+.setGenerationId(rebalanceResult.generationId);
+
+HeartbeatResponseData validHeartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.NONE.code(), validHeartbeatResponse.errorCode());
+
+HeartbeatResponseData inValidHeartbeatResponse = 
context.sendGenericGroupHeartbeat(
+heartbeatRequest.setGroupInstanceId("leader-instance-id")
+.setMemberId("invalid-member-id"));
+
+assertEquals(Errors.FENCED_INSTANCE_ID.code(), 
inValidHeartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatUnknownGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(-1);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatDeadGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+group.transitionTo(DEAD);
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(-1);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatEmptyGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+JoinGroupRequestProtocolCollection protocols = new 
JoinGroupRequestProtocolCollection();
+protocols.add(new JoinGroupRequestProtocol()
+.setName("range")
+.setMetadata(new byte[]{0}));
+
+group.add(new GenericGroupMember(
+"member-id",
+Optional.empty(),
+"client-id",
+"client-host",
+1,
+5000,
+"consumer",
+protocols
+));
+
+group.transitionTo(PREPARING_REBALANCE);
+group.transitionTo(EMPTY);

Review Comment:
   thanks. removed the transitions, the group starts off as empty



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator

2023-07-27 Thread via GitHub


jeffkbkim commented on code in PR #14056:
URL: https://github.com/apache/kafka/pull/14056#discussion_r1276919519


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -8540,5 +8568,1182 @@ private static class RebalanceResult {
 this.followerAssignment = followerAssignment;
 }
 }
+
+@Test
+public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws 
Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+context.createGenericGroup("group-id");
+
+RebalanceResult rebalanceResult = staticMembersJoinAndRebalance(

Review Comment:
   resolved via rebase



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator

2023-07-27 Thread via GitHub


jeffkbkim commented on code in PR #14056:
URL: https://github.com/apache/kafka/pull/14056#discussion_r1276919382


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -8540,5 +8568,1182 @@ private static class RebalanceResult {
 this.followerAssignment = followerAssignment;
 }
 }
+
+@Test
+public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws 
Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+context.createGenericGroup("group-id");

Review Comment:
   Thanks for the catch. Removed from all tests that call 
staticMembersJoinAndRebalance



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator

2023-07-27 Thread via GitHub


jeffkbkim commented on code in PR #14056:
URL: https://github.com/apache/kafka/pull/14056#discussion_r1276874868


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -368,9 +369,29 @@ public CompletableFuture heartbeat(
 return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
 }
 
-return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-"This API is not implemented yet."
-));
+if (!isGroupIdNotEmpty(request.groupId())) {
+return CompletableFuture.completedFuture(new 
HeartbeatResponseData()
+.setErrorCode(Errors.INVALID_GROUP_ID.code()));
+}
+
+return runtime.scheduleReadOperation("generic-group-heartbeat",
+topicPartitionFor(request.groupId()),
+(coordinator, __) -> coordinator.genericGroupHeartbeat(context, 
request)

Review Comment:
   as discussed offline, we will keep it as is since it is how the current 
coordinator behaves.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator

2023-07-25 Thread via GitHub


jeffkbkim commented on code in PR #14056:
URL: https://github.com/apache/kafka/pull/14056#discussion_r1274116830


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -368,9 +369,29 @@ public CompletableFuture heartbeat(
 return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
 }
 
-return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-"This API is not implemented yet."
-));
+if (!isGroupIdNotEmpty(request.groupId())) {
+return CompletableFuture.completedFuture(new 
HeartbeatResponseData()
+.setErrorCode(Errors.INVALID_GROUP_ID.code()));
+}
+
+return runtime.scheduleReadOperation("generic-group-heartbeat",
+topicPartitionFor(request.groupId()),
+(coordinator, __) -> coordinator.genericGroupHeartbeat(context, 
request)

Review Comment:
   i wasn't too sure on this actually. how should we use the offset parameter, 
should we pass it down and use it to read the `groups` timeline map?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org