[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator
jeffkbkim commented on code in PR #14056: URL: https://github.com/apache/kafka/pull/14056#discussion_r1276971308 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -8540,5 +8568,1182 @@ private static class RebalanceResult { this.followerAssignment = followerAssignment; } } + +@Test +public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +context.createGenericGroup("group-id"); + +RebalanceResult rebalanceResult = staticMembersJoinAndRebalance( +context, +"group-id", +"leader-instance-id", +"follower-instance-id" +); + +SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder() +.withGroupId("group-id") +.withGroupInstanceId("leader-instance-id") +.withMemberId(rebalanceResult.leaderId) +.withGenerationId(rebalanceResult.generationId) +.build(); + +CompletableFuture syncFuture = new CompletableFuture<>(); +CoordinatorResult result = context.sendGenericGroupSync(syncRequest, syncFuture); + +assertTrue(result.records().isEmpty()); +assertTrue(syncFuture.isDone()); +assertEquals(Errors.NONE.code(), syncFuture.get().errorCode()); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId(rebalanceResult.leaderId) +.setGenerationId(rebalanceResult.generationId); + +HeartbeatResponseData validHeartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.NONE.code(), validHeartbeatResponse.errorCode()); + +HeartbeatResponseData inValidHeartbeatResponse = context.sendGenericGroupHeartbeat( +heartbeatRequest.setGroupInstanceId("leader-instance-id") +.setMemberId("invalid-member-id")); + +assertEquals(Errors.FENCED_INSTANCE_ID.code(), inValidHeartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatUnknownGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(-1); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatDeadGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +group.transitionTo(DEAD); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(-1); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatEmptyGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +JoinGroupRequestProtocolCollection protocols = new JoinGroupRequestProtocolCollection(); +protocols.add(new JoinGroupRequestProtocol() +.setName("range") +.setMetadata(new byte[]{0})); + +group.add(new GenericGroupMember( +"member-id", +Optional.empty(), +"client-id", +"client-host", +1, +5000, +"consumer", +protocols +)); + +group.transitionTo(PREPARING_REBALANCE); +group.transitionTo(EMPTY); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(0); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatUnknownMemberExistingGroup() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator
jeffkbkim commented on code in PR #14056: URL: https://github.com/apache/kafka/pull/14056#discussion_r1276959521 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -8540,5 +8568,1182 @@ private static class RebalanceResult { this.followerAssignment = followerAssignment; } } + +@Test +public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +context.createGenericGroup("group-id"); + +RebalanceResult rebalanceResult = staticMembersJoinAndRebalance( +context, +"group-id", +"leader-instance-id", +"follower-instance-id" +); + +SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder() +.withGroupId("group-id") +.withGroupInstanceId("leader-instance-id") +.withMemberId(rebalanceResult.leaderId) +.withGenerationId(rebalanceResult.generationId) +.build(); + +CompletableFuture syncFuture = new CompletableFuture<>(); +CoordinatorResult result = context.sendGenericGroupSync(syncRequest, syncFuture); + +assertTrue(result.records().isEmpty()); +assertTrue(syncFuture.isDone()); +assertEquals(Errors.NONE.code(), syncFuture.get().errorCode()); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId(rebalanceResult.leaderId) +.setGenerationId(rebalanceResult.generationId); + +HeartbeatResponseData validHeartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.NONE.code(), validHeartbeatResponse.errorCode()); + +HeartbeatResponseData inValidHeartbeatResponse = context.sendGenericGroupHeartbeat( +heartbeatRequest.setGroupInstanceId("leader-instance-id") +.setMemberId("invalid-member-id")); + +assertEquals(Errors.FENCED_INSTANCE_ID.code(), inValidHeartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatUnknownGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(-1); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatDeadGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +group.transitionTo(DEAD); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(-1); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatEmptyGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +JoinGroupRequestProtocolCollection protocols = new JoinGroupRequestProtocolCollection(); +protocols.add(new JoinGroupRequestProtocol() +.setName("range") +.setMetadata(new byte[]{0})); + +group.add(new GenericGroupMember( +"member-id", +Optional.empty(), +"client-id", +"client-host", +1, +5000, +"consumer", +protocols +)); + +group.transitionTo(PREPARING_REBALANCE); +group.transitionTo(EMPTY); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(0); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatUnknownMemberExistingGroup() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator
jeffkbkim commented on code in PR #14056: URL: https://github.com/apache/kafka/pull/14056#discussion_r1276944101 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -8540,5 +8568,1182 @@ private static class RebalanceResult { this.followerAssignment = followerAssignment; } } + +@Test +public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +context.createGenericGroup("group-id"); + +RebalanceResult rebalanceResult = staticMembersJoinAndRebalance( +context, +"group-id", +"leader-instance-id", +"follower-instance-id" +); + +SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder() +.withGroupId("group-id") +.withGroupInstanceId("leader-instance-id") +.withMemberId(rebalanceResult.leaderId) +.withGenerationId(rebalanceResult.generationId) +.build(); + +CompletableFuture syncFuture = new CompletableFuture<>(); +CoordinatorResult result = context.sendGenericGroupSync(syncRequest, syncFuture); + +assertTrue(result.records().isEmpty()); +assertTrue(syncFuture.isDone()); +assertEquals(Errors.NONE.code(), syncFuture.get().errorCode()); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId(rebalanceResult.leaderId) +.setGenerationId(rebalanceResult.generationId); + +HeartbeatResponseData validHeartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.NONE.code(), validHeartbeatResponse.errorCode()); + +HeartbeatResponseData inValidHeartbeatResponse = context.sendGenericGroupHeartbeat( +heartbeatRequest.setGroupInstanceId("leader-instance-id") +.setMemberId("invalid-member-id")); + +assertEquals(Errors.FENCED_INSTANCE_ID.code(), inValidHeartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatUnknownGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(-1); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatDeadGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +group.transitionTo(DEAD); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(-1); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatEmptyGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +JoinGroupRequestProtocolCollection protocols = new JoinGroupRequestProtocolCollection(); +protocols.add(new JoinGroupRequestProtocol() +.setName("range") +.setMetadata(new byte[]{0})); + +group.add(new GenericGroupMember( +"member-id", +Optional.empty(), +"client-id", +"client-host", +1, +5000, +"consumer", +protocols +)); + +group.transitionTo(PREPARING_REBALANCE); +group.transitionTo(EMPTY); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(0); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatUnknownMemberExistingGroup() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator
jeffkbkim commented on code in PR #14056: URL: https://github.com/apache/kafka/pull/14056#discussion_r1276943880 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -8540,5 +8568,1182 @@ private static class RebalanceResult { this.followerAssignment = followerAssignment; } } + +@Test +public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +context.createGenericGroup("group-id"); + +RebalanceResult rebalanceResult = staticMembersJoinAndRebalance( +context, +"group-id", +"leader-instance-id", +"follower-instance-id" +); + +SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder() +.withGroupId("group-id") +.withGroupInstanceId("leader-instance-id") +.withMemberId(rebalanceResult.leaderId) +.withGenerationId(rebalanceResult.generationId) +.build(); + +CompletableFuture syncFuture = new CompletableFuture<>(); +CoordinatorResult result = context.sendGenericGroupSync(syncRequest, syncFuture); + +assertTrue(result.records().isEmpty()); +assertTrue(syncFuture.isDone()); +assertEquals(Errors.NONE.code(), syncFuture.get().errorCode()); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId(rebalanceResult.leaderId) +.setGenerationId(rebalanceResult.generationId); + +HeartbeatResponseData validHeartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.NONE.code(), validHeartbeatResponse.errorCode()); + +HeartbeatResponseData inValidHeartbeatResponse = context.sendGenericGroupHeartbeat( +heartbeatRequest.setGroupInstanceId("leader-instance-id") +.setMemberId("invalid-member-id")); + +assertEquals(Errors.FENCED_INSTANCE_ID.code(), inValidHeartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatUnknownGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(-1); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatDeadGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +group.transitionTo(DEAD); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(-1); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatEmptyGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +JoinGroupRequestProtocolCollection protocols = new JoinGroupRequestProtocolCollection(); +protocols.add(new JoinGroupRequestProtocol() +.setName("range") +.setMetadata(new byte[]{0})); + +group.add(new GenericGroupMember( +"member-id", +Optional.empty(), +"client-id", +"client-host", +1, +5000, +"consumer", +protocols +)); + +group.transitionTo(PREPARING_REBALANCE); +group.transitionTo(EMPTY); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(0); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatUnknownMemberExistingGroup() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator
jeffkbkim commented on code in PR #14056: URL: https://github.com/apache/kafka/pull/14056#discussion_r1276942521 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -8540,5 +8568,1182 @@ private static class RebalanceResult { this.followerAssignment = followerAssignment; } } + +@Test +public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +context.createGenericGroup("group-id"); + +RebalanceResult rebalanceResult = staticMembersJoinAndRebalance( +context, +"group-id", +"leader-instance-id", +"follower-instance-id" +); + +SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder() +.withGroupId("group-id") +.withGroupInstanceId("leader-instance-id") +.withMemberId(rebalanceResult.leaderId) +.withGenerationId(rebalanceResult.generationId) +.build(); + +CompletableFuture syncFuture = new CompletableFuture<>(); +CoordinatorResult result = context.sendGenericGroupSync(syncRequest, syncFuture); + +assertTrue(result.records().isEmpty()); +assertTrue(syncFuture.isDone()); +assertEquals(Errors.NONE.code(), syncFuture.get().errorCode()); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId(rebalanceResult.leaderId) +.setGenerationId(rebalanceResult.generationId); + +HeartbeatResponseData validHeartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.NONE.code(), validHeartbeatResponse.errorCode()); + +HeartbeatResponseData inValidHeartbeatResponse = context.sendGenericGroupHeartbeat( +heartbeatRequest.setGroupInstanceId("leader-instance-id") +.setMemberId("invalid-member-id")); + +assertEquals(Errors.FENCED_INSTANCE_ID.code(), inValidHeartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatUnknownGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(-1); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatDeadGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +group.transitionTo(DEAD); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(-1); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatEmptyGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +JoinGroupRequestProtocolCollection protocols = new JoinGroupRequestProtocolCollection(); +protocols.add(new JoinGroupRequestProtocol() +.setName("range") +.setMetadata(new byte[]{0})); + +group.add(new GenericGroupMember( +"member-id", +Optional.empty(), +"client-id", +"client-host", +1, +5000, +"consumer", +protocols +)); + +group.transitionTo(PREPARING_REBALANCE); +group.transitionTo(EMPTY); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(0); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatUnknownMemberExistingGroup() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator
jeffkbkim commented on code in PR #14056: URL: https://github.com/apache/kafka/pull/14056#discussion_r1276939557 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -8540,5 +8568,1182 @@ private static class RebalanceResult { this.followerAssignment = followerAssignment; } } + +@Test +public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +context.createGenericGroup("group-id"); + +RebalanceResult rebalanceResult = staticMembersJoinAndRebalance( +context, +"group-id", +"leader-instance-id", +"follower-instance-id" +); + +SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder() +.withGroupId("group-id") +.withGroupInstanceId("leader-instance-id") +.withMemberId(rebalanceResult.leaderId) +.withGenerationId(rebalanceResult.generationId) +.build(); + +CompletableFuture syncFuture = new CompletableFuture<>(); +CoordinatorResult result = context.sendGenericGroupSync(syncRequest, syncFuture); + +assertTrue(result.records().isEmpty()); +assertTrue(syncFuture.isDone()); +assertEquals(Errors.NONE.code(), syncFuture.get().errorCode()); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId(rebalanceResult.leaderId) +.setGenerationId(rebalanceResult.generationId); + +HeartbeatResponseData validHeartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.NONE.code(), validHeartbeatResponse.errorCode()); + +HeartbeatResponseData inValidHeartbeatResponse = context.sendGenericGroupHeartbeat( +heartbeatRequest.setGroupInstanceId("leader-instance-id") +.setMemberId("invalid-member-id")); + +assertEquals(Errors.FENCED_INSTANCE_ID.code(), inValidHeartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatUnknownGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(-1); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatDeadGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +group.transitionTo(DEAD); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(-1); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatEmptyGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +JoinGroupRequestProtocolCollection protocols = new JoinGroupRequestProtocolCollection(); +protocols.add(new JoinGroupRequestProtocol() +.setName("range") +.setMetadata(new byte[]{0})); + +group.add(new GenericGroupMember( +"member-id", +Optional.empty(), +"client-id", +"client-host", +1, +5000, +"consumer", +protocols +)); + +group.transitionTo(PREPARING_REBALANCE); +group.transitionTo(EMPTY); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(0); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatUnknownMemberExistingGroup() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator
jeffkbkim commented on code in PR #14056: URL: https://github.com/apache/kafka/pull/14056#discussion_r1276936889 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -8540,5 +8568,1182 @@ private static class RebalanceResult { this.followerAssignment = followerAssignment; } } + +@Test +public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +context.createGenericGroup("group-id"); + +RebalanceResult rebalanceResult = staticMembersJoinAndRebalance( +context, +"group-id", +"leader-instance-id", +"follower-instance-id" +); + +SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder() +.withGroupId("group-id") +.withGroupInstanceId("leader-instance-id") +.withMemberId(rebalanceResult.leaderId) +.withGenerationId(rebalanceResult.generationId) +.build(); + +CompletableFuture syncFuture = new CompletableFuture<>(); +CoordinatorResult result = context.sendGenericGroupSync(syncRequest, syncFuture); + +assertTrue(result.records().isEmpty()); +assertTrue(syncFuture.isDone()); +assertEquals(Errors.NONE.code(), syncFuture.get().errorCode()); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId(rebalanceResult.leaderId) +.setGenerationId(rebalanceResult.generationId); + +HeartbeatResponseData validHeartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.NONE.code(), validHeartbeatResponse.errorCode()); + +HeartbeatResponseData inValidHeartbeatResponse = context.sendGenericGroupHeartbeat( +heartbeatRequest.setGroupInstanceId("leader-instance-id") +.setMemberId("invalid-member-id")); + +assertEquals(Errors.FENCED_INSTANCE_ID.code(), inValidHeartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatUnknownGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(-1); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatDeadGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +group.transitionTo(DEAD); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(-1); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatEmptyGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +JoinGroupRequestProtocolCollection protocols = new JoinGroupRequestProtocolCollection(); +protocols.add(new JoinGroupRequestProtocol() +.setName("range") +.setMetadata(new byte[]{0})); + +group.add(new GenericGroupMember( +"member-id", +Optional.empty(), +"client-id", +"client-host", +1, +5000, +"consumer", +protocols +)); + +group.transitionTo(PREPARING_REBALANCE); +group.transitionTo(EMPTY); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(0); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatUnknownMemberExistingGroup() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator
jeffkbkim commented on code in PR #14056: URL: https://github.com/apache/kafka/pull/14056#discussion_r1276923811 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -8540,5 +8568,1182 @@ private static class RebalanceResult { this.followerAssignment = followerAssignment; } } + +@Test +public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +context.createGenericGroup("group-id"); + +RebalanceResult rebalanceResult = staticMembersJoinAndRebalance( +context, +"group-id", +"leader-instance-id", +"follower-instance-id" +); + +SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder() +.withGroupId("group-id") +.withGroupInstanceId("leader-instance-id") +.withMemberId(rebalanceResult.leaderId) +.withGenerationId(rebalanceResult.generationId) +.build(); + +CompletableFuture syncFuture = new CompletableFuture<>(); +CoordinatorResult result = context.sendGenericGroupSync(syncRequest, syncFuture); + +assertTrue(result.records().isEmpty()); +assertTrue(syncFuture.isDone()); +assertEquals(Errors.NONE.code(), syncFuture.get().errorCode()); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId(rebalanceResult.leaderId) +.setGenerationId(rebalanceResult.generationId); + +HeartbeatResponseData validHeartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.NONE.code(), validHeartbeatResponse.errorCode()); + +HeartbeatResponseData inValidHeartbeatResponse = context.sendGenericGroupHeartbeat( +heartbeatRequest.setGroupInstanceId("leader-instance-id") +.setMemberId("invalid-member-id")); + +assertEquals(Errors.FENCED_INSTANCE_ID.code(), inValidHeartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatUnknownGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(-1); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatDeadGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +group.transitionTo(DEAD); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(-1); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatEmptyGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +JoinGroupRequestProtocolCollection protocols = new JoinGroupRequestProtocolCollection(); +protocols.add(new JoinGroupRequestProtocol() +.setName("range") +.setMetadata(new byte[]{0})); + +group.add(new GenericGroupMember( +"member-id", +Optional.empty(), +"client-id", +"client-host", +1, +5000, +"consumer", +protocols +)); + +group.transitionTo(PREPARING_REBALANCE); +group.transitionTo(EMPTY); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(0); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatUnknownMemberExistingGroup() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator
jeffkbkim commented on code in PR #14056: URL: https://github.com/apache/kafka/pull/14056#discussion_r1276922896 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -8540,5 +8568,1182 @@ private static class RebalanceResult { this.followerAssignment = followerAssignment; } } + +@Test +public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +context.createGenericGroup("group-id"); + +RebalanceResult rebalanceResult = staticMembersJoinAndRebalance( +context, +"group-id", +"leader-instance-id", +"follower-instance-id" +); + +SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder() +.withGroupId("group-id") +.withGroupInstanceId("leader-instance-id") +.withMemberId(rebalanceResult.leaderId) +.withGenerationId(rebalanceResult.generationId) +.build(); + +CompletableFuture syncFuture = new CompletableFuture<>(); +CoordinatorResult result = context.sendGenericGroupSync(syncRequest, syncFuture); + +assertTrue(result.records().isEmpty()); +assertTrue(syncFuture.isDone()); +assertEquals(Errors.NONE.code(), syncFuture.get().errorCode()); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId(rebalanceResult.leaderId) +.setGenerationId(rebalanceResult.generationId); + +HeartbeatResponseData validHeartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.NONE.code(), validHeartbeatResponse.errorCode()); + +HeartbeatResponseData inValidHeartbeatResponse = context.sendGenericGroupHeartbeat( +heartbeatRequest.setGroupInstanceId("leader-instance-id") +.setMemberId("invalid-member-id")); + +assertEquals(Errors.FENCED_INSTANCE_ID.code(), inValidHeartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatUnknownGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(-1); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatDeadGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +group.transitionTo(DEAD); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(-1); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatEmptyGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +JoinGroupRequestProtocolCollection protocols = new JoinGroupRequestProtocolCollection(); +protocols.add(new JoinGroupRequestProtocol() +.setName("range") +.setMetadata(new byte[]{0})); + +group.add(new GenericGroupMember( +"member-id", +Optional.empty(), +"client-id", +"client-host", +1, +5000, +"consumer", +protocols +)); + +group.transitionTo(PREPARING_REBALANCE); +group.transitionTo(EMPTY); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(0); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatUnknownMemberExistingGroup() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator
jeffkbkim commented on code in PR #14056: URL: https://github.com/apache/kafka/pull/14056#discussion_r1276921862 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -8540,5 +8568,1182 @@ private static class RebalanceResult { this.followerAssignment = followerAssignment; } } + +@Test +public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +context.createGenericGroup("group-id"); + +RebalanceResult rebalanceResult = staticMembersJoinAndRebalance( +context, +"group-id", +"leader-instance-id", +"follower-instance-id" +); + +SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder() +.withGroupId("group-id") +.withGroupInstanceId("leader-instance-id") +.withMemberId(rebalanceResult.leaderId) +.withGenerationId(rebalanceResult.generationId) +.build(); + +CompletableFuture syncFuture = new CompletableFuture<>(); +CoordinatorResult result = context.sendGenericGroupSync(syncRequest, syncFuture); + +assertTrue(result.records().isEmpty()); +assertTrue(syncFuture.isDone()); +assertEquals(Errors.NONE.code(), syncFuture.get().errorCode()); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId(rebalanceResult.leaderId) +.setGenerationId(rebalanceResult.generationId); + +HeartbeatResponseData validHeartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.NONE.code(), validHeartbeatResponse.errorCode()); + +HeartbeatResponseData inValidHeartbeatResponse = context.sendGenericGroupHeartbeat( +heartbeatRequest.setGroupInstanceId("leader-instance-id") +.setMemberId("invalid-member-id")); + +assertEquals(Errors.FENCED_INSTANCE_ID.code(), inValidHeartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatUnknownGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(-1); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatDeadGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +group.transitionTo(DEAD); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(-1); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatEmptyGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +JoinGroupRequestProtocolCollection protocols = new JoinGroupRequestProtocolCollection(); +protocols.add(new JoinGroupRequestProtocol() +.setName("range") +.setMetadata(new byte[]{0})); + +group.add(new GenericGroupMember( +"member-id", +Optional.empty(), +"client-id", +"client-host", +1, +5000, +"consumer", +protocols +)); + +group.transitionTo(PREPARING_REBALANCE); +group.transitionTo(EMPTY); Review Comment: thanks. removed the transitions, the group starts off as empty -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator
jeffkbkim commented on code in PR #14056: URL: https://github.com/apache/kafka/pull/14056#discussion_r1276919519 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -8540,5 +8568,1182 @@ private static class RebalanceResult { this.followerAssignment = followerAssignment; } } + +@Test +public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +context.createGenericGroup("group-id"); + +RebalanceResult rebalanceResult = staticMembersJoinAndRebalance( Review Comment: resolved via rebase -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator
jeffkbkim commented on code in PR #14056: URL: https://github.com/apache/kafka/pull/14056#discussion_r1276919382 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -8540,5 +8568,1182 @@ private static class RebalanceResult { this.followerAssignment = followerAssignment; } } + +@Test +public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +context.createGenericGroup("group-id"); Review Comment: Thanks for the catch. Removed from all tests that call staticMembersJoinAndRebalance -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator
jeffkbkim commented on code in PR #14056: URL: https://github.com/apache/kafka/pull/14056#discussion_r1276874868 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -368,9 +369,29 @@ public CompletableFuture heartbeat( return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } -return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( -"This API is not implemented yet." -)); +if (!isGroupIdNotEmpty(request.groupId())) { +return CompletableFuture.completedFuture(new HeartbeatResponseData() +.setErrorCode(Errors.INVALID_GROUP_ID.code())); +} + +return runtime.scheduleReadOperation("generic-group-heartbeat", +topicPartitionFor(request.groupId()), +(coordinator, __) -> coordinator.genericGroupHeartbeat(context, request) Review Comment: as discussed offline, we will keep it as is since it is how the current coordinator behaves. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator
jeffkbkim commented on code in PR #14056: URL: https://github.com/apache/kafka/pull/14056#discussion_r1274116830 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -368,9 +369,29 @@ public CompletableFuture heartbeat( return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } -return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( -"This API is not implemented yet." -)); +if (!isGroupIdNotEmpty(request.groupId())) { +return CompletableFuture.completedFuture(new HeartbeatResponseData() +.setErrorCode(Errors.INVALID_GROUP_ID.code())); +} + +return runtime.scheduleReadOperation("generic-group-heartbeat", +topicPartitionFor(request.groupId()), +(coordinator, __) -> coordinator.genericGroupHeartbeat(context, request) Review Comment: i wasn't too sure on this actually. how should we use the offset parameter, should we pass it down and use it to read the `groups` timeline map? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org