lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1681562408
########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ########## @@ -514,61 +536,76 @@ private void assertNextHeartbeatTiming(long expectedTimeToNextHeartbeatMs) { @Test public void testHeartbeatState() { + CommitRequestManager commitRequestManager = mock(CommitRequestManager.class); + mockJoiningMemberData(false); + + heartbeatState = new HeartbeatState( + subscriptions, + membershipManager, + DEFAULT_MAX_POLL_INTERVAL_MS + ); + + createHeartbeatStateWithZeroHeartbeatInterval(); + // The initial ConsumerGroupHeartbeatRequest sets most fields to their initial empty values ConsumerGroupHeartbeatRequestData data = heartbeatState.buildRequestData(); - assertEquals(ConsumerTestBuilder.DEFAULT_GROUP_ID, data.groupId()); + assertEquals(DEFAULT_GROUP_ID, data.groupId()); assertEquals("", data.memberId()); assertEquals(0, data.memberEpoch()); assertNull(data.instanceId()); - assertEquals(ConsumerTestBuilder.DEFAULT_MAX_POLL_INTERVAL_MS, data.rebalanceTimeoutMs()); + assertEquals(DEFAULT_MAX_POLL_INTERVAL_MS, data.rebalanceTimeoutMs()); assertEquals(Collections.emptyList(), data.subscribedTopicNames()); - assertEquals(ConsumerTestBuilder.DEFAULT_REMOTE_ASSIGNOR, data.serverAssignor()); + assertEquals(DEFAULT_REMOTE_ASSIGNOR, data.serverAssignor()); assertEquals(Collections.emptyList(), data.topicPartitions()); - membershipManager.onHeartbeatRequestSent(); assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state()); // Mock a response from the group coordinator, that supplies the member ID and a new epoch - mockStableMember(); + when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true); + when(subscriptions.rebalanceListener()).thenReturn(Optional.empty()); + ConsumerGroupHeartbeatResponse rs1 = new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData() + .setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS) + .setMemberId(DEFAULT_MEMBER_ID) + .setMemberEpoch(DEFAULT_MEMBER_EPOCH) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()) + ); + when(commitRequestManager.maybeAutoCommitSyncBeforeRevocation(anyLong())).thenReturn(CompletableFuture.completedFuture(null)); + mockDefaultMemberData(false); data = heartbeatState.buildRequestData(); - assertEquals(ConsumerTestBuilder.DEFAULT_GROUP_ID, data.groupId()); - assertEquals(memberId, data.memberId()); + assertEquals(DEFAULT_GROUP_ID, data.groupId()); + assertEquals(DEFAULT_MEMBER_ID, data.memberId()); assertEquals(1, data.memberEpoch()); assertNull(data.instanceId()); assertEquals(-1, data.rebalanceTimeoutMs()); assertNull(data.subscribedTopicNames()); assertNull(data.serverAssignor()); - assertEquals(data.topicPartitions(), Collections.emptyList()); - membershipManager.onHeartbeatRequestSent(); + assertEquals(Collections.emptyList(), data.topicPartitions()); assertEquals(MemberState.STABLE, membershipManager.state()); Review Comment: I think we should remove this now that the `membershipMgr` is a mock, as it should be :) (it has no value since it's passing only because we set the expectation ourselves on `mockDefaultMemberData`) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org