kirktrue commented on code in PR #14879: URL: https://github.com/apache/kafka/pull/14879#discussion_r1415960044
########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ########## @@ -292,6 +297,90 @@ public void testValidateConsumerGroupHeartbeatRequest(final short version) { assertNull(heartbeatRequest.data().subscribedTopicRegex()); } + @Test + public void testConsumerGroupMetadataFirstUpdate() { + resetWithZeroHeartbeatInterval(Optional.empty()); + mockStableMember(); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", 9999))); + + NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + + assertEquals(1, result.unsentRequests.size()); + NetworkClientDelegate.UnsentRequest request = result.unsentRequests.get(0); + ClientResponse response = createHeartbeatResponse(request, Errors.NONE); + result.unsentRequests.get(0).handler().onComplete(response); + assertEquals(1, backgroundEventQueue.size()); + final BackgroundEvent event = backgroundEventQueue.poll(); + assertEquals(BackgroundEvent.Type.GROUP_METADATA_UPDATE, event.type()); + final GroupMetadataUpdateEvent groupMetadataUpdateEvent = (GroupMetadataUpdateEvent) event; + final GroupMetadataUpdateEvent expectedGroupMetadataUpdateEvent = new GroupMetadataUpdateEvent( + memberEpoch, + memberId + ); + assertEquals(expectedGroupMetadataUpdateEvent, groupMetadataUpdateEvent); + } + + @Test + public void testConsumerGroupMetadataUpdateWithSameUpdate() { + resetWithZeroHeartbeatInterval(Optional.empty()); + mockStableMember(); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", 9999))); + NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(1, result.unsentRequests.size()); + NetworkClientDelegate.UnsentRequest request = result.unsentRequests.get(0); + ClientResponse firstResponse = createHeartbeatResponse(request, Errors.NONE); + request.handler().onComplete(firstResponse); + assertEquals(1, backgroundEventQueue.size()); + final BackgroundEvent firstEvent = backgroundEventQueue.poll(); + assertEquals(BackgroundEvent.Type.GROUP_METADATA_UPDATE, firstEvent.type()); + + time.sleep(2000); Review Comment: That's the intention of `resetWithZeroHeartbeatInterval()`, yes. At some point it did as it said 🤔 ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ########## @@ -292,6 +297,90 @@ public void testValidateConsumerGroupHeartbeatRequest(final short version) { assertNull(heartbeatRequest.data().subscribedTopicRegex()); } + @Test + public void testConsumerGroupMetadataFirstUpdate() { + resetWithZeroHeartbeatInterval(Optional.empty()); + mockStableMember(); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", 9999))); + + NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + + assertEquals(1, result.unsentRequests.size()); + NetworkClientDelegate.UnsentRequest request = result.unsentRequests.get(0); + ClientResponse response = createHeartbeatResponse(request, Errors.NONE); + result.unsentRequests.get(0).handler().onComplete(response); + assertEquals(1, backgroundEventQueue.size()); + final BackgroundEvent event = backgroundEventQueue.poll(); + assertEquals(BackgroundEvent.Type.GROUP_METADATA_UPDATE, event.type()); + final GroupMetadataUpdateEvent groupMetadataUpdateEvent = (GroupMetadataUpdateEvent) event; + final GroupMetadataUpdateEvent expectedGroupMetadataUpdateEvent = new GroupMetadataUpdateEvent( + memberEpoch, + memberId + ); + assertEquals(expectedGroupMetadataUpdateEvent, groupMetadataUpdateEvent); + } + + @Test + public void testConsumerGroupMetadataUpdateWithSameUpdate() { + resetWithZeroHeartbeatInterval(Optional.empty()); + mockStableMember(); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", 9999))); + NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(1, result.unsentRequests.size()); + NetworkClientDelegate.UnsentRequest request = result.unsentRequests.get(0); + ClientResponse firstResponse = createHeartbeatResponse(request, Errors.NONE); + request.handler().onComplete(firstResponse); + assertEquals(1, backgroundEventQueue.size()); + final BackgroundEvent firstEvent = backgroundEventQueue.poll(); + assertEquals(BackgroundEvent.Type.GROUP_METADATA_UPDATE, firstEvent.type()); + + time.sleep(2000); Review Comment: That's the intention of `resetWithZeroHeartbeatInterval()`, yes. At some point it did as it said 🤔 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org