Re: [PR] KAFKA-15999: Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
chia7712 merged PR #16200: URL: https://github.com/apache/kafka/pull/16200 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999: Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
brenden20 commented on PR #16200: URL: https://github.com/apache/kafka/pull/16200#issuecomment-2248600964 @chia7712 fixed that small issue with the name. Let me know if there is anything else! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999: Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1690216933 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -106,60 +99,85 @@ public class HeartbeatRequestManagerTest { private MembershipManager membershipManager; private HeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState; private HeartbeatRequestManager.HeartbeatState heartbeatState; -private final String memberId = "member-id"; -private final int memberEpoch = 1; private BackgroundEventHandler backgroundEventHandler; -private Metrics metrics; +private LogContext logContext; @BeforeEach public void setUp() { -setUp(ConsumerTestBuilder.createDefaultGroupInformation()); -} +this.time = new MockTime(); +this.logContext = new LogContext(); +this.pollTimer = spy(time.timer(DEFAULT_MAX_POLL_INTERVAL_MS)); +this.coordinatorRequestManager = mock(CoordinatorRequestManager.class); +this.heartbeatState = mock(HeartbeatState.class); +this.backgroundEventHandler = mock(BackgroundEventHandler.class); +this.subscriptions = mock(SubscriptionState.class); +this.membershipManager = mock(MembershipManagerImpl.class); +this.metadata = mock(ConsumerMetadata.class); +Metrics metrics = new Metrics(time); +ConsumerConfig config = mock(ConsumerConfig.class); + +this.heartbeatRequestState = spy(new HeartbeatRequestState( +logContext, +time, +DEFAULT_HEARTBEAT_INTERVAL_MS, +DEFAULT_RETRY_BACKOFF_MS, +DEFAULT_RETRY_BACKOFF_MAX_MS, +DEFAULT_HEARTBEAT_JITTER_MS)); -private void setUp(Optional groupInfo) { -testBuilder = new ConsumerTestBuilder(groupInfo, true, false); -time = testBuilder.time; -coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new); -heartbeatRequestManager = testBuilder.heartbeatRequestManager.orElseThrow(IllegalStateException::new); -heartbeatRequestState = testBuilder.heartbeatRequestState.orElseThrow(IllegalStateException::new); -heartbeatState = testBuilder.heartbeatState.orElseThrow(IllegalStateException::new); -backgroundEventHandler = testBuilder.backgroundEventHandler; -subscriptions = testBuilder.subscriptions; -membershipManager = testBuilder.membershipManager.orElseThrow(IllegalStateException::new); -metadata = testBuilder.metadata; -metrics = new Metrics(time); +this.heartbeatRequestManager = new HeartbeatRequestManager( +logContext, +pollTimer, +config, +coordinatorRequestManager, +membershipManager, +heartbeatState, +heartbeatRequestState, +backgroundEventHandler, +metrics); - when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", ))); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mock(Node.class))); } -private void resetWithZeroHeartbeatInterval(Optional groupInstanceId) { -cleanup(); - -ConsumerTestBuilder.GroupInformation gi = new ConsumerTestBuilder.GroupInformation( -DEFAULT_GROUP_ID, -groupInstanceId, +private void createHeartbeatRequestStateWithZeroHeartbeatInterval() { +this.heartbeatRequestState = spy(new HeartbeatRequestState( +logContext, +time, 0, -0.0, -Optional.of(DEFAULT_REMOTE_ASSIGNOR) -); +DEFAULT_RETRY_BACKOFF_MS, +DEFAULT_RETRY_BACKOFF_MAX_MS, +DEFAULT_HEARTBEAT_JITTER_MS)); -setUp(Optional.of(gi)); +this.heartbeatRequestManager = createHeartbeatRequestManager( +coordinatorRequestManager, +membershipManager, +heartbeatState, +heartbeatRequestState, +backgroundEventHandler); } -@AfterEach -public void cleanup() { -if (testBuilder != null) { -testBuilder.close(); -} +private void createHeartbeatStateandRequestManager() { Review Comment: Whoops, missed that. Fixed now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999: Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
chia7712 commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1690141224 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -106,60 +99,85 @@ public class HeartbeatRequestManagerTest { private MembershipManager membershipManager; private HeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState; private HeartbeatRequestManager.HeartbeatState heartbeatState; -private final String memberId = "member-id"; -private final int memberEpoch = 1; private BackgroundEventHandler backgroundEventHandler; -private Metrics metrics; +private LogContext logContext; @BeforeEach public void setUp() { -setUp(ConsumerTestBuilder.createDefaultGroupInformation()); -} +this.time = new MockTime(); +this.logContext = new LogContext(); +this.pollTimer = spy(time.timer(DEFAULT_MAX_POLL_INTERVAL_MS)); +this.coordinatorRequestManager = mock(CoordinatorRequestManager.class); +this.heartbeatState = mock(HeartbeatState.class); +this.backgroundEventHandler = mock(BackgroundEventHandler.class); +this.subscriptions = mock(SubscriptionState.class); +this.membershipManager = mock(MembershipManagerImpl.class); +this.metadata = mock(ConsumerMetadata.class); +Metrics metrics = new Metrics(time); +ConsumerConfig config = mock(ConsumerConfig.class); + +this.heartbeatRequestState = spy(new HeartbeatRequestState( +logContext, +time, +DEFAULT_HEARTBEAT_INTERVAL_MS, +DEFAULT_RETRY_BACKOFF_MS, +DEFAULT_RETRY_BACKOFF_MAX_MS, +DEFAULT_HEARTBEAT_JITTER_MS)); -private void setUp(Optional groupInfo) { -testBuilder = new ConsumerTestBuilder(groupInfo, true, false); -time = testBuilder.time; -coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new); -heartbeatRequestManager = testBuilder.heartbeatRequestManager.orElseThrow(IllegalStateException::new); -heartbeatRequestState = testBuilder.heartbeatRequestState.orElseThrow(IllegalStateException::new); -heartbeatState = testBuilder.heartbeatState.orElseThrow(IllegalStateException::new); -backgroundEventHandler = testBuilder.backgroundEventHandler; -subscriptions = testBuilder.subscriptions; -membershipManager = testBuilder.membershipManager.orElseThrow(IllegalStateException::new); -metadata = testBuilder.metadata; -metrics = new Metrics(time); +this.heartbeatRequestManager = new HeartbeatRequestManager( +logContext, +pollTimer, +config, +coordinatorRequestManager, +membershipManager, +heartbeatState, +heartbeatRequestState, +backgroundEventHandler, +metrics); - when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", ))); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mock(Node.class))); } -private void resetWithZeroHeartbeatInterval(Optional groupInstanceId) { -cleanup(); - -ConsumerTestBuilder.GroupInformation gi = new ConsumerTestBuilder.GroupInformation( -DEFAULT_GROUP_ID, -groupInstanceId, +private void createHeartbeatRequestStateWithZeroHeartbeatInterval() { +this.heartbeatRequestState = spy(new HeartbeatRequestState( +logContext, +time, 0, -0.0, -Optional.of(DEFAULT_REMOTE_ASSIGNOR) -); +DEFAULT_RETRY_BACKOFF_MS, +DEFAULT_RETRY_BACKOFF_MAX_MS, +DEFAULT_HEARTBEAT_JITTER_MS)); -setUp(Optional.of(gi)); +this.heartbeatRequestManager = createHeartbeatRequestManager( +coordinatorRequestManager, +membershipManager, +heartbeatState, +heartbeatRequestState, +backgroundEventHandler); } -@AfterEach -public void cleanup() { -if (testBuilder != null) { -testBuilder.close(); -} +private void createHeartbeatStateandRequestManager() { Review Comment: nit: `createHeartbeatStateandRequestManager` -> `createHeartbeatStateAndRequestManager` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999: Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
brenden20 commented on PR #16200: URL: https://github.com/apache/kafka/pull/16200#issuecomment-2248441396 @chia7712 thank you for your feedback! I have addressed your comment by moving those object creations into a helper method. Let me know how it looks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999: Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1690112360 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -360,57 +371,68 @@ public void testNoCoordinator() { @ParameterizedTest @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT) public void testValidateConsumerGroupHeartbeatRequest(final short version) { +heartbeatState = new HeartbeatState( Review Comment: Good point! I have moved those object creations into a helper method now. Let me know what you think. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999: Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
chia7712 commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1689639268 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -360,57 +371,68 @@ public void testNoCoordinator() { @ParameterizedTest @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT) public void testValidateConsumerGroupHeartbeatRequest(final short version) { +heartbeatState = new HeartbeatState( Review Comment: I guess those creations can be moved to a helper to be shared by `testValidateConsumerGroupHeartbeatRequestAssignmentSentWhenLocalEpochChanges`, `testValidateConsumerGroupHeartbeatRequest` and `testFirstHeartbeatIncludesRequiredInfoToJoinGroupAndGetAssignments` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999: Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
chia7712 commented on PR #16200: URL: https://github.com/apache/kafka/pull/16200#issuecomment-2246117642 I will take a look today! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999: Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on PR #16200: URL: https://github.com/apache/kafka/pull/16200#issuecomment-2246108930 Hey @chia7712, would you maybe have some time to take a look at this? Nice improvement @brenden20 did on the HeartbeatRequestManager test. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
brenden20 commented on PR #16200: URL: https://github.com/apache/kafka/pull/16200#issuecomment-2239426940 @lianetm thank you for the review! I addressed the comments you left. Let me know if it is good -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1684512734 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -898,6 +837,30 @@ private HeartbeatRequestManager createHeartbeatRequestManager( heartbeatState, heartbeatRequestState, backgroundEventHandler, -metrics); +new Metrics()); +} + +private void mockJoiningMemberData(String instanceId) { Review Comment: I see, good catch! Changed now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on PR #16200: URL: https://github.com/apache/kafka/pull/16200#issuecomment-2237383499 Hey @brenden20 , thanks for the updates! Just a few minor comments left. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1683379819 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -898,6 +837,30 @@ private HeartbeatRequestManager createHeartbeatRequestManager( heartbeatState, heartbeatRequestState, backgroundEventHandler, -metrics); +new Metrics()); +} + +private void mockJoiningMemberData(String instanceId) { + when(membershipManager.groupInstanceId()).thenReturn(Optional.ofNullable(instanceId)); +when(membershipManager.memberId()).thenReturn(""); +when(membershipManager.memberEpoch()).thenReturn(0); +when(membershipManager.groupId()).thenReturn(DEFAULT_GROUP_ID); + when(membershipManager.currentAssignment()).thenReturn(LocalAssignment.NONE); + when(membershipManager.serverAssignor()).thenReturn(Optional.of(DEFAULT_REMOTE_ASSIGNOR)); +} + +private void mockFencedToJoiningMemberData() { +when(membershipManager.state()).thenReturn(MemberState.JOINING); +when(membershipManager.memberEpoch()).thenReturn(0); +when(membershipManager.groupInstanceId()).thenReturn(Optional.empty()); +} + +private void mockDefaultMemberData(String instanceId) { Review Comment: nit: `mockStableMemberData` maybe? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1683363391 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -898,6 +837,30 @@ private HeartbeatRequestManager createHeartbeatRequestManager( heartbeatState, heartbeatRequestState, backgroundEventHandler, -metrics); +new Metrics()); +} + +private void mockJoiningMemberData(String instanceId) { Review Comment: I realize now that we have to properly set the expected state here because it is considered when building the request data, to determine if it should be a full heartbeat or not (HeartbeatRequestManager [ln#566](https://github.com/apache/kafka/blob/f595802cc752ed01dc74e9ab932209fe25a9d10b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L566)), so I would say that this should include `when(membershipManager.state()).thenReturn(MemberState.JOINING)`. The test is probably passing anyways because of the way this func is used (first HB where all previous sentFields are null), but just to be true to how this will happen in real-life scenarios. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1683375014 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -898,6 +837,30 @@ private HeartbeatRequestManager createHeartbeatRequestManager( heartbeatState, heartbeatRequestState, backgroundEventHandler, -metrics); +new Metrics()); +} + +private void mockJoiningMemberData(String instanceId) { + when(membershipManager.groupInstanceId()).thenReturn(Optional.ofNullable(instanceId)); +when(membershipManager.memberId()).thenReturn(""); +when(membershipManager.memberEpoch()).thenReturn(0); +when(membershipManager.groupId()).thenReturn(DEFAULT_GROUP_ID); + when(membershipManager.currentAssignment()).thenReturn(LocalAssignment.NONE); + when(membershipManager.serverAssignor()).thenReturn(Optional.of(DEFAULT_REMOTE_ASSIGNOR)); +} + +private void mockFencedToJoiningMemberData() { Review Comment: nit: `mockRejoiningMemberData` is maybe better (fencing is just only one of the ways to get to rejoining, not specifically used here) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1683363391 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -898,6 +837,30 @@ private HeartbeatRequestManager createHeartbeatRequestManager( heartbeatState, heartbeatRequestState, backgroundEventHandler, -metrics); +new Metrics()); +} + +private void mockJoiningMemberData(String instanceId) { Review Comment: I realize now that we have to properly set the expected state here because it is considered when building the request data, to determine if it should be a full heartbeat or not (HeartbeatRequestManager [ln#566](https://github.com/apache/kafka/blob/f595802cc752ed01dc74e9ab932209fe25a9d10b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L566)), so I would say that this should include `when(membershipManager.state()).thenReturn(MemberState.JOINING)`. The test is probably passing anyways because of the way this func is used (first HB), but just to be true to how this will happen in real-life scenarios. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1683363391 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -898,6 +837,30 @@ private HeartbeatRequestManager createHeartbeatRequestManager( heartbeatState, heartbeatRequestState, backgroundEventHandler, -metrics); +new Metrics()); +} + +private void mockJoiningMemberData(String instanceId) { Review Comment: I realize now that we have to properly set the expected state here because it is considered when building the request data, to determine if it should be a full heartbeat or not (HeartbeatRequestManager [ln#566](https://github.com/apache/kafka/blob/f595802cc752ed01dc74e9ab932209fe25a9d10b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L566)), so I would say that this should include `when(membershipManager.state()).thenReturn(MemberState.JOINING)` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1683354934 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -659,95 +665,50 @@ public void testPollTimerExpirationShouldNotMarkMemberStaleIfMemberAlreadyLeavin @Test public void testisExpiredByUsedForLogging() { -Timer pollTimer = spy(time.timer(DEFAULT_MAX_POLL_INTERVAL_MS)); -heartbeatRequestManager = new HeartbeatRequestManager(new LogContext(), pollTimer, config(), -coordinatorRequestManager, membershipManager, heartbeatState, heartbeatRequestState, -backgroundEventHandler, metrics); when(membershipManager.shouldSkipHeartbeat()).thenReturn(false); int exceededTimeMs = 5; time.sleep(DEFAULT_MAX_POLL_INTERVAL_MS + exceededTimeMs); +when(membershipManager.isLeavingGroup()).thenReturn(false); NetworkClientDelegate.PollResult pollResult = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(1, pollResult.unsentRequests.size()); verify(membershipManager).transitionToSendingLeaveGroup(true); verify(pollTimer, never()).isExpiredBy(); -assertEquals(exceededTimeMs, pollTimer.isExpiredBy()); clearInvocations(pollTimer); heartbeatRequestManager.resetPollTimer(time.milliseconds()); verify(pollTimer).isExpiredBy(); } -@Test -public void testHeartbeatMetrics() { -// setup -coordinatorRequestManager = mock(CoordinatorRequestManager.class); -membershipManager = mock(MembershipManager.class); -heartbeatState = mock(HeartbeatRequestManager.HeartbeatState.class); -time = new MockTime(); -metrics = new Metrics(time); -heartbeatRequestState = new HeartbeatRequestManager.HeartbeatRequestState( -new LogContext(), -time, -0, // This initial interval should be 0 to ensure heartbeat on the clock -DEFAULT_RETRY_BACKOFF_MS, -DEFAULT_RETRY_BACKOFF_MAX_MS, -0); -backgroundEventHandler = mock(BackgroundEventHandler.class); -heartbeatRequestManager = createHeartbeatRequestManager( -coordinatorRequestManager, -membershipManager, -heartbeatState, -heartbeatRequestState, -backgroundEventHandler); - when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", ))); -when(membershipManager.state()).thenReturn(MemberState.STABLE); - -assertNotNull(getMetric("heartbeat-response-time-max")); -assertNotNull(getMetric("heartbeat-rate")); -assertNotNull(getMetric("heartbeat-total")); -assertNotNull(getMetric("last-heartbeat-seconds-ago")); - -// test poll -assertHeartbeat(heartbeatRequestManager, 0); -time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); -assertEquals(1.0, getMetric("heartbeat-total").metricValue()); -assertEquals((double) TimeUnit.MILLISECONDS.toSeconds(DEFAULT_HEARTBEAT_INTERVAL_MS), getMetric("last-heartbeat-seconds-ago").metricValue()); - -assertHeartbeat(heartbeatRequestManager, DEFAULT_HEARTBEAT_INTERVAL_MS); -assertEquals(0.06d, (double) getMetric("heartbeat-rate").metricValue(), 0.005d); -assertEquals(2.0, getMetric("heartbeat-total").metricValue()); - -// Randomly sleep for some time -Random rand = new Random(); -int randomSleepS = rand.nextInt(11); -time.sleep(randomSleepS * 1000); -assertEquals((double) randomSleepS, getMetric("last-heartbeat-seconds-ago").metricValue()); -} - @Test public void testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() { -mockStableMember(); +heartbeatRequestManager = createHeartbeatRequestManager( +coordinatorRequestManager, +membershipManager, +heartbeatState, +heartbeatRequestState, +backgroundEventHandler); time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(1, result.unsentRequests.size()); // Receive HB response fencing member when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true); -doNothing().when(membershipManager).transitionToFenced(); ClientResponse response = createHeartbeatResponse(result.unsentRequests.get(0), Errors.FENCED_MEMBER_EPOCH); result.unsentRequests.get(0).handler().onComplete(response); verify(membershipManager).transitionToFenced(); verify(heartbeatRequestState).onFailedAttempt(anyLong()); verify(heartbeatRequestState).reset(); +when(membershipManager.shouldSkipHeartbeat()).t
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1683354345 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -659,95 +665,50 @@ public void testPollTimerExpirationShouldNotMarkMemberStaleIfMemberAlreadyLeavin @Test public void testisExpiredByUsedForLogging() { -Timer pollTimer = spy(time.timer(DEFAULT_MAX_POLL_INTERVAL_MS)); -heartbeatRequestManager = new HeartbeatRequestManager(new LogContext(), pollTimer, config(), -coordinatorRequestManager, membershipManager, heartbeatState, heartbeatRequestState, -backgroundEventHandler, metrics); when(membershipManager.shouldSkipHeartbeat()).thenReturn(false); int exceededTimeMs = 5; time.sleep(DEFAULT_MAX_POLL_INTERVAL_MS + exceededTimeMs); +when(membershipManager.isLeavingGroup()).thenReturn(false); NetworkClientDelegate.PollResult pollResult = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(1, pollResult.unsentRequests.size()); verify(membershipManager).transitionToSendingLeaveGroup(true); verify(pollTimer, never()).isExpiredBy(); -assertEquals(exceededTimeMs, pollTimer.isExpiredBy()); clearInvocations(pollTimer); heartbeatRequestManager.resetPollTimer(time.milliseconds()); verify(pollTimer).isExpiredBy(); } -@Test -public void testHeartbeatMetrics() { -// setup -coordinatorRequestManager = mock(CoordinatorRequestManager.class); -membershipManager = mock(MembershipManager.class); -heartbeatState = mock(HeartbeatRequestManager.HeartbeatState.class); -time = new MockTime(); -metrics = new Metrics(time); -heartbeatRequestState = new HeartbeatRequestManager.HeartbeatRequestState( -new LogContext(), -time, -0, // This initial interval should be 0 to ensure heartbeat on the clock -DEFAULT_RETRY_BACKOFF_MS, -DEFAULT_RETRY_BACKOFF_MAX_MS, -0); -backgroundEventHandler = mock(BackgroundEventHandler.class); -heartbeatRequestManager = createHeartbeatRequestManager( -coordinatorRequestManager, -membershipManager, -heartbeatState, -heartbeatRequestState, -backgroundEventHandler); - when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", ))); -when(membershipManager.state()).thenReturn(MemberState.STABLE); - -assertNotNull(getMetric("heartbeat-response-time-max")); -assertNotNull(getMetric("heartbeat-rate")); -assertNotNull(getMetric("heartbeat-total")); -assertNotNull(getMetric("last-heartbeat-seconds-ago")); - -// test poll -assertHeartbeat(heartbeatRequestManager, 0); -time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); -assertEquals(1.0, getMetric("heartbeat-total").metricValue()); -assertEquals((double) TimeUnit.MILLISECONDS.toSeconds(DEFAULT_HEARTBEAT_INTERVAL_MS), getMetric("last-heartbeat-seconds-ago").metricValue()); - -assertHeartbeat(heartbeatRequestManager, DEFAULT_HEARTBEAT_INTERVAL_MS); -assertEquals(0.06d, (double) getMetric("heartbeat-rate").metricValue(), 0.005d); -assertEquals(2.0, getMetric("heartbeat-total").metricValue()); - -// Randomly sleep for some time -Random rand = new Random(); -int randomSleepS = rand.nextInt(11); -time.sleep(randomSleepS * 1000); -assertEquals((double) randomSleepS, getMetric("last-heartbeat-seconds-ago").metricValue()); -} - @Test public void testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() { -mockStableMember(); +heartbeatRequestManager = createHeartbeatRequestManager( +coordinatorRequestManager, +membershipManager, +heartbeatState, +heartbeatRequestState, +backgroundEventHandler); time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(1, result.unsentRequests.size()); // Receive HB response fencing member when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true); -doNothing().when(membershipManager).transitionToFenced(); ClientResponse response = createHeartbeatResponse(result.unsentRequests.get(0), Errors.FENCED_MEMBER_EPOCH); result.unsentRequests.get(0).handler().onComplete(response); verify(membershipManager).transitionToFenced(); verify(heartbeatRequestState).onFailedAttempt(anyLong()); verify(heartbeatRequestState).reset(); +when(membershipManager.shouldSkipHeartbeat()).t
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1683353250 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -621,7 +628,6 @@ public void testPollTimerExpiration() { assertHeartbeat(heartbeatRequestManager, DEFAULT_HEARTBEAT_INTERVAL_MS); verify(membershipManager).transitionToSendingLeaveGroup(true); verify(heartbeatState).reset(); -verify(heartbeatRequestState).reset(); verify(membershipManager).onHeartbeatRequestSent(); when(membershipManager.state()).thenReturn(MemberState.STALE); Review Comment: maybe no needed anymore? (the following expectation on `shouldSkipHeartbeat` is probably all we need) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
brenden20 commented on PR #16200: URL: https://github.com/apache/kafka/pull/16200#issuecomment-2234881749 @lianetm thank you again for the review and helpful comments, I have implemented all suggestions! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1681916086 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -898,6 +854,42 @@ private HeartbeatRequestManager createHeartbeatRequestManager( heartbeatState, heartbeatRequestState, backgroundEventHandler, -metrics); +new Metrics()); +} + +private void mockJoiningMemberData(boolean instanceId) { +when(membershipManager.memberId()).thenReturn(""); +when(membershipManager.memberEpoch()).thenReturn(0); +when(membershipManager.groupId()).thenReturn(DEFAULT_GROUP_ID); +if (instanceId) + when(membershipManager.groupInstanceId()).thenReturn(Optional.of(DEFAULT_GROUP_INSTANCE_ID)); +else + when(membershipManager.groupInstanceId()).thenReturn(Optional.empty()); Review Comment: I do like this implementation better, thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1681913213 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -898,6 +854,42 @@ private HeartbeatRequestManager createHeartbeatRequestManager( heartbeatState, heartbeatRequestState, backgroundEventHandler, -metrics); +new Metrics()); +} + +private void mockJoiningMemberData(boolean instanceId) { +when(membershipManager.memberId()).thenReturn(""); +when(membershipManager.memberEpoch()).thenReturn(0); +when(membershipManager.groupId()).thenReturn(DEFAULT_GROUP_ID); +if (instanceId) + when(membershipManager.groupInstanceId()).thenReturn(Optional.of(DEFAULT_GROUP_INSTANCE_ID)); +else + when(membershipManager.groupInstanceId()).thenReturn(Optional.empty()); + when(membershipManager.currentAssignment()).thenReturn(LocalAssignment.NONE); + when(membershipManager.serverAssignor()).thenReturn(Optional.of("uniform")); +when(membershipManager.state()).thenReturn(MemberState.UNSUBSCRIBED); Review Comment: Makes sense, changed now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on PR #16200: URL: https://github.com/apache/kafka/pull/16200#issuecomment-2234017232 Thanks for the updates @brenden20 . This is looking very good, left some comments just to keep improving. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1681566194 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -580,39 +617,26 @@ public void testHeartbeatState() { ConsumerGroupHeartbeatResponseData.Assignment assignmentTopic1 = new ConsumerGroupHeartbeatResponseData.Assignment(); assignmentTopic1.setTopicPartitions(Collections.singletonList(tpTopic1)); -ConsumerGroupHeartbeatResponse rs1 = new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData() +rs1 = new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData() .setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS) -.setMemberId(memberId) +.setMemberId(DEFAULT_MEMBER_ID) .setMemberEpoch(1) .setAssignment(assignmentTopic1)); when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId, "topic1")); -membershipManager.onHeartbeatSuccess(rs1.data()); +mockReconcilingState(); // We remain in RECONCILING state, as the assignment will be reconciled on the next poll assertEquals(MemberState.RECONCILING, membershipManager.state()); Review Comment: ditto (and the `mockReconcilingState` could be completely removed) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1681573819 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -40,13 +38,13 @@ import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter; Review Comment: let's remove. This is making the build [fail](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-16200/59/pipeline/) with unused import -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1681564234 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -514,61 +536,76 @@ private void assertNextHeartbeatTiming(long expectedTimeToNextHeartbeatMs) { @Test public void testHeartbeatState() { +CommitRequestManager commitRequestManager = mock(CommitRequestManager.class); +mockJoiningMemberData(false); + +heartbeatState = new HeartbeatState( +subscriptions, +membershipManager, +DEFAULT_MAX_POLL_INTERVAL_MS +); + +createHeartbeatStateWithZeroHeartbeatInterval(); + // The initial ConsumerGroupHeartbeatRequest sets most fields to their initial empty values ConsumerGroupHeartbeatRequestData data = heartbeatState.buildRequestData(); -assertEquals(ConsumerTestBuilder.DEFAULT_GROUP_ID, data.groupId()); +assertEquals(DEFAULT_GROUP_ID, data.groupId()); assertEquals("", data.memberId()); assertEquals(0, data.memberEpoch()); assertNull(data.instanceId()); -assertEquals(ConsumerTestBuilder.DEFAULT_MAX_POLL_INTERVAL_MS, data.rebalanceTimeoutMs()); +assertEquals(DEFAULT_MAX_POLL_INTERVAL_MS, data.rebalanceTimeoutMs()); assertEquals(Collections.emptyList(), data.subscribedTopicNames()); -assertEquals(ConsumerTestBuilder.DEFAULT_REMOTE_ASSIGNOR, data.serverAssignor()); +assertEquals(DEFAULT_REMOTE_ASSIGNOR, data.serverAssignor()); assertEquals(Collections.emptyList(), data.topicPartitions()); -membershipManager.onHeartbeatRequestSent(); assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state()); // Mock a response from the group coordinator, that supplies the member ID and a new epoch -mockStableMember(); +when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true); +when(subscriptions.rebalanceListener()).thenReturn(Optional.empty()); +ConsumerGroupHeartbeatResponse rs1 = new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData() +.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS) +.setMemberId(DEFAULT_MEMBER_ID) +.setMemberEpoch(DEFAULT_MEMBER_EPOCH) +.setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()) +); + when(commitRequestManager.maybeAutoCommitSyncBeforeRevocation(anyLong())).thenReturn(CompletableFuture.completedFuture(null)); +mockDefaultMemberData(false); data = heartbeatState.buildRequestData(); -assertEquals(ConsumerTestBuilder.DEFAULT_GROUP_ID, data.groupId()); -assertEquals(memberId, data.memberId()); +assertEquals(DEFAULT_GROUP_ID, data.groupId()); +assertEquals(DEFAULT_MEMBER_ID, data.memberId()); assertEquals(1, data.memberEpoch()); assertNull(data.instanceId()); assertEquals(-1, data.rebalanceTimeoutMs()); assertNull(data.subscribedTopicNames()); assertNull(data.serverAssignor()); -assertEquals(data.topicPartitions(), Collections.emptyList()); -membershipManager.onHeartbeatRequestSent(); +assertEquals(Collections.emptyList(), data.topicPartitions()); assertEquals(MemberState.STABLE, membershipManager.state()); // Join the group and subscribe to a topic, but the response has not yet been received String topic = "topic1"; subscriptions.subscribe(Collections.singleton(topic), Optional.empty()); -membershipManager.onSubscriptionUpdated(); -membershipManager.transitionToFenced(); // And indirect way of moving to JOINING state + when(subscriptions.subscription()).thenReturn(Collections.singleton(topic)); +mockFencedToJoiningMemberData(); data = heartbeatState.buildRequestData(); -assertEquals(ConsumerTestBuilder.DEFAULT_GROUP_ID, data.groupId()); -assertEquals(memberId, data.memberId()); +assertEquals(DEFAULT_GROUP_ID, data.groupId()); +assertEquals(DEFAULT_MEMBER_ID, data.memberId()); assertEquals(0, data.memberEpoch()); assertNull(data.instanceId()); assertEquals(DEFAULT_MAX_POLL_INTERVAL_MS, data.rebalanceTimeoutMs()); assertEquals(Collections.singletonList(topic), data.subscribedTopicNames()); -assertEquals(ConsumerTestBuilder.DEFAULT_REMOTE_ASSIGNOR, data.serverAssignor()); +assertEquals(DEFAULT_REMOTE_ASSIGNOR, data.serverAssignor()); assertEquals(Collections.emptyList(), data.topicPartitions()); -membershipManager.onHeartbeatRequestSent(); assertEquals(MemberState.JOINING, membershipManager.state()); Review Comment: similar, no value anymore. We should remove this (and the expectation we set with the value on `m
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1681569580 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -514,61 +536,76 @@ private void assertNextHeartbeatTiming(long expectedTimeToNextHeartbeatMs) { @Test public void testHeartbeatState() { +CommitRequestManager commitRequestManager = mock(CommitRequestManager.class); +mockJoiningMemberData(false); + +heartbeatState = new HeartbeatState( +subscriptions, +membershipManager, +DEFAULT_MAX_POLL_INTERVAL_MS +); + +createHeartbeatStateWithZeroHeartbeatInterval(); + // The initial ConsumerGroupHeartbeatRequest sets most fields to their initial empty values ConsumerGroupHeartbeatRequestData data = heartbeatState.buildRequestData(); -assertEquals(ConsumerTestBuilder.DEFAULT_GROUP_ID, data.groupId()); +assertEquals(DEFAULT_GROUP_ID, data.groupId()); assertEquals("", data.memberId()); assertEquals(0, data.memberEpoch()); assertNull(data.instanceId()); -assertEquals(ConsumerTestBuilder.DEFAULT_MAX_POLL_INTERVAL_MS, data.rebalanceTimeoutMs()); +assertEquals(DEFAULT_MAX_POLL_INTERVAL_MS, data.rebalanceTimeoutMs()); assertEquals(Collections.emptyList(), data.subscribedTopicNames()); -assertEquals(ConsumerTestBuilder.DEFAULT_REMOTE_ASSIGNOR, data.serverAssignor()); +assertEquals(DEFAULT_REMOTE_ASSIGNOR, data.serverAssignor()); assertEquals(Collections.emptyList(), data.topicPartitions()); -membershipManager.onHeartbeatRequestSent(); assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state()); Review Comment: same as comment [here](https://github.com/apache/kafka/pull/16200/files#r1681562408) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1681564234 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -514,61 +536,76 @@ private void assertNextHeartbeatTiming(long expectedTimeToNextHeartbeatMs) { @Test public void testHeartbeatState() { +CommitRequestManager commitRequestManager = mock(CommitRequestManager.class); +mockJoiningMemberData(false); + +heartbeatState = new HeartbeatState( +subscriptions, +membershipManager, +DEFAULT_MAX_POLL_INTERVAL_MS +); + +createHeartbeatStateWithZeroHeartbeatInterval(); + // The initial ConsumerGroupHeartbeatRequest sets most fields to their initial empty values ConsumerGroupHeartbeatRequestData data = heartbeatState.buildRequestData(); -assertEquals(ConsumerTestBuilder.DEFAULT_GROUP_ID, data.groupId()); +assertEquals(DEFAULT_GROUP_ID, data.groupId()); assertEquals("", data.memberId()); assertEquals(0, data.memberEpoch()); assertNull(data.instanceId()); -assertEquals(ConsumerTestBuilder.DEFAULT_MAX_POLL_INTERVAL_MS, data.rebalanceTimeoutMs()); +assertEquals(DEFAULT_MAX_POLL_INTERVAL_MS, data.rebalanceTimeoutMs()); assertEquals(Collections.emptyList(), data.subscribedTopicNames()); -assertEquals(ConsumerTestBuilder.DEFAULT_REMOTE_ASSIGNOR, data.serverAssignor()); +assertEquals(DEFAULT_REMOTE_ASSIGNOR, data.serverAssignor()); assertEquals(Collections.emptyList(), data.topicPartitions()); -membershipManager.onHeartbeatRequestSent(); assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state()); // Mock a response from the group coordinator, that supplies the member ID and a new epoch -mockStableMember(); +when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true); +when(subscriptions.rebalanceListener()).thenReturn(Optional.empty()); +ConsumerGroupHeartbeatResponse rs1 = new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData() +.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS) +.setMemberId(DEFAULT_MEMBER_ID) +.setMemberEpoch(DEFAULT_MEMBER_EPOCH) +.setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()) +); + when(commitRequestManager.maybeAutoCommitSyncBeforeRevocation(anyLong())).thenReturn(CompletableFuture.completedFuture(null)); +mockDefaultMemberData(false); data = heartbeatState.buildRequestData(); -assertEquals(ConsumerTestBuilder.DEFAULT_GROUP_ID, data.groupId()); -assertEquals(memberId, data.memberId()); +assertEquals(DEFAULT_GROUP_ID, data.groupId()); +assertEquals(DEFAULT_MEMBER_ID, data.memberId()); assertEquals(1, data.memberEpoch()); assertNull(data.instanceId()); assertEquals(-1, data.rebalanceTimeoutMs()); assertNull(data.subscribedTopicNames()); assertNull(data.serverAssignor()); -assertEquals(data.topicPartitions(), Collections.emptyList()); -membershipManager.onHeartbeatRequestSent(); +assertEquals(Collections.emptyList(), data.topicPartitions()); assertEquals(MemberState.STABLE, membershipManager.state()); // Join the group and subscribe to a topic, but the response has not yet been received String topic = "topic1"; subscriptions.subscribe(Collections.singleton(topic), Optional.empty()); -membershipManager.onSubscriptionUpdated(); -membershipManager.transitionToFenced(); // And indirect way of moving to JOINING state + when(subscriptions.subscription()).thenReturn(Collections.singleton(topic)); +mockFencedToJoiningMemberData(); data = heartbeatState.buildRequestData(); -assertEquals(ConsumerTestBuilder.DEFAULT_GROUP_ID, data.groupId()); -assertEquals(memberId, data.memberId()); +assertEquals(DEFAULT_GROUP_ID, data.groupId()); +assertEquals(DEFAULT_MEMBER_ID, data.memberId()); assertEquals(0, data.memberEpoch()); assertNull(data.instanceId()); assertEquals(DEFAULT_MAX_POLL_INTERVAL_MS, data.rebalanceTimeoutMs()); assertEquals(Collections.singletonList(topic), data.subscribedTopicNames()); -assertEquals(ConsumerTestBuilder.DEFAULT_REMOTE_ASSIGNOR, data.serverAssignor()); +assertEquals(DEFAULT_REMOTE_ASSIGNOR, data.serverAssignor()); assertEquals(Collections.emptyList(), data.topicPartitions()); -membershipManager.onHeartbeatRequestSent(); assertEquals(MemberState.JOINING, membershipManager.state()); Review Comment: similar, no value, we should remove this (and the expectation we set with the value on `mockFence
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1681562408 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -514,61 +536,76 @@ private void assertNextHeartbeatTiming(long expectedTimeToNextHeartbeatMs) { @Test public void testHeartbeatState() { +CommitRequestManager commitRequestManager = mock(CommitRequestManager.class); +mockJoiningMemberData(false); + +heartbeatState = new HeartbeatState( +subscriptions, +membershipManager, +DEFAULT_MAX_POLL_INTERVAL_MS +); + +createHeartbeatStateWithZeroHeartbeatInterval(); + // The initial ConsumerGroupHeartbeatRequest sets most fields to their initial empty values ConsumerGroupHeartbeatRequestData data = heartbeatState.buildRequestData(); -assertEquals(ConsumerTestBuilder.DEFAULT_GROUP_ID, data.groupId()); +assertEquals(DEFAULT_GROUP_ID, data.groupId()); assertEquals("", data.memberId()); assertEquals(0, data.memberEpoch()); assertNull(data.instanceId()); -assertEquals(ConsumerTestBuilder.DEFAULT_MAX_POLL_INTERVAL_MS, data.rebalanceTimeoutMs()); +assertEquals(DEFAULT_MAX_POLL_INTERVAL_MS, data.rebalanceTimeoutMs()); assertEquals(Collections.emptyList(), data.subscribedTopicNames()); -assertEquals(ConsumerTestBuilder.DEFAULT_REMOTE_ASSIGNOR, data.serverAssignor()); +assertEquals(DEFAULT_REMOTE_ASSIGNOR, data.serverAssignor()); assertEquals(Collections.emptyList(), data.topicPartitions()); -membershipManager.onHeartbeatRequestSent(); assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state()); // Mock a response from the group coordinator, that supplies the member ID and a new epoch -mockStableMember(); +when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true); +when(subscriptions.rebalanceListener()).thenReturn(Optional.empty()); +ConsumerGroupHeartbeatResponse rs1 = new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData() +.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS) +.setMemberId(DEFAULT_MEMBER_ID) +.setMemberEpoch(DEFAULT_MEMBER_EPOCH) +.setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()) +); + when(commitRequestManager.maybeAutoCommitSyncBeforeRevocation(anyLong())).thenReturn(CompletableFuture.completedFuture(null)); +mockDefaultMemberData(false); data = heartbeatState.buildRequestData(); -assertEquals(ConsumerTestBuilder.DEFAULT_GROUP_ID, data.groupId()); -assertEquals(memberId, data.memberId()); +assertEquals(DEFAULT_GROUP_ID, data.groupId()); +assertEquals(DEFAULT_MEMBER_ID, data.memberId()); assertEquals(1, data.memberEpoch()); assertNull(data.instanceId()); assertEquals(-1, data.rebalanceTimeoutMs()); assertNull(data.subscribedTopicNames()); assertNull(data.serverAssignor()); -assertEquals(data.topicPartitions(), Collections.emptyList()); -membershipManager.onHeartbeatRequestSent(); +assertEquals(Collections.emptyList(), data.topicPartitions()); assertEquals(MemberState.STABLE, membershipManager.state()); Review Comment: I think we should remove this now that the `membershipMgr` is a mock, as it should be :) (it has no value since it's passing only because we set the expectation ourselves on `mockDefaultMemberData`) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1681559941 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -514,61 +536,76 @@ private void assertNextHeartbeatTiming(long expectedTimeToNextHeartbeatMs) { @Test public void testHeartbeatState() { +CommitRequestManager commitRequestManager = mock(CommitRequestManager.class); Review Comment: I would expect we don't need anything related to the commitRequestManager, because the `HBManager` we're testing knows nothing about it. It's the `MembershipMgr` the one that commits offsets before revoking partitions. Could you try removing this and the expectation on ln 571? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1681548025 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -898,6 +854,42 @@ private HeartbeatRequestManager createHeartbeatRequestManager( heartbeatState, heartbeatRequestState, backgroundEventHandler, -metrics); +new Metrics()); +} + +private void mockJoiningMemberData(boolean instanceId) { +when(membershipManager.memberId()).thenReturn(""); +when(membershipManager.memberEpoch()).thenReturn(0); +when(membershipManager.groupId()).thenReturn(DEFAULT_GROUP_ID); +if (instanceId) + when(membershipManager.groupInstanceId()).thenReturn(Optional.of(DEFAULT_GROUP_INSTANCE_ID)); +else + when(membershipManager.groupInstanceId()).thenReturn(Optional.empty()); Review Comment: what about simplifying this to: ``` when(membershipManager.groupInstanceId()).thenReturn(Optional.ofNullable(instanceId)); ``` Passing a String instanceId as param. I find it's also clearer to have the string param with the explicit `instanceId` to use (or null). ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -898,6 +854,42 @@ private HeartbeatRequestManager createHeartbeatRequestManager( heartbeatState, heartbeatRequestState, backgroundEventHandler, -metrics); +new Metrics()); +} + +private void mockJoiningMemberData(boolean instanceId) { +when(membershipManager.memberId()).thenReturn(""); +when(membershipManager.memberEpoch()).thenReturn(0); +when(membershipManager.groupId()).thenReturn(DEFAULT_GROUP_ID); +if (instanceId) + when(membershipManager.groupInstanceId()).thenReturn(Optional.of(DEFAULT_GROUP_INSTANCE_ID)); +else + when(membershipManager.groupInstanceId()).thenReturn(Optional.empty()); Review Comment: what about simplifying this to: ``` when(membershipManager.groupInstanceId()).thenReturn(Optional.ofNullable(instanceId)); ``` Passing a String instanceId as param. I find it's also clearer to have the string param with the explicit `instanceId` to use (or null). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1681548025 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -898,6 +854,42 @@ private HeartbeatRequestManager createHeartbeatRequestManager( heartbeatState, heartbeatRequestState, backgroundEventHandler, -metrics); +new Metrics()); +} + +private void mockJoiningMemberData(boolean instanceId) { +when(membershipManager.memberId()).thenReturn(""); +when(membershipManager.memberEpoch()).thenReturn(0); +when(membershipManager.groupId()).thenReturn(DEFAULT_GROUP_ID); +if (instanceId) + when(membershipManager.groupInstanceId()).thenReturn(Optional.of(DEFAULT_GROUP_INSTANCE_ID)); +else + when(membershipManager.groupInstanceId()).thenReturn(Optional.empty()); Review Comment: what about simplifying this to: ``` when(membershipManager.groupInstanceId()).thenReturn(Optional.ofNullable(instanceId)); ``` Passing a String instanceId as param. I find it's also clearer to have the string param with the explicit `instanceId` to use (or null). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1681537376 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -898,6 +854,42 @@ private HeartbeatRequestManager createHeartbeatRequestManager( heartbeatState, heartbeatRequestState, backgroundEventHandler, -metrics); +new Metrics()); +} + +private void mockJoiningMemberData(boolean instanceId) { +when(membershipManager.memberId()).thenReturn(""); +when(membershipManager.memberEpoch()).thenReturn(0); +when(membershipManager.groupId()).thenReturn(DEFAULT_GROUP_ID); +if (instanceId) + when(membershipManager.groupInstanceId()).thenReturn(Optional.of(DEFAULT_GROUP_INSTANCE_ID)); +else + when(membershipManager.groupInstanceId()).thenReturn(Optional.empty()); + when(membershipManager.currentAssignment()).thenReturn(LocalAssignment.NONE); + when(membershipManager.serverAssignor()).thenReturn(Optional.of("uniform")); +when(membershipManager.state()).thenReturn(MemberState.UNSUBSCRIBED); +} + +private void mockFencedToJoiningMemberData() { +when(membershipManager.state()).thenReturn(MemberState.JOINING); +when(membershipManager.memberEpoch()).thenReturn(0); +when(membershipManager.groupInstanceId()).thenReturn(Optional.empty()); +} + +private void mockDefaultMemberData(boolean instanceId) { +when(membershipManager.currentAssignment()).thenReturn(new LocalAssignment(0, Collections.emptyMap())); +when(membershipManager.groupId()).thenReturn(DEFAULT_GROUP_ID); +when(membershipManager.memberId()).thenReturn(DEFAULT_MEMBER_ID); +when(membershipManager.memberEpoch()).thenReturn(DEFAULT_MEMBER_EPOCH); +if (instanceId) + when(membershipManager.groupInstanceId()).thenReturn(Optional.of(DEFAULT_GROUP_INSTANCE_ID)); +else + when(membershipManager.groupInstanceId()).thenReturn(Optional.empty()); + when(membershipManager.serverAssignor()).thenReturn(Optional.of("uniform")); Review Comment: DEFAULT_REMOTE_ASSIGNOR ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1681535352 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -898,6 +854,42 @@ private HeartbeatRequestManager createHeartbeatRequestManager( heartbeatState, heartbeatRequestState, backgroundEventHandler, -metrics); +new Metrics()); +} + +private void mockJoiningMemberData(boolean instanceId) { +when(membershipManager.memberId()).thenReturn(""); +when(membershipManager.memberEpoch()).thenReturn(0); +when(membershipManager.groupId()).thenReturn(DEFAULT_GROUP_ID); +if (instanceId) + when(membershipManager.groupInstanceId()).thenReturn(Optional.of(DEFAULT_GROUP_INSTANCE_ID)); +else + when(membershipManager.groupInstanceId()).thenReturn(Optional.empty()); + when(membershipManager.currentAssignment()).thenReturn(LocalAssignment.NONE); + when(membershipManager.serverAssignor()).thenReturn(Optional.of("uniform")); Review Comment: should we use the constant we already have? DEFAULT_REMOTE_ASSIGNOR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1681534114 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -898,6 +854,42 @@ private HeartbeatRequestManager createHeartbeatRequestManager( heartbeatState, heartbeatRequestState, backgroundEventHandler, -metrics); +new Metrics()); +} + +private void mockJoiningMemberData(boolean instanceId) { +when(membershipManager.memberId()).thenReturn(""); +when(membershipManager.memberEpoch()).thenReturn(0); +when(membershipManager.groupId()).thenReturn(DEFAULT_GROUP_ID); +if (instanceId) + when(membershipManager.groupInstanceId()).thenReturn(Optional.of(DEFAULT_GROUP_INSTANCE_ID)); +else + when(membershipManager.groupInstanceId()).thenReturn(Optional.empty()); + when(membershipManager.currentAssignment()).thenReturn(LocalAssignment.NONE); + when(membershipManager.serverAssignor()).thenReturn(Optional.of("uniform")); +when(membershipManager.state()).thenReturn(MemberState.UNSUBSCRIBED); +} + +private void mockFencedToJoiningMemberData() { +when(membershipManager.state()).thenReturn(MemberState.JOINING); +when(membershipManager.memberEpoch()).thenReturn(0); +when(membershipManager.groupInstanceId()).thenReturn(Optional.empty()); +} + +private void mockDefaultMemberData(boolean instanceId) { +when(membershipManager.currentAssignment()).thenReturn(new LocalAssignment(0, Collections.emptyMap())); +when(membershipManager.groupId()).thenReturn(DEFAULT_GROUP_ID); +when(membershipManager.memberId()).thenReturn(DEFAULT_MEMBER_ID); +when(membershipManager.memberEpoch()).thenReturn(DEFAULT_MEMBER_EPOCH); +if (instanceId) + when(membershipManager.groupInstanceId()).thenReturn(Optional.of(DEFAULT_GROUP_INSTANCE_ID)); +else + when(membershipManager.groupInstanceId()).thenReturn(Optional.empty()); + when(membershipManager.serverAssignor()).thenReturn(Optional.of("uniform")); +when(membershipManager.state()).thenReturn(MemberState.STABLE); Review Comment: Similar to previous comment. If this was needed to unblock asserts on `membershipMgr.state()`, we should remove those assertions and this, as it truly brings no value to test the expectation we set. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1681524660 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -898,6 +854,42 @@ private HeartbeatRequestManager createHeartbeatRequestManager( heartbeatState, heartbeatRequestState, backgroundEventHandler, -metrics); +new Metrics()); +} + +private void mockJoiningMemberData(boolean instanceId) { +when(membershipManager.memberId()).thenReturn(""); +when(membershipManager.memberEpoch()).thenReturn(0); +when(membershipManager.groupId()).thenReturn(DEFAULT_GROUP_ID); +if (instanceId) + when(membershipManager.groupInstanceId()).thenReturn(Optional.of(DEFAULT_GROUP_INSTANCE_ID)); +else + when(membershipManager.groupInstanceId()).thenReturn(Optional.empty()); + when(membershipManager.currentAssignment()).thenReturn(LocalAssignment.NONE); + when(membershipManager.serverAssignor()).thenReturn(Optional.of("uniform")); +when(membershipManager.state()).thenReturn(MemberState.UNSUBSCRIBED); Review Comment: this is a bit confusing because it's not what happens in practice. A member that is about to join (send first HB) is in JOINING state, not unsubscribed. I guess you needed this because of the existing `testHeartbeatState` that uses this func and expects the state to be UNSUBSCRIBED. I would say that's what is actually wrong. I would suggest we remove the `assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());` on ln 560, and this expectation here too. (we're only asserting on something we set ourselves) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1681489990 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -447,7 +505,8 @@ private ConsumerGroupHeartbeatRequest getHeartbeatRequest(HeartbeatRequestManage @ParameterizedTest @MethodSource("errorProvider") public void testHeartbeatResponseOnErrorHandling(final Errors error, final boolean isFatal) { -mockStableMember(); +if (isFatal) +when(membershipManager.state()).thenReturn(MemberState.FATAL); Review Comment: great, that's what I was expecting. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
brenden20 commented on PR #16200: URL: https://github.com/apache/kafka/pull/16200#issuecomment-2229328830 @lianetm thank you for the review, I have taken your suggestions and implemented them. Let me know if there is anything else. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1678342328 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -447,7 +505,8 @@ private ConsumerGroupHeartbeatRequest getHeartbeatRequest(HeartbeatRequestManage @ParameterizedTest @MethodSource("errorProvider") public void testHeartbeatResponseOnErrorHandling(final Errors error, final boolean isFatal) { -mockStableMember(); +if (isFatal) +when(membershipManager.state()).thenReturn(MemberState.FATAL); Review Comment: Looks like the if statement was not necessary, the tests pass regardless. Removing the assert and stubbing still make the test work! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1678333602 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -201,8 +202,42 @@ public void testSuccessfulHeartbeatTiming() { @ParameterizedTest @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT) public void testFirstHeartbeatIncludesRequiredInfoToJoinGroupAndGetAssignments(short version) { -resetWithZeroHeartbeatInterval(Optional.of(DEFAULT_GROUP_INSTANCE_ID)); +membershipManager = new MembershipManagerImpl( +DEFAULT_GROUP_ID, +Optional.of(DEFAULT_GROUP_INSTANCE_ID), +0, +Optional.of(""), +subscriptions, +mock(CommitRequestManager.class), +(ConsumerMetadata) metadata, +logContext, +Optional.of(mock(ClientTelemetryReporter.class)), +backgroundEventHandler, +time, +new Metrics() +); +membershipManager.transitionToJoining(); Review Comment: I was able to get this working with this kind of approach, let me know what you think -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1678232950 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -659,95 +749,50 @@ public void testPollTimerExpirationShouldNotMarkMemberStaleIfMemberAlreadyLeavin @Test public void testisExpiredByUsedForLogging() { -Timer pollTimer = spy(time.timer(DEFAULT_MAX_POLL_INTERVAL_MS)); -heartbeatRequestManager = new HeartbeatRequestManager(new LogContext(), pollTimer, config(), -coordinatorRequestManager, membershipManager, heartbeatState, heartbeatRequestState, -backgroundEventHandler, metrics); when(membershipManager.shouldSkipHeartbeat()).thenReturn(false); int exceededTimeMs = 5; time.sleep(DEFAULT_MAX_POLL_INTERVAL_MS + exceededTimeMs); +when(membershipManager.isLeavingGroup()).thenReturn(false); NetworkClientDelegate.PollResult pollResult = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(1, pollResult.unsentRequests.size()); verify(membershipManager).transitionToSendingLeaveGroup(true); verify(pollTimer, never()).isExpiredBy(); -assertEquals(exceededTimeMs, pollTimer.isExpiredBy()); clearInvocations(pollTimer); heartbeatRequestManager.resetPollTimer(time.milliseconds()); verify(pollTimer).isExpiredBy(); } -@Test -public void testHeartbeatMetrics() { Review Comment: Yes, you are correct -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1674318250 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -447,7 +505,8 @@ private ConsumerGroupHeartbeatRequest getHeartbeatRequest(HeartbeatRequestManage @ParameterizedTest @MethodSource("errorProvider") public void testHeartbeatResponseOnErrorHandling(final Errors error, final boolean isFatal) { -mockStableMember(); +if (isFatal) +when(membershipManager.state()).thenReturn(MemberState.FATAL); Review Comment: trying to understand why exactly we need this for, I ended up in the `ensureHeartbeatStopped` that asserts on the `membershipMgr` state to check it's fatal, was that the reason? I would say we should remove that assert on the mock that brings no value (and then probably this, if not required for anything else) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1674309494 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -425,6 +482,7 @@ topicId, mkSortedSet(0) assertEquals(Collections.singletonList(expectedTopicPartitions), heartbeatRequest1.data().topicPartitions()); // Assignment did not change, so no assignment should be sent Review Comment: nit: I noticed that the `getHeartbeatRequest` used here has a kind of copy of this comment, totally out of place (ln 497). Could you pls remove it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1674210447 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -201,8 +202,42 @@ public void testSuccessfulHeartbeatTiming() { @ParameterizedTest @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT) public void testFirstHeartbeatIncludesRequiredInfoToJoinGroupAndGetAssignments(short version) { -resetWithZeroHeartbeatInterval(Optional.of(DEFAULT_GROUP_INSTANCE_ID)); +membershipManager = new MembershipManagerImpl( +DEFAULT_GROUP_ID, +Optional.of(DEFAULT_GROUP_INSTANCE_ID), +0, +Optional.of(""), +subscriptions, +mock(CommitRequestManager.class), +(ConsumerMetadata) metadata, +logContext, +Optional.of(mock(ClientTelemetryReporter.class)), +backgroundEventHandler, +time, +new Metrics() +); +membershipManager.transitionToJoining(); Review Comment: btw, if this suggestion works, please consider it for the other test on the heartbeat state, maybe we can simplify there too (`testHeartbeatState` and `testValidateConsumerGroupHeartbeatRequest`) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1674210447 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -201,8 +202,42 @@ public void testSuccessfulHeartbeatTiming() { @ParameterizedTest @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT) public void testFirstHeartbeatIncludesRequiredInfoToJoinGroupAndGetAssignments(short version) { -resetWithZeroHeartbeatInterval(Optional.of(DEFAULT_GROUP_INSTANCE_ID)); +membershipManager = new MembershipManagerImpl( +DEFAULT_GROUP_ID, +Optional.of(DEFAULT_GROUP_INSTANCE_ID), +0, +Optional.of(""), +subscriptions, +mock(CommitRequestManager.class), +(ConsumerMetadata) metadata, +logContext, +Optional.of(mock(ClientTelemetryReporter.class)), +backgroundEventHandler, +time, +new Metrics() +); +membershipManager.transitionToJoining(); Review Comment: btw, if this suggestion works, please consider it for the other test on the heartbeat state, maybe we can simplify there too (`testHeartbeatState`) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1674207651 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -201,8 +202,42 @@ public void testSuccessfulHeartbeatTiming() { @ParameterizedTest @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT) public void testFirstHeartbeatIncludesRequiredInfoToJoinGroupAndGetAssignments(short version) { -resetWithZeroHeartbeatInterval(Optional.of(DEFAULT_GROUP_INSTANCE_ID)); +membershipManager = new MembershipManagerImpl( +DEFAULT_GROUP_ID, +Optional.of(DEFAULT_GROUP_INSTANCE_ID), +0, +Optional.of(""), +subscriptions, +mock(CommitRequestManager.class), +(ConsumerMetadata) metadata, +logContext, +Optional.of(mock(ClientTelemetryReporter.class)), +backgroundEventHandler, +time, +new Metrics() +); +membershipManager.transitionToJoining(); Review Comment: actually I wonder if we really need an instance of the `membershipMgr` here (and all the explicit calls on it). Since we're testing how the HBMgr generates the data, we maybe only need to mock the `membershipMgr` funcs the HBMgr relies on, maybe something like this, and removing the membershipMgr instance and all the calls on it, that were only trying to get to the state where these values would be returned: ``` private void mockJoiningMemberData() { when(membershipManager.memberId()).thenReturn(""); when(membershipManager.memberEpoch()).thenReturn(0); when(membershipManager.groupId()).thenReturn(DEFAULT_GROUP_ID); when(membershipManager.groupInstanceId()).thenReturn(Optional.of(DEFAULT_GROUP_INSTANCE_ID)); when(membershipManager.currentAssignment()).thenReturn(LocalAssignment.NONE); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1674174436 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -659,95 +749,50 @@ public void testPollTimerExpirationShouldNotMarkMemberStaleIfMemberAlreadyLeavin @Test public void testisExpiredByUsedForLogging() { -Timer pollTimer = spy(time.timer(DEFAULT_MAX_POLL_INTERVAL_MS)); -heartbeatRequestManager = new HeartbeatRequestManager(new LogContext(), pollTimer, config(), -coordinatorRequestManager, membershipManager, heartbeatState, heartbeatRequestState, -backgroundEventHandler, metrics); when(membershipManager.shouldSkipHeartbeat()).thenReturn(false); int exceededTimeMs = 5; time.sleep(DEFAULT_MAX_POLL_INTERVAL_MS + exceededTimeMs); +when(membershipManager.isLeavingGroup()).thenReturn(false); NetworkClientDelegate.PollResult pollResult = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(1, pollResult.unsentRequests.size()); verify(membershipManager).transitionToSendingLeaveGroup(true); verify(pollTimer, never()).isExpiredBy(); -assertEquals(exceededTimeMs, pollTimer.isExpiredBy()); clearInvocations(pollTimer); heartbeatRequestManager.resetPollTimer(time.milliseconds()); verify(pollTimer).isExpiredBy(); } -@Test -public void testHeartbeatMetrics() { Review Comment: just to double check, we're removing this because it's covered in the `HeartbeatMetricsManangerTest` right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1674165119 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -756,18 +801,19 @@ public void testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() { @ParameterizedTest @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT) public void testSendingLeaveGroupHeartbeatWhenPreviousOneInFlight(final short version) { -mockStableMember(); time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(1, result.unsentRequests.size()); result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(0, result.unsentRequests.size(), "No heartbeat should be sent while a previous one is in-flight"); -membershipManager.leaveGroup(); - +when(membershipManager.state()).thenReturn(MemberState.LEAVING); +when(heartbeatState.buildRequestData()).thenReturn(new ConsumerGroupHeartbeatRequestData().setMemberEpoch(-1)); ConsumerGroupHeartbeatRequest heartbeatToLeave = getHeartbeatRequest(heartbeatRequestManager, version); assertEquals(ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH, heartbeatToLeave.data().memberEpoch()); + //when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty()); Review Comment: let's remove this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1674150218 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -251,23 +304,25 @@ public void testSkippingHeartbeat(final boolean shouldSkipHeartbeat) { @Test public void testTimerNotDue() { -mockStableMember(); time.sleep(100); // time elapsed < heartbeatInterval, no heartbeat should be sent NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(0, result.unsentRequests.size()); assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, result.timeUntilNextPollMs); + +when(pollTimer.remainingMs()).thenReturn(1800L); assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, heartbeatRequestManager.maximumTimeToWait(time.milliseconds())); // Member in state where it should not send Heartbeat anymore when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true); +when(membershipManager.shouldSkipHeartbeat()).thenReturn(true); membershipManager.transitionToFatal(); Review Comment: uhm that's weird. The only check that should happen there is the `shouldSkipHeartbeat`, and we're mocking it. We're doing the same in other tests (ex. `testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin`), where just mocking the `shouldSkipHeartbeat` leads to no HB as expected (no actual transition). Could you please double check, trying to remove just ln 298 again? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1674142299 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -201,8 +202,42 @@ public void testSuccessfulHeartbeatTiming() { @ParameterizedTest @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT) public void testFirstHeartbeatIncludesRequiredInfoToJoinGroupAndGetAssignments(short version) { -resetWithZeroHeartbeatInterval(Optional.of(DEFAULT_GROUP_INSTANCE_ID)); +membershipManager = new MembershipManagerImpl( +DEFAULT_GROUP_ID, +Optional.of(DEFAULT_GROUP_INSTANCE_ID), +0, +Optional.of(""), +subscriptions, +mock(CommitRequestManager.class), +(ConsumerMetadata) metadata, +logContext, +Optional.of(mock(ClientTelemetryReporter.class)), +backgroundEventHandler, +time, +new Metrics() +); +membershipManager.transitionToJoining(); + +heartbeatState = new HeartbeatState( +subscriptions, +membershipManager, +DEFAULT_MAX_POLL_INTERVAL_MS +); + +heartbeatRequestManager = createHeartbeatRequestManager( +coordinatorRequestManager, +membershipManager, +heartbeatState, +heartbeatRequestState, +backgroundEventHandler +); +createHeartbeatStateWithZeroHeartbeatInterval(); +time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); String topic = "topic1"; +// Make a singleton set +HashSet set = new HashSet<>(); +set.add(topic); Review Comment: indeed, let's make a `Collections.singleton` set (and remove the comment and reuse the var on ln 241 too) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1674138238 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -201,8 +202,42 @@ public void testSuccessfulHeartbeatTiming() { @ParameterizedTest @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT) public void testFirstHeartbeatIncludesRequiredInfoToJoinGroupAndGetAssignments(short version) { -resetWithZeroHeartbeatInterval(Optional.of(DEFAULT_GROUP_INSTANCE_ID)); +membershipManager = new MembershipManagerImpl( +DEFAULT_GROUP_ID, +Optional.of(DEFAULT_GROUP_INSTANCE_ID), +0, +Optional.of(""), Review Comment: won't affect the test, but just for the sake of correctness, if we don't want an assignor we should pass `Optional.empty()`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
brenden20 commented on PR #16200: URL: https://github.com/apache/kafka/pull/16200#issuecomment-2215943716 @lianetm @philipnee thank you both for the feedback! I have addressed all comments and left some replies as well, let me know what you think, made some pretty major improvements over the last iteration. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1669499524 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -251,23 +304,25 @@ public void testSkippingHeartbeat(final boolean shouldSkipHeartbeat) { @Test public void testTimerNotDue() { -mockStableMember(); time.sleep(100); // time elapsed < heartbeatInterval, no heartbeat should be sent NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(0, result.unsentRequests.size()); assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, result.timeUntilNextPollMs); + +when(pollTimer.remainingMs()).thenReturn(1800L); Review Comment: That line is now removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1669499273 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -349,6 +408,10 @@ public void testNoCoordinator() { when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty()); NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); +when(pollTimer.isExpired()).thenReturn(false); +when(pollTimer.remainingMs()).thenReturn(Long.MAX_VALUE); + when(heartbeatRequestState.timeToNextHeartbeatMs(time.milliseconds())).thenReturn(1000L); Review Comment: These are now removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1669498721 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -898,6 +967,6 @@ private HeartbeatRequestManager createHeartbeatRequestManager( heartbeatState, heartbeatRequestState, backgroundEventHandler, -metrics); +new Metrics()); Review Comment: Yes, but if you use the same metrics for different objects it breaks everything -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1669497625 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -756,18 +846,18 @@ public void testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() { @ParameterizedTest @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT) public void testSendingLeaveGroupHeartbeatWhenPreviousOneInFlight(final short version) { -mockStableMember(); time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(1, result.unsentRequests.size()); result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(0, result.unsentRequests.size(), "No heartbeat should be sent while a previous one is in-flight"); -membershipManager.leaveGroup(); - +when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(true); +when(heartbeatState.buildRequestData()).thenReturn(new ConsumerGroupHeartbeatRequestData().setMemberEpoch(-1)); ConsumerGroupHeartbeatRequest heartbeatToLeave = getHeartbeatRequest(heartbeatRequestManager, version); assertEquals(ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH, heartbeatToLeave.data().memberEpoch()); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty()); Review Comment: Yes I see, I appreciate the explanations on these as well, it definitely gives me a better understanding of how everything works together. Changed now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1669495115 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -756,18 +846,18 @@ public void testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() { @ParameterizedTest @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT) public void testSendingLeaveGroupHeartbeatWhenPreviousOneInFlight(final short version) { -mockStableMember(); time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(1, result.unsentRequests.size()); result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(0, result.unsentRequests.size(), "No heartbeat should be sent while a previous one is in-flight"); -membershipManager.leaveGroup(); - +when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(true); Review Comment: You're right, replacing that now, thank you -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1669465217 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -312,31 +369,33 @@ public void testHeartbeatOutsideInterval() { @Test public void testNetworkTimeout() { // The initial heartbeatInterval is set to 0 -resetWithZeroHeartbeatInterval(Optional.empty()); -mockStableMember(); +time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", ))); +when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(true); NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(1, result.unsentRequests.size()); // Mimic network timeout result.unsentRequests.get(0).handler().onFailure(time.milliseconds(), new TimeoutException("timeout")); +time.sleep(1); Review Comment: Yeah, removing it changes nothing with the test, removing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1669460104 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -251,23 +304,25 @@ public void testSkippingHeartbeat(final boolean shouldSkipHeartbeat) { @Test public void testTimerNotDue() { -mockStableMember(); time.sleep(100); // time elapsed < heartbeatInterval, no heartbeat should be sent NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(0, result.unsentRequests.size()); assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, result.timeUntilNextPollMs); + +when(pollTimer.remainingMs()).thenReturn(1800L); assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, heartbeatRequestManager.maximumTimeToWait(time.milliseconds())); // Member in state where it should not send Heartbeat anymore when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true); +when(membershipManager.shouldSkipHeartbeat()).thenReturn(true); membershipManager.transitionToFatal(); Review Comment: It seems that I do actually need it, I tried removing it and found that ```shouldSkipHeartbeat``` is not working properly without it. Even stubbing the method to return ```MemberState.FATAL``` does not work for some reason. This seems to be the only way to return an empty poll result -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1669445851 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -251,23 +304,25 @@ public void testSkippingHeartbeat(final boolean shouldSkipHeartbeat) { @Test public void testTimerNotDue() { -mockStableMember(); time.sleep(100); // time elapsed < heartbeatInterval, no heartbeat should be sent NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(0, result.unsentRequests.size()); assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, result.timeUntilNextPollMs); + +when(pollTimer.remainingMs()).thenReturn(1800L); assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, heartbeatRequestManager.maximumTimeToWait(time.milliseconds())); // Member in state where it should not send Heartbeat anymore when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true); Review Comment: I can do this if I turn ```pollTimer``` into a spy, I tried that out and I think we should keep it. It makes the tests more strict on timing, since a mock would have to be stubbed to get the desired behavior. It also allows us to keep our ```verify()``` invocations on ```pollTimer```. Let me know what you think -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1669445851 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -251,23 +304,25 @@ public void testSkippingHeartbeat(final boolean shouldSkipHeartbeat) { @Test public void testTimerNotDue() { -mockStableMember(); time.sleep(100); // time elapsed < heartbeatInterval, no heartbeat should be sent NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(0, result.unsentRequests.size()); assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, result.timeUntilNextPollMs); + +when(pollTimer.remainingMs()).thenReturn(1800L); assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, heartbeatRequestManager.maximumTimeToWait(time.milliseconds())); // Member in state where it should not send Heartbeat anymore when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true); Review Comment: I can do this if I turn ```pollTimer``` into a spy, I tried that out and I think we should keep it. It makes the tests more strict on timing, since a mock would have to be stubbed to get the desired behavior. Let me know what you think -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1669428550 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -251,23 +304,25 @@ public void testSkippingHeartbeat(final boolean shouldSkipHeartbeat) { @Test public void testTimerNotDue() { -mockStableMember(); time.sleep(100); // time elapsed < heartbeatInterval, no heartbeat should be sent NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(0, result.unsentRequests.size()); assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, result.timeUntilNextPollMs); + +when(pollTimer.remainingMs()).thenReturn(1800L); assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, heartbeatRequestManager.maximumTimeToWait(time.milliseconds())); // Member in state where it should not send Heartbeat anymore when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true); Review Comment: It is actually needed for this test. ```maximumTimeToWait()``` will always return 0 if ```pollTimer``` is a mock. Just did some testing turning ```pollTimer``` from a mock to a spy and it worked out pretty good. All tests are working, I think it may be beneficial to keep ```pollTimer``` a spy, since I notice in many places we ```verify``` on ```pollTimer```. Let me know what you think, my changes are pushed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1669428550 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -251,23 +304,25 @@ public void testSkippingHeartbeat(final boolean shouldSkipHeartbeat) { @Test public void testTimerNotDue() { -mockStableMember(); time.sleep(100); // time elapsed < heartbeatInterval, no heartbeat should be sent NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(0, result.unsentRequests.size()); assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, result.timeUntilNextPollMs); + +when(pollTimer.remainingMs()).thenReturn(1800L); assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, heartbeatRequestManager.maximumTimeToWait(time.milliseconds())); // Member in state where it should not send Heartbeat anymore when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true); Review Comment: It is actually needed for this test. ```maximumTimeToWait()``` will always return 0 if ```pollTimer``` is a mock. Just did some testing turning ```pollTimer``` from a mock to a spy and it worked out pretty good. All tests are working, I think it may be beneficial to keep ```pollTimer``` a spy, since I notice in many places we ```verify``` on ```pollTimer```. Let me know what you think, my changes are pushed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1669143238 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -106,72 +102,90 @@ public class HeartbeatRequestManagerTest { private MembershipManager membershipManager; private HeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState; private HeartbeatRequestManager.HeartbeatState heartbeatState; -private final String memberId = "member-id"; -private final int memberEpoch = 1; private BackgroundEventHandler backgroundEventHandler; -private Metrics metrics; +private LogContext logContext; @BeforeEach public void setUp() { -setUp(ConsumerTestBuilder.createDefaultGroupInformation()); -} +this.time = new MockTime(); +this.logContext = new LogContext(); +this.pollTimer = mock(Timer.class); +this.coordinatorRequestManager = mock(CoordinatorRequestManager.class); +this.heartbeatState = mock(HeartbeatState.class); +this.backgroundEventHandler = mock(BackgroundEventHandler.class); +this.subscriptions = mock(SubscriptionState.class); +this.membershipManager = mock(MembershipManagerImpl.class); +this.metadata = mock(ConsumerMetadata.class); +Metrics metrics = new Metrics(time); +ConsumerConfig config = mock(ConsumerConfig.class); + +this.heartbeatRequestState = spy(new HeartbeatRequestState( +logContext, +time, +DEFAULT_HEARTBEAT_INTERVAL_MS, +DEFAULT_RETRY_BACKOFF_MS, +DEFAULT_RETRY_BACKOFF_MAX_MS, +DEFAULT_HEARTBEAT_JITTER_MS)); -private void setUp(Optional groupInfo) { -testBuilder = new ConsumerTestBuilder(groupInfo, true, false); -time = testBuilder.time; -coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new); -heartbeatRequestManager = testBuilder.heartbeatRequestManager.orElseThrow(IllegalStateException::new); -heartbeatRequestState = testBuilder.heartbeatRequestState.orElseThrow(IllegalStateException::new); -heartbeatState = testBuilder.heartbeatState.orElseThrow(IllegalStateException::new); -backgroundEventHandler = testBuilder.backgroundEventHandler; -subscriptions = testBuilder.subscriptions; -membershipManager = testBuilder.membershipManager.orElseThrow(IllegalStateException::new); -metadata = testBuilder.metadata; -metrics = new Metrics(time); +this.heartbeatRequestManager = new HeartbeatRequestManager( +logContext, +pollTimer, +config, +coordinatorRequestManager, +membershipManager, +heartbeatState, +heartbeatRequestState, +backgroundEventHandler, +metrics); when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", ))); +Map> map = new HashMap<>(); +LocalAssignment local = new LocalAssignment(0, map); +when(membershipManager.currentAssignment()).thenReturn(local); } -private void resetWithZeroHeartbeatInterval(Optional groupInstanceId) { -cleanup(); - -ConsumerTestBuilder.GroupInformation gi = new ConsumerTestBuilder.GroupInformation( -DEFAULT_GROUP_ID, -groupInstanceId, +private void createHeartbeatStateWith0HeartbeatInterval() { +this.heartbeatRequestState = spy(new HeartbeatRequestState( +logContext, +time, 0, -0.0, -Optional.of(DEFAULT_REMOTE_ASSIGNOR) -); - -setUp(Optional.of(gi)); -} +DEFAULT_RETRY_BACKOFF_MS, +DEFAULT_RETRY_BACKOFF_MAX_MS, +DEFAULT_HEARTBEAT_JITTER_MS)); -@AfterEach -public void cleanup() { -if (testBuilder != null) { -testBuilder.close(); -} +heartbeatRequestManager = createHeartbeatRequestManager( +coordinatorRequestManager, +membershipManager, +heartbeatState, +heartbeatRequestState, +backgroundEventHandler); } @Test public void testHeartbeatOnStartup() { + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty()); NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(0, result.unsentRequests.size()); -resetWithZeroHeartbeatInterval(Optional.empty()); -mockStableMember(); +time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); assertEquals(0, heartbeatRequestManager.maximumTimeToWait(time.mil
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1669139534 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -106,72 +102,90 @@ public class HeartbeatRequestManagerTest { private MembershipManager membershipManager; private HeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState; private HeartbeatRequestManager.HeartbeatState heartbeatState; -private final String memberId = "member-id"; -private final int memberEpoch = 1; private BackgroundEventHandler backgroundEventHandler; -private Metrics metrics; +private LogContext logContext; @BeforeEach public void setUp() { -setUp(ConsumerTestBuilder.createDefaultGroupInformation()); -} +this.time = new MockTime(); +this.logContext = new LogContext(); +this.pollTimer = mock(Timer.class); +this.coordinatorRequestManager = mock(CoordinatorRequestManager.class); +this.heartbeatState = mock(HeartbeatState.class); +this.backgroundEventHandler = mock(BackgroundEventHandler.class); +this.subscriptions = mock(SubscriptionState.class); +this.membershipManager = mock(MembershipManagerImpl.class); +this.metadata = mock(ConsumerMetadata.class); +Metrics metrics = new Metrics(time); +ConsumerConfig config = mock(ConsumerConfig.class); + +this.heartbeatRequestState = spy(new HeartbeatRequestState( +logContext, +time, +DEFAULT_HEARTBEAT_INTERVAL_MS, +DEFAULT_RETRY_BACKOFF_MS, +DEFAULT_RETRY_BACKOFF_MAX_MS, +DEFAULT_HEARTBEAT_JITTER_MS)); -private void setUp(Optional groupInfo) { -testBuilder = new ConsumerTestBuilder(groupInfo, true, false); -time = testBuilder.time; -coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new); -heartbeatRequestManager = testBuilder.heartbeatRequestManager.orElseThrow(IllegalStateException::new); -heartbeatRequestState = testBuilder.heartbeatRequestState.orElseThrow(IllegalStateException::new); -heartbeatState = testBuilder.heartbeatState.orElseThrow(IllegalStateException::new); -backgroundEventHandler = testBuilder.backgroundEventHandler; -subscriptions = testBuilder.subscriptions; -membershipManager = testBuilder.membershipManager.orElseThrow(IllegalStateException::new); -metadata = testBuilder.metadata; -metrics = new Metrics(time); +this.heartbeatRequestManager = new HeartbeatRequestManager( +logContext, +pollTimer, +config, +coordinatorRequestManager, +membershipManager, +heartbeatState, +heartbeatRequestState, +backgroundEventHandler, +metrics); when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", ))); +Map> map = new HashMap<>(); +LocalAssignment local = new LocalAssignment(0, map); +when(membershipManager.currentAssignment()).thenReturn(local); } -private void resetWithZeroHeartbeatInterval(Optional groupInstanceId) { -cleanup(); - -ConsumerTestBuilder.GroupInformation gi = new ConsumerTestBuilder.GroupInformation( -DEFAULT_GROUP_ID, -groupInstanceId, +private void createHeartbeatStateWith0HeartbeatInterval() { +this.heartbeatRequestState = spy(new HeartbeatRequestState( +logContext, +time, 0, -0.0, -Optional.of(DEFAULT_REMOTE_ASSIGNOR) -); - -setUp(Optional.of(gi)); -} +DEFAULT_RETRY_BACKOFF_MS, +DEFAULT_RETRY_BACKOFF_MAX_MS, +DEFAULT_HEARTBEAT_JITTER_MS)); -@AfterEach -public void cleanup() { -if (testBuilder != null) { -testBuilder.close(); -} +heartbeatRequestManager = createHeartbeatRequestManager( +coordinatorRequestManager, +membershipManager, +heartbeatState, +heartbeatRequestState, +backgroundEventHandler); } @Test public void testHeartbeatOnStartup() { + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty()); Review Comment: I have made the change you suggested, let me know what you think. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1669122743 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -106,72 +102,90 @@ public class HeartbeatRequestManagerTest { private MembershipManager membershipManager; private HeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState; private HeartbeatRequestManager.HeartbeatState heartbeatState; -private final String memberId = "member-id"; -private final int memberEpoch = 1; private BackgroundEventHandler backgroundEventHandler; -private Metrics metrics; +private LogContext logContext; @BeforeEach public void setUp() { -setUp(ConsumerTestBuilder.createDefaultGroupInformation()); -} +this.time = new MockTime(); +this.logContext = new LogContext(); +this.pollTimer = mock(Timer.class); +this.coordinatorRequestManager = mock(CoordinatorRequestManager.class); +this.heartbeatState = mock(HeartbeatState.class); +this.backgroundEventHandler = mock(BackgroundEventHandler.class); +this.subscriptions = mock(SubscriptionState.class); +this.membershipManager = mock(MembershipManagerImpl.class); +this.metadata = mock(ConsumerMetadata.class); +Metrics metrics = new Metrics(time); +ConsumerConfig config = mock(ConsumerConfig.class); + +this.heartbeatRequestState = spy(new HeartbeatRequestState( +logContext, +time, +DEFAULT_HEARTBEAT_INTERVAL_MS, +DEFAULT_RETRY_BACKOFF_MS, +DEFAULT_RETRY_BACKOFF_MAX_MS, +DEFAULT_HEARTBEAT_JITTER_MS)); -private void setUp(Optional groupInfo) { -testBuilder = new ConsumerTestBuilder(groupInfo, true, false); -time = testBuilder.time; -coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new); -heartbeatRequestManager = testBuilder.heartbeatRequestManager.orElseThrow(IllegalStateException::new); -heartbeatRequestState = testBuilder.heartbeatRequestState.orElseThrow(IllegalStateException::new); -heartbeatState = testBuilder.heartbeatState.orElseThrow(IllegalStateException::new); -backgroundEventHandler = testBuilder.backgroundEventHandler; -subscriptions = testBuilder.subscriptions; -membershipManager = testBuilder.membershipManager.orElseThrow(IllegalStateException::new); -metadata = testBuilder.metadata; -metrics = new Metrics(time); +this.heartbeatRequestManager = new HeartbeatRequestManager( +logContext, +pollTimer, +config, +coordinatorRequestManager, +membershipManager, +heartbeatState, +heartbeatRequestState, +backgroundEventHandler, +metrics); when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", ))); +Map> map = new HashMap<>(); +LocalAssignment local = new LocalAssignment(0, map); +when(membershipManager.currentAssignment()).thenReturn(local); } -private void resetWithZeroHeartbeatInterval(Optional groupInstanceId) { -cleanup(); - -ConsumerTestBuilder.GroupInformation gi = new ConsumerTestBuilder.GroupInformation( -DEFAULT_GROUP_ID, -groupInstanceId, +private void createHeartbeatStateWith0HeartbeatInterval() { +this.heartbeatRequestState = spy(new HeartbeatRequestState( +logContext, +time, 0, -0.0, -Optional.of(DEFAULT_REMOTE_ASSIGNOR) -); - -setUp(Optional.of(gi)); -} +DEFAULT_RETRY_BACKOFF_MS, +DEFAULT_RETRY_BACKOFF_MAX_MS, +DEFAULT_HEARTBEAT_JITTER_MS)); -@AfterEach -public void cleanup() { -if (testBuilder != null) { -testBuilder.close(); -} +heartbeatRequestManager = createHeartbeatRequestManager( +coordinatorRequestManager, +membershipManager, +heartbeatState, +heartbeatRequestState, +backgroundEventHandler); } @Test public void testHeartbeatOnStartup() { + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty()); NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(0, result.unsentRequests.size()); -resetWithZeroHeartbeatInterval(Optional.empty()); -mockStableMember(); +time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); assertEquals(0, heartbeatRequestManager.maximumTimeToWait(time.mil
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1669112935 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -106,72 +102,90 @@ public class HeartbeatRequestManagerTest { private MembershipManager membershipManager; private HeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState; private HeartbeatRequestManager.HeartbeatState heartbeatState; -private final String memberId = "member-id"; -private final int memberEpoch = 1; private BackgroundEventHandler backgroundEventHandler; -private Metrics metrics; +private LogContext logContext; @BeforeEach public void setUp() { -setUp(ConsumerTestBuilder.createDefaultGroupInformation()); -} +this.time = new MockTime(); +this.logContext = new LogContext(); +this.pollTimer = mock(Timer.class); +this.coordinatorRequestManager = mock(CoordinatorRequestManager.class); +this.heartbeatState = mock(HeartbeatState.class); +this.backgroundEventHandler = mock(BackgroundEventHandler.class); +this.subscriptions = mock(SubscriptionState.class); +this.membershipManager = mock(MembershipManagerImpl.class); +this.metadata = mock(ConsumerMetadata.class); +Metrics metrics = new Metrics(time); +ConsumerConfig config = mock(ConsumerConfig.class); + +this.heartbeatRequestState = spy(new HeartbeatRequestState( +logContext, +time, +DEFAULT_HEARTBEAT_INTERVAL_MS, +DEFAULT_RETRY_BACKOFF_MS, +DEFAULT_RETRY_BACKOFF_MAX_MS, +DEFAULT_HEARTBEAT_JITTER_MS)); -private void setUp(Optional groupInfo) { -testBuilder = new ConsumerTestBuilder(groupInfo, true, false); -time = testBuilder.time; -coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new); -heartbeatRequestManager = testBuilder.heartbeatRequestManager.orElseThrow(IllegalStateException::new); -heartbeatRequestState = testBuilder.heartbeatRequestState.orElseThrow(IllegalStateException::new); -heartbeatState = testBuilder.heartbeatState.orElseThrow(IllegalStateException::new); -backgroundEventHandler = testBuilder.backgroundEventHandler; -subscriptions = testBuilder.subscriptions; -membershipManager = testBuilder.membershipManager.orElseThrow(IllegalStateException::new); -metadata = testBuilder.metadata; -metrics = new Metrics(time); +this.heartbeatRequestManager = new HeartbeatRequestManager( +logContext, +pollTimer, +config, +coordinatorRequestManager, +membershipManager, +heartbeatState, +heartbeatRequestState, +backgroundEventHandler, +metrics); when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", ))); +Map> map = new HashMap<>(); +LocalAssignment local = new LocalAssignment(0, map); +when(membershipManager.currentAssignment()).thenReturn(local); Review Comment: Removing it changed nothing, good catch! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on PR #16200: URL: https://github.com/apache/kafka/pull/16200#issuecomment-2214850627 Hey @brenden20 , thanks for the updates! I completed another pass, left some comments. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1669063349 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -756,18 +846,18 @@ public void testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() { @ParameterizedTest @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT) public void testSendingLeaveGroupHeartbeatWhenPreviousOneInFlight(final short version) { -mockStableMember(); time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(1, result.unsentRequests.size()); result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(0, result.unsentRequests.size(), "No heartbeat should be sent while a previous one is in-flight"); -membershipManager.leaveGroup(); - +when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(true); +when(heartbeatState.buildRequestData()).thenReturn(new ConsumerGroupHeartbeatRequestData().setMemberEpoch(-1)); ConsumerGroupHeartbeatRequest heartbeatToLeave = getHeartbeatRequest(heartbeatRequestManager, version); assertEquals(ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH, heartbeatToLeave.data().memberEpoch()); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty()); Review Comment: similar to other comments above, this will achieve the result we want (no HB), but not for the reason we wanted to test. The intention of the test was to asssert that polling the HB mananager after the request to leave has been sent will generate no request (and that's because the member transitions to UNSUBSCRIBED on `onHeartbeatRequestSent`. So probably here we should only set the expectation that shouldSkipHeartbeat returns true, which is what the HB mgr checks, and gets when the member is UNSUBSCRIBED. Makes sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1669059780 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -756,18 +846,18 @@ public void testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() { @ParameterizedTest @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT) public void testSendingLeaveGroupHeartbeatWhenPreviousOneInFlight(final short version) { -mockStableMember(); time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(1, result.unsentRequests.size()); result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(0, result.unsentRequests.size(), "No heartbeat should be sent while a previous one is in-flight"); -membershipManager.leaveGroup(); - +when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(true); Review Comment: we should `when(membershipManager.state()).thenReturn(MemberState.LEAVING)` instead I would say, to be true to the real flow. When a member is leaving the HB is generated because of the state it's in (and it also considers the canSend request, but with the state as a spy do we really need to set this expectation?) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
philipnee commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1669013977 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -898,6 +967,6 @@ private HeartbeatRequestManager createHeartbeatRequestManager( heartbeatState, heartbeatRequestState, backgroundEventHandler, -metrics); +new Metrics()); Review Comment: isn't metrics already initialized in the ctor? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1668910020 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -357,31 +420,62 @@ public void testNoCoordinator() { @ParameterizedTest @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT) public void testValidateConsumerGroupHeartbeatRequest(final short version) { +membershipManager = new MembershipManagerImpl( +DEFAULT_GROUP_ID, +Optional.of(DEFAULT_GROUP_INSTANCE_ID), +0, +Optional.of("uniform"), +subscriptions, +mock(CommitRequestManager.class), +(ConsumerMetadata) metadata, +logContext, +Optional.of(mock(ClientTelemetryReporter.class)), +backgroundEventHandler, +time, +new Metrics() +); +membershipManager.transitionToJoining(); + +heartbeatState = new HeartbeatState( +subscriptions, +membershipManager, +DEFAULT_MAX_POLL_INTERVAL_MS +); + +heartbeatRequestManager = createHeartbeatRequestManager( +coordinatorRequestManager, +membershipManager, +heartbeatState, +heartbeatRequestState, +backgroundEventHandler +); + // The initial heartbeatInterval is set to 0, but we're testing -resetWithZeroHeartbeatInterval(Optional.of(DEFAULT_GROUP_INSTANCE_ID)); -mockStableMember(); +time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); List subscribedTopics = Collections.singletonList("topic"); -subscriptions.subscribe(new HashSet<>(subscribedTopics), Optional.empty()); + when(subscriptions.subscription()).thenReturn(Collections.singleton("topic")); Review Comment: should we reference the same `subscribedTopics` var in the `thenReturn` as before? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1668904111 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -349,6 +408,10 @@ public void testNoCoordinator() { when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty()); NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); +when(pollTimer.isExpired()).thenReturn(false); +when(pollTimer.remainingMs()).thenReturn(Long.MAX_VALUE); + when(heartbeatRequestState.timeToNextHeartbeatMs(time.milliseconds())).thenReturn(1000L); Review Comment: needed or wanted for some reason? again wondering because since the state is a spy on the internal component, I would expect to have the real implementation kicking in, based on the interval defined for the HBMgr. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
philipnee commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1668899064 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -251,23 +304,25 @@ public void testSkippingHeartbeat(final boolean shouldSkipHeartbeat) { @Test public void testTimerNotDue() { -mockStableMember(); time.sleep(100); // time elapsed < heartbeatInterval, no heartbeat should be sent NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(0, result.unsentRequests.size()); assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, result.timeUntilNextPollMs); + +when(pollTimer.remainingMs()).thenReturn(1800L); Review Comment: `1800L` isn't entirely clear to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1668883729 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -312,31 +369,33 @@ public void testHeartbeatOutsideInterval() { @Test public void testNetworkTimeout() { // The initial heartbeatInterval is set to 0 -resetWithZeroHeartbeatInterval(Optional.empty()); -mockStableMember(); +time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", ))); +when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(true); NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(1, result.unsentRequests.size()); // Mimic network timeout result.unsentRequests.get(0).handler().onFailure(time.milliseconds(), new TimeoutException("timeout")); +time.sleep(1); +result = heartbeatRequestManager.poll(time.milliseconds()); +assertEquals(1, result.unsentRequests.size()); + // Assure the manager will backoff on timeout + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty()); time.sleep(DEFAULT_RETRY_BACKOFF_MS - 1); result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(0, result.unsentRequests.size()); - -time.sleep(1); -result = heartbeatRequestManager.poll(time.milliseconds()); -assertEquals(1, result.unsentRequests.size()); } @Test public void testFailureOnFatalException() { // The initial heartbeatInterval is set to 0 -resetWithZeroHeartbeatInterval(Optional.empty()); -mockStableMember(); +time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); +when(membershipManager.isLeavingGroup()).thenReturn(true); when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", ))); +when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(true); Review Comment: needed? the state is spy, so I would expect we get here the behaviour we want, based on the interval that we already expired above, all from the real instance -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1668882107 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -312,31 +369,33 @@ public void testHeartbeatOutsideInterval() { @Test public void testNetworkTimeout() { // The initial heartbeatInterval is set to 0 -resetWithZeroHeartbeatInterval(Optional.empty()); -mockStableMember(); +time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", ))); +when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(true); NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(1, result.unsentRequests.size()); // Mimic network timeout result.unsentRequests.get(0).handler().onFailure(time.milliseconds(), new TimeoutException("timeout")); +time.sleep(1); +result = heartbeatRequestManager.poll(time.milliseconds()); +assertEquals(1, result.unsentRequests.size()); + // Assure the manager will backoff on timeout + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty()); time.sleep(DEFAULT_RETRY_BACKOFF_MS - 1); result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(0, result.unsentRequests.size()); - -time.sleep(1); -result = heartbeatRequestManager.poll(time.milliseconds()); -assertEquals(1, result.unsentRequests.size()); } @Test public void testFailureOnFatalException() { // The initial heartbeatInterval is set to 0 -resetWithZeroHeartbeatInterval(Optional.empty()); -mockStableMember(); +time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); +when(membershipManager.isLeavingGroup()).thenReturn(true); Review Comment: do we need this? (it's for the expired poll timer flow, which is unrelated) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1668876259 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -312,31 +369,33 @@ public void testHeartbeatOutsideInterval() { @Test public void testNetworkTimeout() { // The initial heartbeatInterval is set to 0 -resetWithZeroHeartbeatInterval(Optional.empty()); -mockStableMember(); +time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", ))); +when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(true); NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(1, result.unsentRequests.size()); // Mimic network timeout result.unsentRequests.get(0).handler().onFailure(time.milliseconds(), new TimeoutException("timeout")); +time.sleep(1); +result = heartbeatRequestManager.poll(time.milliseconds()); +assertEquals(1, result.unsentRequests.size()); + // Assure the manager will backoff on timeout + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty()); time.sleep(DEFAULT_RETRY_BACKOFF_MS - 1); result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(0, result.unsentRequests.size()); - -time.sleep(1); -result = heartbeatRequestManager.poll(time.milliseconds()); -assertEquals(1, result.unsentRequests.size()); } @Test public void testFailureOnFatalException() { Review Comment: since we're here, could we improve the `verify(backgroundEventHandler).add(any())` to check it's an `ErrorEvent` instead of any? it's truly what we expect on a fatal response in HB -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1668847329 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -312,31 +369,33 @@ public void testHeartbeatOutsideInterval() { @Test public void testNetworkTimeout() { // The initial heartbeatInterval is set to 0 -resetWithZeroHeartbeatInterval(Optional.empty()); -mockStableMember(); +time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", ))); +when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(true); NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(1, result.unsentRequests.size()); // Mimic network timeout result.unsentRequests.get(0).handler().onFailure(time.milliseconds(), new TimeoutException("timeout")); +time.sleep(1); Review Comment: not needed right? the flow we had (and should maintain) is: 1. poll generates HB and receives timeout as response 2. check backoff enforced (sleep backoff -1 , no request) 3. check new HB after backoff (sleep the 1ms that was missing to have the full backoff and check HB generated). Since we're mocking the membershipMgr now we should need to set expectations to ensure that we're not skipping the hb because membershipMgr.shouldSkipHb is true, and ensure that it's the backoff the one really kicking in. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1668868970 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -312,31 +369,33 @@ public void testHeartbeatOutsideInterval() { @Test public void testNetworkTimeout() { // The initial heartbeatInterval is set to 0 -resetWithZeroHeartbeatInterval(Optional.empty()); -mockStableMember(); +time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", ))); +when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(true); NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(1, result.unsentRequests.size()); // Mimic network timeout result.unsentRequests.get(0).handler().onFailure(time.milliseconds(), new TimeoutException("timeout")); +time.sleep(1); +result = heartbeatRequestManager.poll(time.milliseconds()); +assertEquals(1, result.unsentRequests.size()); + // Assure the manager will backoff on timeout + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty()); Review Comment: this changes the semantics. This will achieve the result of not generating a HB, but we want to test that the HB is not generated because the backoff hasn't expired. So we should ensure there is a coordinator, ensure that the membershipMgr allows HB (shouldSKipHb false), and see that it's indeed the backoff check [here](https://github.com/apache/kafka/blob/a533e246e3ed5f6f6c5be4ebf9d29ae75cab557e/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestState.java#L89) the one making that no HB is generated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1668847329 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -312,31 +369,33 @@ public void testHeartbeatOutsideInterval() { @Test public void testNetworkTimeout() { // The initial heartbeatInterval is set to 0 -resetWithZeroHeartbeatInterval(Optional.empty()); -mockStableMember(); +time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", ))); +when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(true); NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(1, result.unsentRequests.size()); // Mimic network timeout result.unsentRequests.get(0).handler().onFailure(time.milliseconds(), new TimeoutException("timeout")); +time.sleep(1); Review Comment: not needed right? the flow we had (and should maintain) is: 1. poll generates HB and receives timeout as response 2. check backoff enforced (sleep backoff -1 , no request) 3. check new HB after backoff (sleep the 1ms that was missing to have the full backoff and check HB generated). Since we're mocking the membershipMgr now we should need to set expectations to ensure that we're not skipping the hb because membershipMgr.shouldSkipHb is true, and ensure that it's the backoff the one really kicking in. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1668847329 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -312,31 +369,33 @@ public void testHeartbeatOutsideInterval() { @Test public void testNetworkTimeout() { // The initial heartbeatInterval is set to 0 -resetWithZeroHeartbeatInterval(Optional.empty()); -mockStableMember(); +time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", ))); +when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(true); NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(1, result.unsentRequests.size()); // Mimic network timeout result.unsentRequests.get(0).handler().onFailure(time.milliseconds(), new TimeoutException("timeout")); +time.sleep(1); Review Comment: not needed right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1668844863 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -298,6 +353,8 @@ public void testHeartbeatNotSentIfAnotherOneInFlight() { public void testHeartbeatOutsideInterval() { when(membershipManager.shouldSkipHeartbeat()).thenReturn(false); when(membershipManager.shouldHeartbeatNow()).thenReturn(true); +when(pollTimer.remainingMs()).thenReturn(Long.MAX_VALUE); + when(heartbeatRequestState.timeToNextHeartbeatMs(time.milliseconds())).thenReturn(1000L); Review Comment: is this 1000 the interval? if so let's use the constant to make it clear. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1668835568 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -277,12 +332,12 @@ public void testHeartbeatNotSentIfAnotherOneInFlight() { result = heartbeatRequestManager.poll(time.milliseconds()); Review Comment: I see down below (ln 337-340) the sequence to validate that the interval expired and no HB is generated because of the inflight. I truly don't see the value in keeping this one then (ln 333-335), which is not related to in-flight at all (HB interval is tested in other funcs). Makes sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1668829801 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -277,12 +332,12 @@ public void testHeartbeatNotSentIfAnotherOneInFlight() { result = heartbeatRequestManager.poll(time.milliseconds()); Review Comment: we need to sleep the interval before polling here again. If not, we're not getting HB because the interval hasn't expired (not because of in-flight as we want to assert on the following line) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1668797551 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -251,23 +304,25 @@ public void testSkippingHeartbeat(final boolean shouldSkipHeartbeat) { @Test public void testTimerNotDue() { -mockStableMember(); time.sleep(100); // time elapsed < heartbeatInterval, no heartbeat should be sent NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(0, result.unsentRequests.size()); assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, result.timeUntilNextPollMs); + +when(pollTimer.remainingMs()).thenReturn(1800L); assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, heartbeatRequestManager.maximumTimeToWait(time.milliseconds())); // Member in state where it should not send Heartbeat anymore when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true); Review Comment: I would dare to say that we shouldn't need this now that we don't have a spy on `membershipMgr` anymore (it was there were this is used for several assignment/callbacks checks, not at the HB mgr level) ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -251,23 +304,25 @@ public void testSkippingHeartbeat(final boolean shouldSkipHeartbeat) { @Test public void testTimerNotDue() { -mockStableMember(); time.sleep(100); // time elapsed < heartbeatInterval, no heartbeat should be sent NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(0, result.unsentRequests.size()); assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, result.timeUntilNextPollMs); + +when(pollTimer.remainingMs()).thenReturn(1800L); assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, heartbeatRequestManager.maximumTimeToWait(time.milliseconds())); // Member in state where it should not send Heartbeat anymore when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true); +when(membershipManager.shouldSkipHeartbeat()).thenReturn(true); membershipManager.transitionToFatal(); Review Comment: we shouldn't need this explicit call on the mock now (it's replaced by the `shouldSkipHeartbeat` expectation you already set above). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1668779441 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -106,72 +102,90 @@ public class HeartbeatRequestManagerTest { private MembershipManager membershipManager; private HeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState; private HeartbeatRequestManager.HeartbeatState heartbeatState; -private final String memberId = "member-id"; -private final int memberEpoch = 1; private BackgroundEventHandler backgroundEventHandler; -private Metrics metrics; +private LogContext logContext; @BeforeEach public void setUp() { -setUp(ConsumerTestBuilder.createDefaultGroupInformation()); -} +this.time = new MockTime(); +this.logContext = new LogContext(); +this.pollTimer = mock(Timer.class); +this.coordinatorRequestManager = mock(CoordinatorRequestManager.class); +this.heartbeatState = mock(HeartbeatState.class); +this.backgroundEventHandler = mock(BackgroundEventHandler.class); +this.subscriptions = mock(SubscriptionState.class); +this.membershipManager = mock(MembershipManagerImpl.class); +this.metadata = mock(ConsumerMetadata.class); +Metrics metrics = new Metrics(time); +ConsumerConfig config = mock(ConsumerConfig.class); + +this.heartbeatRequestState = spy(new HeartbeatRequestState( +logContext, +time, +DEFAULT_HEARTBEAT_INTERVAL_MS, +DEFAULT_RETRY_BACKOFF_MS, +DEFAULT_RETRY_BACKOFF_MAX_MS, +DEFAULT_HEARTBEAT_JITTER_MS)); -private void setUp(Optional groupInfo) { -testBuilder = new ConsumerTestBuilder(groupInfo, true, false); -time = testBuilder.time; -coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new); -heartbeatRequestManager = testBuilder.heartbeatRequestManager.orElseThrow(IllegalStateException::new); -heartbeatRequestState = testBuilder.heartbeatRequestState.orElseThrow(IllegalStateException::new); -heartbeatState = testBuilder.heartbeatState.orElseThrow(IllegalStateException::new); -backgroundEventHandler = testBuilder.backgroundEventHandler; -subscriptions = testBuilder.subscriptions; -membershipManager = testBuilder.membershipManager.orElseThrow(IllegalStateException::new); -metadata = testBuilder.metadata; -metrics = new Metrics(time); +this.heartbeatRequestManager = new HeartbeatRequestManager( +logContext, +pollTimer, +config, +coordinatorRequestManager, +membershipManager, +heartbeatState, +heartbeatRequestState, +backgroundEventHandler, +metrics); when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", ))); +Map> map = new HashMap<>(); +LocalAssignment local = new LocalAssignment(0, map); +when(membershipManager.currentAssignment()).thenReturn(local); } -private void resetWithZeroHeartbeatInterval(Optional groupInstanceId) { -cleanup(); - -ConsumerTestBuilder.GroupInformation gi = new ConsumerTestBuilder.GroupInformation( -DEFAULT_GROUP_ID, -groupInstanceId, +private void createHeartbeatStateWith0HeartbeatInterval() { +this.heartbeatRequestState = spy(new HeartbeatRequestState( +logContext, +time, 0, -0.0, -Optional.of(DEFAULT_REMOTE_ASSIGNOR) -); - -setUp(Optional.of(gi)); -} +DEFAULT_RETRY_BACKOFF_MS, +DEFAULT_RETRY_BACKOFF_MAX_MS, +DEFAULT_HEARTBEAT_JITTER_MS)); -@AfterEach -public void cleanup() { -if (testBuilder != null) { -testBuilder.close(); -} +heartbeatRequestManager = createHeartbeatRequestManager( +coordinatorRequestManager, +membershipManager, +heartbeatState, +heartbeatRequestState, +backgroundEventHandler); } @Test public void testHeartbeatOnStartup() { + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty()); NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(0, result.unsentRequests.size()); -resetWithZeroHeartbeatInterval(Optional.empty()); -mockStableMember(); +time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); assertEquals(0, heartbeatRequestManager.maximumTimeToWait(time.milli
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1668775173 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -106,72 +102,90 @@ public class HeartbeatRequestManagerTest { private MembershipManager membershipManager; private HeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState; private HeartbeatRequestManager.HeartbeatState heartbeatState; -private final String memberId = "member-id"; -private final int memberEpoch = 1; private BackgroundEventHandler backgroundEventHandler; -private Metrics metrics; +private LogContext logContext; @BeforeEach public void setUp() { -setUp(ConsumerTestBuilder.createDefaultGroupInformation()); -} +this.time = new MockTime(); +this.logContext = new LogContext(); +this.pollTimer = mock(Timer.class); +this.coordinatorRequestManager = mock(CoordinatorRequestManager.class); +this.heartbeatState = mock(HeartbeatState.class); +this.backgroundEventHandler = mock(BackgroundEventHandler.class); +this.subscriptions = mock(SubscriptionState.class); +this.membershipManager = mock(MembershipManagerImpl.class); +this.metadata = mock(ConsumerMetadata.class); +Metrics metrics = new Metrics(time); +ConsumerConfig config = mock(ConsumerConfig.class); + +this.heartbeatRequestState = spy(new HeartbeatRequestState( +logContext, +time, +DEFAULT_HEARTBEAT_INTERVAL_MS, +DEFAULT_RETRY_BACKOFF_MS, +DEFAULT_RETRY_BACKOFF_MAX_MS, +DEFAULT_HEARTBEAT_JITTER_MS)); -private void setUp(Optional groupInfo) { -testBuilder = new ConsumerTestBuilder(groupInfo, true, false); -time = testBuilder.time; -coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new); -heartbeatRequestManager = testBuilder.heartbeatRequestManager.orElseThrow(IllegalStateException::new); -heartbeatRequestState = testBuilder.heartbeatRequestState.orElseThrow(IllegalStateException::new); -heartbeatState = testBuilder.heartbeatState.orElseThrow(IllegalStateException::new); -backgroundEventHandler = testBuilder.backgroundEventHandler; -subscriptions = testBuilder.subscriptions; -membershipManager = testBuilder.membershipManager.orElseThrow(IllegalStateException::new); -metadata = testBuilder.metadata; -metrics = new Metrics(time); +this.heartbeatRequestManager = new HeartbeatRequestManager( +logContext, +pollTimer, +config, +coordinatorRequestManager, +membershipManager, +heartbeatState, +heartbeatRequestState, +backgroundEventHandler, +metrics); when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", ))); +Map> map = new HashMap<>(); +LocalAssignment local = new LocalAssignment(0, map); +when(membershipManager.currentAssignment()).thenReturn(local); } -private void resetWithZeroHeartbeatInterval(Optional groupInstanceId) { -cleanup(); - -ConsumerTestBuilder.GroupInformation gi = new ConsumerTestBuilder.GroupInformation( -DEFAULT_GROUP_ID, -groupInstanceId, +private void createHeartbeatStateWith0HeartbeatInterval() { +this.heartbeatRequestState = spy(new HeartbeatRequestState( +logContext, +time, 0, -0.0, -Optional.of(DEFAULT_REMOTE_ASSIGNOR) -); - -setUp(Optional.of(gi)); -} +DEFAULT_RETRY_BACKOFF_MS, +DEFAULT_RETRY_BACKOFF_MAX_MS, +DEFAULT_HEARTBEAT_JITTER_MS)); -@AfterEach -public void cleanup() { -if (testBuilder != null) { -testBuilder.close(); -} +heartbeatRequestManager = createHeartbeatRequestManager( +coordinatorRequestManager, +membershipManager, +heartbeatState, +heartbeatRequestState, +backgroundEventHandler); } @Test public void testHeartbeatOnStartup() { + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty()); Review Comment: Not really hurtful but this is changing the path the test runs. Before this PR, the first poll on this test would return null because the member was unsubscribed (shouldSkipHeartbeat true). Now it's because there is no coordinator. Both go down the same path in the poll implementation but if we want to be true to what we had before, here we should mock
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1668745058 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -106,72 +102,90 @@ public class HeartbeatRequestManagerTest { private MembershipManager membershipManager; private HeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState; private HeartbeatRequestManager.HeartbeatState heartbeatState; -private final String memberId = "member-id"; -private final int memberEpoch = 1; private BackgroundEventHandler backgroundEventHandler; -private Metrics metrics; +private LogContext logContext; @BeforeEach public void setUp() { -setUp(ConsumerTestBuilder.createDefaultGroupInformation()); -} +this.time = new MockTime(); +this.logContext = new LogContext(); +this.pollTimer = mock(Timer.class); +this.coordinatorRequestManager = mock(CoordinatorRequestManager.class); +this.heartbeatState = mock(HeartbeatState.class); +this.backgroundEventHandler = mock(BackgroundEventHandler.class); +this.subscriptions = mock(SubscriptionState.class); +this.membershipManager = mock(MembershipManagerImpl.class); +this.metadata = mock(ConsumerMetadata.class); +Metrics metrics = new Metrics(time); +ConsumerConfig config = mock(ConsumerConfig.class); + +this.heartbeatRequestState = spy(new HeartbeatRequestState( +logContext, +time, +DEFAULT_HEARTBEAT_INTERVAL_MS, +DEFAULT_RETRY_BACKOFF_MS, +DEFAULT_RETRY_BACKOFF_MAX_MS, +DEFAULT_HEARTBEAT_JITTER_MS)); -private void setUp(Optional groupInfo) { -testBuilder = new ConsumerTestBuilder(groupInfo, true, false); -time = testBuilder.time; -coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new); -heartbeatRequestManager = testBuilder.heartbeatRequestManager.orElseThrow(IllegalStateException::new); -heartbeatRequestState = testBuilder.heartbeatRequestState.orElseThrow(IllegalStateException::new); -heartbeatState = testBuilder.heartbeatState.orElseThrow(IllegalStateException::new); -backgroundEventHandler = testBuilder.backgroundEventHandler; -subscriptions = testBuilder.subscriptions; -membershipManager = testBuilder.membershipManager.orElseThrow(IllegalStateException::new); -metadata = testBuilder.metadata; -metrics = new Metrics(time); +this.heartbeatRequestManager = new HeartbeatRequestManager( +logContext, +pollTimer, +config, +coordinatorRequestManager, +membershipManager, +heartbeatState, +heartbeatRequestState, +backgroundEventHandler, +metrics); when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", ))); +Map> map = new HashMap<>(); +LocalAssignment local = new LocalAssignment(0, map); +when(membershipManager.currentAssignment()).thenReturn(local); } -private void resetWithZeroHeartbeatInterval(Optional groupInstanceId) { -cleanup(); - -ConsumerTestBuilder.GroupInformation gi = new ConsumerTestBuilder.GroupInformation( -DEFAULT_GROUP_ID, -groupInstanceId, +private void createHeartbeatStateWith0HeartbeatInterval() { +this.heartbeatRequestState = spy(new HeartbeatRequestState( +logContext, +time, 0, -0.0, -Optional.of(DEFAULT_REMOTE_ASSIGNOR) -); - -setUp(Optional.of(gi)); -} +DEFAULT_RETRY_BACKOFF_MS, +DEFAULT_RETRY_BACKOFF_MAX_MS, +DEFAULT_HEARTBEAT_JITTER_MS)); -@AfterEach -public void cleanup() { -if (testBuilder != null) { -testBuilder.close(); -} +heartbeatRequestManager = createHeartbeatRequestManager( +coordinatorRequestManager, +membershipManager, +heartbeatState, +heartbeatRequestState, +backgroundEventHandler); } @Test public void testHeartbeatOnStartup() { + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty()); NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(0, result.unsentRequests.size()); -resetWithZeroHeartbeatInterval(Optional.empty()); -mockStableMember(); +time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); assertEquals(0, heartbeatRequestManager.maximumTimeToWait(time.milli
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1668723357 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -106,72 +102,90 @@ public class HeartbeatRequestManagerTest { private MembershipManager membershipManager; private HeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState; private HeartbeatRequestManager.HeartbeatState heartbeatState; -private final String memberId = "member-id"; -private final int memberEpoch = 1; private BackgroundEventHandler backgroundEventHandler; -private Metrics metrics; +private LogContext logContext; @BeforeEach public void setUp() { -setUp(ConsumerTestBuilder.createDefaultGroupInformation()); -} +this.time = new MockTime(); +this.logContext = new LogContext(); +this.pollTimer = mock(Timer.class); +this.coordinatorRequestManager = mock(CoordinatorRequestManager.class); +this.heartbeatState = mock(HeartbeatState.class); +this.backgroundEventHandler = mock(BackgroundEventHandler.class); +this.subscriptions = mock(SubscriptionState.class); +this.membershipManager = mock(MembershipManagerImpl.class); +this.metadata = mock(ConsumerMetadata.class); +Metrics metrics = new Metrics(time); +ConsumerConfig config = mock(ConsumerConfig.class); + +this.heartbeatRequestState = spy(new HeartbeatRequestState( +logContext, +time, +DEFAULT_HEARTBEAT_INTERVAL_MS, +DEFAULT_RETRY_BACKOFF_MS, +DEFAULT_RETRY_BACKOFF_MAX_MS, +DEFAULT_HEARTBEAT_JITTER_MS)); -private void setUp(Optional groupInfo) { -testBuilder = new ConsumerTestBuilder(groupInfo, true, false); -time = testBuilder.time; -coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new); -heartbeatRequestManager = testBuilder.heartbeatRequestManager.orElseThrow(IllegalStateException::new); -heartbeatRequestState = testBuilder.heartbeatRequestState.orElseThrow(IllegalStateException::new); -heartbeatState = testBuilder.heartbeatState.orElseThrow(IllegalStateException::new); -backgroundEventHandler = testBuilder.backgroundEventHandler; -subscriptions = testBuilder.subscriptions; -membershipManager = testBuilder.membershipManager.orElseThrow(IllegalStateException::new); -metadata = testBuilder.metadata; -metrics = new Metrics(time); +this.heartbeatRequestManager = new HeartbeatRequestManager( +logContext, +pollTimer, +config, +coordinatorRequestManager, +membershipManager, +heartbeatState, +heartbeatRequestState, +backgroundEventHandler, +metrics); when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", ))); +Map> map = new HashMap<>(); +LocalAssignment local = new LocalAssignment(0, map); +when(membershipManager.currentAssignment()).thenReturn(local); Review Comment: do we really need to set this expectation for all tests? this is only used when building the request if there were partitions reconciled, so I would expect to need it only in very specific tests around that. Worth double-checking. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
brenden20 commented on PR #16200: URL: https://github.com/apache/kafka/pull/16200#issuecomment-2207337445 @philipnee @lianetm thank you both for the feedback, I have addressed all comments -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
philipnee commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1664632832 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -594,24 +740,13 @@ public void testHeartbeatState() { @Test public void testPollTimerExpiration() { -coordinatorRequestManager = mock(CoordinatorRequestManager.class); -membershipManager = mock(MembershipManager.class); -heartbeatState = mock(HeartbeatRequestManager.HeartbeatState.class); -heartbeatRequestState = spy(new HeartbeatRequestManager.HeartbeatRequestState( -new LogContext(), -time, -DEFAULT_HEARTBEAT_INTERVAL_MS, -DEFAULT_RETRY_BACKOFF_MS, -DEFAULT_RETRY_BACKOFF_MAX_MS, -0)); -backgroundEventHandler = mock(BackgroundEventHandler.class); - heartbeatRequestManager = createHeartbeatRequestManager( coordinatorRequestManager, membershipManager, heartbeatState, heartbeatRequestState, backgroundEventHandler); + Review Comment: can we remove the extra space. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
philipnee commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1664631319 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -277,27 +332,29 @@ public void testHeartbeatNotSentIfAnotherOneInFlight() { result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(0, result.unsentRequests.size(), "No heartbeat should be sent while a " + -"previous one is in-flight"); +"previous one is in-flight"); time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(0, result.unsentRequests.size(), "No heartbeat should be sent when the " + -"interval expires if there is a previous HB request in-flight"); +"interval expires if there is a previous HB request in-flight"); // Receive response for the inflight after the interval expired. The next HB should be sent // on the next poll waiting only for the minimal backoff. inflightReq.handler().onComplete(createHeartbeatResponse(inflightReq, Errors.NONE)); time.sleep(DEFAULT_RETRY_BACKOFF_MS); result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(1, result.unsentRequests.size(), "A next heartbeat should be sent on " + -"the first poll after receiving a response that took longer than the interval, " + -"waiting only for the minimal backoff."); +"the first poll after receiving a response that took longer than the interval, " + Review Comment: could you revert the changes here? (as well as the ones above and below) I think these are IDE indentation changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
philipnee commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1664366556 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -106,72 +102,90 @@ public class HeartbeatRequestManagerTest { private MembershipManager membershipManager; private HeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState; private HeartbeatRequestManager.HeartbeatState heartbeatState; -private final String memberId = "member-id"; -private final int memberEpoch = 1; private BackgroundEventHandler backgroundEventHandler; -private Metrics metrics; +private LogContext logContext; @BeforeEach public void setUp() { -setUp(ConsumerTestBuilder.createDefaultGroupInformation()); -} +this.time = new MockTime(); +Metrics metrics = new Metrics(time); Review Comment: for aesthetic purpose, let's group all the `this` and non `this` i.e., can we move metrics to the bottom? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1649440775 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -106,72 +102,99 @@ public class HeartbeatRequestManagerTest { private MembershipManager membershipManager; private HeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState; private HeartbeatRequestManager.HeartbeatState heartbeatState; -private final String memberId = "member-id"; -private final int memberEpoch = 1; private BackgroundEventHandler backgroundEventHandler; -private Metrics metrics; +private LogContext logContext; @BeforeEach public void setUp() { -setUp(ConsumerTestBuilder.createDefaultGroupInformation()); -} - -private void setUp(Optional groupInfo) { -testBuilder = new ConsumerTestBuilder(groupInfo, true, false); -time = testBuilder.time; -coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new); -heartbeatRequestManager = testBuilder.heartbeatRequestManager.orElseThrow(IllegalStateException::new); -heartbeatRequestState = testBuilder.heartbeatRequestState.orElseThrow(IllegalStateException::new); -heartbeatState = testBuilder.heartbeatState.orElseThrow(IllegalStateException::new); -backgroundEventHandler = testBuilder.backgroundEventHandler; -subscriptions = testBuilder.subscriptions; -membershipManager = testBuilder.membershipManager.orElseThrow(IllegalStateException::new); -metadata = testBuilder.metadata; -metrics = new Metrics(time); +this.time = new MockTime(); +Metrics metrics = new Metrics(time); +this.logContext = new LogContext(); +this.pollTimer = mock(Timer.class); +this.coordinatorRequestManager = mock(CoordinatorRequestManager.class); +this.heartbeatState = mock(HeartbeatState.class); +this.backgroundEventHandler = mock(BackgroundEventHandler.class); +this.subscriptions = mock(SubscriptionState.class); +this.membershipManager = mock(MembershipManagerImpl.class); +this.metadata = mock(ConsumerMetadata.class); +ConsumerConfig config = mock(ConsumerConfig.class); + +this.heartbeatRequestState = spy(new HeartbeatRequestState( +logContext, +time, +DEFAULT_HEARTBEAT_INTERVAL_MS, +DEFAULT_RETRY_BACKOFF_MS, +DEFAULT_RETRY_BACKOFF_MAX_MS, +DEFAULT_HEARTBEAT_JITTER_MS)); + +this.heartbeatRequestManager = new HeartbeatRequestManager( +logContext, +pollTimer, +config, +coordinatorRequestManager, +membershipManager, +heartbeatState, +heartbeatRequestState, +backgroundEventHandler, +metrics); when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", ))); +Map> map = new HashMap<>(); +LocalAssignment local = new LocalAssignment(0, map); +when(membershipManager.currentAssignment()).thenReturn(local); } -private void resetWithZeroHeartbeatInterval(Optional groupInstanceId) { -cleanup(); - -ConsumerTestBuilder.GroupInformation gi = new ConsumerTestBuilder.GroupInformation( -DEFAULT_GROUP_ID, -groupInstanceId, +private void createHeartbeatStateWith0HeartbeatInterval() { +this.heartbeatRequestState = spy(new HeartbeatRequestState( +logContext, +time, 0, -0.0, -Optional.of(DEFAULT_REMOTE_ASSIGNOR) -); +DEFAULT_RETRY_BACKOFF_MS, +DEFAULT_RETRY_BACKOFF_MAX_MS, +DEFAULT_HEARTBEAT_JITTER_MS)); -setUp(Optional.of(gi)); +heartbeatRequestManager = createHeartbeatRequestManager( +coordinatorRequestManager, +membershipManager, +heartbeatState, +heartbeatRequestState, +backgroundEventHandler); } -@AfterEach -public void cleanup() { -if (testBuilder != null) { -testBuilder.close(); -} +private void resetWithZeroHeartbeatInterval() { Review Comment: It works for all but one test, for now I am keeping ```createHeartbeatStateWith0HeartbeatInterval()``` for ```testSkippingHeartbeat(final boolean shouldSkipHeartbeat)``` since this test requires the initial heartbeatInterval be 0 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -275,29 +316,34 @@ public void testHeartbeatNotSentIfAnotherOneInFlight() { assert
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1649427661 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -659,78 +753,38 @@ public void testPollTimerExpirationShouldNotMarkMemberStaleIfMemberAlreadyLeavin @Test public void testisExpiredByUsedForLogging() { -Timer pollTimer = spy(time.timer(DEFAULT_MAX_POLL_INTERVAL_MS)); -heartbeatRequestManager = new HeartbeatRequestManager(new LogContext(), pollTimer, config(), -coordinatorRequestManager, membershipManager, heartbeatState, heartbeatRequestState, -backgroundEventHandler, metrics); when(membershipManager.shouldSkipHeartbeat()).thenReturn(false); int exceededTimeMs = 5; time.sleep(DEFAULT_MAX_POLL_INTERVAL_MS + exceededTimeMs); +when(membershipManager.isLeavingGroup()).thenReturn(false); +when(pollTimer.isExpired()).thenReturn(true); NetworkClientDelegate.PollResult pollResult = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(1, pollResult.unsentRequests.size()); verify(membershipManager).transitionToSendingLeaveGroup(true); verify(pollTimer, never()).isExpiredBy(); -assertEquals(exceededTimeMs, pollTimer.isExpiredBy()); clearInvocations(pollTimer); heartbeatRequestManager.resetPollTimer(time.milliseconds()); verify(pollTimer).isExpiredBy(); } @Test -public void testHeartbeatMetrics() { -// setup -coordinatorRequestManager = mock(CoordinatorRequestManager.class); -membershipManager = mock(MembershipManager.class); -heartbeatState = mock(HeartbeatRequestManager.HeartbeatState.class); -time = new MockTime(); -metrics = new Metrics(time); -heartbeatRequestState = new HeartbeatRequestManager.HeartbeatRequestState( -new LogContext(), -time, -0, // This initial interval should be 0 to ensure heartbeat on the clock -DEFAULT_RETRY_BACKOFF_MS, -DEFAULT_RETRY_BACKOFF_MAX_MS, -0); -backgroundEventHandler = mock(BackgroundEventHandler.class); +public void testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() { heartbeatRequestManager = createHeartbeatRequestManager( -coordinatorRequestManager, -membershipManager, -heartbeatState, -heartbeatRequestState, -backgroundEventHandler); - when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", ))); -when(membershipManager.state()).thenReturn(MemberState.STABLE); - -assertNotNull(getMetric("heartbeat-response-time-max")); -assertNotNull(getMetric("heartbeat-rate")); -assertNotNull(getMetric("heartbeat-total")); -assertNotNull(getMetric("last-heartbeat-seconds-ago")); - -// test poll -assertHeartbeat(heartbeatRequestManager, 0); -time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); -assertEquals(1.0, getMetric("heartbeat-total").metricValue()); -assertEquals((double) TimeUnit.MILLISECONDS.toSeconds(DEFAULT_HEARTBEAT_INTERVAL_MS), getMetric("last-heartbeat-seconds-ago").metricValue()); - -assertHeartbeat(heartbeatRequestManager, DEFAULT_HEARTBEAT_INTERVAL_MS); -assertEquals(0.06d, (double) getMetric("heartbeat-rate").metricValue(), 0.005d); -assertEquals(2.0, getMetric("heartbeat-total").metricValue()); - -// Randomly sleep for some time -Random rand = new Random(); -int randomSleepS = rand.nextInt(11); -time.sleep(randomSleepS * 1000); -assertEquals((double) randomSleepS, getMetric("last-heartbeat-seconds-ago").metricValue()); -} +coordinatorRequestManager, +membershipManager, +heartbeatState, +heartbeatRequestState, +backgroundEventHandler); -@Test -public void testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() { +when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(true); +when(membershipManager.state()).thenReturn(MemberState.STABLE); mockStableMember(); time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); +when(membershipManager.isLeavingGroup()).thenReturn(true); Review Comment: uhm do we need this here? I wouldn't expect so (the membershipMgr is a mock now, and the HB mgr does not check the isLeavingGroup to generate a HB) ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -659,78 +753,38 @@ public void testPollTimerExpirationShouldNotMarkMemberStaleIfMemberAlreadyLeavin @Test public void testisExpiredByUsedForLogging() { -Timer
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on PR #16200: URL: https://github.com/apache/kafka/pull/16200#issuecomment-2183475535 Hey @brenden20, I completed another pass, left some comments. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1638461100 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -86,91 +79,169 @@ import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; - public class HeartbeatRequestManagerTest { private static final String DEFAULT_GROUP_ID = "groupId"; -private static final String CONSUMER_COORDINATOR_METRICS = "consumer-coordinator-metrics"; +private static final String DEFAULT_REMOTE_ASSIGNOR = "uniform"; +private static final String DEFAULT_GROUP_INSTANCE_ID = "group-instance-id"; +private static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000; +private static final int DEFAULT_MAX_POLL_INTERVAL_MS = 1; +private static final long DEFAULT_RETRY_BACKOFF_MS = 80; +private static final long DEFAULT_RETRY_BACKOFF_MAX_MS = 1000; +private static final double DEFAULT_HEARTBEAT_JITTER_MS = 0.0; -private ConsumerTestBuilder testBuilder; private Time time; private Timer pollTimer; private CoordinatorRequestManager coordinatorRequestManager; private SubscriptionState subscriptions; private Metadata metadata; private HeartbeatRequestManager heartbeatRequestManager; +private HeartbeatRequestManager heartbeatRequestManager1; private MembershipManager membershipManager; +private MembershipManager membershipManager1; private HeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState; +private HeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState1; private HeartbeatRequestManager.HeartbeatState heartbeatState; private final String memberId = "member-id"; private final int memberEpoch = 1; private BackgroundEventHandler backgroundEventHandler; -private Metrics metrics; +private RequestManagers requestManagers; +private LogContext logContext; +private ConsumerConfig config; @BeforeEach public void setUp() { -setUp(ConsumerTestBuilder.createDefaultGroupInformation()); -} - -private void setUp(Optional groupInfo) { -testBuilder = new ConsumerTestBuilder(groupInfo, true, false); -time = testBuilder.time; -coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new); -heartbeatRequestManager = testBuilder.heartbeatRequestManager.orElseThrow(IllegalStateException::new); -heartbeatRequestState = testBuilder.heartbeatRequestState.orElseThrow(IllegalStateException::new); -heartbeatState = testBuilder.heartbeatState.orElseThrow(IllegalStateException::new); -backgroundEventHandler = testBuilder.backgroundEventHandler; -subscriptions = testBuilder.subscriptions; -membershipManager = testBuilder.membershipManager.orElseThrow(IllegalStateException::new); -metadata = testBuilder.metadata; -metrics = new Metrics(time); +this.time = new MockTime(); +Metrics metrics = new Metrics(); +this.logContext = new LogContext(); +this.pollTimer = mock(Timer.class); +this.coordinatorRequestManager = mock(CoordinatorRequestManager.class); +this.heartbeatRequestState = mock(HeartbeatRequestState.class); +this.heartbeatState = mock(HeartbeatState.class); +this.backgroundEventHandler = mock(BackgroundEventHandler.class); +this.subscriptions = mock(SubscriptionState.class); +this.membershipManager = mock(MembershipManagerImpl.class); +this.metadata = mock(ConsumerMetadata.class); +this.config = mock(ConsumerConfig.class); +OffsetCommitCallbackInvoker offsetCommitCallbackInvoker = mock(OffsetCommitCallbackInvoker.class); + +this.heartbeatRequestManager = new HeartbeatRequestManager( +logContext, +time, +config, +coordinatorRequestManager, +subscriptions, +membershipManager, +backgroundEventHandler, +metrics); - when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", ))); -} +this.requestManagers = new RequestManagers( Review Comment: Actually no, that was an oversight on my part. Removed now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1638492592 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -86,91 +79,169 @@ import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; - public class HeartbeatRequestManagerTest { private static final String DEFAULT_GROUP_ID = "groupId"; -private static final String CONSUMER_COORDINATOR_METRICS = "consumer-coordinator-metrics"; +private static final String DEFAULT_REMOTE_ASSIGNOR = "uniform"; +private static final String DEFAULT_GROUP_INSTANCE_ID = "group-instance-id"; +private static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000; +private static final int DEFAULT_MAX_POLL_INTERVAL_MS = 1; +private static final long DEFAULT_RETRY_BACKOFF_MS = 80; +private static final long DEFAULT_RETRY_BACKOFF_MAX_MS = 1000; +private static final double DEFAULT_HEARTBEAT_JITTER_MS = 0.0; -private ConsumerTestBuilder testBuilder; private Time time; private Timer pollTimer; private CoordinatorRequestManager coordinatorRequestManager; private SubscriptionState subscriptions; private Metadata metadata; private HeartbeatRequestManager heartbeatRequestManager; +private HeartbeatRequestManager heartbeatRequestManager1; private MembershipManager membershipManager; +private MembershipManager membershipManager1; private HeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState; +private HeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState1; private HeartbeatRequestManager.HeartbeatState heartbeatState; private final String memberId = "member-id"; private final int memberEpoch = 1; private BackgroundEventHandler backgroundEventHandler; -private Metrics metrics; +private RequestManagers requestManagers; +private LogContext logContext; +private ConsumerConfig config; @BeforeEach public void setUp() { -setUp(ConsumerTestBuilder.createDefaultGroupInformation()); -} - -private void setUp(Optional groupInfo) { -testBuilder = new ConsumerTestBuilder(groupInfo, true, false); -time = testBuilder.time; -coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new); -heartbeatRequestManager = testBuilder.heartbeatRequestManager.orElseThrow(IllegalStateException::new); -heartbeatRequestState = testBuilder.heartbeatRequestState.orElseThrow(IllegalStateException::new); -heartbeatState = testBuilder.heartbeatState.orElseThrow(IllegalStateException::new); -backgroundEventHandler = testBuilder.backgroundEventHandler; -subscriptions = testBuilder.subscriptions; -membershipManager = testBuilder.membershipManager.orElseThrow(IllegalStateException::new); -metadata = testBuilder.metadata; -metrics = new Metrics(time); +this.time = new MockTime(); +Metrics metrics = new Metrics(); +this.logContext = new LogContext(); +this.pollTimer = mock(Timer.class); +this.coordinatorRequestManager = mock(CoordinatorRequestManager.class); +this.heartbeatRequestState = mock(HeartbeatRequestState.class); +this.heartbeatState = mock(HeartbeatState.class); Review Comment: Makes sense, I will remove ```heartbeatRequestState1``` and make ```heartbeatRequestState``` into a spy. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1636933032 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -86,91 +79,169 @@ import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; - public class HeartbeatRequestManagerTest { private static final String DEFAULT_GROUP_ID = "groupId"; -private static final String CONSUMER_COORDINATOR_METRICS = "consumer-coordinator-metrics"; +private static final String DEFAULT_REMOTE_ASSIGNOR = "uniform"; +private static final String DEFAULT_GROUP_INSTANCE_ID = "group-instance-id"; +private static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000; +private static final int DEFAULT_MAX_POLL_INTERVAL_MS = 1; +private static final long DEFAULT_RETRY_BACKOFF_MS = 80; +private static final long DEFAULT_RETRY_BACKOFF_MAX_MS = 1000; +private static final double DEFAULT_HEARTBEAT_JITTER_MS = 0.0; -private ConsumerTestBuilder testBuilder; private Time time; private Timer pollTimer; private CoordinatorRequestManager coordinatorRequestManager; private SubscriptionState subscriptions; private Metadata metadata; private HeartbeatRequestManager heartbeatRequestManager; +private HeartbeatRequestManager heartbeatRequestManager1; private MembershipManager membershipManager; +private MembershipManager membershipManager1; private HeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState; +private HeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState1; private HeartbeatRequestManager.HeartbeatState heartbeatState; private final String memberId = "member-id"; private final int memberEpoch = 1; private BackgroundEventHandler backgroundEventHandler; -private Metrics metrics; +private RequestManagers requestManagers; +private LogContext logContext; +private ConsumerConfig config; @BeforeEach public void setUp() { -setUp(ConsumerTestBuilder.createDefaultGroupInformation()); -} - -private void setUp(Optional groupInfo) { -testBuilder = new ConsumerTestBuilder(groupInfo, true, false); -time = testBuilder.time; -coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new); -heartbeatRequestManager = testBuilder.heartbeatRequestManager.orElseThrow(IllegalStateException::new); -heartbeatRequestState = testBuilder.heartbeatRequestState.orElseThrow(IllegalStateException::new); -heartbeatState = testBuilder.heartbeatState.orElseThrow(IllegalStateException::new); -backgroundEventHandler = testBuilder.backgroundEventHandler; -subscriptions = testBuilder.subscriptions; -membershipManager = testBuilder.membershipManager.orElseThrow(IllegalStateException::new); -metadata = testBuilder.metadata; -metrics = new Metrics(time); +this.time = new MockTime(); +Metrics metrics = new Metrics(); +this.logContext = new LogContext(); +this.pollTimer = mock(Timer.class); +this.coordinatorRequestManager = mock(CoordinatorRequestManager.class); +this.heartbeatRequestState = mock(HeartbeatRequestState.class); +this.heartbeatState = mock(HeartbeatState.class); Review Comment: turning this into a mock requires a little bit more work I would expect. Several tests assert on this request state (because it's mainly the internal component that dictates the timing of the heartbeats). Also I notice we end up having this mock and another request state that holds an actual instance (heartbeatRequestState1). So I wonder if this HeartbeatRequestState only is maybe a sensible usage for a spy in this test? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org