Re: [PR] KAFKA-15999: Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-24 Thread via GitHub


chia7712 merged PR #16200:
URL: https://github.com/apache/kafka/pull/16200


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999: Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-24 Thread via GitHub


brenden20 commented on PR #16200:
URL: https://github.com/apache/kafka/pull/16200#issuecomment-2248600964

   @chia7712 fixed that small issue with the name. Let me know if there is 
anything else!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999: Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-24 Thread via GitHub


brenden20 commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1690216933


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -106,60 +99,85 @@ public class HeartbeatRequestManagerTest {
 private MembershipManager membershipManager;
 private HeartbeatRequestManager.HeartbeatRequestState 
heartbeatRequestState;
 private HeartbeatRequestManager.HeartbeatState heartbeatState;
-private final String memberId = "member-id";
-private final int memberEpoch = 1;
 private BackgroundEventHandler backgroundEventHandler;
-private Metrics metrics;
+private LogContext logContext;
 
 @BeforeEach
 public void setUp() {
-setUp(ConsumerTestBuilder.createDefaultGroupInformation());
-}
+this.time = new MockTime();
+this.logContext = new LogContext();
+this.pollTimer = spy(time.timer(DEFAULT_MAX_POLL_INTERVAL_MS));
+this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+this.heartbeatState = mock(HeartbeatState.class);
+this.backgroundEventHandler = mock(BackgroundEventHandler.class);
+this.subscriptions = mock(SubscriptionState.class);
+this.membershipManager = mock(MembershipManagerImpl.class);
+this.metadata = mock(ConsumerMetadata.class);
+Metrics metrics = new Metrics(time);
+ConsumerConfig config = mock(ConsumerConfig.class);
+
+this.heartbeatRequestState = spy(new HeartbeatRequestState(
+logContext,
+time,
+DEFAULT_HEARTBEAT_INTERVAL_MS,
+DEFAULT_RETRY_BACKOFF_MS,
+DEFAULT_RETRY_BACKOFF_MAX_MS,
+DEFAULT_HEARTBEAT_JITTER_MS));
 
-private void setUp(Optional 
groupInfo) {
-testBuilder = new ConsumerTestBuilder(groupInfo, true, false);
-time = testBuilder.time;
-coordinatorRequestManager = 
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
-heartbeatRequestManager = 
testBuilder.heartbeatRequestManager.orElseThrow(IllegalStateException::new);
-heartbeatRequestState = 
testBuilder.heartbeatRequestState.orElseThrow(IllegalStateException::new);
-heartbeatState = 
testBuilder.heartbeatState.orElseThrow(IllegalStateException::new);
-backgroundEventHandler = testBuilder.backgroundEventHandler;
-subscriptions = testBuilder.subscriptions;
-membershipManager = 
testBuilder.membershipManager.orElseThrow(IllegalStateException::new);
-metadata = testBuilder.metadata;
-metrics = new Metrics(time);
+this.heartbeatRequestManager = new HeartbeatRequestManager(
+logContext,
+pollTimer,
+config,
+coordinatorRequestManager,
+membershipManager,
+heartbeatState,
+heartbeatRequestState,
+backgroundEventHandler,
+metrics);
 
-
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new 
Node(1, "localhost", )));
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mock(Node.class)));
 }
 
-private void resetWithZeroHeartbeatInterval(Optional 
groupInstanceId) {
-cleanup();
-
-ConsumerTestBuilder.GroupInformation gi = new 
ConsumerTestBuilder.GroupInformation(
-DEFAULT_GROUP_ID,
-groupInstanceId,
+private void createHeartbeatRequestStateWithZeroHeartbeatInterval() {
+this.heartbeatRequestState = spy(new HeartbeatRequestState(
+logContext,
+time,
 0,
-0.0,
-Optional.of(DEFAULT_REMOTE_ASSIGNOR)
-);
+DEFAULT_RETRY_BACKOFF_MS,
+DEFAULT_RETRY_BACKOFF_MAX_MS,
+DEFAULT_HEARTBEAT_JITTER_MS));
 
-setUp(Optional.of(gi));
+this.heartbeatRequestManager = createHeartbeatRequestManager(
+coordinatorRequestManager,
+membershipManager,
+heartbeatState,
+heartbeatRequestState,
+backgroundEventHandler);
 }
 
-@AfterEach
-public void cleanup() {
-if (testBuilder != null) {
-testBuilder.close();
-}
+private void createHeartbeatStateandRequestManager() {

Review Comment:
   Whoops, missed that. Fixed now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999: Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-24 Thread via GitHub


chia7712 commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1690141224


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -106,60 +99,85 @@ public class HeartbeatRequestManagerTest {
 private MembershipManager membershipManager;
 private HeartbeatRequestManager.HeartbeatRequestState 
heartbeatRequestState;
 private HeartbeatRequestManager.HeartbeatState heartbeatState;
-private final String memberId = "member-id";
-private final int memberEpoch = 1;
 private BackgroundEventHandler backgroundEventHandler;
-private Metrics metrics;
+private LogContext logContext;
 
 @BeforeEach
 public void setUp() {
-setUp(ConsumerTestBuilder.createDefaultGroupInformation());
-}
+this.time = new MockTime();
+this.logContext = new LogContext();
+this.pollTimer = spy(time.timer(DEFAULT_MAX_POLL_INTERVAL_MS));
+this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+this.heartbeatState = mock(HeartbeatState.class);
+this.backgroundEventHandler = mock(BackgroundEventHandler.class);
+this.subscriptions = mock(SubscriptionState.class);
+this.membershipManager = mock(MembershipManagerImpl.class);
+this.metadata = mock(ConsumerMetadata.class);
+Metrics metrics = new Metrics(time);
+ConsumerConfig config = mock(ConsumerConfig.class);
+
+this.heartbeatRequestState = spy(new HeartbeatRequestState(
+logContext,
+time,
+DEFAULT_HEARTBEAT_INTERVAL_MS,
+DEFAULT_RETRY_BACKOFF_MS,
+DEFAULT_RETRY_BACKOFF_MAX_MS,
+DEFAULT_HEARTBEAT_JITTER_MS));
 
-private void setUp(Optional 
groupInfo) {
-testBuilder = new ConsumerTestBuilder(groupInfo, true, false);
-time = testBuilder.time;
-coordinatorRequestManager = 
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
-heartbeatRequestManager = 
testBuilder.heartbeatRequestManager.orElseThrow(IllegalStateException::new);
-heartbeatRequestState = 
testBuilder.heartbeatRequestState.orElseThrow(IllegalStateException::new);
-heartbeatState = 
testBuilder.heartbeatState.orElseThrow(IllegalStateException::new);
-backgroundEventHandler = testBuilder.backgroundEventHandler;
-subscriptions = testBuilder.subscriptions;
-membershipManager = 
testBuilder.membershipManager.orElseThrow(IllegalStateException::new);
-metadata = testBuilder.metadata;
-metrics = new Metrics(time);
+this.heartbeatRequestManager = new HeartbeatRequestManager(
+logContext,
+pollTimer,
+config,
+coordinatorRequestManager,
+membershipManager,
+heartbeatState,
+heartbeatRequestState,
+backgroundEventHandler,
+metrics);
 
-
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new 
Node(1, "localhost", )));
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mock(Node.class)));
 }
 
-private void resetWithZeroHeartbeatInterval(Optional 
groupInstanceId) {
-cleanup();
-
-ConsumerTestBuilder.GroupInformation gi = new 
ConsumerTestBuilder.GroupInformation(
-DEFAULT_GROUP_ID,
-groupInstanceId,
+private void createHeartbeatRequestStateWithZeroHeartbeatInterval() {
+this.heartbeatRequestState = spy(new HeartbeatRequestState(
+logContext,
+time,
 0,
-0.0,
-Optional.of(DEFAULT_REMOTE_ASSIGNOR)
-);
+DEFAULT_RETRY_BACKOFF_MS,
+DEFAULT_RETRY_BACKOFF_MAX_MS,
+DEFAULT_HEARTBEAT_JITTER_MS));
 
-setUp(Optional.of(gi));
+this.heartbeatRequestManager = createHeartbeatRequestManager(
+coordinatorRequestManager,
+membershipManager,
+heartbeatState,
+heartbeatRequestState,
+backgroundEventHandler);
 }
 
-@AfterEach
-public void cleanup() {
-if (testBuilder != null) {
-testBuilder.close();
-}
+private void createHeartbeatStateandRequestManager() {

Review Comment:
   nit: `createHeartbeatStateandRequestManager` -> 
`createHeartbeatStateAndRequestManager`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999: Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-24 Thread via GitHub


brenden20 commented on PR #16200:
URL: https://github.com/apache/kafka/pull/16200#issuecomment-2248441396

   @chia7712 thank you for your feedback! I have addressed your comment by 
moving those object creations into a helper method. Let me know how it looks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999: Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-24 Thread via GitHub


brenden20 commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1690112360


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -360,57 +371,68 @@ public void testNoCoordinator() {
 @ParameterizedTest
 @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT)
 public void testValidateConsumerGroupHeartbeatRequest(final short version) 
{
+heartbeatState = new HeartbeatState(

Review Comment:
   Good point! I have moved those object creations into a helper method now. 
Let me know what you think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999: Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-24 Thread via GitHub


chia7712 commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1689639268


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -360,57 +371,68 @@ public void testNoCoordinator() {
 @ParameterizedTest
 @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT)
 public void testValidateConsumerGroupHeartbeatRequest(final short version) 
{
+heartbeatState = new HeartbeatState(

Review Comment:
   I guess those creations can be moved to a helper to be shared by 
`testValidateConsumerGroupHeartbeatRequestAssignmentSentWhenLocalEpochChanges`, 
`testValidateConsumerGroupHeartbeatRequest` and 
`testFirstHeartbeatIncludesRequiredInfoToJoinGroupAndGetAssignments`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999: Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-23 Thread via GitHub


chia7712 commented on PR #16200:
URL: https://github.com/apache/kafka/pull/16200#issuecomment-2246117642

   I will take a look today!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999: Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-23 Thread via GitHub


lianetm commented on PR #16200:
URL: https://github.com/apache/kafka/pull/16200#issuecomment-2246108930

   Hey @chia7712, would you maybe have some time to take a look at this? Nice 
improvement @brenden20 did on the HeartbeatRequestManager test. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-19 Thread via GitHub


brenden20 commented on PR #16200:
URL: https://github.com/apache/kafka/pull/16200#issuecomment-2239426940

   @lianetm thank you for the review! I addressed the comments you left. Let me 
know if it is good


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-19 Thread via GitHub


brenden20 commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1684512734


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -898,6 +837,30 @@ private HeartbeatRequestManager 
createHeartbeatRequestManager(
 heartbeatState,
 heartbeatRequestState,
 backgroundEventHandler,
-metrics);
+new Metrics());
+}
+
+private void mockJoiningMemberData(String instanceId) {

Review Comment:
   I see, good catch! Changed now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-18 Thread via GitHub


lianetm commented on PR #16200:
URL: https://github.com/apache/kafka/pull/16200#issuecomment-2237383499

   Hey @brenden20 , thanks for the updates! Just a few minor comments left.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-18 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1683379819


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -898,6 +837,30 @@ private HeartbeatRequestManager 
createHeartbeatRequestManager(
 heartbeatState,
 heartbeatRequestState,
 backgroundEventHandler,
-metrics);
+new Metrics());
+}
+
+private void mockJoiningMemberData(String instanceId) {
+
when(membershipManager.groupInstanceId()).thenReturn(Optional.ofNullable(instanceId));
+when(membershipManager.memberId()).thenReturn("");
+when(membershipManager.memberEpoch()).thenReturn(0);
+when(membershipManager.groupId()).thenReturn(DEFAULT_GROUP_ID);
+
when(membershipManager.currentAssignment()).thenReturn(LocalAssignment.NONE);
+
when(membershipManager.serverAssignor()).thenReturn(Optional.of(DEFAULT_REMOTE_ASSIGNOR));
+}
+
+private void mockFencedToJoiningMemberData() {
+when(membershipManager.state()).thenReturn(MemberState.JOINING);
+when(membershipManager.memberEpoch()).thenReturn(0);
+when(membershipManager.groupInstanceId()).thenReturn(Optional.empty());
+}
+
+private void mockDefaultMemberData(String instanceId) {

Review Comment:
   nit: `mockStableMemberData` maybe?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-18 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1683363391


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -898,6 +837,30 @@ private HeartbeatRequestManager 
createHeartbeatRequestManager(
 heartbeatState,
 heartbeatRequestState,
 backgroundEventHandler,
-metrics);
+new Metrics());
+}
+
+private void mockJoiningMemberData(String instanceId) {

Review Comment:
   I realize now that we have to properly set the expected state here because 
it is considered when building the request data, to determine if it should be a 
full heartbeat or not (HeartbeatRequestManager 
[ln#566](https://github.com/apache/kafka/blob/f595802cc752ed01dc74e9ab932209fe25a9d10b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L566)),
 so I would say that this should include 
`when(membershipManager.state()).thenReturn(MemberState.JOINING)`. The test is 
probably passing anyways because of the way this func is used (first HB where 
all previous sentFields are null), but just to be true to how this will happen 
in real-life scenarios.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-18 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1683375014


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -898,6 +837,30 @@ private HeartbeatRequestManager 
createHeartbeatRequestManager(
 heartbeatState,
 heartbeatRequestState,
 backgroundEventHandler,
-metrics);
+new Metrics());
+}
+
+private void mockJoiningMemberData(String instanceId) {
+
when(membershipManager.groupInstanceId()).thenReturn(Optional.ofNullable(instanceId));
+when(membershipManager.memberId()).thenReturn("");
+when(membershipManager.memberEpoch()).thenReturn(0);
+when(membershipManager.groupId()).thenReturn(DEFAULT_GROUP_ID);
+
when(membershipManager.currentAssignment()).thenReturn(LocalAssignment.NONE);
+
when(membershipManager.serverAssignor()).thenReturn(Optional.of(DEFAULT_REMOTE_ASSIGNOR));
+}
+
+private void mockFencedToJoiningMemberData() {

Review Comment:
   nit: `mockRejoiningMemberData` is maybe better (fencing is just only one of 
the ways to get to rejoining, not specifically used here)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-18 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1683363391


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -898,6 +837,30 @@ private HeartbeatRequestManager 
createHeartbeatRequestManager(
 heartbeatState,
 heartbeatRequestState,
 backgroundEventHandler,
-metrics);
+new Metrics());
+}
+
+private void mockJoiningMemberData(String instanceId) {

Review Comment:
   I realize now that we have to properly set the expected state here because 
it is considered when building the request data, to determine if it should be a 
full heartbeat or not (HeartbeatRequestManager 
[ln#566](https://github.com/apache/kafka/blob/f595802cc752ed01dc74e9ab932209fe25a9d10b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L566)),
 so I would say that this should include 
`when(membershipManager.state()).thenReturn(MemberState.JOINING)`. The test is 
probably passing anyways because of the way this func is used (first HB), but 
just to be true to how this will happen in real-life scenarios.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-18 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1683363391


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -898,6 +837,30 @@ private HeartbeatRequestManager 
createHeartbeatRequestManager(
 heartbeatState,
 heartbeatRequestState,
 backgroundEventHandler,
-metrics);
+new Metrics());
+}
+
+private void mockJoiningMemberData(String instanceId) {

Review Comment:
   I realize now that we have to properly set the expected state here because 
it is considered when building the request data, to determine if it should be a 
full heartbeat or not (HeartbeatRequestManager 
[ln#566](https://github.com/apache/kafka/blob/f595802cc752ed01dc74e9ab932209fe25a9d10b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L566)),
 so I would say that this should include 
`when(membershipManager.state()).thenReturn(MemberState.JOINING)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-18 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1683354934


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -659,95 +665,50 @@ public void 
testPollTimerExpirationShouldNotMarkMemberStaleIfMemberAlreadyLeavin
 
 @Test
 public void testisExpiredByUsedForLogging() {
-Timer pollTimer = spy(time.timer(DEFAULT_MAX_POLL_INTERVAL_MS));
-heartbeatRequestManager = new HeartbeatRequestManager(new 
LogContext(), pollTimer, config(),
-coordinatorRequestManager, membershipManager, heartbeatState, 
heartbeatRequestState,
-backgroundEventHandler, metrics);
 when(membershipManager.shouldSkipHeartbeat()).thenReturn(false);
 
 int exceededTimeMs = 5;
 time.sleep(DEFAULT_MAX_POLL_INTERVAL_MS + exceededTimeMs);
 
+when(membershipManager.isLeavingGroup()).thenReturn(false);
 NetworkClientDelegate.PollResult pollResult = 
heartbeatRequestManager.poll(time.milliseconds());
 assertEquals(1, pollResult.unsentRequests.size());
 verify(membershipManager).transitionToSendingLeaveGroup(true);
 verify(pollTimer, never()).isExpiredBy();
-assertEquals(exceededTimeMs, pollTimer.isExpiredBy());
 
 clearInvocations(pollTimer);
 heartbeatRequestManager.resetPollTimer(time.milliseconds());
 verify(pollTimer).isExpiredBy();
 }
 
-@Test
-public void testHeartbeatMetrics() {
-// setup
-coordinatorRequestManager = mock(CoordinatorRequestManager.class);
-membershipManager = mock(MembershipManager.class);
-heartbeatState = mock(HeartbeatRequestManager.HeartbeatState.class);
-time = new MockTime();
-metrics = new Metrics(time);
-heartbeatRequestState = new 
HeartbeatRequestManager.HeartbeatRequestState(
-new LogContext(),
-time,
-0, // This initial interval should be 0 to ensure heartbeat on the 
clock
-DEFAULT_RETRY_BACKOFF_MS,
-DEFAULT_RETRY_BACKOFF_MAX_MS,
-0);
-backgroundEventHandler = mock(BackgroundEventHandler.class);
-heartbeatRequestManager = createHeartbeatRequestManager(
-coordinatorRequestManager,
-membershipManager,
-heartbeatState,
-heartbeatRequestState,
-backgroundEventHandler);
-
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new 
Node(1, "localhost", )));
-when(membershipManager.state()).thenReturn(MemberState.STABLE);
-
-assertNotNull(getMetric("heartbeat-response-time-max"));
-assertNotNull(getMetric("heartbeat-rate"));
-assertNotNull(getMetric("heartbeat-total"));
-assertNotNull(getMetric("last-heartbeat-seconds-ago"));
-
-// test poll
-assertHeartbeat(heartbeatRequestManager, 0);
-time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
-assertEquals(1.0, getMetric("heartbeat-total").metricValue());
-assertEquals((double) 
TimeUnit.MILLISECONDS.toSeconds(DEFAULT_HEARTBEAT_INTERVAL_MS), 
getMetric("last-heartbeat-seconds-ago").metricValue());
-
-assertHeartbeat(heartbeatRequestManager, 
DEFAULT_HEARTBEAT_INTERVAL_MS);
-assertEquals(0.06d, (double) 
getMetric("heartbeat-rate").metricValue(), 0.005d);
-assertEquals(2.0, getMetric("heartbeat-total").metricValue());
-
-// Randomly sleep for some time
-Random rand = new Random();
-int randomSleepS = rand.nextInt(11);
-time.sleep(randomSleepS * 1000);
-assertEquals((double) randomSleepS, 
getMetric("last-heartbeat-seconds-ago").metricValue());
-}
-
 @Test
 public void 
testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() {
-mockStableMember();
+heartbeatRequestManager = createHeartbeatRequestManager(
+coordinatorRequestManager,
+membershipManager,
+heartbeatState,
+heartbeatRequestState,
+backgroundEventHandler);
 
 time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
 NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
 assertEquals(1, result.unsentRequests.size());
 
 // Receive HB response fencing member
 when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true);
-doNothing().when(membershipManager).transitionToFenced();
 ClientResponse response = 
createHeartbeatResponse(result.unsentRequests.get(0), 
Errors.FENCED_MEMBER_EPOCH);
 result.unsentRequests.get(0).handler().onComplete(response);
 
 verify(membershipManager).transitionToFenced();
 verify(heartbeatRequestState).onFailedAttempt(anyLong());
 verify(heartbeatRequestState).reset();
 
+when(membershipManager.shouldSkipHeartbeat()).t

Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-18 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1683354345


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -659,95 +665,50 @@ public void 
testPollTimerExpirationShouldNotMarkMemberStaleIfMemberAlreadyLeavin
 
 @Test
 public void testisExpiredByUsedForLogging() {
-Timer pollTimer = spy(time.timer(DEFAULT_MAX_POLL_INTERVAL_MS));
-heartbeatRequestManager = new HeartbeatRequestManager(new 
LogContext(), pollTimer, config(),
-coordinatorRequestManager, membershipManager, heartbeatState, 
heartbeatRequestState,
-backgroundEventHandler, metrics);
 when(membershipManager.shouldSkipHeartbeat()).thenReturn(false);
 
 int exceededTimeMs = 5;
 time.sleep(DEFAULT_MAX_POLL_INTERVAL_MS + exceededTimeMs);
 
+when(membershipManager.isLeavingGroup()).thenReturn(false);
 NetworkClientDelegate.PollResult pollResult = 
heartbeatRequestManager.poll(time.milliseconds());
 assertEquals(1, pollResult.unsentRequests.size());
 verify(membershipManager).transitionToSendingLeaveGroup(true);
 verify(pollTimer, never()).isExpiredBy();
-assertEquals(exceededTimeMs, pollTimer.isExpiredBy());
 
 clearInvocations(pollTimer);
 heartbeatRequestManager.resetPollTimer(time.milliseconds());
 verify(pollTimer).isExpiredBy();
 }
 
-@Test
-public void testHeartbeatMetrics() {
-// setup
-coordinatorRequestManager = mock(CoordinatorRequestManager.class);
-membershipManager = mock(MembershipManager.class);
-heartbeatState = mock(HeartbeatRequestManager.HeartbeatState.class);
-time = new MockTime();
-metrics = new Metrics(time);
-heartbeatRequestState = new 
HeartbeatRequestManager.HeartbeatRequestState(
-new LogContext(),
-time,
-0, // This initial interval should be 0 to ensure heartbeat on the 
clock
-DEFAULT_RETRY_BACKOFF_MS,
-DEFAULT_RETRY_BACKOFF_MAX_MS,
-0);
-backgroundEventHandler = mock(BackgroundEventHandler.class);
-heartbeatRequestManager = createHeartbeatRequestManager(
-coordinatorRequestManager,
-membershipManager,
-heartbeatState,
-heartbeatRequestState,
-backgroundEventHandler);
-
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new 
Node(1, "localhost", )));
-when(membershipManager.state()).thenReturn(MemberState.STABLE);
-
-assertNotNull(getMetric("heartbeat-response-time-max"));
-assertNotNull(getMetric("heartbeat-rate"));
-assertNotNull(getMetric("heartbeat-total"));
-assertNotNull(getMetric("last-heartbeat-seconds-ago"));
-
-// test poll
-assertHeartbeat(heartbeatRequestManager, 0);
-time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
-assertEquals(1.0, getMetric("heartbeat-total").metricValue());
-assertEquals((double) 
TimeUnit.MILLISECONDS.toSeconds(DEFAULT_HEARTBEAT_INTERVAL_MS), 
getMetric("last-heartbeat-seconds-ago").metricValue());
-
-assertHeartbeat(heartbeatRequestManager, 
DEFAULT_HEARTBEAT_INTERVAL_MS);
-assertEquals(0.06d, (double) 
getMetric("heartbeat-rate").metricValue(), 0.005d);
-assertEquals(2.0, getMetric("heartbeat-total").metricValue());
-
-// Randomly sleep for some time
-Random rand = new Random();
-int randomSleepS = rand.nextInt(11);
-time.sleep(randomSleepS * 1000);
-assertEquals((double) randomSleepS, 
getMetric("last-heartbeat-seconds-ago").metricValue());
-}
-
 @Test
 public void 
testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() {
-mockStableMember();
+heartbeatRequestManager = createHeartbeatRequestManager(
+coordinatorRequestManager,
+membershipManager,
+heartbeatState,
+heartbeatRequestState,
+backgroundEventHandler);
 
 time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
 NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
 assertEquals(1, result.unsentRequests.size());
 
 // Receive HB response fencing member
 when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true);
-doNothing().when(membershipManager).transitionToFenced();
 ClientResponse response = 
createHeartbeatResponse(result.unsentRequests.get(0), 
Errors.FENCED_MEMBER_EPOCH);
 result.unsentRequests.get(0).handler().onComplete(response);
 
 verify(membershipManager).transitionToFenced();
 verify(heartbeatRequestState).onFailedAttempt(anyLong());
 verify(heartbeatRequestState).reset();
 
+when(membershipManager.shouldSkipHeartbeat()).t

Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-18 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1683353250


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -621,7 +628,6 @@ public void testPollTimerExpiration() {
 assertHeartbeat(heartbeatRequestManager, 
DEFAULT_HEARTBEAT_INTERVAL_MS);
 verify(membershipManager).transitionToSendingLeaveGroup(true);
 verify(heartbeatState).reset();
-verify(heartbeatRequestState).reset();
 verify(membershipManager).onHeartbeatRequestSent();
 
 when(membershipManager.state()).thenReturn(MemberState.STALE);

Review Comment:
   maybe no needed anymore? (the following expectation on `shouldSkipHeartbeat` 
is probably all we need) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-17 Thread via GitHub


brenden20 commented on PR #16200:
URL: https://github.com/apache/kafka/pull/16200#issuecomment-2234881749

   @lianetm thank you again for the review and helpful comments, I have 
implemented all suggestions!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-17 Thread via GitHub


brenden20 commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1681916086


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -898,6 +854,42 @@ private HeartbeatRequestManager 
createHeartbeatRequestManager(
 heartbeatState,
 heartbeatRequestState,
 backgroundEventHandler,
-metrics);
+new Metrics());
+}
+
+private void mockJoiningMemberData(boolean instanceId) {
+when(membershipManager.memberId()).thenReturn("");
+when(membershipManager.memberEpoch()).thenReturn(0);
+when(membershipManager.groupId()).thenReturn(DEFAULT_GROUP_ID);
+if (instanceId)
+
when(membershipManager.groupInstanceId()).thenReturn(Optional.of(DEFAULT_GROUP_INSTANCE_ID));
+else
+
when(membershipManager.groupInstanceId()).thenReturn(Optional.empty());

Review Comment:
   I do like this implementation better, thank you!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-17 Thread via GitHub


brenden20 commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1681913213


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -898,6 +854,42 @@ private HeartbeatRequestManager 
createHeartbeatRequestManager(
 heartbeatState,
 heartbeatRequestState,
 backgroundEventHandler,
-metrics);
+new Metrics());
+}
+
+private void mockJoiningMemberData(boolean instanceId) {
+when(membershipManager.memberId()).thenReturn("");
+when(membershipManager.memberEpoch()).thenReturn(0);
+when(membershipManager.groupId()).thenReturn(DEFAULT_GROUP_ID);
+if (instanceId)
+
when(membershipManager.groupInstanceId()).thenReturn(Optional.of(DEFAULT_GROUP_INSTANCE_ID));
+else
+
when(membershipManager.groupInstanceId()).thenReturn(Optional.empty());
+
when(membershipManager.currentAssignment()).thenReturn(LocalAssignment.NONE);
+
when(membershipManager.serverAssignor()).thenReturn(Optional.of("uniform"));
+when(membershipManager.state()).thenReturn(MemberState.UNSUBSCRIBED);

Review Comment:
   Makes sense, changed now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-17 Thread via GitHub


lianetm commented on PR #16200:
URL: https://github.com/apache/kafka/pull/16200#issuecomment-2234017232

   Thanks for the updates @brenden20 . This is looking very good, left some 
comments just to keep improving. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-17 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1681566194


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -580,39 +617,26 @@ public void testHeartbeatState() {
 ConsumerGroupHeartbeatResponseData.Assignment assignmentTopic1 =
 new ConsumerGroupHeartbeatResponseData.Assignment();
 
assignmentTopic1.setTopicPartitions(Collections.singletonList(tpTopic1));
-ConsumerGroupHeartbeatResponse rs1 = new 
ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData()
+rs1 = new ConsumerGroupHeartbeatResponse(new 
ConsumerGroupHeartbeatResponseData()
 .setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS)
-.setMemberId(memberId)
+.setMemberId(DEFAULT_MEMBER_ID)
 .setMemberEpoch(1)
 .setAssignment(assignmentTopic1));
 
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId, 
"topic1"));
-membershipManager.onHeartbeatSuccess(rs1.data());
+mockReconcilingState();
 
 // We remain in RECONCILING state, as the assignment will be 
reconciled on the next poll
 assertEquals(MemberState.RECONCILING, membershipManager.state());

Review Comment:
   ditto (and the `mockReconcilingState` could be completely removed)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-17 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1681573819


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -40,13 +38,13 @@
 import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
 import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;

Review Comment:
   let's remove. This is making the build 
[fail](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-16200/59/pipeline/)
 with unused import



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-17 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1681564234


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -514,61 +536,76 @@ private void assertNextHeartbeatTiming(long 
expectedTimeToNextHeartbeatMs) {
 
 @Test
 public void testHeartbeatState() {
+CommitRequestManager commitRequestManager = 
mock(CommitRequestManager.class);
+mockJoiningMemberData(false);
+
+heartbeatState = new HeartbeatState(
+subscriptions,
+membershipManager,
+DEFAULT_MAX_POLL_INTERVAL_MS
+);
+
+createHeartbeatStateWithZeroHeartbeatInterval();
+
 // The initial ConsumerGroupHeartbeatRequest sets most fields to their 
initial empty values
 ConsumerGroupHeartbeatRequestData data = 
heartbeatState.buildRequestData();
-assertEquals(ConsumerTestBuilder.DEFAULT_GROUP_ID, data.groupId());
+assertEquals(DEFAULT_GROUP_ID, data.groupId());
 assertEquals("", data.memberId());
 assertEquals(0, data.memberEpoch());
 assertNull(data.instanceId());
-assertEquals(ConsumerTestBuilder.DEFAULT_MAX_POLL_INTERVAL_MS, 
data.rebalanceTimeoutMs());
+assertEquals(DEFAULT_MAX_POLL_INTERVAL_MS, data.rebalanceTimeoutMs());
 assertEquals(Collections.emptyList(), data.subscribedTopicNames());
-assertEquals(ConsumerTestBuilder.DEFAULT_REMOTE_ASSIGNOR, 
data.serverAssignor());
+assertEquals(DEFAULT_REMOTE_ASSIGNOR, data.serverAssignor());
 assertEquals(Collections.emptyList(), data.topicPartitions());
-membershipManager.onHeartbeatRequestSent();
 assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
 
 // Mock a response from the group coordinator, that supplies the 
member ID and a new epoch
-mockStableMember();
+when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true);
+when(subscriptions.rebalanceListener()).thenReturn(Optional.empty());
+ConsumerGroupHeartbeatResponse rs1 = new 
ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData()
+.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS)
+.setMemberId(DEFAULT_MEMBER_ID)
+.setMemberEpoch(DEFAULT_MEMBER_EPOCH)
+.setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment())
+);
+
when(commitRequestManager.maybeAutoCommitSyncBeforeRevocation(anyLong())).thenReturn(CompletableFuture.completedFuture(null));
+mockDefaultMemberData(false);
 data = heartbeatState.buildRequestData();
-assertEquals(ConsumerTestBuilder.DEFAULT_GROUP_ID, data.groupId());
-assertEquals(memberId, data.memberId());
+assertEquals(DEFAULT_GROUP_ID, data.groupId());
+assertEquals(DEFAULT_MEMBER_ID, data.memberId());
 assertEquals(1, data.memberEpoch());
 assertNull(data.instanceId());
 assertEquals(-1, data.rebalanceTimeoutMs());
 assertNull(data.subscribedTopicNames());
 assertNull(data.serverAssignor());
-assertEquals(data.topicPartitions(), Collections.emptyList());
-membershipManager.onHeartbeatRequestSent();
+assertEquals(Collections.emptyList(), data.topicPartitions());
 assertEquals(MemberState.STABLE, membershipManager.state());
 
 // Join the group and subscribe to a topic, but the response has not 
yet been received
 String topic = "topic1";
 subscriptions.subscribe(Collections.singleton(topic), 
Optional.empty());
-membershipManager.onSubscriptionUpdated();
-membershipManager.transitionToFenced(); // And indirect way of moving 
to JOINING state
+
when(subscriptions.subscription()).thenReturn(Collections.singleton(topic));
+mockFencedToJoiningMemberData();
 data = heartbeatState.buildRequestData();
-assertEquals(ConsumerTestBuilder.DEFAULT_GROUP_ID, data.groupId());
-assertEquals(memberId, data.memberId());
+assertEquals(DEFAULT_GROUP_ID, data.groupId());
+assertEquals(DEFAULT_MEMBER_ID, data.memberId());
 assertEquals(0, data.memberEpoch());
 assertNull(data.instanceId());
 assertEquals(DEFAULT_MAX_POLL_INTERVAL_MS, data.rebalanceTimeoutMs());
 assertEquals(Collections.singletonList(topic), 
data.subscribedTopicNames());
-assertEquals(ConsumerTestBuilder.DEFAULT_REMOTE_ASSIGNOR, 
data.serverAssignor());
+assertEquals(DEFAULT_REMOTE_ASSIGNOR, data.serverAssignor());
 assertEquals(Collections.emptyList(), data.topicPartitions());
-membershipManager.onHeartbeatRequestSent();
 assertEquals(MemberState.JOINING, membershipManager.state());

Review Comment:
   similar, no value anymore. We should remove this (and the expectation we set 
with the value on `m

Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-17 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1681569580


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -514,61 +536,76 @@ private void assertNextHeartbeatTiming(long 
expectedTimeToNextHeartbeatMs) {
 
 @Test
 public void testHeartbeatState() {
+CommitRequestManager commitRequestManager = 
mock(CommitRequestManager.class);
+mockJoiningMemberData(false);
+
+heartbeatState = new HeartbeatState(
+subscriptions,
+membershipManager,
+DEFAULT_MAX_POLL_INTERVAL_MS
+);
+
+createHeartbeatStateWithZeroHeartbeatInterval();
+
 // The initial ConsumerGroupHeartbeatRequest sets most fields to their 
initial empty values
 ConsumerGroupHeartbeatRequestData data = 
heartbeatState.buildRequestData();
-assertEquals(ConsumerTestBuilder.DEFAULT_GROUP_ID, data.groupId());
+assertEquals(DEFAULT_GROUP_ID, data.groupId());
 assertEquals("", data.memberId());
 assertEquals(0, data.memberEpoch());
 assertNull(data.instanceId());
-assertEquals(ConsumerTestBuilder.DEFAULT_MAX_POLL_INTERVAL_MS, 
data.rebalanceTimeoutMs());
+assertEquals(DEFAULT_MAX_POLL_INTERVAL_MS, data.rebalanceTimeoutMs());
 assertEquals(Collections.emptyList(), data.subscribedTopicNames());
-assertEquals(ConsumerTestBuilder.DEFAULT_REMOTE_ASSIGNOR, 
data.serverAssignor());
+assertEquals(DEFAULT_REMOTE_ASSIGNOR, data.serverAssignor());
 assertEquals(Collections.emptyList(), data.topicPartitions());
-membershipManager.onHeartbeatRequestSent();
 assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());

Review Comment:
   same as comment 
[here](https://github.com/apache/kafka/pull/16200/files#r1681562408)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-17 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1681564234


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -514,61 +536,76 @@ private void assertNextHeartbeatTiming(long 
expectedTimeToNextHeartbeatMs) {
 
 @Test
 public void testHeartbeatState() {
+CommitRequestManager commitRequestManager = 
mock(CommitRequestManager.class);
+mockJoiningMemberData(false);
+
+heartbeatState = new HeartbeatState(
+subscriptions,
+membershipManager,
+DEFAULT_MAX_POLL_INTERVAL_MS
+);
+
+createHeartbeatStateWithZeroHeartbeatInterval();
+
 // The initial ConsumerGroupHeartbeatRequest sets most fields to their 
initial empty values
 ConsumerGroupHeartbeatRequestData data = 
heartbeatState.buildRequestData();
-assertEquals(ConsumerTestBuilder.DEFAULT_GROUP_ID, data.groupId());
+assertEquals(DEFAULT_GROUP_ID, data.groupId());
 assertEquals("", data.memberId());
 assertEquals(0, data.memberEpoch());
 assertNull(data.instanceId());
-assertEquals(ConsumerTestBuilder.DEFAULT_MAX_POLL_INTERVAL_MS, 
data.rebalanceTimeoutMs());
+assertEquals(DEFAULT_MAX_POLL_INTERVAL_MS, data.rebalanceTimeoutMs());
 assertEquals(Collections.emptyList(), data.subscribedTopicNames());
-assertEquals(ConsumerTestBuilder.DEFAULT_REMOTE_ASSIGNOR, 
data.serverAssignor());
+assertEquals(DEFAULT_REMOTE_ASSIGNOR, data.serverAssignor());
 assertEquals(Collections.emptyList(), data.topicPartitions());
-membershipManager.onHeartbeatRequestSent();
 assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
 
 // Mock a response from the group coordinator, that supplies the 
member ID and a new epoch
-mockStableMember();
+when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true);
+when(subscriptions.rebalanceListener()).thenReturn(Optional.empty());
+ConsumerGroupHeartbeatResponse rs1 = new 
ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData()
+.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS)
+.setMemberId(DEFAULT_MEMBER_ID)
+.setMemberEpoch(DEFAULT_MEMBER_EPOCH)
+.setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment())
+);
+
when(commitRequestManager.maybeAutoCommitSyncBeforeRevocation(anyLong())).thenReturn(CompletableFuture.completedFuture(null));
+mockDefaultMemberData(false);
 data = heartbeatState.buildRequestData();
-assertEquals(ConsumerTestBuilder.DEFAULT_GROUP_ID, data.groupId());
-assertEquals(memberId, data.memberId());
+assertEquals(DEFAULT_GROUP_ID, data.groupId());
+assertEquals(DEFAULT_MEMBER_ID, data.memberId());
 assertEquals(1, data.memberEpoch());
 assertNull(data.instanceId());
 assertEquals(-1, data.rebalanceTimeoutMs());
 assertNull(data.subscribedTopicNames());
 assertNull(data.serverAssignor());
-assertEquals(data.topicPartitions(), Collections.emptyList());
-membershipManager.onHeartbeatRequestSent();
+assertEquals(Collections.emptyList(), data.topicPartitions());
 assertEquals(MemberState.STABLE, membershipManager.state());
 
 // Join the group and subscribe to a topic, but the response has not 
yet been received
 String topic = "topic1";
 subscriptions.subscribe(Collections.singleton(topic), 
Optional.empty());
-membershipManager.onSubscriptionUpdated();
-membershipManager.transitionToFenced(); // And indirect way of moving 
to JOINING state
+
when(subscriptions.subscription()).thenReturn(Collections.singleton(topic));
+mockFencedToJoiningMemberData();
 data = heartbeatState.buildRequestData();
-assertEquals(ConsumerTestBuilder.DEFAULT_GROUP_ID, data.groupId());
-assertEquals(memberId, data.memberId());
+assertEquals(DEFAULT_GROUP_ID, data.groupId());
+assertEquals(DEFAULT_MEMBER_ID, data.memberId());
 assertEquals(0, data.memberEpoch());
 assertNull(data.instanceId());
 assertEquals(DEFAULT_MAX_POLL_INTERVAL_MS, data.rebalanceTimeoutMs());
 assertEquals(Collections.singletonList(topic), 
data.subscribedTopicNames());
-assertEquals(ConsumerTestBuilder.DEFAULT_REMOTE_ASSIGNOR, 
data.serverAssignor());
+assertEquals(DEFAULT_REMOTE_ASSIGNOR, data.serverAssignor());
 assertEquals(Collections.emptyList(), data.topicPartitions());
-membershipManager.onHeartbeatRequestSent();
 assertEquals(MemberState.JOINING, membershipManager.state());

Review Comment:
   similar, no value, we should remove this (and the expectation we set with 
the value on `mockFence

Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-17 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1681562408


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -514,61 +536,76 @@ private void assertNextHeartbeatTiming(long 
expectedTimeToNextHeartbeatMs) {
 
 @Test
 public void testHeartbeatState() {
+CommitRequestManager commitRequestManager = 
mock(CommitRequestManager.class);
+mockJoiningMemberData(false);
+
+heartbeatState = new HeartbeatState(
+subscriptions,
+membershipManager,
+DEFAULT_MAX_POLL_INTERVAL_MS
+);
+
+createHeartbeatStateWithZeroHeartbeatInterval();
+
 // The initial ConsumerGroupHeartbeatRequest sets most fields to their 
initial empty values
 ConsumerGroupHeartbeatRequestData data = 
heartbeatState.buildRequestData();
-assertEquals(ConsumerTestBuilder.DEFAULT_GROUP_ID, data.groupId());
+assertEquals(DEFAULT_GROUP_ID, data.groupId());
 assertEquals("", data.memberId());
 assertEquals(0, data.memberEpoch());
 assertNull(data.instanceId());
-assertEquals(ConsumerTestBuilder.DEFAULT_MAX_POLL_INTERVAL_MS, 
data.rebalanceTimeoutMs());
+assertEquals(DEFAULT_MAX_POLL_INTERVAL_MS, data.rebalanceTimeoutMs());
 assertEquals(Collections.emptyList(), data.subscribedTopicNames());
-assertEquals(ConsumerTestBuilder.DEFAULT_REMOTE_ASSIGNOR, 
data.serverAssignor());
+assertEquals(DEFAULT_REMOTE_ASSIGNOR, data.serverAssignor());
 assertEquals(Collections.emptyList(), data.topicPartitions());
-membershipManager.onHeartbeatRequestSent();
 assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
 
 // Mock a response from the group coordinator, that supplies the 
member ID and a new epoch
-mockStableMember();
+when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true);
+when(subscriptions.rebalanceListener()).thenReturn(Optional.empty());
+ConsumerGroupHeartbeatResponse rs1 = new 
ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData()
+.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS)
+.setMemberId(DEFAULT_MEMBER_ID)
+.setMemberEpoch(DEFAULT_MEMBER_EPOCH)
+.setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment())
+);
+
when(commitRequestManager.maybeAutoCommitSyncBeforeRevocation(anyLong())).thenReturn(CompletableFuture.completedFuture(null));
+mockDefaultMemberData(false);
 data = heartbeatState.buildRequestData();
-assertEquals(ConsumerTestBuilder.DEFAULT_GROUP_ID, data.groupId());
-assertEquals(memberId, data.memberId());
+assertEquals(DEFAULT_GROUP_ID, data.groupId());
+assertEquals(DEFAULT_MEMBER_ID, data.memberId());
 assertEquals(1, data.memberEpoch());
 assertNull(data.instanceId());
 assertEquals(-1, data.rebalanceTimeoutMs());
 assertNull(data.subscribedTopicNames());
 assertNull(data.serverAssignor());
-assertEquals(data.topicPartitions(), Collections.emptyList());
-membershipManager.onHeartbeatRequestSent();
+assertEquals(Collections.emptyList(), data.topicPartitions());
 assertEquals(MemberState.STABLE, membershipManager.state());

Review Comment:
   I think we should remove this now that the `membershipMgr` is a mock, as it 
should be :) (it has no value since it's passing only because we set the 
expectation ourselves on `mockDefaultMemberData`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-17 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1681559941


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -514,61 +536,76 @@ private void assertNextHeartbeatTiming(long 
expectedTimeToNextHeartbeatMs) {
 
 @Test
 public void testHeartbeatState() {
+CommitRequestManager commitRequestManager = 
mock(CommitRequestManager.class);

Review Comment:
   I would expect we don't need anything related to the commitRequestManager, 
because the `HBManager` we're testing knows nothing about it. It's the 
`MembershipMgr` the one that commits offsets before revoking partitions. Could 
you try removing this and the expectation on ln 571? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-17 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1681548025


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -898,6 +854,42 @@ private HeartbeatRequestManager 
createHeartbeatRequestManager(
 heartbeatState,
 heartbeatRequestState,
 backgroundEventHandler,
-metrics);
+new Metrics());
+}
+
+private void mockJoiningMemberData(boolean instanceId) {
+when(membershipManager.memberId()).thenReturn("");
+when(membershipManager.memberEpoch()).thenReturn(0);
+when(membershipManager.groupId()).thenReturn(DEFAULT_GROUP_ID);
+if (instanceId)
+
when(membershipManager.groupInstanceId()).thenReturn(Optional.of(DEFAULT_GROUP_INSTANCE_ID));
+else
+
when(membershipManager.groupInstanceId()).thenReturn(Optional.empty());

Review Comment:
   what about simplifying this to:
   ```
   
when(membershipManager.groupInstanceId()).thenReturn(Optional.ofNullable(instanceId));
   ```
   Passing a String instanceId as param. I find it's also clearer to have the 
string param with the explicit `instanceId` to use (or null).



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -898,6 +854,42 @@ private HeartbeatRequestManager 
createHeartbeatRequestManager(
 heartbeatState,
 heartbeatRequestState,
 backgroundEventHandler,
-metrics);
+new Metrics());
+}
+
+private void mockJoiningMemberData(boolean instanceId) {
+when(membershipManager.memberId()).thenReturn("");
+when(membershipManager.memberEpoch()).thenReturn(0);
+when(membershipManager.groupId()).thenReturn(DEFAULT_GROUP_ID);
+if (instanceId)
+
when(membershipManager.groupInstanceId()).thenReturn(Optional.of(DEFAULT_GROUP_INSTANCE_ID));
+else
+
when(membershipManager.groupInstanceId()).thenReturn(Optional.empty());

Review Comment:
   what about simplifying this to:
   ```

when(membershipManager.groupInstanceId()).thenReturn(Optional.ofNullable(instanceId));
   ```
   Passing a String instanceId as param. I find it's also clearer to have the 
string param with the explicit `instanceId` to use (or null).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-17 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1681548025


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -898,6 +854,42 @@ private HeartbeatRequestManager 
createHeartbeatRequestManager(
 heartbeatState,
 heartbeatRequestState,
 backgroundEventHandler,
-metrics);
+new Metrics());
+}
+
+private void mockJoiningMemberData(boolean instanceId) {
+when(membershipManager.memberId()).thenReturn("");
+when(membershipManager.memberEpoch()).thenReturn(0);
+when(membershipManager.groupId()).thenReturn(DEFAULT_GROUP_ID);
+if (instanceId)
+
when(membershipManager.groupInstanceId()).thenReturn(Optional.of(DEFAULT_GROUP_INSTANCE_ID));
+else
+
when(membershipManager.groupInstanceId()).thenReturn(Optional.empty());

Review Comment:
   what about simplifying this to:
   ```
   
when(membershipManager.groupInstanceId()).thenReturn(Optional.ofNullable(instanceId));
   ```
   Passing a String instanceId as param. I find it's also clearer to have the 
string param with the explicit `instanceId` to use (or null).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-17 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1681537376


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -898,6 +854,42 @@ private HeartbeatRequestManager 
createHeartbeatRequestManager(
 heartbeatState,
 heartbeatRequestState,
 backgroundEventHandler,
-metrics);
+new Metrics());
+}
+
+private void mockJoiningMemberData(boolean instanceId) {
+when(membershipManager.memberId()).thenReturn("");
+when(membershipManager.memberEpoch()).thenReturn(0);
+when(membershipManager.groupId()).thenReturn(DEFAULT_GROUP_ID);
+if (instanceId)
+
when(membershipManager.groupInstanceId()).thenReturn(Optional.of(DEFAULT_GROUP_INSTANCE_ID));
+else
+
when(membershipManager.groupInstanceId()).thenReturn(Optional.empty());
+
when(membershipManager.currentAssignment()).thenReturn(LocalAssignment.NONE);
+
when(membershipManager.serverAssignor()).thenReturn(Optional.of("uniform"));
+when(membershipManager.state()).thenReturn(MemberState.UNSUBSCRIBED);
+}
+
+private void mockFencedToJoiningMemberData() {
+when(membershipManager.state()).thenReturn(MemberState.JOINING);
+when(membershipManager.memberEpoch()).thenReturn(0);
+when(membershipManager.groupInstanceId()).thenReturn(Optional.empty());
+}
+
+private void mockDefaultMemberData(boolean instanceId) {
+when(membershipManager.currentAssignment()).thenReturn(new 
LocalAssignment(0, Collections.emptyMap()));
+when(membershipManager.groupId()).thenReturn(DEFAULT_GROUP_ID);
+when(membershipManager.memberId()).thenReturn(DEFAULT_MEMBER_ID);
+when(membershipManager.memberEpoch()).thenReturn(DEFAULT_MEMBER_EPOCH);
+if (instanceId)
+
when(membershipManager.groupInstanceId()).thenReturn(Optional.of(DEFAULT_GROUP_INSTANCE_ID));
+else
+
when(membershipManager.groupInstanceId()).thenReturn(Optional.empty());
+
when(membershipManager.serverAssignor()).thenReturn(Optional.of("uniform"));

Review Comment:
   DEFAULT_REMOTE_ASSIGNOR ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-17 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1681535352


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -898,6 +854,42 @@ private HeartbeatRequestManager 
createHeartbeatRequestManager(
 heartbeatState,
 heartbeatRequestState,
 backgroundEventHandler,
-metrics);
+new Metrics());
+}
+
+private void mockJoiningMemberData(boolean instanceId) {
+when(membershipManager.memberId()).thenReturn("");
+when(membershipManager.memberEpoch()).thenReturn(0);
+when(membershipManager.groupId()).thenReturn(DEFAULT_GROUP_ID);
+if (instanceId)
+
when(membershipManager.groupInstanceId()).thenReturn(Optional.of(DEFAULT_GROUP_INSTANCE_ID));
+else
+
when(membershipManager.groupInstanceId()).thenReturn(Optional.empty());
+
when(membershipManager.currentAssignment()).thenReturn(LocalAssignment.NONE);
+
when(membershipManager.serverAssignor()).thenReturn(Optional.of("uniform"));

Review Comment:
   should we use the constant we already have? DEFAULT_REMOTE_ASSIGNOR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-17 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1681534114


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -898,6 +854,42 @@ private HeartbeatRequestManager 
createHeartbeatRequestManager(
 heartbeatState,
 heartbeatRequestState,
 backgroundEventHandler,
-metrics);
+new Metrics());
+}
+
+private void mockJoiningMemberData(boolean instanceId) {
+when(membershipManager.memberId()).thenReturn("");
+when(membershipManager.memberEpoch()).thenReturn(0);
+when(membershipManager.groupId()).thenReturn(DEFAULT_GROUP_ID);
+if (instanceId)
+
when(membershipManager.groupInstanceId()).thenReturn(Optional.of(DEFAULT_GROUP_INSTANCE_ID));
+else
+
when(membershipManager.groupInstanceId()).thenReturn(Optional.empty());
+
when(membershipManager.currentAssignment()).thenReturn(LocalAssignment.NONE);
+
when(membershipManager.serverAssignor()).thenReturn(Optional.of("uniform"));
+when(membershipManager.state()).thenReturn(MemberState.UNSUBSCRIBED);
+}
+
+private void mockFencedToJoiningMemberData() {
+when(membershipManager.state()).thenReturn(MemberState.JOINING);
+when(membershipManager.memberEpoch()).thenReturn(0);
+when(membershipManager.groupInstanceId()).thenReturn(Optional.empty());
+}
+
+private void mockDefaultMemberData(boolean instanceId) {
+when(membershipManager.currentAssignment()).thenReturn(new 
LocalAssignment(0, Collections.emptyMap()));
+when(membershipManager.groupId()).thenReturn(DEFAULT_GROUP_ID);
+when(membershipManager.memberId()).thenReturn(DEFAULT_MEMBER_ID);
+when(membershipManager.memberEpoch()).thenReturn(DEFAULT_MEMBER_EPOCH);
+if (instanceId)
+
when(membershipManager.groupInstanceId()).thenReturn(Optional.of(DEFAULT_GROUP_INSTANCE_ID));
+else
+
when(membershipManager.groupInstanceId()).thenReturn(Optional.empty());
+
when(membershipManager.serverAssignor()).thenReturn(Optional.of("uniform"));
+when(membershipManager.state()).thenReturn(MemberState.STABLE);

Review Comment:
   Similar to previous comment. If this was needed to unblock asserts on 
`membershipMgr.state()`, we should remove those assertions and this, as it 
truly brings no value to test the expectation we set.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-17 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1681524660


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -898,6 +854,42 @@ private HeartbeatRequestManager 
createHeartbeatRequestManager(
 heartbeatState,
 heartbeatRequestState,
 backgroundEventHandler,
-metrics);
+new Metrics());
+}
+
+private void mockJoiningMemberData(boolean instanceId) {
+when(membershipManager.memberId()).thenReturn("");
+when(membershipManager.memberEpoch()).thenReturn(0);
+when(membershipManager.groupId()).thenReturn(DEFAULT_GROUP_ID);
+if (instanceId)
+
when(membershipManager.groupInstanceId()).thenReturn(Optional.of(DEFAULT_GROUP_INSTANCE_ID));
+else
+
when(membershipManager.groupInstanceId()).thenReturn(Optional.empty());
+
when(membershipManager.currentAssignment()).thenReturn(LocalAssignment.NONE);
+
when(membershipManager.serverAssignor()).thenReturn(Optional.of("uniform"));
+when(membershipManager.state()).thenReturn(MemberState.UNSUBSCRIBED);

Review Comment:
   this is a bit confusing because it's not what happens in practice. A member 
that is about to join (send first HB) is in JOINING state, not unsubscribed. I 
guess you needed this because of the existing `testHeartbeatState` that uses 
this func and expects the state to be UNSUBSCRIBED. I would say that's what is 
actually wrong. I would suggest we remove the 
`assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());` on ln 560, 
and this expectation here too. (we're only asserting on something we set 
ourselves)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-17 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1681489990


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -447,7 +505,8 @@ private ConsumerGroupHeartbeatRequest 
getHeartbeatRequest(HeartbeatRequestManage
 @ParameterizedTest
 @MethodSource("errorProvider")
 public void testHeartbeatResponseOnErrorHandling(final Errors error, final 
boolean isFatal) {
-mockStableMember();
+if (isFatal)
+when(membershipManager.state()).thenReturn(MemberState.FATAL);

Review Comment:
   great, that's what I was expecting. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-15 Thread via GitHub


brenden20 commented on PR #16200:
URL: https://github.com/apache/kafka/pull/16200#issuecomment-2229328830

   @lianetm thank you for the review, I have taken your suggestions and 
implemented them. Let me know if there is anything else.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-15 Thread via GitHub


brenden20 commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1678342328


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -447,7 +505,8 @@ private ConsumerGroupHeartbeatRequest 
getHeartbeatRequest(HeartbeatRequestManage
 @ParameterizedTest
 @MethodSource("errorProvider")
 public void testHeartbeatResponseOnErrorHandling(final Errors error, final 
boolean isFatal) {
-mockStableMember();
+if (isFatal)
+when(membershipManager.state()).thenReturn(MemberState.FATAL);

Review Comment:
   Looks like the if statement was not necessary, the tests pass regardless. 
Removing the assert and stubbing still make the test work!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-15 Thread via GitHub


brenden20 commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1678333602


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -201,8 +202,42 @@ public void testSuccessfulHeartbeatTiming() {
 @ParameterizedTest
 @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT)
 public void 
testFirstHeartbeatIncludesRequiredInfoToJoinGroupAndGetAssignments(short 
version) {
-resetWithZeroHeartbeatInterval(Optional.of(DEFAULT_GROUP_INSTANCE_ID));
+membershipManager = new MembershipManagerImpl(
+DEFAULT_GROUP_ID,
+Optional.of(DEFAULT_GROUP_INSTANCE_ID),
+0,
+Optional.of(""),
+subscriptions,
+mock(CommitRequestManager.class),
+(ConsumerMetadata) metadata,
+logContext,
+Optional.of(mock(ClientTelemetryReporter.class)),
+backgroundEventHandler,
+time,
+new Metrics()
+);
+membershipManager.transitionToJoining();

Review Comment:
   I was able to get this working with this kind of approach, let me know what 
you think



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-15 Thread via GitHub


brenden20 commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1678232950


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -659,95 +749,50 @@ public void 
testPollTimerExpirationShouldNotMarkMemberStaleIfMemberAlreadyLeavin
 
 @Test
 public void testisExpiredByUsedForLogging() {
-Timer pollTimer = spy(time.timer(DEFAULT_MAX_POLL_INTERVAL_MS));
-heartbeatRequestManager = new HeartbeatRequestManager(new 
LogContext(), pollTimer, config(),
-coordinatorRequestManager, membershipManager, heartbeatState, 
heartbeatRequestState,
-backgroundEventHandler, metrics);
 when(membershipManager.shouldSkipHeartbeat()).thenReturn(false);
 
 int exceededTimeMs = 5;
 time.sleep(DEFAULT_MAX_POLL_INTERVAL_MS + exceededTimeMs);
 
+when(membershipManager.isLeavingGroup()).thenReturn(false);
 NetworkClientDelegate.PollResult pollResult = 
heartbeatRequestManager.poll(time.milliseconds());
 assertEquals(1, pollResult.unsentRequests.size());
 verify(membershipManager).transitionToSendingLeaveGroup(true);
 verify(pollTimer, never()).isExpiredBy();
-assertEquals(exceededTimeMs, pollTimer.isExpiredBy());
 
 clearInvocations(pollTimer);
 heartbeatRequestManager.resetPollTimer(time.milliseconds());
 verify(pollTimer).isExpiredBy();
 }
 
-@Test
-public void testHeartbeatMetrics() {

Review Comment:
   Yes, you are correct



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-11 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1674318250


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -447,7 +505,8 @@ private ConsumerGroupHeartbeatRequest 
getHeartbeatRequest(HeartbeatRequestManage
 @ParameterizedTest
 @MethodSource("errorProvider")
 public void testHeartbeatResponseOnErrorHandling(final Errors error, final 
boolean isFatal) {
-mockStableMember();
+if (isFatal)
+when(membershipManager.state()).thenReturn(MemberState.FATAL);

Review Comment:
   trying to understand why exactly we need this for, I ended up in the 
`ensureHeartbeatStopped` that asserts on the `membershipMgr` state to check 
it's fatal, was that the reason? I would say we should remove that assert on 
the mock that brings no value (and then probably this, if not required for 
anything else)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-11 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1674309494


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -425,6 +482,7 @@ topicId, mkSortedSet(0)
 assertEquals(Collections.singletonList(expectedTopicPartitions), 
heartbeatRequest1.data().topicPartitions());
 
 // Assignment did not change, so no assignment should be sent

Review Comment:
   nit: I noticed that the `getHeartbeatRequest` used here has a kind of copy 
of this comment, totally out of place (ln 497). Could you pls remove it?  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-11 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1674210447


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -201,8 +202,42 @@ public void testSuccessfulHeartbeatTiming() {
 @ParameterizedTest
 @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT)
 public void 
testFirstHeartbeatIncludesRequiredInfoToJoinGroupAndGetAssignments(short 
version) {
-resetWithZeroHeartbeatInterval(Optional.of(DEFAULT_GROUP_INSTANCE_ID));
+membershipManager = new MembershipManagerImpl(
+DEFAULT_GROUP_ID,
+Optional.of(DEFAULT_GROUP_INSTANCE_ID),
+0,
+Optional.of(""),
+subscriptions,
+mock(CommitRequestManager.class),
+(ConsumerMetadata) metadata,
+logContext,
+Optional.of(mock(ClientTelemetryReporter.class)),
+backgroundEventHandler,
+time,
+new Metrics()
+);
+membershipManager.transitionToJoining();

Review Comment:
   btw, if this suggestion works, please consider it for the other test on the 
heartbeat state, maybe we can simplify there too (`testHeartbeatState` and 
`testValidateConsumerGroupHeartbeatRequest`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-11 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1674210447


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -201,8 +202,42 @@ public void testSuccessfulHeartbeatTiming() {
 @ParameterizedTest
 @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT)
 public void 
testFirstHeartbeatIncludesRequiredInfoToJoinGroupAndGetAssignments(short 
version) {
-resetWithZeroHeartbeatInterval(Optional.of(DEFAULT_GROUP_INSTANCE_ID));
+membershipManager = new MembershipManagerImpl(
+DEFAULT_GROUP_ID,
+Optional.of(DEFAULT_GROUP_INSTANCE_ID),
+0,
+Optional.of(""),
+subscriptions,
+mock(CommitRequestManager.class),
+(ConsumerMetadata) metadata,
+logContext,
+Optional.of(mock(ClientTelemetryReporter.class)),
+backgroundEventHandler,
+time,
+new Metrics()
+);
+membershipManager.transitionToJoining();

Review Comment:
   btw, if this suggestion works, please consider it for the other test on the 
heartbeat state, maybe we can simplify there too (`testHeartbeatState`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-11 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1674207651


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -201,8 +202,42 @@ public void testSuccessfulHeartbeatTiming() {
 @ParameterizedTest
 @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT)
 public void 
testFirstHeartbeatIncludesRequiredInfoToJoinGroupAndGetAssignments(short 
version) {
-resetWithZeroHeartbeatInterval(Optional.of(DEFAULT_GROUP_INSTANCE_ID));
+membershipManager = new MembershipManagerImpl(
+DEFAULT_GROUP_ID,
+Optional.of(DEFAULT_GROUP_INSTANCE_ID),
+0,
+Optional.of(""),
+subscriptions,
+mock(CommitRequestManager.class),
+(ConsumerMetadata) metadata,
+logContext,
+Optional.of(mock(ClientTelemetryReporter.class)),
+backgroundEventHandler,
+time,
+new Metrics()
+);
+membershipManager.transitionToJoining();

Review Comment:
   actually I wonder if we really need an instance of the `membershipMgr` here 
(and all the explicit calls on it). Since we're testing how the HBMgr generates 
the data, we maybe only need to mock the `membershipMgr` funcs the HBMgr relies 
on, maybe something like this, and removing the membershipMgr instance and all 
the calls on it, that were only trying to get to the state where these values 
would be returned:
   ```
   private void mockJoiningMemberData() {
   when(membershipManager.memberId()).thenReturn("");
   when(membershipManager.memberEpoch()).thenReturn(0);
   when(membershipManager.groupId()).thenReturn(DEFAULT_GROUP_ID);
   
when(membershipManager.groupInstanceId()).thenReturn(Optional.of(DEFAULT_GROUP_INSTANCE_ID));
   
when(membershipManager.currentAssignment()).thenReturn(LocalAssignment.NONE);
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-11 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1674174436


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -659,95 +749,50 @@ public void 
testPollTimerExpirationShouldNotMarkMemberStaleIfMemberAlreadyLeavin
 
 @Test
 public void testisExpiredByUsedForLogging() {
-Timer pollTimer = spy(time.timer(DEFAULT_MAX_POLL_INTERVAL_MS));
-heartbeatRequestManager = new HeartbeatRequestManager(new 
LogContext(), pollTimer, config(),
-coordinatorRequestManager, membershipManager, heartbeatState, 
heartbeatRequestState,
-backgroundEventHandler, metrics);
 when(membershipManager.shouldSkipHeartbeat()).thenReturn(false);
 
 int exceededTimeMs = 5;
 time.sleep(DEFAULT_MAX_POLL_INTERVAL_MS + exceededTimeMs);
 
+when(membershipManager.isLeavingGroup()).thenReturn(false);
 NetworkClientDelegate.PollResult pollResult = 
heartbeatRequestManager.poll(time.milliseconds());
 assertEquals(1, pollResult.unsentRequests.size());
 verify(membershipManager).transitionToSendingLeaveGroup(true);
 verify(pollTimer, never()).isExpiredBy();
-assertEquals(exceededTimeMs, pollTimer.isExpiredBy());
 
 clearInvocations(pollTimer);
 heartbeatRequestManager.resetPollTimer(time.milliseconds());
 verify(pollTimer).isExpiredBy();
 }
 
-@Test
-public void testHeartbeatMetrics() {

Review Comment:
   just to double check, we're removing this because it's covered in the 
`HeartbeatMetricsManangerTest` right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-11 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1674165119


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -756,18 +801,19 @@ public void 
testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() {
 @ParameterizedTest
 @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT)
 public void testSendingLeaveGroupHeartbeatWhenPreviousOneInFlight(final 
short version) {
-mockStableMember();
 time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
 NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
 assertEquals(1, result.unsentRequests.size());
 result = heartbeatRequestManager.poll(time.milliseconds());
 assertEquals(0, result.unsentRequests.size(), "No heartbeat should be 
sent while a previous one is in-flight");
 
-membershipManager.leaveGroup();
-
+when(membershipManager.state()).thenReturn(MemberState.LEAVING);
+when(heartbeatState.buildRequestData()).thenReturn(new 
ConsumerGroupHeartbeatRequestData().setMemberEpoch(-1));
 ConsumerGroupHeartbeatRequest heartbeatToLeave = 
getHeartbeatRequest(heartbeatRequestManager, version);
 assertEquals(ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH, 
heartbeatToLeave.data().memberEpoch());
 
+
//when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());

Review Comment:
   let's remove this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-11 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1674150218


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -251,23 +304,25 @@ public void testSkippingHeartbeat(final boolean 
shouldSkipHeartbeat) {
 
 @Test
 public void testTimerNotDue() {
-mockStableMember();
 time.sleep(100); // time elapsed < heartbeatInterval, no heartbeat 
should be sent
 NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
 assertEquals(0, result.unsentRequests.size());
 assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, 
result.timeUntilNextPollMs);
+
+when(pollTimer.remainingMs()).thenReturn(1800L);
 assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, 
heartbeatRequestManager.maximumTimeToWait(time.milliseconds()));
 
 // Member in state where it should not send Heartbeat anymore
 when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true);
+when(membershipManager.shouldSkipHeartbeat()).thenReturn(true);
 membershipManager.transitionToFatal();

Review Comment:
   uhm that's weird. The only check that should happen there is the 
`shouldSkipHeartbeat`, and we're mocking it. We're doing the same in other 
tests (ex. `testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin`), 
where just mocking the `shouldSkipHeartbeat` leads to no HB as expected (no 
actual transition). Could you please double check, trying to remove just ln 298 
again?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-11 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1674142299


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -201,8 +202,42 @@ public void testSuccessfulHeartbeatTiming() {
 @ParameterizedTest
 @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT)
 public void 
testFirstHeartbeatIncludesRequiredInfoToJoinGroupAndGetAssignments(short 
version) {
-resetWithZeroHeartbeatInterval(Optional.of(DEFAULT_GROUP_INSTANCE_ID));
+membershipManager = new MembershipManagerImpl(
+DEFAULT_GROUP_ID,
+Optional.of(DEFAULT_GROUP_INSTANCE_ID),
+0,
+Optional.of(""),
+subscriptions,
+mock(CommitRequestManager.class),
+(ConsumerMetadata) metadata,
+logContext,
+Optional.of(mock(ClientTelemetryReporter.class)),
+backgroundEventHandler,
+time,
+new Metrics()
+);
+membershipManager.transitionToJoining();
+
+heartbeatState = new HeartbeatState(
+subscriptions,
+membershipManager,
+DEFAULT_MAX_POLL_INTERVAL_MS
+);
+
+heartbeatRequestManager = createHeartbeatRequestManager(
+coordinatorRequestManager,
+membershipManager,
+heartbeatState,
+heartbeatRequestState,
+backgroundEventHandler
+);
+createHeartbeatStateWithZeroHeartbeatInterval();
+time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
 String topic = "topic1";
+// Make a singleton set
+HashSet set = new HashSet<>();
+set.add(topic);

Review Comment:
   indeed, let's make a `Collections.singleton` set (and remove the comment and 
reuse the var on ln 241 too)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-11 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1674138238


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -201,8 +202,42 @@ public void testSuccessfulHeartbeatTiming() {
 @ParameterizedTest
 @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT)
 public void 
testFirstHeartbeatIncludesRequiredInfoToJoinGroupAndGetAssignments(short 
version) {
-resetWithZeroHeartbeatInterval(Optional.of(DEFAULT_GROUP_INSTANCE_ID));
+membershipManager = new MembershipManagerImpl(
+DEFAULT_GROUP_ID,
+Optional.of(DEFAULT_GROUP_INSTANCE_ID),
+0,
+Optional.of(""),

Review Comment:
   won't affect the test, but just for the sake of correctness, if we don't 
want an assignor we should pass `Optional.empty()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-08 Thread via GitHub


brenden20 commented on PR #16200:
URL: https://github.com/apache/kafka/pull/16200#issuecomment-2215943716

   @lianetm @philipnee thank you both for the feedback! I have addressed all 
comments and left some replies as well, let me know what you think, made some 
pretty major improvements over the last iteration.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-08 Thread via GitHub


brenden20 commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1669499524


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -251,23 +304,25 @@ public void testSkippingHeartbeat(final boolean 
shouldSkipHeartbeat) {
 
 @Test
 public void testTimerNotDue() {
-mockStableMember();
 time.sleep(100); // time elapsed < heartbeatInterval, no heartbeat 
should be sent
 NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
 assertEquals(0, result.unsentRequests.size());
 assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, 
result.timeUntilNextPollMs);
+
+when(pollTimer.remainingMs()).thenReturn(1800L);

Review Comment:
   That line is now removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-08 Thread via GitHub


brenden20 commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1669499273


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -349,6 +408,10 @@ public void testNoCoordinator() {
 
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
 NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
 
+when(pollTimer.isExpired()).thenReturn(false);
+when(pollTimer.remainingMs()).thenReturn(Long.MAX_VALUE);
+
when(heartbeatRequestState.timeToNextHeartbeatMs(time.milliseconds())).thenReturn(1000L);

Review Comment:
   These are now removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-08 Thread via GitHub


brenden20 commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1669498721


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -898,6 +967,6 @@ private HeartbeatRequestManager 
createHeartbeatRequestManager(
 heartbeatState,
 heartbeatRequestState,
 backgroundEventHandler,
-metrics);
+new Metrics());

Review Comment:
   Yes, but if you use the same metrics for different objects it breaks 
everything



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-08 Thread via GitHub


brenden20 commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1669497625


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -756,18 +846,18 @@ public void 
testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() {
 @ParameterizedTest
 @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT)
 public void testSendingLeaveGroupHeartbeatWhenPreviousOneInFlight(final 
short version) {
-mockStableMember();
 time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
 NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
 assertEquals(1, result.unsentRequests.size());
 result = heartbeatRequestManager.poll(time.milliseconds());
 assertEquals(0, result.unsentRequests.size(), "No heartbeat should be 
sent while a previous one is in-flight");
 
-membershipManager.leaveGroup();
-
+when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(true);
+when(heartbeatState.buildRequestData()).thenReturn(new 
ConsumerGroupHeartbeatRequestData().setMemberEpoch(-1));
 ConsumerGroupHeartbeatRequest heartbeatToLeave = 
getHeartbeatRequest(heartbeatRequestManager, version);
 assertEquals(ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH, 
heartbeatToLeave.data().memberEpoch());
 
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());

Review Comment:
   Yes I see, I appreciate the explanations on these as well, it definitely 
gives me a better understanding of how everything works together. Changed now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-08 Thread via GitHub


brenden20 commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1669495115


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -756,18 +846,18 @@ public void 
testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() {
 @ParameterizedTest
 @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT)
 public void testSendingLeaveGroupHeartbeatWhenPreviousOneInFlight(final 
short version) {
-mockStableMember();
 time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
 NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
 assertEquals(1, result.unsentRequests.size());
 result = heartbeatRequestManager.poll(time.milliseconds());
 assertEquals(0, result.unsentRequests.size(), "No heartbeat should be 
sent while a previous one is in-flight");
 
-membershipManager.leaveGroup();
-
+when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(true);

Review Comment:
   You're right, replacing that now, thank you



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-08 Thread via GitHub


brenden20 commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1669465217


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -312,31 +369,33 @@ public void testHeartbeatOutsideInterval() {
 @Test
 public void testNetworkTimeout() {
 // The initial heartbeatInterval is set to 0
-resetWithZeroHeartbeatInterval(Optional.empty());
-mockStableMember();
+time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
 
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new 
Node(1, "localhost", )));
+when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(true);
 NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
 assertEquals(1, result.unsentRequests.size());
 // Mimic network timeout
 result.unsentRequests.get(0).handler().onFailure(time.milliseconds(), 
new TimeoutException("timeout"));
 
+time.sleep(1);

Review Comment:
   Yeah, removing it changes nothing with the test, removing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-08 Thread via GitHub


brenden20 commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1669460104


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -251,23 +304,25 @@ public void testSkippingHeartbeat(final boolean 
shouldSkipHeartbeat) {
 
 @Test
 public void testTimerNotDue() {
-mockStableMember();
 time.sleep(100); // time elapsed < heartbeatInterval, no heartbeat 
should be sent
 NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
 assertEquals(0, result.unsentRequests.size());
 assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, 
result.timeUntilNextPollMs);
+
+when(pollTimer.remainingMs()).thenReturn(1800L);
 assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, 
heartbeatRequestManager.maximumTimeToWait(time.milliseconds()));
 
 // Member in state where it should not send Heartbeat anymore
 when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true);
+when(membershipManager.shouldSkipHeartbeat()).thenReturn(true);
 membershipManager.transitionToFatal();

Review Comment:
   It seems that I do actually need it, I tried removing it and found that 
```shouldSkipHeartbeat``` is not working properly without it. Even stubbing the 
method to return ```MemberState.FATAL``` does not work for some reason. This 
seems to be the only way to return an empty poll result



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-08 Thread via GitHub


brenden20 commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1669445851


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -251,23 +304,25 @@ public void testSkippingHeartbeat(final boolean 
shouldSkipHeartbeat) {
 
 @Test
 public void testTimerNotDue() {
-mockStableMember();
 time.sleep(100); // time elapsed < heartbeatInterval, no heartbeat 
should be sent
 NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
 assertEquals(0, result.unsentRequests.size());
 assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, 
result.timeUntilNextPollMs);
+
+when(pollTimer.remainingMs()).thenReturn(1800L);
 assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, 
heartbeatRequestManager.maximumTimeToWait(time.milliseconds()));
 
 // Member in state where it should not send Heartbeat anymore
 when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true);

Review Comment:
   I can do this if I turn ```pollTimer``` into a spy, I tried that out and I 
think we should keep it. It makes the tests more strict on timing, since a mock 
would have to be stubbed to get the desired behavior. It also allows us to keep 
our ```verify()``` invocations on ```pollTimer```. Let me know what you think



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-08 Thread via GitHub


brenden20 commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1669445851


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -251,23 +304,25 @@ public void testSkippingHeartbeat(final boolean 
shouldSkipHeartbeat) {
 
 @Test
 public void testTimerNotDue() {
-mockStableMember();
 time.sleep(100); // time elapsed < heartbeatInterval, no heartbeat 
should be sent
 NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
 assertEquals(0, result.unsentRequests.size());
 assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, 
result.timeUntilNextPollMs);
+
+when(pollTimer.remainingMs()).thenReturn(1800L);
 assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, 
heartbeatRequestManager.maximumTimeToWait(time.milliseconds()));
 
 // Member in state where it should not send Heartbeat anymore
 when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true);

Review Comment:
   I can do this if I turn ```pollTimer``` into a spy, I tried that out and I 
think we should keep it. It makes the tests more strict on timing, since a mock 
would have to be stubbed to get the desired behavior. Let me know what you think



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-08 Thread via GitHub


brenden20 commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1669428550


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -251,23 +304,25 @@ public void testSkippingHeartbeat(final boolean 
shouldSkipHeartbeat) {
 
 @Test
 public void testTimerNotDue() {
-mockStableMember();
 time.sleep(100); // time elapsed < heartbeatInterval, no heartbeat 
should be sent
 NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
 assertEquals(0, result.unsentRequests.size());
 assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, 
result.timeUntilNextPollMs);
+
+when(pollTimer.remainingMs()).thenReturn(1800L);
 assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, 
heartbeatRequestManager.maximumTimeToWait(time.milliseconds()));
 
 // Member in state where it should not send Heartbeat anymore
 when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true);

Review Comment:
   It is actually needed for this test. ```maximumTimeToWait()``` will always 
return 0 if ```pollTimer``` is a mock. Just did some testing turning 
```pollTimer``` from a mock to a spy and it worked out pretty good. All tests 
are working, I think it may be beneficial to keep ```pollTimer``` a spy, since 
I notice in many places we ```verify``` on ```pollTimer```. Let me know what 
you think, my changes are pushed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-08 Thread via GitHub


brenden20 commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1669428550


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -251,23 +304,25 @@ public void testSkippingHeartbeat(final boolean 
shouldSkipHeartbeat) {
 
 @Test
 public void testTimerNotDue() {
-mockStableMember();
 time.sleep(100); // time elapsed < heartbeatInterval, no heartbeat 
should be sent
 NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
 assertEquals(0, result.unsentRequests.size());
 assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, 
result.timeUntilNextPollMs);
+
+when(pollTimer.remainingMs()).thenReturn(1800L);
 assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, 
heartbeatRequestManager.maximumTimeToWait(time.milliseconds()));
 
 // Member in state where it should not send Heartbeat anymore
 when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true);

Review Comment:
   It is actually needed for this test. ```maximumTimeToWait()``` will always 
return 0 if ```pollTimer``` is a mock. Just did some testing turning 
```pollTimer``` from a mock to a spy and it worked out pretty good. All tests 
are working, I think it may be beneficial to keep ```pollTimer``` a spy, since 
I notice in many places we ```verify``` on ```pollTimer```. Let me know what 
you think, my changes are pushed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-08 Thread via GitHub


brenden20 commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1669143238


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -106,72 +102,90 @@ public class HeartbeatRequestManagerTest {
 private MembershipManager membershipManager;
 private HeartbeatRequestManager.HeartbeatRequestState 
heartbeatRequestState;
 private HeartbeatRequestManager.HeartbeatState heartbeatState;
-private final String memberId = "member-id";
-private final int memberEpoch = 1;
 private BackgroundEventHandler backgroundEventHandler;
-private Metrics metrics;
+private LogContext logContext;
 
 @BeforeEach
 public void setUp() {
-setUp(ConsumerTestBuilder.createDefaultGroupInformation());
-}
+this.time = new MockTime();
+this.logContext = new LogContext();
+this.pollTimer = mock(Timer.class);
+this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+this.heartbeatState = mock(HeartbeatState.class);
+this.backgroundEventHandler = mock(BackgroundEventHandler.class);
+this.subscriptions = mock(SubscriptionState.class);
+this.membershipManager = mock(MembershipManagerImpl.class);
+this.metadata = mock(ConsumerMetadata.class);
+Metrics metrics = new Metrics(time);
+ConsumerConfig config = mock(ConsumerConfig.class);
+
+this.heartbeatRequestState = spy(new HeartbeatRequestState(
+logContext,
+time,
+DEFAULT_HEARTBEAT_INTERVAL_MS,
+DEFAULT_RETRY_BACKOFF_MS,
+DEFAULT_RETRY_BACKOFF_MAX_MS,
+DEFAULT_HEARTBEAT_JITTER_MS));
 
-private void setUp(Optional 
groupInfo) {
-testBuilder = new ConsumerTestBuilder(groupInfo, true, false);
-time = testBuilder.time;
-coordinatorRequestManager = 
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
-heartbeatRequestManager = 
testBuilder.heartbeatRequestManager.orElseThrow(IllegalStateException::new);
-heartbeatRequestState = 
testBuilder.heartbeatRequestState.orElseThrow(IllegalStateException::new);
-heartbeatState = 
testBuilder.heartbeatState.orElseThrow(IllegalStateException::new);
-backgroundEventHandler = testBuilder.backgroundEventHandler;
-subscriptions = testBuilder.subscriptions;
-membershipManager = 
testBuilder.membershipManager.orElseThrow(IllegalStateException::new);
-metadata = testBuilder.metadata;
-metrics = new Metrics(time);
+this.heartbeatRequestManager = new HeartbeatRequestManager(
+logContext,
+pollTimer,
+config,
+coordinatorRequestManager,
+membershipManager,
+heartbeatState,
+heartbeatRequestState,
+backgroundEventHandler,
+metrics);
 
 
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new 
Node(1, "localhost", )));
+Map> map = new HashMap<>();
+LocalAssignment local = new LocalAssignment(0, map);
+when(membershipManager.currentAssignment()).thenReturn(local);
 }
 
-private void resetWithZeroHeartbeatInterval(Optional 
groupInstanceId) {
-cleanup();
-
-ConsumerTestBuilder.GroupInformation gi = new 
ConsumerTestBuilder.GroupInformation(
-DEFAULT_GROUP_ID,
-groupInstanceId,
+private void createHeartbeatStateWith0HeartbeatInterval() {
+this.heartbeatRequestState = spy(new HeartbeatRequestState(
+logContext,
+time,
 0,
-0.0,
-Optional.of(DEFAULT_REMOTE_ASSIGNOR)
-);
-
-setUp(Optional.of(gi));
-}
+DEFAULT_RETRY_BACKOFF_MS,
+DEFAULT_RETRY_BACKOFF_MAX_MS,
+DEFAULT_HEARTBEAT_JITTER_MS));
 
-@AfterEach
-public void cleanup() {
-if (testBuilder != null) {
-testBuilder.close();
-}
+heartbeatRequestManager = createHeartbeatRequestManager(
+coordinatorRequestManager,
+membershipManager,
+heartbeatState,
+heartbeatRequestState,
+backgroundEventHandler);
 }
 
 @Test
 public void testHeartbeatOnStartup() {
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
 NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
 assertEquals(0, result.unsentRequests.size());
 
-resetWithZeroHeartbeatInterval(Optional.empty());
-mockStableMember();
+time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
 assertEquals(0, 
heartbeatRequestManager.maximumTimeToWait(time.mil

Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-08 Thread via GitHub


brenden20 commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1669139534


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -106,72 +102,90 @@ public class HeartbeatRequestManagerTest {
 private MembershipManager membershipManager;
 private HeartbeatRequestManager.HeartbeatRequestState 
heartbeatRequestState;
 private HeartbeatRequestManager.HeartbeatState heartbeatState;
-private final String memberId = "member-id";
-private final int memberEpoch = 1;
 private BackgroundEventHandler backgroundEventHandler;
-private Metrics metrics;
+private LogContext logContext;
 
 @BeforeEach
 public void setUp() {
-setUp(ConsumerTestBuilder.createDefaultGroupInformation());
-}
+this.time = new MockTime();
+this.logContext = new LogContext();
+this.pollTimer = mock(Timer.class);
+this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+this.heartbeatState = mock(HeartbeatState.class);
+this.backgroundEventHandler = mock(BackgroundEventHandler.class);
+this.subscriptions = mock(SubscriptionState.class);
+this.membershipManager = mock(MembershipManagerImpl.class);
+this.metadata = mock(ConsumerMetadata.class);
+Metrics metrics = new Metrics(time);
+ConsumerConfig config = mock(ConsumerConfig.class);
+
+this.heartbeatRequestState = spy(new HeartbeatRequestState(
+logContext,
+time,
+DEFAULT_HEARTBEAT_INTERVAL_MS,
+DEFAULT_RETRY_BACKOFF_MS,
+DEFAULT_RETRY_BACKOFF_MAX_MS,
+DEFAULT_HEARTBEAT_JITTER_MS));
 
-private void setUp(Optional 
groupInfo) {
-testBuilder = new ConsumerTestBuilder(groupInfo, true, false);
-time = testBuilder.time;
-coordinatorRequestManager = 
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
-heartbeatRequestManager = 
testBuilder.heartbeatRequestManager.orElseThrow(IllegalStateException::new);
-heartbeatRequestState = 
testBuilder.heartbeatRequestState.orElseThrow(IllegalStateException::new);
-heartbeatState = 
testBuilder.heartbeatState.orElseThrow(IllegalStateException::new);
-backgroundEventHandler = testBuilder.backgroundEventHandler;
-subscriptions = testBuilder.subscriptions;
-membershipManager = 
testBuilder.membershipManager.orElseThrow(IllegalStateException::new);
-metadata = testBuilder.metadata;
-metrics = new Metrics(time);
+this.heartbeatRequestManager = new HeartbeatRequestManager(
+logContext,
+pollTimer,
+config,
+coordinatorRequestManager,
+membershipManager,
+heartbeatState,
+heartbeatRequestState,
+backgroundEventHandler,
+metrics);
 
 
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new 
Node(1, "localhost", )));
+Map> map = new HashMap<>();
+LocalAssignment local = new LocalAssignment(0, map);
+when(membershipManager.currentAssignment()).thenReturn(local);
 }
 
-private void resetWithZeroHeartbeatInterval(Optional 
groupInstanceId) {
-cleanup();
-
-ConsumerTestBuilder.GroupInformation gi = new 
ConsumerTestBuilder.GroupInformation(
-DEFAULT_GROUP_ID,
-groupInstanceId,
+private void createHeartbeatStateWith0HeartbeatInterval() {
+this.heartbeatRequestState = spy(new HeartbeatRequestState(
+logContext,
+time,
 0,
-0.0,
-Optional.of(DEFAULT_REMOTE_ASSIGNOR)
-);
-
-setUp(Optional.of(gi));
-}
+DEFAULT_RETRY_BACKOFF_MS,
+DEFAULT_RETRY_BACKOFF_MAX_MS,
+DEFAULT_HEARTBEAT_JITTER_MS));
 
-@AfterEach
-public void cleanup() {
-if (testBuilder != null) {
-testBuilder.close();
-}
+heartbeatRequestManager = createHeartbeatRequestManager(
+coordinatorRequestManager,
+membershipManager,
+heartbeatState,
+heartbeatRequestState,
+backgroundEventHandler);
 }
 
 @Test
 public void testHeartbeatOnStartup() {
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());

Review Comment:
   I have made the change you suggested, let me know what you think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure 

Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-08 Thread via GitHub


brenden20 commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1669122743


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -106,72 +102,90 @@ public class HeartbeatRequestManagerTest {
 private MembershipManager membershipManager;
 private HeartbeatRequestManager.HeartbeatRequestState 
heartbeatRequestState;
 private HeartbeatRequestManager.HeartbeatState heartbeatState;
-private final String memberId = "member-id";
-private final int memberEpoch = 1;
 private BackgroundEventHandler backgroundEventHandler;
-private Metrics metrics;
+private LogContext logContext;
 
 @BeforeEach
 public void setUp() {
-setUp(ConsumerTestBuilder.createDefaultGroupInformation());
-}
+this.time = new MockTime();
+this.logContext = new LogContext();
+this.pollTimer = mock(Timer.class);
+this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+this.heartbeatState = mock(HeartbeatState.class);
+this.backgroundEventHandler = mock(BackgroundEventHandler.class);
+this.subscriptions = mock(SubscriptionState.class);
+this.membershipManager = mock(MembershipManagerImpl.class);
+this.metadata = mock(ConsumerMetadata.class);
+Metrics metrics = new Metrics(time);
+ConsumerConfig config = mock(ConsumerConfig.class);
+
+this.heartbeatRequestState = spy(new HeartbeatRequestState(
+logContext,
+time,
+DEFAULT_HEARTBEAT_INTERVAL_MS,
+DEFAULT_RETRY_BACKOFF_MS,
+DEFAULT_RETRY_BACKOFF_MAX_MS,
+DEFAULT_HEARTBEAT_JITTER_MS));
 
-private void setUp(Optional 
groupInfo) {
-testBuilder = new ConsumerTestBuilder(groupInfo, true, false);
-time = testBuilder.time;
-coordinatorRequestManager = 
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
-heartbeatRequestManager = 
testBuilder.heartbeatRequestManager.orElseThrow(IllegalStateException::new);
-heartbeatRequestState = 
testBuilder.heartbeatRequestState.orElseThrow(IllegalStateException::new);
-heartbeatState = 
testBuilder.heartbeatState.orElseThrow(IllegalStateException::new);
-backgroundEventHandler = testBuilder.backgroundEventHandler;
-subscriptions = testBuilder.subscriptions;
-membershipManager = 
testBuilder.membershipManager.orElseThrow(IllegalStateException::new);
-metadata = testBuilder.metadata;
-metrics = new Metrics(time);
+this.heartbeatRequestManager = new HeartbeatRequestManager(
+logContext,
+pollTimer,
+config,
+coordinatorRequestManager,
+membershipManager,
+heartbeatState,
+heartbeatRequestState,
+backgroundEventHandler,
+metrics);
 
 
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new 
Node(1, "localhost", )));
+Map> map = new HashMap<>();
+LocalAssignment local = new LocalAssignment(0, map);
+when(membershipManager.currentAssignment()).thenReturn(local);
 }
 
-private void resetWithZeroHeartbeatInterval(Optional 
groupInstanceId) {
-cleanup();
-
-ConsumerTestBuilder.GroupInformation gi = new 
ConsumerTestBuilder.GroupInformation(
-DEFAULT_GROUP_ID,
-groupInstanceId,
+private void createHeartbeatStateWith0HeartbeatInterval() {
+this.heartbeatRequestState = spy(new HeartbeatRequestState(
+logContext,
+time,
 0,
-0.0,
-Optional.of(DEFAULT_REMOTE_ASSIGNOR)
-);
-
-setUp(Optional.of(gi));
-}
+DEFAULT_RETRY_BACKOFF_MS,
+DEFAULT_RETRY_BACKOFF_MAX_MS,
+DEFAULT_HEARTBEAT_JITTER_MS));
 
-@AfterEach
-public void cleanup() {
-if (testBuilder != null) {
-testBuilder.close();
-}
+heartbeatRequestManager = createHeartbeatRequestManager(
+coordinatorRequestManager,
+membershipManager,
+heartbeatState,
+heartbeatRequestState,
+backgroundEventHandler);
 }
 
 @Test
 public void testHeartbeatOnStartup() {
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
 NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
 assertEquals(0, result.unsentRequests.size());
 
-resetWithZeroHeartbeatInterval(Optional.empty());
-mockStableMember();
+time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
 assertEquals(0, 
heartbeatRequestManager.maximumTimeToWait(time.mil

Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-08 Thread via GitHub


brenden20 commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1669112935


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -106,72 +102,90 @@ public class HeartbeatRequestManagerTest {
 private MembershipManager membershipManager;
 private HeartbeatRequestManager.HeartbeatRequestState 
heartbeatRequestState;
 private HeartbeatRequestManager.HeartbeatState heartbeatState;
-private final String memberId = "member-id";
-private final int memberEpoch = 1;
 private BackgroundEventHandler backgroundEventHandler;
-private Metrics metrics;
+private LogContext logContext;
 
 @BeforeEach
 public void setUp() {
-setUp(ConsumerTestBuilder.createDefaultGroupInformation());
-}
+this.time = new MockTime();
+this.logContext = new LogContext();
+this.pollTimer = mock(Timer.class);
+this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+this.heartbeatState = mock(HeartbeatState.class);
+this.backgroundEventHandler = mock(BackgroundEventHandler.class);
+this.subscriptions = mock(SubscriptionState.class);
+this.membershipManager = mock(MembershipManagerImpl.class);
+this.metadata = mock(ConsumerMetadata.class);
+Metrics metrics = new Metrics(time);
+ConsumerConfig config = mock(ConsumerConfig.class);
+
+this.heartbeatRequestState = spy(new HeartbeatRequestState(
+logContext,
+time,
+DEFAULT_HEARTBEAT_INTERVAL_MS,
+DEFAULT_RETRY_BACKOFF_MS,
+DEFAULT_RETRY_BACKOFF_MAX_MS,
+DEFAULT_HEARTBEAT_JITTER_MS));
 
-private void setUp(Optional 
groupInfo) {
-testBuilder = new ConsumerTestBuilder(groupInfo, true, false);
-time = testBuilder.time;
-coordinatorRequestManager = 
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
-heartbeatRequestManager = 
testBuilder.heartbeatRequestManager.orElseThrow(IllegalStateException::new);
-heartbeatRequestState = 
testBuilder.heartbeatRequestState.orElseThrow(IllegalStateException::new);
-heartbeatState = 
testBuilder.heartbeatState.orElseThrow(IllegalStateException::new);
-backgroundEventHandler = testBuilder.backgroundEventHandler;
-subscriptions = testBuilder.subscriptions;
-membershipManager = 
testBuilder.membershipManager.orElseThrow(IllegalStateException::new);
-metadata = testBuilder.metadata;
-metrics = new Metrics(time);
+this.heartbeatRequestManager = new HeartbeatRequestManager(
+logContext,
+pollTimer,
+config,
+coordinatorRequestManager,
+membershipManager,
+heartbeatState,
+heartbeatRequestState,
+backgroundEventHandler,
+metrics);
 
 
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new 
Node(1, "localhost", )));
+Map> map = new HashMap<>();
+LocalAssignment local = new LocalAssignment(0, map);
+when(membershipManager.currentAssignment()).thenReturn(local);

Review Comment:
   Removing it changed nothing, good catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-08 Thread via GitHub


lianetm commented on PR #16200:
URL: https://github.com/apache/kafka/pull/16200#issuecomment-2214850627

   Hey @brenden20 , thanks for the updates! I completed another pass, left some 
comments. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-08 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1669063349


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -756,18 +846,18 @@ public void 
testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() {
 @ParameterizedTest
 @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT)
 public void testSendingLeaveGroupHeartbeatWhenPreviousOneInFlight(final 
short version) {
-mockStableMember();
 time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
 NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
 assertEquals(1, result.unsentRequests.size());
 result = heartbeatRequestManager.poll(time.milliseconds());
 assertEquals(0, result.unsentRequests.size(), "No heartbeat should be 
sent while a previous one is in-flight");
 
-membershipManager.leaveGroup();
-
+when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(true);
+when(heartbeatState.buildRequestData()).thenReturn(new 
ConsumerGroupHeartbeatRequestData().setMemberEpoch(-1));
 ConsumerGroupHeartbeatRequest heartbeatToLeave = 
getHeartbeatRequest(heartbeatRequestManager, version);
 assertEquals(ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH, 
heartbeatToLeave.data().memberEpoch());
 
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());

Review Comment:
   similar to other comments above, this will achieve the result we want (no 
HB), but not for the reason we wanted to test. The intention of the test was to 
asssert that polling the HB mananager after the request to leave has been sent 
will generate no request (and that's because the member transitions to 
UNSUBSCRIBED on `onHeartbeatRequestSent`.  So probably here we should only set 
the expectation that shouldSkipHeartbeat returns true, which is what the HB mgr 
checks, and gets when the member is UNSUBSCRIBED. Makes sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-08 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1669059780


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -756,18 +846,18 @@ public void 
testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() {
 @ParameterizedTest
 @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT)
 public void testSendingLeaveGroupHeartbeatWhenPreviousOneInFlight(final 
short version) {
-mockStableMember();
 time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
 NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
 assertEquals(1, result.unsentRequests.size());
 result = heartbeatRequestManager.poll(time.milliseconds());
 assertEquals(0, result.unsentRequests.size(), "No heartbeat should be 
sent while a previous one is in-flight");
 
-membershipManager.leaveGroup();
-
+when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(true);

Review Comment:
   we should `when(membershipManager.state()).thenReturn(MemberState.LEAVING)` 
instead I would say, to be true to the real flow. When a member is leaving the 
HB is generated because of the state it's in (and it also considers the canSend 
request, but with the state as a spy do we really need to set this expectation?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-08 Thread via GitHub


philipnee commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1669013977


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -898,6 +967,6 @@ private HeartbeatRequestManager 
createHeartbeatRequestManager(
 heartbeatState,
 heartbeatRequestState,
 backgroundEventHandler,
-metrics);
+new Metrics());

Review Comment:
   isn't metrics already initialized in the ctor?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-08 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1668910020


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -357,31 +420,62 @@ public void testNoCoordinator() {
 @ParameterizedTest
 @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT)
 public void testValidateConsumerGroupHeartbeatRequest(final short version) 
{
+membershipManager = new MembershipManagerImpl(
+DEFAULT_GROUP_ID,
+Optional.of(DEFAULT_GROUP_INSTANCE_ID),
+0,
+Optional.of("uniform"),
+subscriptions,
+mock(CommitRequestManager.class),
+(ConsumerMetadata) metadata,
+logContext,
+Optional.of(mock(ClientTelemetryReporter.class)),
+backgroundEventHandler,
+time,
+new Metrics()
+);
+membershipManager.transitionToJoining();
+
+heartbeatState = new HeartbeatState(
+subscriptions,
+membershipManager,
+DEFAULT_MAX_POLL_INTERVAL_MS
+);
+
+heartbeatRequestManager = createHeartbeatRequestManager(
+coordinatorRequestManager,
+membershipManager,
+heartbeatState,
+heartbeatRequestState,
+backgroundEventHandler
+);
+
 // The initial heartbeatInterval is set to 0, but we're testing
-resetWithZeroHeartbeatInterval(Optional.of(DEFAULT_GROUP_INSTANCE_ID));
-mockStableMember();
+time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
 
 List subscribedTopics = Collections.singletonList("topic");
-subscriptions.subscribe(new HashSet<>(subscribedTopics), 
Optional.empty());
+
when(subscriptions.subscription()).thenReturn(Collections.singleton("topic"));

Review Comment:
   should we reference the same `subscribedTopics` var in the `thenReturn` as 
before? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-08 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1668904111


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -349,6 +408,10 @@ public void testNoCoordinator() {
 
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
 NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
 
+when(pollTimer.isExpired()).thenReturn(false);
+when(pollTimer.remainingMs()).thenReturn(Long.MAX_VALUE);
+
when(heartbeatRequestState.timeToNextHeartbeatMs(time.milliseconds())).thenReturn(1000L);

Review Comment:
   needed or wanted for some reason? again wondering because since the state is 
a spy on the internal component, I would expect to have the real implementation 
kicking in, based on the interval defined for the HBMgr.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-08 Thread via GitHub


philipnee commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1668899064


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -251,23 +304,25 @@ public void testSkippingHeartbeat(final boolean 
shouldSkipHeartbeat) {
 
 @Test
 public void testTimerNotDue() {
-mockStableMember();
 time.sleep(100); // time elapsed < heartbeatInterval, no heartbeat 
should be sent
 NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
 assertEquals(0, result.unsentRequests.size());
 assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, 
result.timeUntilNextPollMs);
+
+when(pollTimer.remainingMs()).thenReturn(1800L);

Review Comment:
   `1800L` isn't entirely clear to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-08 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1668883729


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -312,31 +369,33 @@ public void testHeartbeatOutsideInterval() {
 @Test
 public void testNetworkTimeout() {
 // The initial heartbeatInterval is set to 0
-resetWithZeroHeartbeatInterval(Optional.empty());
-mockStableMember();
+time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
 
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new 
Node(1, "localhost", )));
+when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(true);
 NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
 assertEquals(1, result.unsentRequests.size());
 // Mimic network timeout
 result.unsentRequests.get(0).handler().onFailure(time.milliseconds(), 
new TimeoutException("timeout"));
 
+time.sleep(1);
+result = heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(1, result.unsentRequests.size());
+
 // Assure the manager will backoff on timeout
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
 time.sleep(DEFAULT_RETRY_BACKOFF_MS - 1);
 result = heartbeatRequestManager.poll(time.milliseconds());
 assertEquals(0, result.unsentRequests.size());
-
-time.sleep(1);
-result = heartbeatRequestManager.poll(time.milliseconds());
-assertEquals(1, result.unsentRequests.size());
 }
 
 @Test
 public void testFailureOnFatalException() {
 // The initial heartbeatInterval is set to 0
-resetWithZeroHeartbeatInterval(Optional.empty());
-mockStableMember();
+time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
 
+when(membershipManager.isLeavingGroup()).thenReturn(true);
 
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new 
Node(1, "localhost", )));
+when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(true);

Review Comment:
   needed? the state is spy, so I would expect we get here the behaviour we 
want, based on the interval that we already expired above, all from the real 
instance



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-08 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1668882107


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -312,31 +369,33 @@ public void testHeartbeatOutsideInterval() {
 @Test
 public void testNetworkTimeout() {
 // The initial heartbeatInterval is set to 0
-resetWithZeroHeartbeatInterval(Optional.empty());
-mockStableMember();
+time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
 
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new 
Node(1, "localhost", )));
+when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(true);
 NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
 assertEquals(1, result.unsentRequests.size());
 // Mimic network timeout
 result.unsentRequests.get(0).handler().onFailure(time.milliseconds(), 
new TimeoutException("timeout"));
 
+time.sleep(1);
+result = heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(1, result.unsentRequests.size());
+
 // Assure the manager will backoff on timeout
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
 time.sleep(DEFAULT_RETRY_BACKOFF_MS - 1);
 result = heartbeatRequestManager.poll(time.milliseconds());
 assertEquals(0, result.unsentRequests.size());
-
-time.sleep(1);
-result = heartbeatRequestManager.poll(time.milliseconds());
-assertEquals(1, result.unsentRequests.size());
 }
 
 @Test
 public void testFailureOnFatalException() {
 // The initial heartbeatInterval is set to 0
-resetWithZeroHeartbeatInterval(Optional.empty());
-mockStableMember();
+time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
 
+when(membershipManager.isLeavingGroup()).thenReturn(true);

Review Comment:
   do we need this? (it's for the expired poll timer flow, which is unrelated)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-08 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1668876259


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -312,31 +369,33 @@ public void testHeartbeatOutsideInterval() {
 @Test
 public void testNetworkTimeout() {
 // The initial heartbeatInterval is set to 0
-resetWithZeroHeartbeatInterval(Optional.empty());
-mockStableMember();
+time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
 
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new 
Node(1, "localhost", )));
+when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(true);
 NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
 assertEquals(1, result.unsentRequests.size());
 // Mimic network timeout
 result.unsentRequests.get(0).handler().onFailure(time.milliseconds(), 
new TimeoutException("timeout"));
 
+time.sleep(1);
+result = heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(1, result.unsentRequests.size());
+
 // Assure the manager will backoff on timeout
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
 time.sleep(DEFAULT_RETRY_BACKOFF_MS - 1);
 result = heartbeatRequestManager.poll(time.milliseconds());
 assertEquals(0, result.unsentRequests.size());
-
-time.sleep(1);
-result = heartbeatRequestManager.poll(time.milliseconds());
-assertEquals(1, result.unsentRequests.size());
 }
 
 @Test
 public void testFailureOnFatalException() {

Review Comment:
   since we're here, could we improve the 
`verify(backgroundEventHandler).add(any())` to check it's an `ErrorEvent` 
instead of any? it's truly what we expect on a fatal response in HB



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-08 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1668847329


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -312,31 +369,33 @@ public void testHeartbeatOutsideInterval() {
 @Test
 public void testNetworkTimeout() {
 // The initial heartbeatInterval is set to 0
-resetWithZeroHeartbeatInterval(Optional.empty());
-mockStableMember();
+time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
 
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new 
Node(1, "localhost", )));
+when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(true);
 NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
 assertEquals(1, result.unsentRequests.size());
 // Mimic network timeout
 result.unsentRequests.get(0).handler().onFailure(time.milliseconds(), 
new TimeoutException("timeout"));
 
+time.sleep(1);

Review Comment:
   not needed right? the flow we had (and should maintain) is:
   1. poll generates HB and receives timeout as response
   2. check backoff enforced (sleep backoff -1 , no request)
   3. check new HB after backoff (sleep the 1ms that was missing to have the 
full backoff and check HB generated).
   
   Since we're mocking the membershipMgr now we should need to set expectations 
to ensure that we're not skipping the hb because membershipMgr.shouldSkipHb is 
true, and ensure that it's the backoff the one really kicking in. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-08 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1668868970


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -312,31 +369,33 @@ public void testHeartbeatOutsideInterval() {
 @Test
 public void testNetworkTimeout() {
 // The initial heartbeatInterval is set to 0
-resetWithZeroHeartbeatInterval(Optional.empty());
-mockStableMember();
+time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
 
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new 
Node(1, "localhost", )));
+when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(true);
 NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
 assertEquals(1, result.unsentRequests.size());
 // Mimic network timeout
 result.unsentRequests.get(0).handler().onFailure(time.milliseconds(), 
new TimeoutException("timeout"));
 
+time.sleep(1);
+result = heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(1, result.unsentRequests.size());
+
 // Assure the manager will backoff on timeout
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());

Review Comment:
   this changes the semantics. This will achieve the result of not generating a 
HB, but we want to test that the HB is not generated because the backoff hasn't 
expired. So we should ensure there is a coordinator, ensure that the 
membershipMgr allows HB (shouldSKipHb false), and see that it's indeed the 
backoff check 
[here](https://github.com/apache/kafka/blob/a533e246e3ed5f6f6c5be4ebf9d29ae75cab557e/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestState.java#L89)
 the one making that no HB is generated 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-08 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1668847329


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -312,31 +369,33 @@ public void testHeartbeatOutsideInterval() {
 @Test
 public void testNetworkTimeout() {
 // The initial heartbeatInterval is set to 0
-resetWithZeroHeartbeatInterval(Optional.empty());
-mockStableMember();
+time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
 
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new 
Node(1, "localhost", )));
+when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(true);
 NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
 assertEquals(1, result.unsentRequests.size());
 // Mimic network timeout
 result.unsentRequests.get(0).handler().onFailure(time.milliseconds(), 
new TimeoutException("timeout"));
 
+time.sleep(1);

Review Comment:
   not needed right? the flow we had (and should maintain) is:
   1. poll generates HB and receives timeout as response
   2. check backoff enforced (sleep backoff -1 , no request)
   3. check new HB after backoff (sleep the 1ms that was missing to have the 
full backoff and check HB generated).
   Since we're mocking the membershipMgr now we should need to set expectations 
to ensure that we're not skipping the hb because membershipMgr.shouldSkipHb is 
true, and ensure that it's the backoff the one really kicking in. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-08 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1668847329


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -312,31 +369,33 @@ public void testHeartbeatOutsideInterval() {
 @Test
 public void testNetworkTimeout() {
 // The initial heartbeatInterval is set to 0
-resetWithZeroHeartbeatInterval(Optional.empty());
-mockStableMember();
+time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
 
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new 
Node(1, "localhost", )));
+when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(true);
 NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
 assertEquals(1, result.unsentRequests.size());
 // Mimic network timeout
 result.unsentRequests.get(0).handler().onFailure(time.milliseconds(), 
new TimeoutException("timeout"));
 
+time.sleep(1);

Review Comment:
   not needed right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-08 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1668844863


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -298,6 +353,8 @@ public void testHeartbeatNotSentIfAnotherOneInFlight() {
 public void testHeartbeatOutsideInterval() {
 when(membershipManager.shouldSkipHeartbeat()).thenReturn(false);
 when(membershipManager.shouldHeartbeatNow()).thenReturn(true);
+when(pollTimer.remainingMs()).thenReturn(Long.MAX_VALUE);
+
when(heartbeatRequestState.timeToNextHeartbeatMs(time.milliseconds())).thenReturn(1000L);

Review Comment:
   is this 1000 the interval? if so let's use the constant to make it clear. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-08 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1668835568


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -277,12 +332,12 @@ public void testHeartbeatNotSentIfAnotherOneInFlight() {
 
 result = heartbeatRequestManager.poll(time.milliseconds());

Review Comment:
   I see down below (ln 337-340) the sequence to validate that the interval 
expired and no HB is generated because of the inflight. I truly don't see the 
value in keeping this one then (ln 333-335), which is not related to in-flight 
at all (HB interval is tested in other funcs). Makes sense? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-08 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1668829801


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -277,12 +332,12 @@ public void testHeartbeatNotSentIfAnotherOneInFlight() {
 
 result = heartbeatRequestManager.poll(time.milliseconds());

Review Comment:
   we need to sleep the interval before polling here again. If not, we're not 
getting HB because the interval hasn't expired (not because of in-flight as we 
want to assert on the following line)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-08 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1668797551


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -251,23 +304,25 @@ public void testSkippingHeartbeat(final boolean 
shouldSkipHeartbeat) {
 
 @Test
 public void testTimerNotDue() {
-mockStableMember();
 time.sleep(100); // time elapsed < heartbeatInterval, no heartbeat 
should be sent
 NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
 assertEquals(0, result.unsentRequests.size());
 assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, 
result.timeUntilNextPollMs);
+
+when(pollTimer.remainingMs()).thenReturn(1800L);
 assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, 
heartbeatRequestManager.maximumTimeToWait(time.milliseconds()));
 
 // Member in state where it should not send Heartbeat anymore
 when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true);

Review Comment:
   I would dare to say that we shouldn't need this now that we don't have a spy 
on `membershipMgr` anymore (it was there were this is used for several 
assignment/callbacks checks, not at the HB mgr level)



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -251,23 +304,25 @@ public void testSkippingHeartbeat(final boolean 
shouldSkipHeartbeat) {
 
 @Test
 public void testTimerNotDue() {
-mockStableMember();
 time.sleep(100); // time elapsed < heartbeatInterval, no heartbeat 
should be sent
 NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
 assertEquals(0, result.unsentRequests.size());
 assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, 
result.timeUntilNextPollMs);
+
+when(pollTimer.remainingMs()).thenReturn(1800L);
 assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, 
heartbeatRequestManager.maximumTimeToWait(time.milliseconds()));
 
 // Member in state where it should not send Heartbeat anymore
 when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true);
+when(membershipManager.shouldSkipHeartbeat()).thenReturn(true);
 membershipManager.transitionToFatal();

Review Comment:
   we shouldn't need this explicit call on the mock now (it's replaced by the 
`shouldSkipHeartbeat` expectation you already set above). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-08 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1668779441


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -106,72 +102,90 @@ public class HeartbeatRequestManagerTest {
 private MembershipManager membershipManager;
 private HeartbeatRequestManager.HeartbeatRequestState 
heartbeatRequestState;
 private HeartbeatRequestManager.HeartbeatState heartbeatState;
-private final String memberId = "member-id";
-private final int memberEpoch = 1;
 private BackgroundEventHandler backgroundEventHandler;
-private Metrics metrics;
+private LogContext logContext;
 
 @BeforeEach
 public void setUp() {
-setUp(ConsumerTestBuilder.createDefaultGroupInformation());
-}
+this.time = new MockTime();
+this.logContext = new LogContext();
+this.pollTimer = mock(Timer.class);
+this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+this.heartbeatState = mock(HeartbeatState.class);
+this.backgroundEventHandler = mock(BackgroundEventHandler.class);
+this.subscriptions = mock(SubscriptionState.class);
+this.membershipManager = mock(MembershipManagerImpl.class);
+this.metadata = mock(ConsumerMetadata.class);
+Metrics metrics = new Metrics(time);
+ConsumerConfig config = mock(ConsumerConfig.class);
+
+this.heartbeatRequestState = spy(new HeartbeatRequestState(
+logContext,
+time,
+DEFAULT_HEARTBEAT_INTERVAL_MS,
+DEFAULT_RETRY_BACKOFF_MS,
+DEFAULT_RETRY_BACKOFF_MAX_MS,
+DEFAULT_HEARTBEAT_JITTER_MS));
 
-private void setUp(Optional 
groupInfo) {
-testBuilder = new ConsumerTestBuilder(groupInfo, true, false);
-time = testBuilder.time;
-coordinatorRequestManager = 
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
-heartbeatRequestManager = 
testBuilder.heartbeatRequestManager.orElseThrow(IllegalStateException::new);
-heartbeatRequestState = 
testBuilder.heartbeatRequestState.orElseThrow(IllegalStateException::new);
-heartbeatState = 
testBuilder.heartbeatState.orElseThrow(IllegalStateException::new);
-backgroundEventHandler = testBuilder.backgroundEventHandler;
-subscriptions = testBuilder.subscriptions;
-membershipManager = 
testBuilder.membershipManager.orElseThrow(IllegalStateException::new);
-metadata = testBuilder.metadata;
-metrics = new Metrics(time);
+this.heartbeatRequestManager = new HeartbeatRequestManager(
+logContext,
+pollTimer,
+config,
+coordinatorRequestManager,
+membershipManager,
+heartbeatState,
+heartbeatRequestState,
+backgroundEventHandler,
+metrics);
 
 
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new 
Node(1, "localhost", )));
+Map> map = new HashMap<>();
+LocalAssignment local = new LocalAssignment(0, map);
+when(membershipManager.currentAssignment()).thenReturn(local);
 }
 
-private void resetWithZeroHeartbeatInterval(Optional 
groupInstanceId) {
-cleanup();
-
-ConsumerTestBuilder.GroupInformation gi = new 
ConsumerTestBuilder.GroupInformation(
-DEFAULT_GROUP_ID,
-groupInstanceId,
+private void createHeartbeatStateWith0HeartbeatInterval() {
+this.heartbeatRequestState = spy(new HeartbeatRequestState(
+logContext,
+time,
 0,
-0.0,
-Optional.of(DEFAULT_REMOTE_ASSIGNOR)
-);
-
-setUp(Optional.of(gi));
-}
+DEFAULT_RETRY_BACKOFF_MS,
+DEFAULT_RETRY_BACKOFF_MAX_MS,
+DEFAULT_HEARTBEAT_JITTER_MS));
 
-@AfterEach
-public void cleanup() {
-if (testBuilder != null) {
-testBuilder.close();
-}
+heartbeatRequestManager = createHeartbeatRequestManager(
+coordinatorRequestManager,
+membershipManager,
+heartbeatState,
+heartbeatRequestState,
+backgroundEventHandler);
 }
 
 @Test
 public void testHeartbeatOnStartup() {
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
 NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
 assertEquals(0, result.unsentRequests.size());
 
-resetWithZeroHeartbeatInterval(Optional.empty());
-mockStableMember();
+time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
 assertEquals(0, 
heartbeatRequestManager.maximumTimeToWait(time.milli

Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-08 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1668775173


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -106,72 +102,90 @@ public class HeartbeatRequestManagerTest {
 private MembershipManager membershipManager;
 private HeartbeatRequestManager.HeartbeatRequestState 
heartbeatRequestState;
 private HeartbeatRequestManager.HeartbeatState heartbeatState;
-private final String memberId = "member-id";
-private final int memberEpoch = 1;
 private BackgroundEventHandler backgroundEventHandler;
-private Metrics metrics;
+private LogContext logContext;
 
 @BeforeEach
 public void setUp() {
-setUp(ConsumerTestBuilder.createDefaultGroupInformation());
-}
+this.time = new MockTime();
+this.logContext = new LogContext();
+this.pollTimer = mock(Timer.class);
+this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+this.heartbeatState = mock(HeartbeatState.class);
+this.backgroundEventHandler = mock(BackgroundEventHandler.class);
+this.subscriptions = mock(SubscriptionState.class);
+this.membershipManager = mock(MembershipManagerImpl.class);
+this.metadata = mock(ConsumerMetadata.class);
+Metrics metrics = new Metrics(time);
+ConsumerConfig config = mock(ConsumerConfig.class);
+
+this.heartbeatRequestState = spy(new HeartbeatRequestState(
+logContext,
+time,
+DEFAULT_HEARTBEAT_INTERVAL_MS,
+DEFAULT_RETRY_BACKOFF_MS,
+DEFAULT_RETRY_BACKOFF_MAX_MS,
+DEFAULT_HEARTBEAT_JITTER_MS));
 
-private void setUp(Optional 
groupInfo) {
-testBuilder = new ConsumerTestBuilder(groupInfo, true, false);
-time = testBuilder.time;
-coordinatorRequestManager = 
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
-heartbeatRequestManager = 
testBuilder.heartbeatRequestManager.orElseThrow(IllegalStateException::new);
-heartbeatRequestState = 
testBuilder.heartbeatRequestState.orElseThrow(IllegalStateException::new);
-heartbeatState = 
testBuilder.heartbeatState.orElseThrow(IllegalStateException::new);
-backgroundEventHandler = testBuilder.backgroundEventHandler;
-subscriptions = testBuilder.subscriptions;
-membershipManager = 
testBuilder.membershipManager.orElseThrow(IllegalStateException::new);
-metadata = testBuilder.metadata;
-metrics = new Metrics(time);
+this.heartbeatRequestManager = new HeartbeatRequestManager(
+logContext,
+pollTimer,
+config,
+coordinatorRequestManager,
+membershipManager,
+heartbeatState,
+heartbeatRequestState,
+backgroundEventHandler,
+metrics);
 
 
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new 
Node(1, "localhost", )));
+Map> map = new HashMap<>();
+LocalAssignment local = new LocalAssignment(0, map);
+when(membershipManager.currentAssignment()).thenReturn(local);
 }
 
-private void resetWithZeroHeartbeatInterval(Optional 
groupInstanceId) {
-cleanup();
-
-ConsumerTestBuilder.GroupInformation gi = new 
ConsumerTestBuilder.GroupInformation(
-DEFAULT_GROUP_ID,
-groupInstanceId,
+private void createHeartbeatStateWith0HeartbeatInterval() {
+this.heartbeatRequestState = spy(new HeartbeatRequestState(
+logContext,
+time,
 0,
-0.0,
-Optional.of(DEFAULT_REMOTE_ASSIGNOR)
-);
-
-setUp(Optional.of(gi));
-}
+DEFAULT_RETRY_BACKOFF_MS,
+DEFAULT_RETRY_BACKOFF_MAX_MS,
+DEFAULT_HEARTBEAT_JITTER_MS));
 
-@AfterEach
-public void cleanup() {
-if (testBuilder != null) {
-testBuilder.close();
-}
+heartbeatRequestManager = createHeartbeatRequestManager(
+coordinatorRequestManager,
+membershipManager,
+heartbeatState,
+heartbeatRequestState,
+backgroundEventHandler);
 }
 
 @Test
 public void testHeartbeatOnStartup() {
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());

Review Comment:
   Not really hurtful but this is changing the path the test runs. Before this 
PR, the first poll on this test would return null because the member was 
unsubscribed (shouldSkipHeartbeat true). Now it's because there is no 
coordinator. Both go down the same path in the poll implementation but if we 
want to be true to what we had before, here we should mock 

Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-08 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1668745058


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -106,72 +102,90 @@ public class HeartbeatRequestManagerTest {
 private MembershipManager membershipManager;
 private HeartbeatRequestManager.HeartbeatRequestState 
heartbeatRequestState;
 private HeartbeatRequestManager.HeartbeatState heartbeatState;
-private final String memberId = "member-id";
-private final int memberEpoch = 1;
 private BackgroundEventHandler backgroundEventHandler;
-private Metrics metrics;
+private LogContext logContext;
 
 @BeforeEach
 public void setUp() {
-setUp(ConsumerTestBuilder.createDefaultGroupInformation());
-}
+this.time = new MockTime();
+this.logContext = new LogContext();
+this.pollTimer = mock(Timer.class);
+this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+this.heartbeatState = mock(HeartbeatState.class);
+this.backgroundEventHandler = mock(BackgroundEventHandler.class);
+this.subscriptions = mock(SubscriptionState.class);
+this.membershipManager = mock(MembershipManagerImpl.class);
+this.metadata = mock(ConsumerMetadata.class);
+Metrics metrics = new Metrics(time);
+ConsumerConfig config = mock(ConsumerConfig.class);
+
+this.heartbeatRequestState = spy(new HeartbeatRequestState(
+logContext,
+time,
+DEFAULT_HEARTBEAT_INTERVAL_MS,
+DEFAULT_RETRY_BACKOFF_MS,
+DEFAULT_RETRY_BACKOFF_MAX_MS,
+DEFAULT_HEARTBEAT_JITTER_MS));
 
-private void setUp(Optional 
groupInfo) {
-testBuilder = new ConsumerTestBuilder(groupInfo, true, false);
-time = testBuilder.time;
-coordinatorRequestManager = 
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
-heartbeatRequestManager = 
testBuilder.heartbeatRequestManager.orElseThrow(IllegalStateException::new);
-heartbeatRequestState = 
testBuilder.heartbeatRequestState.orElseThrow(IllegalStateException::new);
-heartbeatState = 
testBuilder.heartbeatState.orElseThrow(IllegalStateException::new);
-backgroundEventHandler = testBuilder.backgroundEventHandler;
-subscriptions = testBuilder.subscriptions;
-membershipManager = 
testBuilder.membershipManager.orElseThrow(IllegalStateException::new);
-metadata = testBuilder.metadata;
-metrics = new Metrics(time);
+this.heartbeatRequestManager = new HeartbeatRequestManager(
+logContext,
+pollTimer,
+config,
+coordinatorRequestManager,
+membershipManager,
+heartbeatState,
+heartbeatRequestState,
+backgroundEventHandler,
+metrics);
 
 
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new 
Node(1, "localhost", )));
+Map> map = new HashMap<>();
+LocalAssignment local = new LocalAssignment(0, map);
+when(membershipManager.currentAssignment()).thenReturn(local);
 }
 
-private void resetWithZeroHeartbeatInterval(Optional 
groupInstanceId) {
-cleanup();
-
-ConsumerTestBuilder.GroupInformation gi = new 
ConsumerTestBuilder.GroupInformation(
-DEFAULT_GROUP_ID,
-groupInstanceId,
+private void createHeartbeatStateWith0HeartbeatInterval() {
+this.heartbeatRequestState = spy(new HeartbeatRequestState(
+logContext,
+time,
 0,
-0.0,
-Optional.of(DEFAULT_REMOTE_ASSIGNOR)
-);
-
-setUp(Optional.of(gi));
-}
+DEFAULT_RETRY_BACKOFF_MS,
+DEFAULT_RETRY_BACKOFF_MAX_MS,
+DEFAULT_HEARTBEAT_JITTER_MS));
 
-@AfterEach
-public void cleanup() {
-if (testBuilder != null) {
-testBuilder.close();
-}
+heartbeatRequestManager = createHeartbeatRequestManager(
+coordinatorRequestManager,
+membershipManager,
+heartbeatState,
+heartbeatRequestState,
+backgroundEventHandler);
 }
 
 @Test
 public void testHeartbeatOnStartup() {
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
 NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
 assertEquals(0, result.unsentRequests.size());
 
-resetWithZeroHeartbeatInterval(Optional.empty());
-mockStableMember();
+time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
 assertEquals(0, 
heartbeatRequestManager.maximumTimeToWait(time.milli

Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-08 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1668723357


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -106,72 +102,90 @@ public class HeartbeatRequestManagerTest {
 private MembershipManager membershipManager;
 private HeartbeatRequestManager.HeartbeatRequestState 
heartbeatRequestState;
 private HeartbeatRequestManager.HeartbeatState heartbeatState;
-private final String memberId = "member-id";
-private final int memberEpoch = 1;
 private BackgroundEventHandler backgroundEventHandler;
-private Metrics metrics;
+private LogContext logContext;
 
 @BeforeEach
 public void setUp() {
-setUp(ConsumerTestBuilder.createDefaultGroupInformation());
-}
+this.time = new MockTime();
+this.logContext = new LogContext();
+this.pollTimer = mock(Timer.class);
+this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+this.heartbeatState = mock(HeartbeatState.class);
+this.backgroundEventHandler = mock(BackgroundEventHandler.class);
+this.subscriptions = mock(SubscriptionState.class);
+this.membershipManager = mock(MembershipManagerImpl.class);
+this.metadata = mock(ConsumerMetadata.class);
+Metrics metrics = new Metrics(time);
+ConsumerConfig config = mock(ConsumerConfig.class);
+
+this.heartbeatRequestState = spy(new HeartbeatRequestState(
+logContext,
+time,
+DEFAULT_HEARTBEAT_INTERVAL_MS,
+DEFAULT_RETRY_BACKOFF_MS,
+DEFAULT_RETRY_BACKOFF_MAX_MS,
+DEFAULT_HEARTBEAT_JITTER_MS));
 
-private void setUp(Optional 
groupInfo) {
-testBuilder = new ConsumerTestBuilder(groupInfo, true, false);
-time = testBuilder.time;
-coordinatorRequestManager = 
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
-heartbeatRequestManager = 
testBuilder.heartbeatRequestManager.orElseThrow(IllegalStateException::new);
-heartbeatRequestState = 
testBuilder.heartbeatRequestState.orElseThrow(IllegalStateException::new);
-heartbeatState = 
testBuilder.heartbeatState.orElseThrow(IllegalStateException::new);
-backgroundEventHandler = testBuilder.backgroundEventHandler;
-subscriptions = testBuilder.subscriptions;
-membershipManager = 
testBuilder.membershipManager.orElseThrow(IllegalStateException::new);
-metadata = testBuilder.metadata;
-metrics = new Metrics(time);
+this.heartbeatRequestManager = new HeartbeatRequestManager(
+logContext,
+pollTimer,
+config,
+coordinatorRequestManager,
+membershipManager,
+heartbeatState,
+heartbeatRequestState,
+backgroundEventHandler,
+metrics);
 
 
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new 
Node(1, "localhost", )));
+Map> map = new HashMap<>();
+LocalAssignment local = new LocalAssignment(0, map);
+when(membershipManager.currentAssignment()).thenReturn(local);

Review Comment:
   do we really need to set this expectation for all tests? this is only used 
when building the request if there were partitions reconciled, so I would 
expect to need it only in very specific tests around that. Worth 
double-checking. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-03 Thread via GitHub


brenden20 commented on PR #16200:
URL: https://github.com/apache/kafka/pull/16200#issuecomment-2207337445

   @philipnee @lianetm thank you both for the feedback, I have addressed all 
comments


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-03 Thread via GitHub


philipnee commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1664632832


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -594,24 +740,13 @@ public void testHeartbeatState() {
 
 @Test
 public void testPollTimerExpiration() {
-coordinatorRequestManager = mock(CoordinatorRequestManager.class);
-membershipManager = mock(MembershipManager.class);
-heartbeatState = mock(HeartbeatRequestManager.HeartbeatState.class);
-heartbeatRequestState = spy(new 
HeartbeatRequestManager.HeartbeatRequestState(
-new LogContext(),
-time,
-DEFAULT_HEARTBEAT_INTERVAL_MS,
-DEFAULT_RETRY_BACKOFF_MS,
-DEFAULT_RETRY_BACKOFF_MAX_MS,
-0));
-backgroundEventHandler = mock(BackgroundEventHandler.class);
-
 heartbeatRequestManager = createHeartbeatRequestManager(
 coordinatorRequestManager,
 membershipManager,
 heartbeatState,
 heartbeatRequestState,
 backgroundEventHandler);
+

Review Comment:
   can we remove the extra space.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-03 Thread via GitHub


philipnee commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1664631319


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -277,27 +332,29 @@ public void testHeartbeatNotSentIfAnotherOneInFlight() {
 
 result = heartbeatRequestManager.poll(time.milliseconds());
 assertEquals(0, result.unsentRequests.size(), "No heartbeat should be 
sent while a " +
-"previous one is in-flight");
+"previous one is in-flight");
 
 time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
 result = heartbeatRequestManager.poll(time.milliseconds());
 assertEquals(0, result.unsentRequests.size(), "No heartbeat should be 
sent when the " +
-"interval expires if there is a previous HB request in-flight");
+"interval expires if there is a previous HB request 
in-flight");
 
 // Receive response for the inflight after the interval expired. The 
next HB should be sent
 // on the next poll waiting only for the minimal backoff.
 inflightReq.handler().onComplete(createHeartbeatResponse(inflightReq, 
Errors.NONE));
 time.sleep(DEFAULT_RETRY_BACKOFF_MS);
 result = heartbeatRequestManager.poll(time.milliseconds());
 assertEquals(1, result.unsentRequests.size(), "A next heartbeat should 
be sent on " +
-"the first poll after receiving a response that took longer than 
the interval, " +
-"waiting only for the minimal backoff.");
+"the first poll after receiving a response that took longer 
than the interval, " +

Review Comment:
   could you revert the changes here? (as well as the ones above and below) I 
think these are IDE indentation changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-07-03 Thread via GitHub


philipnee commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1664366556


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -106,72 +102,90 @@ public class HeartbeatRequestManagerTest {
 private MembershipManager membershipManager;
 private HeartbeatRequestManager.HeartbeatRequestState 
heartbeatRequestState;
 private HeartbeatRequestManager.HeartbeatState heartbeatState;
-private final String memberId = "member-id";
-private final int memberEpoch = 1;
 private BackgroundEventHandler backgroundEventHandler;
-private Metrics metrics;
+private LogContext logContext;
 
 @BeforeEach
 public void setUp() {
-setUp(ConsumerTestBuilder.createDefaultGroupInformation());
-}
+this.time = new MockTime();
+Metrics metrics = new Metrics(time);

Review Comment:
   for aesthetic purpose, let's group all the `this` and non `this` i.e., can 
we move metrics to the bottom? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-06-22 Thread via GitHub


brenden20 commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1649440775


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -106,72 +102,99 @@ public class HeartbeatRequestManagerTest {
 private MembershipManager membershipManager;
 private HeartbeatRequestManager.HeartbeatRequestState 
heartbeatRequestState;
 private HeartbeatRequestManager.HeartbeatState heartbeatState;
-private final String memberId = "member-id";
-private final int memberEpoch = 1;
 private BackgroundEventHandler backgroundEventHandler;
-private Metrics metrics;
+private LogContext logContext;
 
 @BeforeEach
 public void setUp() {
-setUp(ConsumerTestBuilder.createDefaultGroupInformation());
-}
-
-private void setUp(Optional 
groupInfo) {
-testBuilder = new ConsumerTestBuilder(groupInfo, true, false);
-time = testBuilder.time;
-coordinatorRequestManager = 
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
-heartbeatRequestManager = 
testBuilder.heartbeatRequestManager.orElseThrow(IllegalStateException::new);
-heartbeatRequestState = 
testBuilder.heartbeatRequestState.orElseThrow(IllegalStateException::new);
-heartbeatState = 
testBuilder.heartbeatState.orElseThrow(IllegalStateException::new);
-backgroundEventHandler = testBuilder.backgroundEventHandler;
-subscriptions = testBuilder.subscriptions;
-membershipManager = 
testBuilder.membershipManager.orElseThrow(IllegalStateException::new);
-metadata = testBuilder.metadata;
-metrics = new Metrics(time);
+this.time = new MockTime();
+Metrics metrics = new Metrics(time);
+this.logContext = new LogContext();
+this.pollTimer = mock(Timer.class);
+this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+this.heartbeatState = mock(HeartbeatState.class);
+this.backgroundEventHandler = mock(BackgroundEventHandler.class);
+this.subscriptions = mock(SubscriptionState.class);
+this.membershipManager = mock(MembershipManagerImpl.class);
+this.metadata = mock(ConsumerMetadata.class);
+ConsumerConfig config = mock(ConsumerConfig.class);
+
+this.heartbeatRequestState = spy(new HeartbeatRequestState(
+logContext,
+time,
+DEFAULT_HEARTBEAT_INTERVAL_MS,
+DEFAULT_RETRY_BACKOFF_MS,
+DEFAULT_RETRY_BACKOFF_MAX_MS,
+DEFAULT_HEARTBEAT_JITTER_MS));
+
+this.heartbeatRequestManager = new HeartbeatRequestManager(
+logContext,
+pollTimer,
+config,
+coordinatorRequestManager,
+membershipManager,
+heartbeatState,
+heartbeatRequestState,
+backgroundEventHandler,
+metrics);
 
 
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new 
Node(1, "localhost", )));
+Map> map = new HashMap<>();
+LocalAssignment local = new LocalAssignment(0, map);
+when(membershipManager.currentAssignment()).thenReturn(local);
 }
 
-private void resetWithZeroHeartbeatInterval(Optional 
groupInstanceId) {
-cleanup();
-
-ConsumerTestBuilder.GroupInformation gi = new 
ConsumerTestBuilder.GroupInformation(
-DEFAULT_GROUP_ID,
-groupInstanceId,
+private void createHeartbeatStateWith0HeartbeatInterval() {
+this.heartbeatRequestState = spy(new HeartbeatRequestState(
+logContext,
+time,
 0,
-0.0,
-Optional.of(DEFAULT_REMOTE_ASSIGNOR)
-);
+DEFAULT_RETRY_BACKOFF_MS,
+DEFAULT_RETRY_BACKOFF_MAX_MS,
+DEFAULT_HEARTBEAT_JITTER_MS));
 
-setUp(Optional.of(gi));
+heartbeatRequestManager = createHeartbeatRequestManager(
+coordinatorRequestManager,
+membershipManager,
+heartbeatState,
+heartbeatRequestState,
+backgroundEventHandler);
 }
 
-@AfterEach
-public void cleanup() {
-if (testBuilder != null) {
-testBuilder.close();
-}
+private void resetWithZeroHeartbeatInterval() {

Review Comment:
   It works for all but one test, for now I am keeping 
```createHeartbeatStateWith0HeartbeatInterval()``` for 
```testSkippingHeartbeat(final boolean shouldSkipHeartbeat)``` since this test 
requires the initial heartbeatInterval be 0



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -275,29 +316,34 @@ public void testHeartbeatNotSentIfAnotherOneInFlight() {
 assert

Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-06-22 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1649427661


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -659,78 +753,38 @@ public void 
testPollTimerExpirationShouldNotMarkMemberStaleIfMemberAlreadyLeavin
 
 @Test
 public void testisExpiredByUsedForLogging() {
-Timer pollTimer = spy(time.timer(DEFAULT_MAX_POLL_INTERVAL_MS));
-heartbeatRequestManager = new HeartbeatRequestManager(new 
LogContext(), pollTimer, config(),
-coordinatorRequestManager, membershipManager, heartbeatState, 
heartbeatRequestState,
-backgroundEventHandler, metrics);
 when(membershipManager.shouldSkipHeartbeat()).thenReturn(false);
 
 int exceededTimeMs = 5;
 time.sleep(DEFAULT_MAX_POLL_INTERVAL_MS + exceededTimeMs);
 
+when(membershipManager.isLeavingGroup()).thenReturn(false);
+when(pollTimer.isExpired()).thenReturn(true);
 NetworkClientDelegate.PollResult pollResult = 
heartbeatRequestManager.poll(time.milliseconds());
 assertEquals(1, pollResult.unsentRequests.size());
 verify(membershipManager).transitionToSendingLeaveGroup(true);
 verify(pollTimer, never()).isExpiredBy();
-assertEquals(exceededTimeMs, pollTimer.isExpiredBy());
 
 clearInvocations(pollTimer);
 heartbeatRequestManager.resetPollTimer(time.milliseconds());
 verify(pollTimer).isExpiredBy();
 }
 
 @Test
-public void testHeartbeatMetrics() {
-// setup
-coordinatorRequestManager = mock(CoordinatorRequestManager.class);
-membershipManager = mock(MembershipManager.class);
-heartbeatState = mock(HeartbeatRequestManager.HeartbeatState.class);
-time = new MockTime();
-metrics = new Metrics(time);
-heartbeatRequestState = new 
HeartbeatRequestManager.HeartbeatRequestState(
-new LogContext(),
-time,
-0, // This initial interval should be 0 to ensure heartbeat on the 
clock
-DEFAULT_RETRY_BACKOFF_MS,
-DEFAULT_RETRY_BACKOFF_MAX_MS,
-0);
-backgroundEventHandler = mock(BackgroundEventHandler.class);
+public void 
testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() {
 heartbeatRequestManager = createHeartbeatRequestManager(
-coordinatorRequestManager,
-membershipManager,
-heartbeatState,
-heartbeatRequestState,
-backgroundEventHandler);
-
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new 
Node(1, "localhost", )));
-when(membershipManager.state()).thenReturn(MemberState.STABLE);
-
-assertNotNull(getMetric("heartbeat-response-time-max"));
-assertNotNull(getMetric("heartbeat-rate"));
-assertNotNull(getMetric("heartbeat-total"));
-assertNotNull(getMetric("last-heartbeat-seconds-ago"));
-
-// test poll
-assertHeartbeat(heartbeatRequestManager, 0);
-time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
-assertEquals(1.0, getMetric("heartbeat-total").metricValue());
-assertEquals((double) 
TimeUnit.MILLISECONDS.toSeconds(DEFAULT_HEARTBEAT_INTERVAL_MS), 
getMetric("last-heartbeat-seconds-ago").metricValue());
-
-assertHeartbeat(heartbeatRequestManager, 
DEFAULT_HEARTBEAT_INTERVAL_MS);
-assertEquals(0.06d, (double) 
getMetric("heartbeat-rate").metricValue(), 0.005d);
-assertEquals(2.0, getMetric("heartbeat-total").metricValue());
-
-// Randomly sleep for some time
-Random rand = new Random();
-int randomSleepS = rand.nextInt(11);
-time.sleep(randomSleepS * 1000);
-assertEquals((double) randomSleepS, 
getMetric("last-heartbeat-seconds-ago").metricValue());
-}
+coordinatorRequestManager,
+membershipManager,
+heartbeatState,
+heartbeatRequestState,
+backgroundEventHandler);
 
-@Test
-public void 
testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() {
+when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(true);
+when(membershipManager.state()).thenReturn(MemberState.STABLE);
 mockStableMember();
 
 time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
+when(membershipManager.isLeavingGroup()).thenReturn(true);

Review Comment:
   uhm do we need this here? I wouldn't expect so (the membershipMgr is a mock 
now, and the HB mgr does not check the isLeavingGroup to generate a HB)



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -659,78 +753,38 @@ public void 
testPollTimerExpirationShouldNotMarkMemberStaleIfMemberAlreadyLeavin
 
 @Test
 public void testisExpiredByUsedForLogging() {
-Timer

Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-06-22 Thread via GitHub


lianetm commented on PR #16200:
URL: https://github.com/apache/kafka/pull/16200#issuecomment-2183475535

   Hey @brenden20, I completed another pass, left some comments. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-06-13 Thread via GitHub


brenden20 commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1638461100


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -86,91 +79,169 @@
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-
 public class HeartbeatRequestManagerTest {
 private static final String DEFAULT_GROUP_ID = "groupId";
-private static final String CONSUMER_COORDINATOR_METRICS = 
"consumer-coordinator-metrics";
+private static final String DEFAULT_REMOTE_ASSIGNOR = "uniform";
+private static final String DEFAULT_GROUP_INSTANCE_ID = 
"group-instance-id";
+private static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000;
+private static final int DEFAULT_MAX_POLL_INTERVAL_MS = 1;
+private static final long DEFAULT_RETRY_BACKOFF_MS = 80;
+private static final long DEFAULT_RETRY_BACKOFF_MAX_MS = 1000;
+private static final double DEFAULT_HEARTBEAT_JITTER_MS = 0.0;
 
-private ConsumerTestBuilder testBuilder;
 private Time time;
 private Timer pollTimer;
 private CoordinatorRequestManager coordinatorRequestManager;
 private SubscriptionState subscriptions;
 private Metadata metadata;
 private HeartbeatRequestManager heartbeatRequestManager;
+private HeartbeatRequestManager heartbeatRequestManager1;
 private MembershipManager membershipManager;
+private MembershipManager membershipManager1;
 private HeartbeatRequestManager.HeartbeatRequestState 
heartbeatRequestState;
+private HeartbeatRequestManager.HeartbeatRequestState 
heartbeatRequestState1;
 private HeartbeatRequestManager.HeartbeatState heartbeatState;
 private final String memberId = "member-id";
 private final int memberEpoch = 1;
 private BackgroundEventHandler backgroundEventHandler;
-private Metrics metrics;
+private RequestManagers requestManagers;
+private LogContext logContext;
+private ConsumerConfig config;
 
 @BeforeEach
 public void setUp() {
-setUp(ConsumerTestBuilder.createDefaultGroupInformation());
-}
-
-private void setUp(Optional 
groupInfo) {
-testBuilder = new ConsumerTestBuilder(groupInfo, true, false);
-time = testBuilder.time;
-coordinatorRequestManager = 
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
-heartbeatRequestManager = 
testBuilder.heartbeatRequestManager.orElseThrow(IllegalStateException::new);
-heartbeatRequestState = 
testBuilder.heartbeatRequestState.orElseThrow(IllegalStateException::new);
-heartbeatState = 
testBuilder.heartbeatState.orElseThrow(IllegalStateException::new);
-backgroundEventHandler = testBuilder.backgroundEventHandler;
-subscriptions = testBuilder.subscriptions;
-membershipManager = 
testBuilder.membershipManager.orElseThrow(IllegalStateException::new);
-metadata = testBuilder.metadata;
-metrics = new Metrics(time);
+this.time = new MockTime();
+Metrics metrics = new Metrics();
+this.logContext = new LogContext();
+this.pollTimer = mock(Timer.class);
+this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+this.heartbeatRequestState = mock(HeartbeatRequestState.class);
+this.heartbeatState = mock(HeartbeatState.class);
+this.backgroundEventHandler = mock(BackgroundEventHandler.class);
+this.subscriptions = mock(SubscriptionState.class);
+this.membershipManager = mock(MembershipManagerImpl.class);
+this.metadata = mock(ConsumerMetadata.class);
+this.config = mock(ConsumerConfig.class);
+OffsetCommitCallbackInvoker offsetCommitCallbackInvoker = 
mock(OffsetCommitCallbackInvoker.class);
+
+this.heartbeatRequestManager = new HeartbeatRequestManager(
+logContext,
+time,
+config,
+coordinatorRequestManager,
+subscriptions,
+membershipManager,
+backgroundEventHandler,
+metrics);
 
-
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new 
Node(1, "localhost", )));
-}
+this.requestManagers = new RequestManagers(

Review Comment:
   Actually no, that was an oversight on my part. Removed now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-06-13 Thread via GitHub


brenden20 commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1638492592


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -86,91 +79,169 @@
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-
 public class HeartbeatRequestManagerTest {
 private static final String DEFAULT_GROUP_ID = "groupId";
-private static final String CONSUMER_COORDINATOR_METRICS = 
"consumer-coordinator-metrics";
+private static final String DEFAULT_REMOTE_ASSIGNOR = "uniform";
+private static final String DEFAULT_GROUP_INSTANCE_ID = 
"group-instance-id";
+private static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000;
+private static final int DEFAULT_MAX_POLL_INTERVAL_MS = 1;
+private static final long DEFAULT_RETRY_BACKOFF_MS = 80;
+private static final long DEFAULT_RETRY_BACKOFF_MAX_MS = 1000;
+private static final double DEFAULT_HEARTBEAT_JITTER_MS = 0.0;
 
-private ConsumerTestBuilder testBuilder;
 private Time time;
 private Timer pollTimer;
 private CoordinatorRequestManager coordinatorRequestManager;
 private SubscriptionState subscriptions;
 private Metadata metadata;
 private HeartbeatRequestManager heartbeatRequestManager;
+private HeartbeatRequestManager heartbeatRequestManager1;
 private MembershipManager membershipManager;
+private MembershipManager membershipManager1;
 private HeartbeatRequestManager.HeartbeatRequestState 
heartbeatRequestState;
+private HeartbeatRequestManager.HeartbeatRequestState 
heartbeatRequestState1;
 private HeartbeatRequestManager.HeartbeatState heartbeatState;
 private final String memberId = "member-id";
 private final int memberEpoch = 1;
 private BackgroundEventHandler backgroundEventHandler;
-private Metrics metrics;
+private RequestManagers requestManagers;
+private LogContext logContext;
+private ConsumerConfig config;
 
 @BeforeEach
 public void setUp() {
-setUp(ConsumerTestBuilder.createDefaultGroupInformation());
-}
-
-private void setUp(Optional 
groupInfo) {
-testBuilder = new ConsumerTestBuilder(groupInfo, true, false);
-time = testBuilder.time;
-coordinatorRequestManager = 
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
-heartbeatRequestManager = 
testBuilder.heartbeatRequestManager.orElseThrow(IllegalStateException::new);
-heartbeatRequestState = 
testBuilder.heartbeatRequestState.orElseThrow(IllegalStateException::new);
-heartbeatState = 
testBuilder.heartbeatState.orElseThrow(IllegalStateException::new);
-backgroundEventHandler = testBuilder.backgroundEventHandler;
-subscriptions = testBuilder.subscriptions;
-membershipManager = 
testBuilder.membershipManager.orElseThrow(IllegalStateException::new);
-metadata = testBuilder.metadata;
-metrics = new Metrics(time);
+this.time = new MockTime();
+Metrics metrics = new Metrics();
+this.logContext = new LogContext();
+this.pollTimer = mock(Timer.class);
+this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+this.heartbeatRequestState = mock(HeartbeatRequestState.class);
+this.heartbeatState = mock(HeartbeatState.class);

Review Comment:
   Makes sense, I will remove ```heartbeatRequestState1``` and make 
```heartbeatRequestState``` into a spy. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-06-12 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1636933032


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -86,91 +79,169 @@
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-
 public class HeartbeatRequestManagerTest {
 private static final String DEFAULT_GROUP_ID = "groupId";
-private static final String CONSUMER_COORDINATOR_METRICS = 
"consumer-coordinator-metrics";
+private static final String DEFAULT_REMOTE_ASSIGNOR = "uniform";
+private static final String DEFAULT_GROUP_INSTANCE_ID = 
"group-instance-id";
+private static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000;
+private static final int DEFAULT_MAX_POLL_INTERVAL_MS = 1;
+private static final long DEFAULT_RETRY_BACKOFF_MS = 80;
+private static final long DEFAULT_RETRY_BACKOFF_MAX_MS = 1000;
+private static final double DEFAULT_HEARTBEAT_JITTER_MS = 0.0;
 
-private ConsumerTestBuilder testBuilder;
 private Time time;
 private Timer pollTimer;
 private CoordinatorRequestManager coordinatorRequestManager;
 private SubscriptionState subscriptions;
 private Metadata metadata;
 private HeartbeatRequestManager heartbeatRequestManager;
+private HeartbeatRequestManager heartbeatRequestManager1;
 private MembershipManager membershipManager;
+private MembershipManager membershipManager1;
 private HeartbeatRequestManager.HeartbeatRequestState 
heartbeatRequestState;
+private HeartbeatRequestManager.HeartbeatRequestState 
heartbeatRequestState1;
 private HeartbeatRequestManager.HeartbeatState heartbeatState;
 private final String memberId = "member-id";
 private final int memberEpoch = 1;
 private BackgroundEventHandler backgroundEventHandler;
-private Metrics metrics;
+private RequestManagers requestManagers;
+private LogContext logContext;
+private ConsumerConfig config;
 
 @BeforeEach
 public void setUp() {
-setUp(ConsumerTestBuilder.createDefaultGroupInformation());
-}
-
-private void setUp(Optional 
groupInfo) {
-testBuilder = new ConsumerTestBuilder(groupInfo, true, false);
-time = testBuilder.time;
-coordinatorRequestManager = 
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
-heartbeatRequestManager = 
testBuilder.heartbeatRequestManager.orElseThrow(IllegalStateException::new);
-heartbeatRequestState = 
testBuilder.heartbeatRequestState.orElseThrow(IllegalStateException::new);
-heartbeatState = 
testBuilder.heartbeatState.orElseThrow(IllegalStateException::new);
-backgroundEventHandler = testBuilder.backgroundEventHandler;
-subscriptions = testBuilder.subscriptions;
-membershipManager = 
testBuilder.membershipManager.orElseThrow(IllegalStateException::new);
-metadata = testBuilder.metadata;
-metrics = new Metrics(time);
+this.time = new MockTime();
+Metrics metrics = new Metrics();
+this.logContext = new LogContext();
+this.pollTimer = mock(Timer.class);
+this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+this.heartbeatRequestState = mock(HeartbeatRequestState.class);
+this.heartbeatState = mock(HeartbeatState.class);

Review Comment:
   turning this into a mock requires a little bit more work I would expect. 
Several tests assert on this request state (because it's mainly the internal 
component that dictates the timing of the heartbeats). Also I notice we end up 
having this mock and another request state that holds an actual instance 
(heartbeatRequestState1). So I wonder if this HeartbeatRequestState only is 
maybe a sensible usage for a spy in this test?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >