lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1681564234
########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ########## @@ -514,61 +536,76 @@ private void assertNextHeartbeatTiming(long expectedTimeToNextHeartbeatMs) { @Test public void testHeartbeatState() { + CommitRequestManager commitRequestManager = mock(CommitRequestManager.class); + mockJoiningMemberData(false); + + heartbeatState = new HeartbeatState( + subscriptions, + membershipManager, + DEFAULT_MAX_POLL_INTERVAL_MS + ); + + createHeartbeatStateWithZeroHeartbeatInterval(); + // The initial ConsumerGroupHeartbeatRequest sets most fields to their initial empty values ConsumerGroupHeartbeatRequestData data = heartbeatState.buildRequestData(); - assertEquals(ConsumerTestBuilder.DEFAULT_GROUP_ID, data.groupId()); + assertEquals(DEFAULT_GROUP_ID, data.groupId()); assertEquals("", data.memberId()); assertEquals(0, data.memberEpoch()); assertNull(data.instanceId()); - assertEquals(ConsumerTestBuilder.DEFAULT_MAX_POLL_INTERVAL_MS, data.rebalanceTimeoutMs()); + assertEquals(DEFAULT_MAX_POLL_INTERVAL_MS, data.rebalanceTimeoutMs()); assertEquals(Collections.emptyList(), data.subscribedTopicNames()); - assertEquals(ConsumerTestBuilder.DEFAULT_REMOTE_ASSIGNOR, data.serverAssignor()); + assertEquals(DEFAULT_REMOTE_ASSIGNOR, data.serverAssignor()); assertEquals(Collections.emptyList(), data.topicPartitions()); - membershipManager.onHeartbeatRequestSent(); assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state()); // Mock a response from the group coordinator, that supplies the member ID and a new epoch - mockStableMember(); + when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true); + when(subscriptions.rebalanceListener()).thenReturn(Optional.empty()); + ConsumerGroupHeartbeatResponse rs1 = new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData() + .setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS) + .setMemberId(DEFAULT_MEMBER_ID) + .setMemberEpoch(DEFAULT_MEMBER_EPOCH) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()) + ); + when(commitRequestManager.maybeAutoCommitSyncBeforeRevocation(anyLong())).thenReturn(CompletableFuture.completedFuture(null)); + mockDefaultMemberData(false); data = heartbeatState.buildRequestData(); - assertEquals(ConsumerTestBuilder.DEFAULT_GROUP_ID, data.groupId()); - assertEquals(memberId, data.memberId()); + assertEquals(DEFAULT_GROUP_ID, data.groupId()); + assertEquals(DEFAULT_MEMBER_ID, data.memberId()); assertEquals(1, data.memberEpoch()); assertNull(data.instanceId()); assertEquals(-1, data.rebalanceTimeoutMs()); assertNull(data.subscribedTopicNames()); assertNull(data.serverAssignor()); - assertEquals(data.topicPartitions(), Collections.emptyList()); - membershipManager.onHeartbeatRequestSent(); + assertEquals(Collections.emptyList(), data.topicPartitions()); assertEquals(MemberState.STABLE, membershipManager.state()); // Join the group and subscribe to a topic, but the response has not yet been received String topic = "topic1"; subscriptions.subscribe(Collections.singleton(topic), Optional.empty()); - membershipManager.onSubscriptionUpdated(); - membershipManager.transitionToFenced(); // And indirect way of moving to JOINING state + when(subscriptions.subscription()).thenReturn(Collections.singleton(topic)); + mockFencedToJoiningMemberData(); data = heartbeatState.buildRequestData(); - assertEquals(ConsumerTestBuilder.DEFAULT_GROUP_ID, data.groupId()); - assertEquals(memberId, data.memberId()); + assertEquals(DEFAULT_GROUP_ID, data.groupId()); + assertEquals(DEFAULT_MEMBER_ID, data.memberId()); assertEquals(0, data.memberEpoch()); assertNull(data.instanceId()); assertEquals(DEFAULT_MAX_POLL_INTERVAL_MS, data.rebalanceTimeoutMs()); assertEquals(Collections.singletonList(topic), data.subscribedTopicNames()); - assertEquals(ConsumerTestBuilder.DEFAULT_REMOTE_ASSIGNOR, data.serverAssignor()); + assertEquals(DEFAULT_REMOTE_ASSIGNOR, data.serverAssignor()); assertEquals(Collections.emptyList(), data.topicPartitions()); - membershipManager.onHeartbeatRequestSent(); assertEquals(MemberState.JOINING, membershipManager.state()); Review Comment: similar, no value, we should remove this (and the expectation we set with the value on `mockFencedToJoiningMemberData`) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org