Re: [PR] KAFKA-16001: Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-13 Thread via GitHub


brenden20 commented on PR #16140:
URL: https://github.com/apache/kafka/pull/16140#issuecomment-2166728664

   @mjsax thank you for the review!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001: Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-13 Thread via GitHub


mjsax commented on PR #16140:
URL: https://github.com/apache/kafka/pull/16140#issuecomment-2166718530

   Thanks for the PR @brenden20! Merged to `trunk`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001: Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-13 Thread via GitHub


mjsax merged PR #16140:
URL: https://github.com/apache/kafka/pull/16140


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-12 Thread via GitHub


lianetm commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1637062738


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -72,68 +64,97 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class ConsumerNetworkThreadTest {
-
-private ConsumerTestBuilder testBuilder;
-private Time time;
-private ConsumerMetadata metadata;
-private NetworkClientDelegate networkClient;
-private BlockingQueue applicationEventsQueue;
-private ApplicationEventProcessor applicationEventProcessor;
-private OffsetsRequestManager offsetsRequestManager;
-private CommitRequestManager commitRequestManager;
-private CoordinatorRequestManager coordinatorRequestManager;
-private ConsumerNetworkThread consumerNetworkThread;
-private final CompletableEventReaper applicationEventReaper = 
mock(CompletableEventReaper.class);
-private MockClient client;
-
-@BeforeEach
-public void setup() {
-testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation());
-time = testBuilder.time;
-metadata = testBuilder.metadata;
-networkClient = testBuilder.networkClientDelegate;
-client = testBuilder.client;
-applicationEventsQueue = testBuilder.applicationEventQueue;
-applicationEventProcessor = testBuilder.applicationEventProcessor;
-commitRequestManager = 
testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new);
-offsetsRequestManager = testBuilder.offsetsRequestManager;
-coordinatorRequestManager = 
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
-consumerNetworkThread = new ConsumerNetworkThread(
-testBuilder.logContext,
+private final Time time;
+private final BlockingQueue applicationEventsQueue;
+private final ApplicationEventProcessor applicationEventProcessor;
+private final OffsetsRequestManager offsetsRequestManager;
+private final HeartbeatRequestManager heartbeatRequestManager;
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final ConsumerNetworkThread consumerNetworkThread;
+private final NetworkClientDelegate networkClientDelegate;
+private final RequestManagers requestManagers;
+private final CompletableEventReaper applicationEventReaper;
+
+ConsumerNetworkThreadTest() {
+this.networkClientDelegate = mock(NetworkClientDelegate.class);
+this.requestManagers = mock(RequestManagers.class);
+this.offsetsRequestManager = mock(OffsetsRequestManager.class);
+this.heartbeatRequestManager = mock(HeartbeatRequestManager.class);
+this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+this.applicationEventProcessor = mock(ApplicationEventProcessor.class);
+this.applicationEventReaper = mock(CompletableEventReaper.class);
+this.time = new MockTime();
+this.applicationEventsQueue = new LinkedBlockingQueue<>();
+LogContext logContext = new LogContext();
+
+this.consumerNetworkThread = new ConsumerNetworkThread(
+logContext,
 time,
-testBuilder.applicationEventQueue,
+applicationEventsQueue,
 applicationEventReaper,
 () -> applicationEventProcessor,
-() -> testBuilder.networkClientDelegate,
-() -> testBuilder.requestManagers
+() -> networkClientDelegate,
+() -> requestManagers
 );
+}
+
+@BeforeEach
+public void setup() {
 consumerNetworkThread.initializeResources();
 }
 
 @AfterEach
 public void tearDown() {
-if (testBuilder != null) {
-testBuilder.close();
-consumerNetworkThread.close(Duration.ZERO);
-}
+if (consumerNetworkThread != null)
+consumerNetworkThread.close();
+}
+
+@Test
+public void testEnsureCloseStopsRunningThread() {
+assertTrue(consumerNetworkThread.isRunning(),
+"ConsumerNetworkThread should start running when created");
+
+consumerNetworkThread.close();
+assertFalse(consumerNetworkThread.isRunning(),
+"close() should make consumerNetworkThread.running false by 
calling closeInternal(Duration timeout)");
+}
+
+@ParameterizedTest
+@ValueSource(longs = {Cons

Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-12 Thread via GitHub


lianetm commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1637062738


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -72,68 +64,97 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class ConsumerNetworkThreadTest {
-
-private ConsumerTestBuilder testBuilder;
-private Time time;
-private ConsumerMetadata metadata;
-private NetworkClientDelegate networkClient;
-private BlockingQueue applicationEventsQueue;
-private ApplicationEventProcessor applicationEventProcessor;
-private OffsetsRequestManager offsetsRequestManager;
-private CommitRequestManager commitRequestManager;
-private CoordinatorRequestManager coordinatorRequestManager;
-private ConsumerNetworkThread consumerNetworkThread;
-private final CompletableEventReaper applicationEventReaper = 
mock(CompletableEventReaper.class);
-private MockClient client;
-
-@BeforeEach
-public void setup() {
-testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation());
-time = testBuilder.time;
-metadata = testBuilder.metadata;
-networkClient = testBuilder.networkClientDelegate;
-client = testBuilder.client;
-applicationEventsQueue = testBuilder.applicationEventQueue;
-applicationEventProcessor = testBuilder.applicationEventProcessor;
-commitRequestManager = 
testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new);
-offsetsRequestManager = testBuilder.offsetsRequestManager;
-coordinatorRequestManager = 
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
-consumerNetworkThread = new ConsumerNetworkThread(
-testBuilder.logContext,
+private final Time time;
+private final BlockingQueue applicationEventsQueue;
+private final ApplicationEventProcessor applicationEventProcessor;
+private final OffsetsRequestManager offsetsRequestManager;
+private final HeartbeatRequestManager heartbeatRequestManager;
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final ConsumerNetworkThread consumerNetworkThread;
+private final NetworkClientDelegate networkClientDelegate;
+private final RequestManagers requestManagers;
+private final CompletableEventReaper applicationEventReaper;
+
+ConsumerNetworkThreadTest() {
+this.networkClientDelegate = mock(NetworkClientDelegate.class);
+this.requestManagers = mock(RequestManagers.class);
+this.offsetsRequestManager = mock(OffsetsRequestManager.class);
+this.heartbeatRequestManager = mock(HeartbeatRequestManager.class);
+this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+this.applicationEventProcessor = mock(ApplicationEventProcessor.class);
+this.applicationEventReaper = mock(CompletableEventReaper.class);
+this.time = new MockTime();
+this.applicationEventsQueue = new LinkedBlockingQueue<>();
+LogContext logContext = new LogContext();
+
+this.consumerNetworkThread = new ConsumerNetworkThread(
+logContext,
 time,
-testBuilder.applicationEventQueue,
+applicationEventsQueue,
 applicationEventReaper,
 () -> applicationEventProcessor,
-() -> testBuilder.networkClientDelegate,
-() -> testBuilder.requestManagers
+() -> networkClientDelegate,
+() -> requestManagers
 );
+}
+
+@BeforeEach
+public void setup() {
 consumerNetworkThread.initializeResources();
 }
 
 @AfterEach
 public void tearDown() {
-if (testBuilder != null) {
-testBuilder.close();
-consumerNetworkThread.close(Duration.ZERO);
-}
+if (consumerNetworkThread != null)
+consumerNetworkThread.close();
+}
+
+@Test
+public void testEnsureCloseStopsRunningThread() {
+assertTrue(consumerNetworkThread.isRunning(),
+"ConsumerNetworkThread should start running when created");
+
+consumerNetworkThread.close();
+assertFalse(consumerNetworkThread.isRunning(),
+"close() should make consumerNetworkThread.running false by 
calling closeInternal(Duration timeout)");
+}
+
+@ParameterizedTest
+@ValueSource(longs = {Cons

Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-12 Thread via GitHub


brenden20 commented on PR #16140:
URL: https://github.com/apache/kafka/pull/16140#issuecomment-2163090508

   @mjsax can you take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-08 Thread via GitHub


philipnee commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1632136201


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -211,178 +220,52 @@ public void testResetPositionsProcessFailureIsIgnored() {
 }
 
 @Test
-public void testValidatePositionsEventIsProcessed() {
-ValidatePositionsEvent e = new 
ValidatePositionsEvent(calculateDeadlineMs(time, 100));
-applicationEventsQueue.add(e);
-consumerNetworkThread.runOnce();
-
verify(applicationEventProcessor).process(any(ValidatePositionsEvent.class));
-assertTrue(applicationEventsQueue.isEmpty());
-}
-
-@Test
-public void testAssignmentChangeEvent() {
-HashMap offset = 
mockTopicPartitionOffset();
-
-final long currentTimeMs = time.milliseconds();
-ApplicationEvent e = new AssignmentChangeEvent(offset, currentTimeMs);
-applicationEventsQueue.add(e);
-
-consumerNetworkThread.runOnce();
-
verify(applicationEventProcessor).process(any(AssignmentChangeEvent.class));
-verify(networkClient, times(1)).poll(anyLong(), anyLong());
-verify(commitRequestManager, 
times(1)).updateAutoCommitTimer(currentTimeMs);
-// Assignment change should generate an async commit (not retried).
-verify(commitRequestManager, times(1)).maybeAutoCommitAsync();
-}
-
-@Test
-void testFetchTopicMetadata() {
-applicationEventsQueue.add(new TopicMetadataEvent("topic", 
Long.MAX_VALUE));
-consumerNetworkThread.runOnce();
-
verify(applicationEventProcessor).process(any(TopicMetadataEvent.class));
-}
-
-@Test
-void testMaximumTimeToWait() {
+public void testMaximumTimeToWait() {
+final int defaultHeartbeatIntervalMs = 1000;
 // Initial value before runOnce has been called
 assertEquals(ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS, 
consumerNetworkThread.maximumTimeToWait());
-consumerNetworkThread.runOnce();
-// After runOnce has been called, it takes the default heartbeat 
interval from the heartbeat request manager
-assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, 
consumerNetworkThread.maximumTimeToWait());
-}
 
-@Test
-void testRequestManagersArePolledOnce() {
-consumerNetworkThread.runOnce();
-testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm 
-> verify(rm, times(1)).poll(anyLong(;
-testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm 
-> verify(rm, times(1)).maximumTimeToWait(anyLong(;
-verify(networkClient, times(1)).poll(anyLong(), anyLong());
-}
+
when(requestManagers.entries()).thenReturn(Collections.singletonList(Optional.of(heartbeatRequestManager)));
+
when(heartbeatRequestManager.maximumTimeToWait(time.milliseconds())).thenReturn((long)
 defaultHeartbeatIntervalMs);
 
-@Test
-void testEnsureMetadataUpdateOnPoll() {
-MetadataResponse metadataResponse = 
RequestTestUtils.metadataUpdateWith(2, Collections.emptyMap());
-client.prepareMetadataUpdate(metadataResponse);
-metadata.requestUpdate(false);
 consumerNetworkThread.runOnce();
-verify(metadata, 
times(1)).updateWithCurrentRequestVersion(eq(metadataResponse), eq(false), 
anyLong());
-}
-
-@Test
-void testEnsureEventsAreCompleted() {
-// Mimic the logic of CompletableEventReaper.reap(Collection):
-doAnswer(__ -> {
-Iterator i = applicationEventsQueue.iterator();
-
-while (i.hasNext()) {
-ApplicationEvent event = i.next();
-
-if (event instanceof CompletableEvent)
-((CompletableEvent) 
event).future().completeExceptionally(new TimeoutException());
-
-i.remove();
-}
-
-return null;
-}).when(applicationEventReaper).reap(any(Collection.class));
-
-Node node = metadata.fetch().nodes().get(0);
-coordinatorRequestManager.markCoordinatorUnknown("test", 
time.milliseconds());
-
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
"group-id", node));
-prepareOffsetCommitRequest(new HashMap<>(), Errors.NONE, false);
-CompletableApplicationEvent event1 = spy(new 
AsyncCommitEvent(Collections.emptyMap()));
-ApplicationEvent event2 = new AsyncCommitEvent(Collections.emptyMap());
-CompletableFuture future = new CompletableFuture<>();
-when(event1.future()).thenReturn(future);
-applicationEventsQueue.add(event1);
-applicationEventsQueue.add(event2);
-assertFalse(future.isDone());
-assertFalse(applicationEventsQueue.isEmpty());
-consumerNetworkThread.cleanup();
-assertTrue(future.isCompletedExceptionally());
-assertTrue(applicationEventsQueue.isEmpty());
+  

Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-07 Thread via GitHub


brenden20 commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1631774198


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -148,35 +169,32 @@ public void testStartupAndTearDown() throws 
InterruptedException {
 }
 
 @Test
-public void testApplicationEvent() {
-ApplicationEvent e = new PollEvent(100);
-applicationEventsQueue.add(e);
+public void testRequestsTransferFromManagersToClientOnThreadRun() {
+List> list = new ArrayList<>();
+list.add(Optional.of(coordinatorRequestManager));
+list.add(Optional.of(heartbeatRequestManager));
+list.add(Optional.of(offsetsRequestManager));
+
+when(requestManagers.entries()).thenReturn(list);
+
when(coordinatorRequestManager.poll(anyLong())).thenReturn(mock(NetworkClientDelegate.PollResult.class));
 consumerNetworkThread.runOnce();
-verify(applicationEventProcessor, times(1)).process(e);
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm).poll(anyLong(;
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm).maximumTimeToWait(anyLong(;
+
verify(networkClientDelegate).addAll(any(NetworkClientDelegate.PollResult.class));
+verify(networkClientDelegate).poll(anyLong(), anyLong());
 }
 
-@Test
-public void testMetadataUpdateEvent() {

Review Comment:
   Refer to previous comment for all event tests that were removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-07 Thread via GitHub


brenden20 commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1631774020


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -148,35 +169,32 @@ public void testStartupAndTearDown() throws 
InterruptedException {
 }
 
 @Test
-public void testApplicationEvent() {

Review Comment:
   Yes, this test as well as many of the other application event tests were 
rewritten into a single parameterized test. It gets the parameters from 
```applicationEvents()```, where it supplies a stream of different kinds of 
events. This was done to condense the code a bit.
   
   And ```testRequestsTransferFromManagersToClientOnThreadRun()``` is a new test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-07 Thread via GitHub


brenden20 commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1631772919


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -72,68 +64,97 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class ConsumerNetworkThreadTest {
-
-private ConsumerTestBuilder testBuilder;
-private Time time;
-private ConsumerMetadata metadata;
-private NetworkClientDelegate networkClient;
-private BlockingQueue applicationEventsQueue;
-private ApplicationEventProcessor applicationEventProcessor;
-private OffsetsRequestManager offsetsRequestManager;
-private CommitRequestManager commitRequestManager;
-private CoordinatorRequestManager coordinatorRequestManager;
-private ConsumerNetworkThread consumerNetworkThread;
-private final CompletableEventReaper applicationEventReaper = 
mock(CompletableEventReaper.class);
-private MockClient client;
-
-@BeforeEach
-public void setup() {
-testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation());
-time = testBuilder.time;
-metadata = testBuilder.metadata;
-networkClient = testBuilder.networkClientDelegate;
-client = testBuilder.client;
-applicationEventsQueue = testBuilder.applicationEventQueue;
-applicationEventProcessor = testBuilder.applicationEventProcessor;
-commitRequestManager = 
testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new);
-offsetsRequestManager = testBuilder.offsetsRequestManager;
-coordinatorRequestManager = 
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
-consumerNetworkThread = new ConsumerNetworkThread(
-testBuilder.logContext,
+private final Time time;
+private final BlockingQueue applicationEventsQueue;
+private final ApplicationEventProcessor applicationEventProcessor;
+private final OffsetsRequestManager offsetsRequestManager;
+private final HeartbeatRequestManager heartbeatRequestManager;
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final ConsumerNetworkThread consumerNetworkThread;
+private final NetworkClientDelegate networkClientDelegate;
+private final RequestManagers requestManagers;
+private final CompletableEventReaper applicationEventReaper;
+
+ConsumerNetworkThreadTest() {
+this.networkClientDelegate = mock(NetworkClientDelegate.class);
+this.requestManagers = mock(RequestManagers.class);
+this.offsetsRequestManager = mock(OffsetsRequestManager.class);
+this.heartbeatRequestManager = mock(HeartbeatRequestManager.class);
+this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+this.applicationEventProcessor = mock(ApplicationEventProcessor.class);
+this.applicationEventReaper = mock(CompletableEventReaper.class);
+this.time = new MockTime();
+this.applicationEventsQueue = new LinkedBlockingQueue<>();
+LogContext logContext = new LogContext();
+
+this.consumerNetworkThread = new ConsumerNetworkThread(
+logContext,
 time,
-testBuilder.applicationEventQueue,
+applicationEventsQueue,
 applicationEventReaper,
 () -> applicationEventProcessor,
-() -> testBuilder.networkClientDelegate,
-() -> testBuilder.requestManagers
+() -> networkClientDelegate,
+() -> requestManagers
 );
+}
+
+@BeforeEach
+public void setup() {
 consumerNetworkThread.initializeResources();
 }
 
 @AfterEach
 public void tearDown() {
-if (testBuilder != null) {
-testBuilder.close();
-consumerNetworkThread.close(Duration.ZERO);
-}
+if (consumerNetworkThread != null)
+consumerNetworkThread.close();
+}
+
+@Test
+public void testEnsureCloseStopsRunningThread() {
+assertTrue(consumerNetworkThread.isRunning(),
+"ConsumerNetworkThread should start running when created");
+
+consumerNetworkThread.close();
+assertFalse(consumerNetworkThread.isRunning(),
+"close() should make consumerNetworkThread.running false by 
calling closeInternal(Duration timeout)");
+}
+
+@ParameterizedTest
+@ValueSource(longs = {Co

Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-07 Thread via GitHub


brenden20 commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1631771648


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -72,68 +64,97 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class ConsumerNetworkThreadTest {
-
-private ConsumerTestBuilder testBuilder;
-private Time time;
-private ConsumerMetadata metadata;
-private NetworkClientDelegate networkClient;
-private BlockingQueue applicationEventsQueue;
-private ApplicationEventProcessor applicationEventProcessor;
-private OffsetsRequestManager offsetsRequestManager;
-private CommitRequestManager commitRequestManager;
-private CoordinatorRequestManager coordinatorRequestManager;
-private ConsumerNetworkThread consumerNetworkThread;
-private final CompletableEventReaper applicationEventReaper = 
mock(CompletableEventReaper.class);
-private MockClient client;
-
-@BeforeEach
-public void setup() {
-testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation());
-time = testBuilder.time;
-metadata = testBuilder.metadata;
-networkClient = testBuilder.networkClientDelegate;
-client = testBuilder.client;
-applicationEventsQueue = testBuilder.applicationEventQueue;
-applicationEventProcessor = testBuilder.applicationEventProcessor;
-commitRequestManager = 
testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new);
-offsetsRequestManager = testBuilder.offsetsRequestManager;
-coordinatorRequestManager = 
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
-consumerNetworkThread = new ConsumerNetworkThread(
-testBuilder.logContext,
+private final Time time;
+private final BlockingQueue applicationEventsQueue;
+private final ApplicationEventProcessor applicationEventProcessor;
+private final OffsetsRequestManager offsetsRequestManager;
+private final HeartbeatRequestManager heartbeatRequestManager;
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final ConsumerNetworkThread consumerNetworkThread;
+private final NetworkClientDelegate networkClientDelegate;
+private final RequestManagers requestManagers;
+private final CompletableEventReaper applicationEventReaper;
+
+ConsumerNetworkThreadTest() {
+this.networkClientDelegate = mock(NetworkClientDelegate.class);
+this.requestManagers = mock(RequestManagers.class);
+this.offsetsRequestManager = mock(OffsetsRequestManager.class);
+this.heartbeatRequestManager = mock(HeartbeatRequestManager.class);
+this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+this.applicationEventProcessor = mock(ApplicationEventProcessor.class);
+this.applicationEventReaper = mock(CompletableEventReaper.class);
+this.time = new MockTime();
+this.applicationEventsQueue = new LinkedBlockingQueue<>();
+LogContext logContext = new LogContext();
+
+this.consumerNetworkThread = new ConsumerNetworkThread(
+logContext,
 time,
-testBuilder.applicationEventQueue,
+applicationEventsQueue,
 applicationEventReaper,
 () -> applicationEventProcessor,
-() -> testBuilder.networkClientDelegate,
-() -> testBuilder.requestManagers
+() -> networkClientDelegate,
+() -> requestManagers
 );
+}
+
+@BeforeEach
+public void setup() {
 consumerNetworkThread.initializeResources();
 }
 
 @AfterEach
 public void tearDown() {
-if (testBuilder != null) {
-testBuilder.close();
-consumerNetworkThread.close(Duration.ZERO);
-}
+if (consumerNetworkThread != null)
+consumerNetworkThread.close();
+}
+
+@Test
+public void testEnsureCloseStopsRunningThread() {

Review Comment:
   Yes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-07 Thread via GitHub


brenden20 commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1631771779


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -72,68 +64,97 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class ConsumerNetworkThreadTest {
-
-private ConsumerTestBuilder testBuilder;
-private Time time;
-private ConsumerMetadata metadata;
-private NetworkClientDelegate networkClient;
-private BlockingQueue applicationEventsQueue;
-private ApplicationEventProcessor applicationEventProcessor;
-private OffsetsRequestManager offsetsRequestManager;
-private CommitRequestManager commitRequestManager;
-private CoordinatorRequestManager coordinatorRequestManager;
-private ConsumerNetworkThread consumerNetworkThread;
-private final CompletableEventReaper applicationEventReaper = 
mock(CompletableEventReaper.class);
-private MockClient client;
-
-@BeforeEach
-public void setup() {
-testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation());
-time = testBuilder.time;
-metadata = testBuilder.metadata;
-networkClient = testBuilder.networkClientDelegate;
-client = testBuilder.client;
-applicationEventsQueue = testBuilder.applicationEventQueue;
-applicationEventProcessor = testBuilder.applicationEventProcessor;
-commitRequestManager = 
testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new);
-offsetsRequestManager = testBuilder.offsetsRequestManager;
-coordinatorRequestManager = 
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
-consumerNetworkThread = new ConsumerNetworkThread(
-testBuilder.logContext,
+private final Time time;
+private final BlockingQueue applicationEventsQueue;
+private final ApplicationEventProcessor applicationEventProcessor;
+private final OffsetsRequestManager offsetsRequestManager;
+private final HeartbeatRequestManager heartbeatRequestManager;
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final ConsumerNetworkThread consumerNetworkThread;
+private final NetworkClientDelegate networkClientDelegate;
+private final RequestManagers requestManagers;
+private final CompletableEventReaper applicationEventReaper;
+
+ConsumerNetworkThreadTest() {
+this.networkClientDelegate = mock(NetworkClientDelegate.class);
+this.requestManagers = mock(RequestManagers.class);
+this.offsetsRequestManager = mock(OffsetsRequestManager.class);
+this.heartbeatRequestManager = mock(HeartbeatRequestManager.class);
+this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+this.applicationEventProcessor = mock(ApplicationEventProcessor.class);
+this.applicationEventReaper = mock(CompletableEventReaper.class);
+this.time = new MockTime();
+this.applicationEventsQueue = new LinkedBlockingQueue<>();
+LogContext logContext = new LogContext();
+
+this.consumerNetworkThread = new ConsumerNetworkThread(
+logContext,
 time,
-testBuilder.applicationEventQueue,
+applicationEventsQueue,
 applicationEventReaper,
 () -> applicationEventProcessor,
-() -> testBuilder.networkClientDelegate,
-() -> testBuilder.requestManagers
+() -> networkClientDelegate,
+() -> requestManagers
 );
+}
+
+@BeforeEach
+public void setup() {
 consumerNetworkThread.initializeResources();
 }
 
 @AfterEach
 public void tearDown() {
-if (testBuilder != null) {
-testBuilder.close();
-consumerNetworkThread.close(Duration.ZERO);
-}
+if (consumerNetworkThread != null)
+consumerNetworkThread.close();
+}
+
+@Test
+public void testEnsureCloseStopsRunningThread() {
+assertTrue(consumerNetworkThread.isRunning(),
+"ConsumerNetworkThread should start running when created");
+
+consumerNetworkThread.close();
+assertFalse(consumerNetworkThread.isRunning(),
+"close() should make consumerNetworkThread.running false by 
calling closeInternal(Duration timeout)");
+}
+
+@ParameterizedTest
+@ValueSource(longs = {Co

Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-07 Thread via GitHub


mjsax commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1631768784


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -148,35 +169,32 @@ public void testStartupAndTearDown() throws 
InterruptedException {
 }
 
 @Test
-public void testApplicationEvent() {
-ApplicationEvent e = new PollEvent(100);
-applicationEventsQueue.add(e);
+public void testRequestsTransferFromManagersToClientOnThreadRun() {
+List> list = new ArrayList<>();
+list.add(Optional.of(coordinatorRequestManager));
+list.add(Optional.of(heartbeatRequestManager));
+list.add(Optional.of(offsetsRequestManager));
+
+when(requestManagers.entries()).thenReturn(list);
+
when(coordinatorRequestManager.poll(anyLong())).thenReturn(mock(NetworkClientDelegate.PollResult.class));
 consumerNetworkThread.runOnce();
-verify(applicationEventProcessor, times(1)).process(e);
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm).poll(anyLong(;
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm).maximumTimeToWait(anyLong(;
+
verify(networkClientDelegate).addAll(any(NetworkClientDelegate.PollResult.class));
+verify(networkClientDelegate).poll(anyLong(), anyLong());
 }
 
-@Test
-public void testMetadataUpdateEvent() {

Review Comment:
   Seems this test was effectively removed? Why?



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -190,15 +208,6 @@ public void testListOffsetsEventIsProcessed(boolean 
requireTimestamp) {
 assertTrue(applicationEventsQueue.isEmpty());
 }
 
-@Test
-public void testResetPositionsEventIsProcessed() {

Review Comment:
   Seems this test was effectively removed? Why?
   
   Same for more test below



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -72,68 +64,97 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class ConsumerNetworkThreadTest {
-
-private ConsumerTestBuilder testBuilder;
-private Time time;
-private ConsumerMetadata metadata;
-private NetworkClientDelegate networkClient;
-private BlockingQueue applicationEventsQueue;
-private ApplicationEventProcessor applicationEventProcessor;
-private OffsetsRequestManager offsetsRequestManager;
-private CommitRequestManager commitRequestManager;
-private CoordinatorRequestManager coordinatorRequestManager;
-private ConsumerNetworkThread consumerNetworkThread;
-private final CompletableEventReaper applicationEventReaper = 
mock(CompletableEventReaper.class);
-private MockClient client;
-
-@BeforeEach
-public void setup() {
-testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation());
-time = testBuilder.time;
-metadata = testBuilder.metadata;
-networkClient = testBuilder.networkClientDelegate;
-client = testBuilder.client;
-applicationEventsQueue = testBuilder.applicationEventQueue;
-applicationEventProcessor = testBuilder.applicationEventProcessor;
-commitRequestManager = 
testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new);
-offsetsRequestManager = testBuilder.offsetsRequestManager;
-coordinatorRequestManager = 
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
-consumerNetworkThread = new ConsumerNetworkThread(
-testBuilder.logContext,
+private final Time time;
+private final BlockingQueue applicationEventsQueue;
+private final ApplicationEventProcessor applicationEventProcessor;
+private final OffsetsRequestManager offsetsRequestManager;
+private final HeartbeatRequestManager heartbeatRequestManager;
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final ConsumerNetworkThread consumerNetworkThread;
+private final NetworkClientDelegate networkClientDelegate;
+private final RequestManagers requestManagers;
+private final CompletableEventReaper applicationEventReaper;
+
+ConsumerNetworkThreadTest() {
+this.networkClientDelegate = mock(NetworkClientDelegate.class);
+this.requestManagers = mock(RequestManagers.class);
+this.offsetsRequestManager = 

Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-07 Thread via GitHub


brenden20 commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1631701125


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -211,178 +219,52 @@ public void testResetPositionsProcessFailureIsIgnored() {
 }
 
 @Test
-public void testValidatePositionsEventIsProcessed() {
-ValidatePositionsEvent e = new 
ValidatePositionsEvent(calculateDeadlineMs(time, 100));
-applicationEventsQueue.add(e);
-consumerNetworkThread.runOnce();
-
verify(applicationEventProcessor).process(any(ValidatePositionsEvent.class));
-assertTrue(applicationEventsQueue.isEmpty());
-}
-
-@Test
-public void testAssignmentChangeEvent() {
-HashMap offset = 
mockTopicPartitionOffset();
-
-final long currentTimeMs = time.milliseconds();
-ApplicationEvent e = new AssignmentChangeEvent(offset, currentTimeMs);
-applicationEventsQueue.add(e);
-
-consumerNetworkThread.runOnce();
-
verify(applicationEventProcessor).process(any(AssignmentChangeEvent.class));
-verify(networkClient, times(1)).poll(anyLong(), anyLong());
-verify(commitRequestManager, 
times(1)).updateAutoCommitTimer(currentTimeMs);
-// Assignment change should generate an async commit (not retried).
-verify(commitRequestManager, times(1)).maybeAutoCommitAsync();
-}
-
-@Test
-void testFetchTopicMetadata() {
-applicationEventsQueue.add(new TopicMetadataEvent("topic", 
Long.MAX_VALUE));
-consumerNetworkThread.runOnce();
-
verify(applicationEventProcessor).process(any(TopicMetadataEvent.class));
-}
-
-@Test
-void testMaximumTimeToWait() {
+public void testMaximumTimeToWait() {
+final int defaultHeartbeatIntervalMs = 1000;
 // Initial value before runOnce has been called
 assertEquals(ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS, 
consumerNetworkThread.maximumTimeToWait());
-consumerNetworkThread.runOnce();
-// After runOnce has been called, it takes the default heartbeat 
interval from the heartbeat request manager
-assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, 
consumerNetworkThread.maximumTimeToWait());
-}
 
-@Test
-void testRequestManagersArePolledOnce() {
-consumerNetworkThread.runOnce();
-testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm 
-> verify(rm, times(1)).poll(anyLong(;
-testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm 
-> verify(rm, times(1)).maximumTimeToWait(anyLong(;
-verify(networkClient, times(1)).poll(anyLong(), anyLong());
-}
+
when(requestManagers.entries()).thenReturn(Collections.singletonList(Optional.of(heartbeatRequestManager)));
+
when(heartbeatRequestManager.maximumTimeToWait(time.milliseconds())).thenReturn((long)
 defaultHeartbeatIntervalMs);
 
-@Test
-void testEnsureMetadataUpdateOnPoll() {
-MetadataResponse metadataResponse = 
RequestTestUtils.metadataUpdateWith(2, Collections.emptyMap());
-client.prepareMetadataUpdate(metadataResponse);
-metadata.requestUpdate(false);
 consumerNetworkThread.runOnce();
-verify(metadata, 
times(1)).updateWithCurrentRequestVersion(eq(metadataResponse), eq(false), 
anyLong());
-}
-
-@Test
-void testEnsureEventsAreCompleted() {
-// Mimic the logic of CompletableEventReaper.reap(Collection):
-doAnswer(__ -> {
-Iterator i = applicationEventsQueue.iterator();
-
-while (i.hasNext()) {
-ApplicationEvent event = i.next();
-
-if (event instanceof CompletableEvent)
-((CompletableEvent) 
event).future().completeExceptionally(new TimeoutException());
-
-i.remove();
-}
-
-return null;
-}).when(applicationEventReaper).reap(any(Collection.class));
-
-Node node = metadata.fetch().nodes().get(0);
-coordinatorRequestManager.markCoordinatorUnknown("test", 
time.milliseconds());
-
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
"group-id", node));
-prepareOffsetCommitRequest(new HashMap<>(), Errors.NONE, false);
-CompletableApplicationEvent event1 = spy(new 
AsyncCommitEvent(Collections.emptyMap()));
-ApplicationEvent event2 = new AsyncCommitEvent(Collections.emptyMap());
-CompletableFuture future = new CompletableFuture<>();
-when(event1.future()).thenReturn(future);
-applicationEventsQueue.add(event1);
-applicationEventsQueue.add(event2);
-assertFalse(future.isDone());
-assertFalse(applicationEventsQueue.isEmpty());
-consumerNetworkThread.cleanup();
-assertTrue(future.isCompletedExceptionally());
-assertTrue(applicationEventsQueue.isEmpty());
+  

Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-07 Thread via GitHub


lianetm commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1631698362


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -211,178 +219,52 @@ public void testResetPositionsProcessFailureIsIgnored() {
 }
 
 @Test
-public void testValidatePositionsEventIsProcessed() {
-ValidatePositionsEvent e = new 
ValidatePositionsEvent(calculateDeadlineMs(time, 100));
-applicationEventsQueue.add(e);
-consumerNetworkThread.runOnce();
-
verify(applicationEventProcessor).process(any(ValidatePositionsEvent.class));
-assertTrue(applicationEventsQueue.isEmpty());
-}
-
-@Test
-public void testAssignmentChangeEvent() {
-HashMap offset = 
mockTopicPartitionOffset();
-
-final long currentTimeMs = time.milliseconds();
-ApplicationEvent e = new AssignmentChangeEvent(offset, currentTimeMs);
-applicationEventsQueue.add(e);
-
-consumerNetworkThread.runOnce();
-
verify(applicationEventProcessor).process(any(AssignmentChangeEvent.class));
-verify(networkClient, times(1)).poll(anyLong(), anyLong());
-verify(commitRequestManager, 
times(1)).updateAutoCommitTimer(currentTimeMs);
-// Assignment change should generate an async commit (not retried).
-verify(commitRequestManager, times(1)).maybeAutoCommitAsync();
-}
-
-@Test
-void testFetchTopicMetadata() {
-applicationEventsQueue.add(new TopicMetadataEvent("topic", 
Long.MAX_VALUE));
-consumerNetworkThread.runOnce();
-
verify(applicationEventProcessor).process(any(TopicMetadataEvent.class));
-}
-
-@Test
-void testMaximumTimeToWait() {
+public void testMaximumTimeToWait() {
+final int defaultHeartbeatIntervalMs = 1000;
 // Initial value before runOnce has been called
 assertEquals(ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS, 
consumerNetworkThread.maximumTimeToWait());
-consumerNetworkThread.runOnce();
-// After runOnce has been called, it takes the default heartbeat 
interval from the heartbeat request manager
-assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, 
consumerNetworkThread.maximumTimeToWait());
-}
 
-@Test
-void testRequestManagersArePolledOnce() {
-consumerNetworkThread.runOnce();
-testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm 
-> verify(rm, times(1)).poll(anyLong(;
-testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm 
-> verify(rm, times(1)).maximumTimeToWait(anyLong(;
-verify(networkClient, times(1)).poll(anyLong(), anyLong());
-}
+
when(requestManagers.entries()).thenReturn(Collections.singletonList(Optional.of(heartbeatRequestManager)));
+
when(heartbeatRequestManager.maximumTimeToWait(time.milliseconds())).thenReturn((long)
 defaultHeartbeatIntervalMs);
 
-@Test
-void testEnsureMetadataUpdateOnPoll() {
-MetadataResponse metadataResponse = 
RequestTestUtils.metadataUpdateWith(2, Collections.emptyMap());
-client.prepareMetadataUpdate(metadataResponse);
-metadata.requestUpdate(false);
 consumerNetworkThread.runOnce();
-verify(metadata, 
times(1)).updateWithCurrentRequestVersion(eq(metadataResponse), eq(false), 
anyLong());
-}
-
-@Test
-void testEnsureEventsAreCompleted() {
-// Mimic the logic of CompletableEventReaper.reap(Collection):
-doAnswer(__ -> {
-Iterator i = applicationEventsQueue.iterator();
-
-while (i.hasNext()) {
-ApplicationEvent event = i.next();
-
-if (event instanceof CompletableEvent)
-((CompletableEvent) 
event).future().completeExceptionally(new TimeoutException());
-
-i.remove();
-}
-
-return null;
-}).when(applicationEventReaper).reap(any(Collection.class));
-
-Node node = metadata.fetch().nodes().get(0);
-coordinatorRequestManager.markCoordinatorUnknown("test", 
time.milliseconds());
-
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
"group-id", node));
-prepareOffsetCommitRequest(new HashMap<>(), Errors.NONE, false);
-CompletableApplicationEvent event1 = spy(new 
AsyncCommitEvent(Collections.emptyMap()));
-ApplicationEvent event2 = new AsyncCommitEvent(Collections.emptyMap());
-CompletableFuture future = new CompletableFuture<>();
-when(event1.future()).thenReturn(future);
-applicationEventsQueue.add(event1);
-applicationEventsQueue.add(event2);
-assertFalse(future.isDone());
-assertFalse(applicationEventsQueue.isEmpty());
-consumerNetworkThread.cleanup();
-assertTrue(future.isCompletedExceptionally());
-assertTrue(applicationEventsQueue.isEmpty());
+

Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-07 Thread via GitHub


brenden20 commented on PR #16140:
URL: https://github.com/apache/kafka/pull/16140#issuecomment-2155534482

   @lianetm I implemented your new suggestions, let me know if it looks good


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-07 Thread via GitHub


lianetm commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1631653829


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -72,68 +64,99 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class ConsumerNetworkThreadTest {
-
-private ConsumerTestBuilder testBuilder;
-private Time time;
-private ConsumerMetadata metadata;
-private NetworkClientDelegate networkClient;
-private BlockingQueue applicationEventsQueue;
-private ApplicationEventProcessor applicationEventProcessor;
-private OffsetsRequestManager offsetsRequestManager;
-private CommitRequestManager commitRequestManager;
-private CoordinatorRequestManager coordinatorRequestManager;
-private ConsumerNetworkThread consumerNetworkThread;
-private final CompletableEventReaper applicationEventReaper = 
mock(CompletableEventReaper.class);
-private MockClient client;
-
-@BeforeEach
-public void setup() {
-testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation());
-time = testBuilder.time;
-metadata = testBuilder.metadata;
-networkClient = testBuilder.networkClientDelegate;
-client = testBuilder.client;
-applicationEventsQueue = testBuilder.applicationEventQueue;
-applicationEventProcessor = testBuilder.applicationEventProcessor;
-commitRequestManager = 
testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new);
-offsetsRequestManager = testBuilder.offsetsRequestManager;
-coordinatorRequestManager = 
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
-consumerNetworkThread = new ConsumerNetworkThread(
-testBuilder.logContext,
+static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000;
+static final long MAX_POLL_TIMEOUT_MS = 5000;
+
+private final Time time;
+private final BlockingQueue applicationEventsQueue;
+private final ApplicationEventProcessor applicationEventProcessor;
+private final OffsetsRequestManager offsetsRequestManager;
+private final HeartbeatRequestManager heartbeatRequestManager;
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final ConsumerNetworkThread consumerNetworkThread;
+private final NetworkClientDelegate networkClientDelegate;
+private final RequestManagers requestManagers;
+private final CompletableEventReaper applicationEventReaper;
+
+ConsumerNetworkThreadTest() {
+this.networkClientDelegate = mock(NetworkClientDelegate.class);
+this.requestManagers = mock(RequestManagers.class);
+this.offsetsRequestManager = mock(OffsetsRequestManager.class);
+this.heartbeatRequestManager = mock(HeartbeatRequestManager.class);
+this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+this.applicationEventProcessor = mock(ApplicationEventProcessor.class);
+this.applicationEventReaper = mock(CompletableEventReaper.class);
+this.time = new MockTime();
+this.applicationEventsQueue = new LinkedBlockingQueue<>();
+LogContext logContext = new LogContext();
+
+this.consumerNetworkThread = new ConsumerNetworkThread(
+logContext,
 time,
-testBuilder.applicationEventQueue,
+applicationEventsQueue,
 applicationEventReaper,
 () -> applicationEventProcessor,
-() -> testBuilder.networkClientDelegate,
-() -> testBuilder.requestManagers
+() -> networkClientDelegate,
+() -> requestManagers
 );
+}
+
+@BeforeEach
+public void setup() {
 consumerNetworkThread.initializeResources();
 }
 
 @AfterEach
 public void tearDown() {
-if (testBuilder != null) {
-testBuilder.close();
-consumerNetworkThread.close(Duration.ZERO);
-}
+if (consumerNetworkThread != null)
+consumerNetworkThread.close();
+}
+
+@Test
+public void testEnsureCloseStopsRunningThread() {
+assertTrue(consumerNetworkThread.isRunning(),
+"ConsumerNetworkThread should start running when created");
+
+consumerNetworkThread.close();
+assertFalse(consumerNetworkThread.isRunning(),
+"close() should make consumerNetworkThread.running fals

Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-07 Thread via GitHub


lianetm commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1631646232


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -72,68 +64,99 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class ConsumerNetworkThreadTest {
-
-private ConsumerTestBuilder testBuilder;
-private Time time;
-private ConsumerMetadata metadata;
-private NetworkClientDelegate networkClient;
-private BlockingQueue applicationEventsQueue;
-private ApplicationEventProcessor applicationEventProcessor;
-private OffsetsRequestManager offsetsRequestManager;
-private CommitRequestManager commitRequestManager;
-private CoordinatorRequestManager coordinatorRequestManager;
-private ConsumerNetworkThread consumerNetworkThread;
-private final CompletableEventReaper applicationEventReaper = 
mock(CompletableEventReaper.class);
-private MockClient client;
-
-@BeforeEach
-public void setup() {
-testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation());
-time = testBuilder.time;
-metadata = testBuilder.metadata;
-networkClient = testBuilder.networkClientDelegate;
-client = testBuilder.client;
-applicationEventsQueue = testBuilder.applicationEventQueue;
-applicationEventProcessor = testBuilder.applicationEventProcessor;
-commitRequestManager = 
testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new);
-offsetsRequestManager = testBuilder.offsetsRequestManager;
-coordinatorRequestManager = 
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
-consumerNetworkThread = new ConsumerNetworkThread(
-testBuilder.logContext,
+static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000;
+static final long MAX_POLL_TIMEOUT_MS = 5000;

Review Comment:
   I mean to remove this and reference 
ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS when needed (like it's already done 
here for some tests)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-07 Thread via GitHub


lianetm commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1631647886


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -72,68 +64,99 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class ConsumerNetworkThreadTest {
-
-private ConsumerTestBuilder testBuilder;
-private Time time;
-private ConsumerMetadata metadata;
-private NetworkClientDelegate networkClient;
-private BlockingQueue applicationEventsQueue;
-private ApplicationEventProcessor applicationEventProcessor;
-private OffsetsRequestManager offsetsRequestManager;
-private CommitRequestManager commitRequestManager;
-private CoordinatorRequestManager coordinatorRequestManager;
-private ConsumerNetworkThread consumerNetworkThread;
-private final CompletableEventReaper applicationEventReaper = 
mock(CompletableEventReaper.class);
-private MockClient client;
-
-@BeforeEach
-public void setup() {
-testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation());
-time = testBuilder.time;
-metadata = testBuilder.metadata;
-networkClient = testBuilder.networkClientDelegate;
-client = testBuilder.client;
-applicationEventsQueue = testBuilder.applicationEventQueue;
-applicationEventProcessor = testBuilder.applicationEventProcessor;
-commitRequestManager = 
testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new);
-offsetsRequestManager = testBuilder.offsetsRequestManager;
-coordinatorRequestManager = 
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
-consumerNetworkThread = new ConsumerNetworkThread(
-testBuilder.logContext,
+static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000;
+static final long MAX_POLL_TIMEOUT_MS = 5000;
+
+private final Time time;
+private final BlockingQueue applicationEventsQueue;
+private final ApplicationEventProcessor applicationEventProcessor;
+private final OffsetsRequestManager offsetsRequestManager;
+private final HeartbeatRequestManager heartbeatRequestManager;
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final ConsumerNetworkThread consumerNetworkThread;
+private final NetworkClientDelegate networkClientDelegate;
+private final RequestManagers requestManagers;
+private final CompletableEventReaper applicationEventReaper;
+
+ConsumerNetworkThreadTest() {
+this.networkClientDelegate = mock(NetworkClientDelegate.class);
+this.requestManagers = mock(RequestManagers.class);
+this.offsetsRequestManager = mock(OffsetsRequestManager.class);
+this.heartbeatRequestManager = mock(HeartbeatRequestManager.class);
+this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+this.applicationEventProcessor = mock(ApplicationEventProcessor.class);
+this.applicationEventReaper = mock(CompletableEventReaper.class);
+this.time = new MockTime();
+this.applicationEventsQueue = new LinkedBlockingQueue<>();
+LogContext logContext = new LogContext();
+
+this.consumerNetworkThread = new ConsumerNetworkThread(
+logContext,
 time,
-testBuilder.applicationEventQueue,
+applicationEventsQueue,
 applicationEventReaper,
 () -> applicationEventProcessor,
-() -> testBuilder.networkClientDelegate,
-() -> testBuilder.requestManagers
+() -> networkClientDelegate,
+() -> requestManagers
 );
+}
+
+@BeforeEach
+public void setup() {
 consumerNetworkThread.initializeResources();
 }
 
 @AfterEach
 public void tearDown() {
-if (testBuilder != null) {
-testBuilder.close();
-consumerNetworkThread.close(Duration.ZERO);
-}
+if (consumerNetworkThread != null)
+consumerNetworkThread.close();
+}
+
+@Test
+public void testEnsureCloseStopsRunningThread() {
+assertTrue(consumerNetworkThread.isRunning(),
+"ConsumerNetworkThread should start running when created");
+
+consumerNetworkThread.close();
+assertFalse(consumerNetworkThread.isRunning(),
+"close() should make consumerNetworkThread.running fals

Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-07 Thread via GitHub


lianetm commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1631646852


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -72,68 +64,99 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class ConsumerNetworkThreadTest {
-
-private ConsumerTestBuilder testBuilder;
-private Time time;
-private ConsumerMetadata metadata;
-private NetworkClientDelegate networkClient;
-private BlockingQueue applicationEventsQueue;
-private ApplicationEventProcessor applicationEventProcessor;
-private OffsetsRequestManager offsetsRequestManager;
-private CommitRequestManager commitRequestManager;
-private CoordinatorRequestManager coordinatorRequestManager;
-private ConsumerNetworkThread consumerNetworkThread;
-private final CompletableEventReaper applicationEventReaper = 
mock(CompletableEventReaper.class);
-private MockClient client;
-
-@BeforeEach
-public void setup() {
-testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation());
-time = testBuilder.time;
-metadata = testBuilder.metadata;
-networkClient = testBuilder.networkClientDelegate;
-client = testBuilder.client;
-applicationEventsQueue = testBuilder.applicationEventQueue;
-applicationEventProcessor = testBuilder.applicationEventProcessor;
-commitRequestManager = 
testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new);
-offsetsRequestManager = testBuilder.offsetsRequestManager;
-coordinatorRequestManager = 
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
-consumerNetworkThread = new ConsumerNetworkThread(
-testBuilder.logContext,
+static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000;

Review Comment:
   this seems to be only used in `testMaximumTimeToWait` right? if so we better 
keep it as a local var



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-07 Thread via GitHub


lianetm commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1631646232


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -72,68 +64,99 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class ConsumerNetworkThreadTest {
-
-private ConsumerTestBuilder testBuilder;
-private Time time;
-private ConsumerMetadata metadata;
-private NetworkClientDelegate networkClient;
-private BlockingQueue applicationEventsQueue;
-private ApplicationEventProcessor applicationEventProcessor;
-private OffsetsRequestManager offsetsRequestManager;
-private CommitRequestManager commitRequestManager;
-private CoordinatorRequestManager coordinatorRequestManager;
-private ConsumerNetworkThread consumerNetworkThread;
-private final CompletableEventReaper applicationEventReaper = 
mock(CompletableEventReaper.class);
-private MockClient client;
-
-@BeforeEach
-public void setup() {
-testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation());
-time = testBuilder.time;
-metadata = testBuilder.metadata;
-networkClient = testBuilder.networkClientDelegate;
-client = testBuilder.client;
-applicationEventsQueue = testBuilder.applicationEventQueue;
-applicationEventProcessor = testBuilder.applicationEventProcessor;
-commitRequestManager = 
testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new);
-offsetsRequestManager = testBuilder.offsetsRequestManager;
-coordinatorRequestManager = 
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
-consumerNetworkThread = new ConsumerNetworkThread(
-testBuilder.logContext,
+static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000;
+static final long MAX_POLL_TIMEOUT_MS = 5000;

Review Comment:
   I mean to remove this and reference 
ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS (like it's already done here for some 
tests)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-07 Thread via GitHub


lianetm commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1631645522


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -72,68 +64,99 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class ConsumerNetworkThreadTest {
-
-private ConsumerTestBuilder testBuilder;
-private Time time;
-private ConsumerMetadata metadata;
-private NetworkClientDelegate networkClient;
-private BlockingQueue applicationEventsQueue;
-private ApplicationEventProcessor applicationEventProcessor;
-private OffsetsRequestManager offsetsRequestManager;
-private CommitRequestManager commitRequestManager;
-private CoordinatorRequestManager coordinatorRequestManager;
-private ConsumerNetworkThread consumerNetworkThread;
-private final CompletableEventReaper applicationEventReaper = 
mock(CompletableEventReaper.class);
-private MockClient client;
-
-@BeforeEach
-public void setup() {
-testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation());
-time = testBuilder.time;
-metadata = testBuilder.metadata;
-networkClient = testBuilder.networkClientDelegate;
-client = testBuilder.client;
-applicationEventsQueue = testBuilder.applicationEventQueue;
-applicationEventProcessor = testBuilder.applicationEventProcessor;
-commitRequestManager = 
testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new);
-offsetsRequestManager = testBuilder.offsetsRequestManager;
-coordinatorRequestManager = 
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
-consumerNetworkThread = new ConsumerNetworkThread(
-testBuilder.logContext,
+static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000;
+static final long MAX_POLL_TIMEOUT_MS = 5000;

Review Comment:
   let's better use the constant defined in 
ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS so that we don't have to maintain 
this one (and avoid a test break if that one changes)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-07 Thread via GitHub


brenden20 commented on PR #16140:
URL: https://github.com/apache/kafka/pull/16140#issuecomment-2155384516

   @lianetm implemented your suggestions on the description and 
testConsumerNetworkThreadPollTimeComputations(). Let me know how it looks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-07 Thread via GitHub


lianetm commented on PR #16140:
URL: https://github.com/apache/kafka/pull/16140#issuecomment-2155322585

   Regarding the PR description, I would suggest to update:
   
   > Removed testEnsureMetadataUpdateOnPoll() since it was doing integration 
testing~~, could not get a unit test to work for that one~~
   
   You did just great removing it, as it was testing something that it's not 
the responsibility of this component, and that's exactly why a real unit test 
is not doable ;)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-07 Thread via GitHub


lianetm commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1631549092


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -72,68 +64,98 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class ConsumerNetworkThreadTest {
-
-private ConsumerTestBuilder testBuilder;
-private Time time;
-private ConsumerMetadata metadata;
-private NetworkClientDelegate networkClient;
-private BlockingQueue applicationEventsQueue;
-private ApplicationEventProcessor applicationEventProcessor;
-private OffsetsRequestManager offsetsRequestManager;
-private CommitRequestManager commitRequestManager;
-private CoordinatorRequestManager coordinatorRequestManager;
-private ConsumerNetworkThread consumerNetworkThread;
-private final CompletableEventReaper applicationEventReaper = 
mock(CompletableEventReaper.class);
-private MockClient client;
-
-@BeforeEach
-public void setup() {
-testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation());
-time = testBuilder.time;
-metadata = testBuilder.metadata;
-networkClient = testBuilder.networkClientDelegate;
-client = testBuilder.client;
-applicationEventsQueue = testBuilder.applicationEventQueue;
-applicationEventProcessor = testBuilder.applicationEventProcessor;
-commitRequestManager = 
testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new);
-offsetsRequestManager = testBuilder.offsetsRequestManager;
-coordinatorRequestManager = 
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
-consumerNetworkThread = new ConsumerNetworkThread(
-testBuilder.logContext,
+static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000;
+
+private final Time time;
+private final BlockingQueue applicationEventsQueue;
+private final ApplicationEventProcessor applicationEventProcessor;
+private final OffsetsRequestManager offsetsRequestManager;
+private final HeartbeatRequestManager heartbeatRequestManager;
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final ConsumerNetworkThread consumerNetworkThread;
+private final NetworkClientDelegate networkClientDelegate;
+private final RequestManagers requestManagers;
+private final CompletableEventReaper applicationEventReaper;
+
+ConsumerNetworkThreadTest() {
+this.networkClientDelegate = mock(NetworkClientDelegate.class);
+this.requestManagers = mock(RequestManagers.class);
+this.offsetsRequestManager = mock(OffsetsRequestManager.class);
+this.heartbeatRequestManager = mock(HeartbeatRequestManager.class);
+this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+this.applicationEventProcessor = mock(ApplicationEventProcessor.class);
+this.applicationEventReaper = mock(CompletableEventReaper.class);
+this.time = new MockTime();
+this.applicationEventsQueue = new LinkedBlockingQueue<>();
+LogContext logContext = new LogContext();
+
+this.consumerNetworkThread = new ConsumerNetworkThread(
+logContext,
 time,
-testBuilder.applicationEventQueue,
+applicationEventsQueue,
 applicationEventReaper,
 () -> applicationEventProcessor,
-() -> testBuilder.networkClientDelegate,
-() -> testBuilder.requestManagers
+() -> networkClientDelegate,
+() -> requestManagers
 );
+}
+
+@BeforeEach
+public void setup() {
 consumerNetworkThread.initializeResources();
 }
 
 @AfterEach
 public void tearDown() {
-if (testBuilder != null) {
-testBuilder.close();
-consumerNetworkThread.close(Duration.ZERO);
-}
+if (consumerNetworkThread != null)
+consumerNetworkThread.close();
+}
+
+@Test
+public void testEnsureCloseStopsRunningThread() {
+assertTrue(consumerNetworkThread.isRunning(),
+"ConsumerNetworkThread should start running when created");
+
+consumerNetworkThread.close();
+assertFalse(consumerNetworkThread.isRunning(),
+"close() should make consumerNetworkThread.running false by 
calling closeInternal(Duration timeout)");
+ 

Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-07 Thread via GitHub


brenden20 commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1631524427


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -211,178 +232,80 @@ public void testResetPositionsProcessFailureIsIgnored() {
 }
 
 @Test
-public void testValidatePositionsEventIsProcessed() {
-ValidatePositionsEvent e = new 
ValidatePositionsEvent(calculateDeadlineMs(time, 100));
-applicationEventsQueue.add(e);
-consumerNetworkThread.runOnce();
-
verify(applicationEventProcessor).process(any(ValidatePositionsEvent.class));
-assertTrue(applicationEventsQueue.isEmpty());
-}
-
-@Test
-public void testAssignmentChangeEvent() {
-HashMap offset = 
mockTopicPartitionOffset();
-
-final long currentTimeMs = time.milliseconds();
-ApplicationEvent e = new AssignmentChangeEvent(offset, currentTimeMs);
-applicationEventsQueue.add(e);
-
-consumerNetworkThread.runOnce();
-
verify(applicationEventProcessor).process(any(AssignmentChangeEvent.class));
-verify(networkClient, times(1)).poll(anyLong(), anyLong());
-verify(commitRequestManager, 
times(1)).updateAutoCommitTimer(currentTimeMs);
-// Assignment change should generate an async commit (not retried).
-verify(commitRequestManager, times(1)).maybeAutoCommitAsync();
-}
+public void testPollResultTimer() {
+NetworkClientDelegate networkClientDelegate = new 
NetworkClientDelegate(
+time,
+config,
+logContext,
+client
+);
 
-@Test
-void testFetchTopicMetadata() {
-applicationEventsQueue.add(new TopicMetadataEvent("topic", 
Long.MAX_VALUE));
-consumerNetworkThread.runOnce();
-
verify(applicationEventProcessor).process(any(TopicMetadataEvent.class));
+NetworkClientDelegate.UnsentRequest req = new 
NetworkClientDelegate.UnsentRequest(
+new FindCoordinatorRequest.Builder(
+new FindCoordinatorRequestData()
+
.setKeyType(FindCoordinatorRequest.CoordinatorType.TRANSACTION.id())
+.setKey("foobar")),
+Optional.empty());
+req.setTimer(time, DEFAULT_REQUEST_TIMEOUT_MS);
+
+// purposely setting a non-MAX time to ensure it is returning 
Long.MAX_VALUE upon success
+NetworkClientDelegate.PollResult success = new 
NetworkClientDelegate.PollResult(
+10,
+Collections.singletonList(req));
+assertEquals(10, networkClientDelegate.addAll(success));
+
+NetworkClientDelegate.PollResult failure = new 
NetworkClientDelegate.PollResult(
+10,
+new ArrayList<>());
+assertEquals(10, networkClientDelegate.addAll(failure));

Review Comment:
   It sems some of my changes were overwritten when I merged, I am fixing that 
up



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-07 Thread via GitHub


lianetm commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1631518633


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -211,178 +232,80 @@ public void testResetPositionsProcessFailureIsIgnored() {
 }
 
 @Test
-public void testValidatePositionsEventIsProcessed() {
-ValidatePositionsEvent e = new 
ValidatePositionsEvent(calculateDeadlineMs(time, 100));
-applicationEventsQueue.add(e);
-consumerNetworkThread.runOnce();
-
verify(applicationEventProcessor).process(any(ValidatePositionsEvent.class));
-assertTrue(applicationEventsQueue.isEmpty());
-}
-
-@Test
-public void testAssignmentChangeEvent() {
-HashMap offset = 
mockTopicPartitionOffset();
-
-final long currentTimeMs = time.milliseconds();
-ApplicationEvent e = new AssignmentChangeEvent(offset, currentTimeMs);
-applicationEventsQueue.add(e);
-
-consumerNetworkThread.runOnce();
-
verify(applicationEventProcessor).process(any(AssignmentChangeEvent.class));
-verify(networkClient, times(1)).poll(anyLong(), anyLong());
-verify(commitRequestManager, 
times(1)).updateAutoCommitTimer(currentTimeMs);
-// Assignment change should generate an async commit (not retried).
-verify(commitRequestManager, times(1)).maybeAutoCommitAsync();
-}
+public void testPollResultTimer() {
+NetworkClientDelegate networkClientDelegate = new 
NetworkClientDelegate(
+time,
+config,
+logContext,
+client
+);
 
-@Test
-void testFetchTopicMetadata() {
-applicationEventsQueue.add(new TopicMetadataEvent("topic", 
Long.MAX_VALUE));
-consumerNetworkThread.runOnce();
-
verify(applicationEventProcessor).process(any(TopicMetadataEvent.class));
+NetworkClientDelegate.UnsentRequest req = new 
NetworkClientDelegate.UnsentRequest(
+new FindCoordinatorRequest.Builder(
+new FindCoordinatorRequestData()
+
.setKeyType(FindCoordinatorRequest.CoordinatorType.TRANSACTION.id())
+.setKey("foobar")),
+Optional.empty());
+req.setTimer(time, DEFAULT_REQUEST_TIMEOUT_MS);
+
+// purposely setting a non-MAX time to ensure it is returning 
Long.MAX_VALUE upon success
+NetworkClientDelegate.PollResult success = new 
NetworkClientDelegate.PollResult(
+10,
+Collections.singletonList(req));
+assertEquals(10, networkClientDelegate.addAll(success));
+
+NetworkClientDelegate.PollResult failure = new 
NetworkClientDelegate.PollResult(
+10,
+new ArrayList<>());
+assertEquals(10, networkClientDelegate.addAll(failure));

Review Comment:
   this seems like a duplicate, not aligned with the comment, what's the 
intention? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-07 Thread via GitHub


brenden20 commented on PR #16140:
URL: https://github.com/apache/kafka/pull/16140#issuecomment-2155226928

   @lianetm implemented suggestions and merged with trunk for no conflicts


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-07 Thread via GitHub


brenden20 commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1631457443


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -258,155 +252,66 @@ void testPollResultTimer() {
 NetworkClientDelegate.PollResult success = new 
NetworkClientDelegate.PollResult(
 10,
 Collections.singletonList(req));
-assertEquals(10, networkClient.addAll(success));
+assertEquals(10, networkClientDelegate.addAll(success));
 
 NetworkClientDelegate.PollResult failure = new 
NetworkClientDelegate.PollResult(
 10,
 new ArrayList<>());
-assertEquals(10, networkClient.addAll(failure));
+assertEquals(10, networkClientDelegate.addAll(failure));
 }
 
 @Test
-void testMaximumTimeToWait() {
+public void testMaximumTimeToWait() {
 // Initial value before runOnce has been called
 assertEquals(ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS, 
consumerNetworkThread.maximumTimeToWait());
-consumerNetworkThread.runOnce();
-// After runOnce has been called, it takes the default heartbeat 
interval from the heartbeat request manager
-assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, 
consumerNetworkThread.maximumTimeToWait());
-}
 
-@Test
-void testRequestManagersArePolledOnce() {
-consumerNetworkThread.runOnce();
-testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm 
-> verify(rm, times(1)).poll(anyLong(;
-testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm 
-> verify(rm, times(1)).maximumTimeToWait(anyLong(;
-verify(networkClient, times(1)).poll(anyLong(), anyLong());
-}
+
when(requestManagers.entries()).thenReturn(Collections.singletonList(Optional.of(heartbeatRequestManager)));
+
when(heartbeatRequestManager.maximumTimeToWait(time.milliseconds())).thenReturn((long)
 DEFAULT_HEARTBEAT_INTERVAL_MS);
 
-@Test
-void testEnsureMetadataUpdateOnPoll() {
-MetadataResponse metadataResponse = 
RequestTestUtils.metadataUpdateWith(2, Collections.emptyMap());
-client.prepareMetadataUpdate(metadataResponse);
-metadata.requestUpdate(false);
 consumerNetworkThread.runOnce();
-verify(metadata, 
times(1)).updateWithCurrentRequestVersion(eq(metadataResponse), eq(false), 
anyLong());
-}
-
-@Test
-void testEnsureEventsAreCompleted() {
-// Mimic the logic of CompletableEventReaper.reap(Collection):
-doAnswer(__ -> {
-Iterator i = applicationEventsQueue.iterator();
-
-while (i.hasNext()) {
-ApplicationEvent event = i.next();
-
-if (event instanceof CompletableEvent)
-((CompletableEvent) 
event).future().completeExceptionally(new TimeoutException());
-
-i.remove();
-}
-
-return null;
-}).when(applicationEventReaper).reap(any(Collection.class));
-
-Node node = metadata.fetch().nodes().get(0);
-coordinatorRequestManager.markCoordinatorUnknown("test", 
time.milliseconds());
-
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
"group-id", node));
-prepareOffsetCommitRequest(new HashMap<>(), Errors.NONE, false);
-CompletableApplicationEvent event1 = spy(new 
AsyncCommitEvent(Collections.emptyMap()));
-ApplicationEvent event2 = new AsyncCommitEvent(Collections.emptyMap());
-CompletableFuture future = new CompletableFuture<>();
-when(event1.future()).thenReturn(future);
-applicationEventsQueue.add(event1);
-applicationEventsQueue.add(event2);
-assertFalse(future.isDone());
-assertFalse(applicationEventsQueue.isEmpty());
-consumerNetworkThread.cleanup();
-assertTrue(future.isCompletedExceptionally());
-assertTrue(applicationEventsQueue.isEmpty());
+// After runOnce has been called, it takes the default heartbeat 
interval from the heartbeat request manager
+assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, 
consumerNetworkThread.maximumTimeToWait());
 }
 
 @Test
-void testCleanupInvokesReaper() {
+public void testCleanupInvokesReaper() {
+LinkedList queue = new 
LinkedList<>();
+when(networkClientDelegate.unsentRequests()).thenReturn(queue);
 consumerNetworkThread.cleanup();
 verify(applicationEventReaper).reap(applicationEventsQueue);
 }
 
 @Test
-void testRunOnceInvokesReaper() {
+public void testRunOnceInvokesReaper() {
 consumerNetworkThread.runOnce();
 verify(applicationEventReaper).reap(any(Long.class));
 }
 
 @Test
-void testSendUnsentRequest() {
-String groupId = "group-id";
-NetworkClientDelegate.UnsentRequest request = new 
NetworkClientD

Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-07 Thread via GitHub


lianetm commented on PR #16140:
URL: https://github.com/apache/kafka/pull/16140#issuecomment-2154902452

   Hey @brenden20 , thanks for the update, just 2 minor comments left. Could 
you please get trunk latests changes and solve the conflicts? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-07 Thread via GitHub


lianetm commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1631259791


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -258,155 +252,66 @@ void testPollResultTimer() {
 NetworkClientDelegate.PollResult success = new 
NetworkClientDelegate.PollResult(
 10,
 Collections.singletonList(req));
-assertEquals(10, networkClient.addAll(success));
+assertEquals(10, networkClientDelegate.addAll(success));
 
 NetworkClientDelegate.PollResult failure = new 
NetworkClientDelegate.PollResult(
 10,
 new ArrayList<>());
-assertEquals(10, networkClient.addAll(failure));
+assertEquals(10, networkClientDelegate.addAll(failure));
 }
 
 @Test
-void testMaximumTimeToWait() {
+public void testMaximumTimeToWait() {
 // Initial value before runOnce has been called
 assertEquals(ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS, 
consumerNetworkThread.maximumTimeToWait());
-consumerNetworkThread.runOnce();
-// After runOnce has been called, it takes the default heartbeat 
interval from the heartbeat request manager
-assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, 
consumerNetworkThread.maximumTimeToWait());
-}
 
-@Test
-void testRequestManagersArePolledOnce() {
-consumerNetworkThread.runOnce();
-testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm 
-> verify(rm, times(1)).poll(anyLong(;
-testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm 
-> verify(rm, times(1)).maximumTimeToWait(anyLong(;
-verify(networkClient, times(1)).poll(anyLong(), anyLong());
-}
+
when(requestManagers.entries()).thenReturn(Collections.singletonList(Optional.of(heartbeatRequestManager)));
+
when(heartbeatRequestManager.maximumTimeToWait(time.milliseconds())).thenReturn((long)
 DEFAULT_HEARTBEAT_INTERVAL_MS);
 
-@Test
-void testEnsureMetadataUpdateOnPoll() {
-MetadataResponse metadataResponse = 
RequestTestUtils.metadataUpdateWith(2, Collections.emptyMap());
-client.prepareMetadataUpdate(metadataResponse);
-metadata.requestUpdate(false);
 consumerNetworkThread.runOnce();
-verify(metadata, 
times(1)).updateWithCurrentRequestVersion(eq(metadataResponse), eq(false), 
anyLong());
-}
-
-@Test
-void testEnsureEventsAreCompleted() {
-// Mimic the logic of CompletableEventReaper.reap(Collection):
-doAnswer(__ -> {
-Iterator i = applicationEventsQueue.iterator();
-
-while (i.hasNext()) {
-ApplicationEvent event = i.next();
-
-if (event instanceof CompletableEvent)
-((CompletableEvent) 
event).future().completeExceptionally(new TimeoutException());
-
-i.remove();
-}
-
-return null;
-}).when(applicationEventReaper).reap(any(Collection.class));
-
-Node node = metadata.fetch().nodes().get(0);
-coordinatorRequestManager.markCoordinatorUnknown("test", 
time.milliseconds());
-
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
"group-id", node));
-prepareOffsetCommitRequest(new HashMap<>(), Errors.NONE, false);
-CompletableApplicationEvent event1 = spy(new 
AsyncCommitEvent(Collections.emptyMap()));
-ApplicationEvent event2 = new AsyncCommitEvent(Collections.emptyMap());
-CompletableFuture future = new CompletableFuture<>();
-when(event1.future()).thenReturn(future);
-applicationEventsQueue.add(event1);
-applicationEventsQueue.add(event2);
-assertFalse(future.isDone());
-assertFalse(applicationEventsQueue.isEmpty());
-consumerNetworkThread.cleanup();
-assertTrue(future.isCompletedExceptionally());
-assertTrue(applicationEventsQueue.isEmpty());
+// After runOnce has been called, it takes the default heartbeat 
interval from the heartbeat request manager
+assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, 
consumerNetworkThread.maximumTimeToWait());
 }
 
 @Test
-void testCleanupInvokesReaper() {
+public void testCleanupInvokesReaper() {
+LinkedList queue = new 
LinkedList<>();
+when(networkClientDelegate.unsentRequests()).thenReturn(queue);
 consumerNetworkThread.cleanup();
 verify(applicationEventReaper).reap(applicationEventsQueue);
 }
 
 @Test
-void testRunOnceInvokesReaper() {
+public void testRunOnceInvokesReaper() {
 consumerNetworkThread.runOnce();
 verify(applicationEventReaper).reap(any(Long.class));
 }
 
 @Test
-void testSendUnsentRequest() {
-String groupId = "group-id";
-NetworkClientDelegate.UnsentRequest request = new 
NetworkClientDel

Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-07 Thread via GitHub


lianetm commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1631257210


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -258,155 +252,66 @@ void testPollResultTimer() {
 NetworkClientDelegate.PollResult success = new 
NetworkClientDelegate.PollResult(
 10,
 Collections.singletonList(req));
-assertEquals(10, networkClient.addAll(success));
+assertEquals(10, networkClientDelegate.addAll(success));
 
 NetworkClientDelegate.PollResult failure = new 
NetworkClientDelegate.PollResult(
 10,
 new ArrayList<>());
-assertEquals(10, networkClient.addAll(failure));
+assertEquals(10, networkClientDelegate.addAll(failure));
 }
 
 @Test
-void testMaximumTimeToWait() {
+public void testMaximumTimeToWait() {
 // Initial value before runOnce has been called
 assertEquals(ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS, 
consumerNetworkThread.maximumTimeToWait());
-consumerNetworkThread.runOnce();
-// After runOnce has been called, it takes the default heartbeat 
interval from the heartbeat request manager
-assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, 
consumerNetworkThread.maximumTimeToWait());
-}
 
-@Test
-void testRequestManagersArePolledOnce() {
-consumerNetworkThread.runOnce();
-testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm 
-> verify(rm, times(1)).poll(anyLong(;
-testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm 
-> verify(rm, times(1)).maximumTimeToWait(anyLong(;
-verify(networkClient, times(1)).poll(anyLong(), anyLong());
-}
+
when(requestManagers.entries()).thenReturn(Collections.singletonList(Optional.of(heartbeatRequestManager)));
+
when(heartbeatRequestManager.maximumTimeToWait(time.milliseconds())).thenReturn((long)
 DEFAULT_HEARTBEAT_INTERVAL_MS);
 
-@Test
-void testEnsureMetadataUpdateOnPoll() {
-MetadataResponse metadataResponse = 
RequestTestUtils.metadataUpdateWith(2, Collections.emptyMap());
-client.prepareMetadataUpdate(metadataResponse);
-metadata.requestUpdate(false);
 consumerNetworkThread.runOnce();
-verify(metadata, 
times(1)).updateWithCurrentRequestVersion(eq(metadataResponse), eq(false), 
anyLong());
-}
-
-@Test
-void testEnsureEventsAreCompleted() {
-// Mimic the logic of CompletableEventReaper.reap(Collection):
-doAnswer(__ -> {
-Iterator i = applicationEventsQueue.iterator();
-
-while (i.hasNext()) {
-ApplicationEvent event = i.next();
-
-if (event instanceof CompletableEvent)
-((CompletableEvent) 
event).future().completeExceptionally(new TimeoutException());
-
-i.remove();
-}
-
-return null;
-}).when(applicationEventReaper).reap(any(Collection.class));
-
-Node node = metadata.fetch().nodes().get(0);
-coordinatorRequestManager.markCoordinatorUnknown("test", 
time.milliseconds());
-
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
"group-id", node));
-prepareOffsetCommitRequest(new HashMap<>(), Errors.NONE, false);
-CompletableApplicationEvent event1 = spy(new 
AsyncCommitEvent(Collections.emptyMap()));
-ApplicationEvent event2 = new AsyncCommitEvent(Collections.emptyMap());
-CompletableFuture future = new CompletableFuture<>();
-when(event1.future()).thenReturn(future);
-applicationEventsQueue.add(event1);
-applicationEventsQueue.add(event2);
-assertFalse(future.isDone());
-assertFalse(applicationEventsQueue.isEmpty());
-consumerNetworkThread.cleanup();
-assertTrue(future.isCompletedExceptionally());
-assertTrue(applicationEventsQueue.isEmpty());
+// After runOnce has been called, it takes the default heartbeat 
interval from the heartbeat request manager
+assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, 
consumerNetworkThread.maximumTimeToWait());
 }
 
 @Test
-void testCleanupInvokesReaper() {
+public void testCleanupInvokesReaper() {
+LinkedList queue = new 
LinkedList<>();
+when(networkClientDelegate.unsentRequests()).thenReturn(queue);
 consumerNetworkThread.cleanup();
 verify(applicationEventReaper).reap(applicationEventsQueue);
 }
 
 @Test
-void testRunOnceInvokesReaper() {
+public void testRunOnceInvokesReaper() {
 consumerNetworkThread.runOnce();
 verify(applicationEventReaper).reap(any(Long.class));
 }
 
 @Test
-void testSendUnsentRequest() {
-String groupId = "group-id";
-NetworkClientDelegate.UnsentRequest request = new 
NetworkClientDel

Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-06 Thread via GitHub


brenden20 commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1630153025


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -341,6 +309,24 @@ void testRunOnceInvokesReaper() {
 
 @Test
 void testSendUnsentRequest() {
+NetworkClientDelegate networkClientDelegate = new 
NetworkClientDelegate(

Review Comment:
   Got that done! Thank you



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-06 Thread via GitHub


brenden20 commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1630086160


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -212,40 +251,15 @@ public void testResetPositionsProcessFailureIsIgnored() {
 
verify(applicationEventProcessor).process(any(ResetPositionsEvent.class));
 }
 
-@Test
-public void testValidatePositionsEventIsProcessed() {
-ValidatePositionsEvent e = new 
ValidatePositionsEvent(calculateDeadlineMs(time, 100));
-applicationEventsQueue.add(e);
-consumerNetworkThread.runOnce();
-
verify(applicationEventProcessor).process(any(ValidatePositionsEvent.class));
-assertTrue(applicationEventsQueue.isEmpty());
-}
-
-@Test
-public void testAssignmentChangeEvent() {
-HashMap offset = 
mockTopicPartitionOffset();
-
-final long currentTimeMs = time.milliseconds();
-ApplicationEvent e = new AssignmentChangeEvent(offset, currentTimeMs);
-applicationEventsQueue.add(e);
-
-consumerNetworkThread.runOnce();
-
verify(applicationEventProcessor).process(any(AssignmentChangeEvent.class));
-verify(networkClient, times(1)).poll(anyLong(), anyLong());
-verify(commitRequestManager, 
times(1)).updateAutoCommitTimer(currentTimeMs);
-// Assignment change should generate an async commit (not retried).
-verify(commitRequestManager, times(1)).maybeAutoCommitAsync();
-}
-
-@Test
-void testFetchTopicMetadata() {
-applicationEventsQueue.add(new TopicMetadataEvent("topic", 
Long.MAX_VALUE));
-consumerNetworkThread.runOnce();
-
verify(applicationEventProcessor).process(any(TopicMetadataEvent.class));
-}
-
 @Test
 void testPollResultTimer() {

Review Comment:
   Good point, I will make a separate ticket and PR to handle this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-06 Thread via GitHub


lianetm commented on PR #16140:
URL: https://github.com/apache/kafka/pull/16140#issuecomment-2153227747

   I completed another pass @brenden20 , left some comments to consider getting 
rid of some instances that we couldn't still mock but seems possible. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-06 Thread via GitHub


brenden20 commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1630082187


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -150,35 +184,49 @@ public void testStartupAndTearDown() throws 
InterruptedException {
 }
 
 @Test
-public void testApplicationEvent() {
-ApplicationEvent e = new PollEvent(100);
-applicationEventsQueue.add(e);
+public void testRequestsTransferFromManagersToClientOnThreadRun() {
+List> list = new ArrayList<>();
+list.add(Optional.of(coordinatorRequestManager));
+list.add(Optional.of(heartbeatRequestManager));
+list.add(Optional.of(offsetsRequestManager));
+
+when(requestManagers.entries()).thenReturn(list);
+
when(coordinatorRequestManager.poll(anyLong())).thenReturn(mock(NetworkClientDelegate.PollResult.class));
 consumerNetworkThread.runOnce();
-verify(applicationEventProcessor, times(1)).process(e);
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm).poll(anyLong(;
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm).maximumTimeToWait(anyLong(;
+
verify(networkClientDelegate).addAll(any(NetworkClientDelegate.PollResult.class));
+verify(networkClientDelegate).poll(anyLong(), anyLong());
 }
 
-@Test
-public void testMetadataUpdateEvent() {
-ApplicationEvent e = new NewTopicsMetadataUpdateRequestEvent();
+@ParameterizedTest
+@MethodSource("applicationEvents")
+public void testApplicationEventIsProcessed(ApplicationEvent e) {
 applicationEventsQueue.add(e);
 consumerNetworkThread.runOnce();
-verify(metadata).requestUpdateForNewTopics();
-}
 
-@Test
-public void testAsyncCommitEvent() {
-ApplicationEvent e = new AsyncCommitEvent(new HashMap<>());
-applicationEventsQueue.add(e);
-consumerNetworkThread.runOnce();
-verify(applicationEventProcessor).process(any(AsyncCommitEvent.class));
+if (e instanceof CompletableEvent)
+verify(applicationEventReaper).add((CompletableEvent) e);
+
+verify(applicationEventProcessor).process(any(e.getClass()));
+assertTrue(applicationEventsQueue.isEmpty());
 }
 
-@Test
-public void testSyncCommitEvent() {
-ApplicationEvent e = new SyncCommitEvent(new HashMap<>(), 
calculateDeadlineMs(time, 100));
-applicationEventsQueue.add(e);
-consumerNetworkThread.runOnce();
-verify(applicationEventProcessor).process(any(SyncCommitEvent.class));
+private static Stream applicationEvents() {
+Time time1 = new MockTime();
+Map offset = 
mockTopicPartitionOffset();
+final long currentTimeMs = time1.milliseconds();
+
+// use 500 for deadlineMs

Review Comment:
   Mistake leaving that there, fixed now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-06 Thread via GitHub


lianetm commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1630075479


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -341,6 +309,24 @@ void testRunOnceInvokesReaper() {
 
 @Test
 void testSendUnsentRequest() {
+NetworkClientDelegate networkClientDelegate = new 
NetworkClientDelegate(

Review Comment:
   This could all be greatly simplified. What about something like:
   
   ```
   
when(networkClientDelegate.hasAnyPendingRequests()).thenReturn(true).thenReturn(true).thenReturn(false);
   consumerNetworkThread.cleanup();
   verify(networkClientDelegate, times(2)).poll(anyLong(), anyLong());
   ```
   
   The responsibility of the component is only to poll the network client as 
long as there are pending requests, so with the above we verify that (no need 
to get into creating an actual network client and assert on it, etc). Makes 
sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-06 Thread via GitHub


lianetm commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1630075479


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -341,6 +309,24 @@ void testRunOnceInvokesReaper() {
 
 @Test
 void testSendUnsentRequest() {
+NetworkClientDelegate networkClientDelegate = new 
NetworkClientDelegate(

Review Comment:
   This could all be greatly simplified. What about replacing the whole test 
with something like:
   
   ```
   
when(networkClientDelegate.hasAnyPendingRequests()).thenReturn(true).thenReturn(true).thenReturn(false);
   consumerNetworkThread.cleanup();
   verify(networkClientDelegate, times(2)).poll(anyLong(), anyLong());
   ```
   
   The responsibility of the component is only to poll the network client as 
long as there are pending requests, so with the above we verify that (no need 
to get into creating an actual network client and assert on it, etc). Makes 
sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-06 Thread via GitHub


lianetm commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1630075479


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -341,6 +309,24 @@ void testRunOnceInvokesReaper() {
 
 @Test
 void testSendUnsentRequest() {
+NetworkClientDelegate networkClientDelegate = new 
NetworkClientDelegate(

Review Comment:
   This could all be greatly simplified. What about something like:
   
   ```
   
when(networkClientDelegate.hasAnyPendingRequests()).thenReturn(true).thenReturn(true).thenReturn(false);
   consumerNetworkThread.cleanup();
   verify(networkClientDelegate, times(2)).poll(anyLong(), anyLong());
   ```
   
   The responsibility of the component is only to poll the network client as 
long as there are pending requests, so with the above we verify that (no need 
to get into creating an actual network client and assert on it, etc). Makes 
sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-06 Thread via GitHub


lianetm commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1630033370


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -212,40 +251,15 @@ public void testResetPositionsProcessFailureIsIgnored() {
 
verify(applicationEventProcessor).process(any(ResetPositionsEvent.class));
 }
 
-@Test
-public void testValidatePositionsEventIsProcessed() {
-ValidatePositionsEvent e = new 
ValidatePositionsEvent(calculateDeadlineMs(time, 100));
-applicationEventsQueue.add(e);
-consumerNetworkThread.runOnce();
-
verify(applicationEventProcessor).process(any(ValidatePositionsEvent.class));
-assertTrue(applicationEventsQueue.isEmpty());
-}
-
-@Test
-public void testAssignmentChangeEvent() {
-HashMap offset = 
mockTopicPartitionOffset();
-
-final long currentTimeMs = time.milliseconds();
-ApplicationEvent e = new AssignmentChangeEvent(offset, currentTimeMs);
-applicationEventsQueue.add(e);
-
-consumerNetworkThread.runOnce();
-
verify(applicationEventProcessor).process(any(AssignmentChangeEvent.class));
-verify(networkClient, times(1)).poll(anyLong(), anyLong());
-verify(commitRequestManager, 
times(1)).updateAutoCommitTimer(currentTimeMs);
-// Assignment change should generate an async commit (not retried).
-verify(commitRequestManager, times(1)).maybeAutoCommitAsync();
-}
-
-@Test
-void testFetchTopicMetadata() {
-applicationEventsQueue.add(new TopicMetadataEvent("topic", 
Long.MAX_VALUE));
-consumerNetworkThread.runOnce();
-
verify(applicationEventProcessor).process(any(TopicMetadataEvent.class));
-}
-
 @Test
 void testPollResultTimer() {

Review Comment:
   This test is all about the `NetworkClientDelegate`, no interaction at all 
with the `ConsumerNetworkThread` right? Should be completely moved to 
`NetorkClientDelegateTest` I would say (this PR or separately, as you wish)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-06 Thread via GitHub


lianetm commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1630022846


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -258,77 +272,31 @@ void testPollResultTimer() {
 NetworkClientDelegate.PollResult success = new 
NetworkClientDelegate.PollResult(
 10,
 Collections.singletonList(req));
-assertEquals(10, networkClient.addAll(success));
+assertEquals(10, networkClientDelegate.addAll(success));
 
 NetworkClientDelegate.PollResult failure = new 
NetworkClientDelegate.PollResult(
 10,
 new ArrayList<>());
-assertEquals(10, networkClient.addAll(failure));
+assertEquals(10, networkClientDelegate.addAll(failure));
 }
 
 @Test
 void testMaximumTimeToWait() {
 // Initial value before runOnce has been called
 assertEquals(ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS, 
consumerNetworkThread.maximumTimeToWait());
-consumerNetworkThread.runOnce();
-// After runOnce has been called, it takes the default heartbeat 
interval from the heartbeat request manager
-assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, 
consumerNetworkThread.maximumTimeToWait());
-}
 
-@Test
-void testRequestManagersArePolledOnce() {
-consumerNetworkThread.runOnce();
-testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm 
-> verify(rm, times(1)).poll(anyLong(;
-testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm 
-> verify(rm, times(1)).maximumTimeToWait(anyLong(;
-verify(networkClient, times(1)).poll(anyLong(), anyLong());
-}
+
when(requestManagers.entries()).thenReturn(Collections.singletonList(Optional.of(heartbeatRequestManager)));
+
when(heartbeatRequestManager.maximumTimeToWait(time.milliseconds())).thenReturn((long)
 DEFAULT_HEARTBEAT_INTERVAL_MS);
 
-@Test
-void testEnsureMetadataUpdateOnPoll() {
-MetadataResponse metadataResponse = 
RequestTestUtils.metadataUpdateWith(2, Collections.emptyMap());
-client.prepareMetadataUpdate(metadataResponse);
-metadata.requestUpdate(false);
 consumerNetworkThread.runOnce();
-verify(metadata, 
times(1)).updateWithCurrentRequestVersion(eq(metadataResponse), eq(false), 
anyLong());
-}
-
-@Test
-void testEnsureEventsAreCompleted() {

Review Comment:
   yay! thanks for removing this! it's been hanging around for too long without 
value



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-06 Thread via GitHub


lianetm commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1630020941


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -150,35 +184,49 @@ public void testStartupAndTearDown() throws 
InterruptedException {
 }
 
 @Test
-public void testApplicationEvent() {
-ApplicationEvent e = new PollEvent(100);
-applicationEventsQueue.add(e);
+public void testRequestsTransferFromManagersToClientOnThreadRun() {
+List> list = new ArrayList<>();
+list.add(Optional.of(coordinatorRequestManager));
+list.add(Optional.of(heartbeatRequestManager));
+list.add(Optional.of(offsetsRequestManager));
+
+when(requestManagers.entries()).thenReturn(list);
+
when(coordinatorRequestManager.poll(anyLong())).thenReturn(mock(NetworkClientDelegate.PollResult.class));
 consumerNetworkThread.runOnce();
-verify(applicationEventProcessor, times(1)).process(e);
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm).poll(anyLong(;
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm).maximumTimeToWait(anyLong(;
+
verify(networkClientDelegate).addAll(any(NetworkClientDelegate.PollResult.class));
+verify(networkClientDelegate).poll(anyLong(), anyLong());
 }
 
-@Test
-public void testMetadataUpdateEvent() {
-ApplicationEvent e = new NewTopicsMetadataUpdateRequestEvent();
+@ParameterizedTest
+@MethodSource("applicationEvents")
+public void testApplicationEventIsProcessed(ApplicationEvent e) {
 applicationEventsQueue.add(e);
 consumerNetworkThread.runOnce();
-verify(metadata).requestUpdateForNewTopics();
-}
 
-@Test
-public void testAsyncCommitEvent() {
-ApplicationEvent e = new AsyncCommitEvent(new HashMap<>());
-applicationEventsQueue.add(e);
-consumerNetworkThread.runOnce();
-verify(applicationEventProcessor).process(any(AsyncCommitEvent.class));
+if (e instanceof CompletableEvent)
+verify(applicationEventReaper).add((CompletableEvent) e);
+
+verify(applicationEventProcessor).process(any(e.getClass()));
+assertTrue(applicationEventsQueue.isEmpty());
 }
 
-@Test
-public void testSyncCommitEvent() {
-ApplicationEvent e = new SyncCommitEvent(new HashMap<>(), 
calculateDeadlineMs(time, 100));
-applicationEventsQueue.add(e);
-consumerNetworkThread.runOnce();
-verify(applicationEventProcessor).process(any(SyncCommitEvent.class));
+private static Stream applicationEvents() {
+Time time1 = new MockTime();
+Map offset = 
mockTopicPartitionOffset();
+final long currentTimeMs = time1.milliseconds();
+
+// use 500 for deadlineMs

Review Comment:
   what's this comment for?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-06 Thread via GitHub


lianetm commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1629578263


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -150,35 +200,44 @@ public void testStartupAndTearDown() throws 
InterruptedException {
 }
 
 @Test
-public void testApplicationEvent() {
-ApplicationEvent e = new PollEvent(100);
-applicationEventsQueue.add(e);
+public void testRequestManagersArePolledOnce() {

Review Comment:
   Yes, this part we're testing gets the requests from the managers and adds 
them to the network client (that's the transfer I was referring to). But 
whatever name we prefer, the point is to reflect what we're testing (which is 
not "managersPolledOnce")



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-05 Thread via GitHub


philipnee commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1628618091


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -74,68 +79,113 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class ConsumerNetworkThreadTest {
+static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000;
+static final int DEFAULT_REQUEST_TIMEOUT_MS = 500;
+
+private final Time time;
+private final ConsumerMetadata metadata;
+private final BlockingQueue applicationEventsQueue;
+private final ApplicationEventProcessor applicationEventProcessor;
+private final OffsetsRequestManager offsetsRequestManager;
+private final HeartbeatRequestManager heartbeatRequestManager;
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final ConsumerNetworkThread consumerNetworkThread;
+private final MockClient client;
+private final NetworkClientDelegate networkClientDelegate;
+private final NetworkClientDelegate networkClient;
+private final RequestManagers requestManagers;
+private final CompletableEventReaper applicationEventReaper;
+
+ConsumerNetworkThreadTest() {
+ConsumerConfig config = mock(ConsumerConfig.class);
+this.time = new MockTime();

Review Comment:
   for aesthetic purposes, can we group mock instances and non-mock ones?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-05 Thread via GitHub


philipnee commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1628611058


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -74,68 +79,113 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class ConsumerNetworkThreadTest {
+static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000;
+static final int DEFAULT_REQUEST_TIMEOUT_MS = 500;
+
+private final Time time;
+private final ConsumerMetadata metadata;
+private final BlockingQueue applicationEventsQueue;
+private final ApplicationEventProcessor applicationEventProcessor;
+private final OffsetsRequestManager offsetsRequestManager;
+private final HeartbeatRequestManager heartbeatRequestManager;
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final ConsumerNetworkThread consumerNetworkThread;
+private final MockClient client;
+private final NetworkClientDelegate networkClientDelegate;
+private final NetworkClientDelegate networkClient;
+private final RequestManagers requestManagers;
+private final CompletableEventReaper applicationEventReaper;
+
+ConsumerNetworkThreadTest() {
+ConsumerConfig config = mock(ConsumerConfig.class);
+this.time = new MockTime();
+this.networkClientDelegate = mock(NetworkClientDelegate.class);
+this.requestManagers = mock(RequestManagers.class);
+this.offsetsRequestManager = mock(OffsetsRequestManager.class);
+this.heartbeatRequestManager = mock(HeartbeatRequestManager.class);
+this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+this.metadata = mock(ConsumerMetadata.class);
+this.applicationEventProcessor = mock(ApplicationEventProcessor.class);
+this.applicationEventReaper = mock(CompletableEventReaper.class);
+this.client = new MockClient(time);
+this.applicationEventsQueue = new LinkedBlockingQueue<>();
+LogContext logContext = new LogContext();
+
+this.networkClient = new NetworkClientDelegate(
+time,
+config,
+logContext,
+client
+);
 
-private ConsumerTestBuilder testBuilder;
-private Time time;
-private ConsumerMetadata metadata;
-private NetworkClientDelegate networkClient;
-private BlockingQueue applicationEventsQueue;
-private ApplicationEventProcessor applicationEventProcessor;
-private OffsetsRequestManager offsetsRequestManager;
-private CommitRequestManager commitRequestManager;
-private CoordinatorRequestManager coordinatorRequestManager;
-private ConsumerNetworkThread consumerNetworkThread;
-private final CompletableEventReaper applicationEventReaper = 
mock(CompletableEventReaper.class);
-private MockClient client;
-
-@BeforeEach
-public void setup() {
-testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation());
-time = testBuilder.time;
-metadata = testBuilder.metadata;
-networkClient = testBuilder.networkClientDelegate;
-client = testBuilder.client;
-applicationEventsQueue = testBuilder.applicationEventQueue;
-applicationEventProcessor = testBuilder.applicationEventProcessor;
-commitRequestManager = 
testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new);
-offsetsRequestManager = testBuilder.offsetsRequestManager;
-coordinatorRequestManager = 
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
-consumerNetworkThread = new ConsumerNetworkThread(
-testBuilder.logContext,
+this.consumerNetworkThread = new ConsumerNetworkThread(
+logContext,
 time,
-testBuilder.applicationEventQueue,
+applicationEventsQueue,
 applicationEventReaper,
 () -> applicationEventProcessor,
-() -> testBuilder.networkClientDelegate,
-() -> testBuilder.requestManagers
+() -> networkClientDelegate,

Review Comment:
   you created two networkClientDelegate instances here - you should pick one 
to use.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.o

Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-05 Thread via GitHub


philipnee commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1628610328


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -341,6 +356,17 @@ void testRunOnceInvokesReaper() {
 
 @Test
 void testSendUnsentRequest() {
+ConsumerNetworkThread consumerNetworkThread1 = new 
ConsumerNetworkThread(
+new LogContext(),
+time,
+applicationEventsQueue,
+applicationEventReaper,
+() -> applicationEventProcessor,
+() -> networkClient,
+() -> requestManagers
+);
+consumerNetworkThread1.initializeResources();

Review Comment:
   you can also remove this line. since it's in the setup()



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-05 Thread via GitHub


philipnee commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1628598367


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -341,6 +356,17 @@ void testRunOnceInvokesReaper() {
 
 @Test
 void testSendUnsentRequest() {
+ConsumerNetworkThread consumerNetworkThread1 = new 
ConsumerNetworkThread(

Review Comment:
   can you just use `this.consumerNetworkThread` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-05 Thread via GitHub


philipnee commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1628596559


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -150,35 +200,44 @@ public void testStartupAndTearDown() throws 
InterruptedException {
 }
 
 @Test
-public void testApplicationEvent() {
-ApplicationEvent e = new PollEvent(100);
-applicationEventsQueue.add(e);
+public void testRequestManagersArePolledOnce() {
+List> list = new ArrayList<>();
+list.add(Optional.of(coordinatorRequestManager));
+list.add(Optional.of(heartbeatRequestManager));
+list.add(Optional.of(offsetsRequestManager));
+
+when(requestManagers.entries()).thenReturn(list);
+
when(coordinatorRequestManager.poll(anyLong())).thenReturn(mock(NetworkClientDelegate.PollResult.class));
 consumerNetworkThread.runOnce();
-verify(applicationEventProcessor, times(1)).process(e);
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm).poll(anyLong(;
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm).maximumTimeToWait(anyLong(;
+
verify(networkClientDelegate).addAll(any(NetworkClientDelegate.PollResult.class));
+verify(networkClientDelegate).poll(anyLong(), anyLong());
 }
 
-@Test
-public void testMetadataUpdateEvent() {
-ApplicationEvent e = new NewTopicsMetadataUpdateRequestEvent();
-applicationEventsQueue.add(e);
-consumerNetworkThread.runOnce();
-verify(metadata).requestUpdateForNewTopics();
-}
-
-@Test
-public void testAsyncCommitEvent() {
-ApplicationEvent e = new AsyncCommitEvent(new HashMap<>());
+@ParameterizedTest
+@MethodSource("appEvents")
+public void testEventIsProcessed(ApplicationEvent e) {
 applicationEventsQueue.add(e);
 consumerNetworkThread.runOnce();
-verify(applicationEventProcessor).process(any(AsyncCommitEvent.class));
+verify(applicationEventProcessor).process(any(e.getClass()));
+assertTrue(applicationEventsQueue.isEmpty());
 }
 
-@Test
-public void testSyncCommitEvent() {
-ApplicationEvent e = new SyncCommitEvent(new HashMap<>(), 
calculateDeadlineMs(time, 100));
-applicationEventsQueue.add(e);
-consumerNetworkThread.runOnce();
-verify(applicationEventProcessor).process(any(SyncCommitEvent.class));
+private static Stream appEvents() {
+Time time1 = new MockTime();

Review Comment:
   you can just this.time ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-05 Thread via GitHub


philipnee commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1628596024


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -150,35 +200,44 @@ public void testStartupAndTearDown() throws 
InterruptedException {
 }
 
 @Test
-public void testApplicationEvent() {
-ApplicationEvent e = new PollEvent(100);
-applicationEventsQueue.add(e);
+public void testRequestManagersArePolledOnce() {
+List> list = new ArrayList<>();
+list.add(Optional.of(coordinatorRequestManager));
+list.add(Optional.of(heartbeatRequestManager));
+list.add(Optional.of(offsetsRequestManager));
+
+when(requestManagers.entries()).thenReturn(list);
+
when(coordinatorRequestManager.poll(anyLong())).thenReturn(mock(NetworkClientDelegate.PollResult.class));
 consumerNetworkThread.runOnce();
-verify(applicationEventProcessor, times(1)).process(e);
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm).poll(anyLong(;
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm).maximumTimeToWait(anyLong(;
+
verify(networkClientDelegate).addAll(any(NetworkClientDelegate.PollResult.class));
+verify(networkClientDelegate).poll(anyLong(), anyLong());
 }
 
-@Test
-public void testMetadataUpdateEvent() {
-ApplicationEvent e = new NewTopicsMetadataUpdateRequestEvent();
-applicationEventsQueue.add(e);
-consumerNetworkThread.runOnce();
-verify(metadata).requestUpdateForNewTopics();
-}
-
-@Test
-public void testAsyncCommitEvent() {
-ApplicationEvent e = new AsyncCommitEvent(new HashMap<>());
+@ParameterizedTest
+@MethodSource("appEvents")
+public void testEventIsProcessed(ApplicationEvent e) {
 applicationEventsQueue.add(e);
 consumerNetworkThread.runOnce();
-verify(applicationEventProcessor).process(any(AsyncCommitEvent.class));
+verify(applicationEventProcessor).process(any(e.getClass()));
+assertTrue(applicationEventsQueue.isEmpty());
 }
 
-@Test
-public void testSyncCommitEvent() {
-ApplicationEvent e = new SyncCommitEvent(new HashMap<>(), 
calculateDeadlineMs(time, 100));
-applicationEventsQueue.add(e);
-consumerNetworkThread.runOnce();
-verify(applicationEventProcessor).process(any(SyncCommitEvent.class));
+private static Stream appEvents() {
+Time time1 = new MockTime();

Review Comment:
   can we just call this `time`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-05 Thread via GitHub


philipnee commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1628596024


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -150,35 +200,44 @@ public void testStartupAndTearDown() throws 
InterruptedException {
 }
 
 @Test
-public void testApplicationEvent() {
-ApplicationEvent e = new PollEvent(100);
-applicationEventsQueue.add(e);
+public void testRequestManagersArePolledOnce() {
+List> list = new ArrayList<>();
+list.add(Optional.of(coordinatorRequestManager));
+list.add(Optional.of(heartbeatRequestManager));
+list.add(Optional.of(offsetsRequestManager));
+
+when(requestManagers.entries()).thenReturn(list);
+
when(coordinatorRequestManager.poll(anyLong())).thenReturn(mock(NetworkClientDelegate.PollResult.class));
 consumerNetworkThread.runOnce();
-verify(applicationEventProcessor, times(1)).process(e);
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm).poll(anyLong(;
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm).maximumTimeToWait(anyLong(;
+
verify(networkClientDelegate).addAll(any(NetworkClientDelegate.PollResult.class));
+verify(networkClientDelegate).poll(anyLong(), anyLong());
 }
 
-@Test
-public void testMetadataUpdateEvent() {
-ApplicationEvent e = new NewTopicsMetadataUpdateRequestEvent();
-applicationEventsQueue.add(e);
-consumerNetworkThread.runOnce();
-verify(metadata).requestUpdateForNewTopics();
-}
-
-@Test
-public void testAsyncCommitEvent() {
-ApplicationEvent e = new AsyncCommitEvent(new HashMap<>());
+@ParameterizedTest
+@MethodSource("appEvents")
+public void testEventIsProcessed(ApplicationEvent e) {
 applicationEventsQueue.add(e);
 consumerNetworkThread.runOnce();
-verify(applicationEventProcessor).process(any(AsyncCommitEvent.class));
+verify(applicationEventProcessor).process(any(e.getClass()));
+assertTrue(applicationEventsQueue.isEmpty());
 }
 
-@Test
-public void testSyncCommitEvent() {
-ApplicationEvent e = new SyncCommitEvent(new HashMap<>(), 
calculateDeadlineMs(time, 100));
-applicationEventsQueue.add(e);
-consumerNetworkThread.runOnce();
-verify(applicationEventProcessor).process(any(SyncCommitEvent.class));
+private static Stream appEvents() {
+Time time1 = new MockTime();

Review Comment:
   can we just call this `time`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-05 Thread via GitHub


philipnee commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1628595462


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -150,35 +200,44 @@ public void testStartupAndTearDown() throws 
InterruptedException {
 }
 
 @Test
-public void testApplicationEvent() {
-ApplicationEvent e = new PollEvent(100);
-applicationEventsQueue.add(e);
+public void testRequestManagersArePolledOnce() {
+List> list = new ArrayList<>();
+list.add(Optional.of(coordinatorRequestManager));
+list.add(Optional.of(heartbeatRequestManager));
+list.add(Optional.of(offsetsRequestManager));
+
+when(requestManagers.entries()).thenReturn(list);
+
when(coordinatorRequestManager.poll(anyLong())).thenReturn(mock(NetworkClientDelegate.PollResult.class));
 consumerNetworkThread.runOnce();
-verify(applicationEventProcessor, times(1)).process(e);
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm).poll(anyLong(;
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm).maximumTimeToWait(anyLong(;
+
verify(networkClientDelegate).addAll(any(NetworkClientDelegate.PollResult.class));
+verify(networkClientDelegate).poll(anyLong(), anyLong());
 }
 
-@Test
-public void testMetadataUpdateEvent() {
-ApplicationEvent e = new NewTopicsMetadataUpdateRequestEvent();
-applicationEventsQueue.add(e);
-consumerNetworkThread.runOnce();
-verify(metadata).requestUpdateForNewTopics();
-}
-
-@Test
-public void testAsyncCommitEvent() {
-ApplicationEvent e = new AsyncCommitEvent(new HashMap<>());
+@ParameterizedTest
+@MethodSource("appEvents")
+public void testEventIsProcessed(ApplicationEvent e) {
 applicationEventsQueue.add(e);
 consumerNetworkThread.runOnce();
-verify(applicationEventProcessor).process(any(AsyncCommitEvent.class));
+verify(applicationEventProcessor).process(any(e.getClass()));

Review Comment:
   also need to check this part:
   ```
 if (event instanceof CompletableEvent)
   applicationEventReaper.add((CompletableEvent) event);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-05 Thread via GitHub


philipnee commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1628593481


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -150,35 +200,44 @@ public void testStartupAndTearDown() throws 
InterruptedException {
 }
 
 @Test
-public void testApplicationEvent() {
-ApplicationEvent e = new PollEvent(100);
-applicationEventsQueue.add(e);
+public void testRequestManagersArePolledOnce() {
+List> list = new ArrayList<>();
+list.add(Optional.of(coordinatorRequestManager));
+list.add(Optional.of(heartbeatRequestManager));
+list.add(Optional.of(offsetsRequestManager));
+
+when(requestManagers.entries()).thenReturn(list);
+
when(coordinatorRequestManager.poll(anyLong())).thenReturn(mock(NetworkClientDelegate.PollResult.class));
 consumerNetworkThread.runOnce();
-verify(applicationEventProcessor, times(1)).process(e);
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm).poll(anyLong(;
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm).maximumTimeToWait(anyLong(;
+
verify(networkClientDelegate).addAll(any(NetworkClientDelegate.PollResult.class));
+verify(networkClientDelegate).poll(anyLong(), anyLong());
 }
 
-@Test
-public void testMetadataUpdateEvent() {
-ApplicationEvent e = new NewTopicsMetadataUpdateRequestEvent();
-applicationEventsQueue.add(e);
-consumerNetworkThread.runOnce();
-verify(metadata).requestUpdateForNewTopics();
-}
-
-@Test
-public void testAsyncCommitEvent() {
-ApplicationEvent e = new AsyncCommitEvent(new HashMap<>());
+@ParameterizedTest
+@MethodSource("appEvents")
+public void testEventIsProcessed(ApplicationEvent e) {
 applicationEventsQueue.add(e);
 consumerNetworkThread.runOnce();
-verify(applicationEventProcessor).process(any(AsyncCommitEvent.class));
+verify(applicationEventProcessor).process(any(e.getClass()));

Review Comment:
   this is not right, the method call is `applicationEventProcessor.process();` 
in runOnce()



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-05 Thread via GitHub


philipnee commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1628593481


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -150,35 +200,44 @@ public void testStartupAndTearDown() throws 
InterruptedException {
 }
 
 @Test
-public void testApplicationEvent() {
-ApplicationEvent e = new PollEvent(100);
-applicationEventsQueue.add(e);
+public void testRequestManagersArePolledOnce() {
+List> list = new ArrayList<>();
+list.add(Optional.of(coordinatorRequestManager));
+list.add(Optional.of(heartbeatRequestManager));
+list.add(Optional.of(offsetsRequestManager));
+
+when(requestManagers.entries()).thenReturn(list);
+
when(coordinatorRequestManager.poll(anyLong())).thenReturn(mock(NetworkClientDelegate.PollResult.class));
 consumerNetworkThread.runOnce();
-verify(applicationEventProcessor, times(1)).process(e);
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm).poll(anyLong(;
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm).maximumTimeToWait(anyLong(;
+
verify(networkClientDelegate).addAll(any(NetworkClientDelegate.PollResult.class));
+verify(networkClientDelegate).poll(anyLong(), anyLong());
 }
 
-@Test
-public void testMetadataUpdateEvent() {
-ApplicationEvent e = new NewTopicsMetadataUpdateRequestEvent();
-applicationEventsQueue.add(e);
-consumerNetworkThread.runOnce();
-verify(metadata).requestUpdateForNewTopics();
-}
-
-@Test
-public void testAsyncCommitEvent() {
-ApplicationEvent e = new AsyncCommitEvent(new HashMap<>());
+@ParameterizedTest
+@MethodSource("appEvents")
+public void testEventIsProcessed(ApplicationEvent e) {
 applicationEventsQueue.add(e);
 consumerNetworkThread.runOnce();
-verify(applicationEventProcessor).process(any(AsyncCommitEvent.class));
+verify(applicationEventProcessor).process(any(e.getClass()));

Review Comment:
   this is not right, the method call is `applicationEventProcessor.process();` 
in runOnce()



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-05 Thread via GitHub


philipnee commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1628593280


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -74,68 +79,113 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class ConsumerNetworkThreadTest {
+static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000;
+static final int DEFAULT_REQUEST_TIMEOUT_MS = 500;
+
+private final Time time;
+private final ConsumerMetadata metadata;
+private final BlockingQueue applicationEventsQueue;
+private final ApplicationEventProcessor applicationEventProcessor;
+private final OffsetsRequestManager offsetsRequestManager;
+private final HeartbeatRequestManager heartbeatRequestManager;
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final ConsumerNetworkThread consumerNetworkThread;
+private final MockClient client;
+private final NetworkClientDelegate networkClientDelegate;
+private final NetworkClientDelegate networkClient;
+private final RequestManagers requestManagers;
+private final CompletableEventReaper applicationEventReaper;
+
+ConsumerNetworkThreadTest() {
+ConsumerConfig config = mock(ConsumerConfig.class);
+this.time = new MockTime();
+this.networkClientDelegate = mock(NetworkClientDelegate.class);
+this.requestManagers = mock(RequestManagers.class);
+this.offsetsRequestManager = mock(OffsetsRequestManager.class);
+this.heartbeatRequestManager = mock(HeartbeatRequestManager.class);
+this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+this.metadata = mock(ConsumerMetadata.class);
+this.applicationEventProcessor = mock(ApplicationEventProcessor.class);
+this.applicationEventReaper = mock(CompletableEventReaper.class);
+this.client = new MockClient(time);
+this.applicationEventsQueue = new LinkedBlockingQueue<>();
+LogContext logContext = new LogContext();
+
+this.networkClient = new NetworkClientDelegate(
+time,
+config,
+logContext,
+client
+);
 
-private ConsumerTestBuilder testBuilder;
-private Time time;
-private ConsumerMetadata metadata;
-private NetworkClientDelegate networkClient;
-private BlockingQueue applicationEventsQueue;
-private ApplicationEventProcessor applicationEventProcessor;
-private OffsetsRequestManager offsetsRequestManager;
-private CommitRequestManager commitRequestManager;
-private CoordinatorRequestManager coordinatorRequestManager;
-private ConsumerNetworkThread consumerNetworkThread;
-private final CompletableEventReaper applicationEventReaper = 
mock(CompletableEventReaper.class);
-private MockClient client;
-
-@BeforeEach
-public void setup() {
-testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation());
-time = testBuilder.time;
-metadata = testBuilder.metadata;
-networkClient = testBuilder.networkClientDelegate;
-client = testBuilder.client;
-applicationEventsQueue = testBuilder.applicationEventQueue;
-applicationEventProcessor = testBuilder.applicationEventProcessor;
-commitRequestManager = 
testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new);
-offsetsRequestManager = testBuilder.offsetsRequestManager;
-coordinatorRequestManager = 
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
-consumerNetworkThread = new ConsumerNetworkThread(
-testBuilder.logContext,
+this.consumerNetworkThread = new ConsumerNetworkThread(
+logContext,
 time,
-testBuilder.applicationEventQueue,
+applicationEventsQueue,
 applicationEventReaper,
 () -> applicationEventProcessor,
-() -> testBuilder.networkClientDelegate,
-() -> testBuilder.requestManagers
+() -> networkClientDelegate,
+() -> requestManagers
 );
+}
+
+@BeforeEach
+public void setup() {
 consumerNetworkThread.initializeResources();
 }
 
 @AfterEach
 public void tearDown() {
-if (testBuilder != null) {
-testBuilder.close();
-consumerNetworkThread.close(Durat

Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-05 Thread via GitHub


philipnee commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1628589378


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -150,35 +200,44 @@ public void testStartupAndTearDown() throws 
InterruptedException {
 }
 
 @Test
-public void testApplicationEvent() {
-ApplicationEvent e = new PollEvent(100);
-applicationEventsQueue.add(e);
+public void testRequestManagersArePolledOnce() {

Review Comment:
   i don't quite get the TransferFromManagersToClient part, perhaps you mean by 
`
verify(networkClientDelegate).addAll(any(NetworkClientDelegate.PollResult.class));`
 ?
   
   i agree that the test verifies a lot  more calls than just verifying rms are 
`PolledOnce`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-05 Thread via GitHub


philipnee commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1628502331


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -78,64 +81,120 @@
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class ConsumerNetworkThreadTest {
+static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000;
+static final int DEFAULT_REQUEST_TIMEOUT_MS = 500;
+
+private final Time time;
+private final ConsumerMetadata metadata;
+private final BlockingQueue applicationEventsQueue;
+private final ApplicationEventProcessor applicationEventProcessor;
+private final OffsetsRequestManager offsetsRequestManager;
+private final HeartbeatRequestManager heartbeatRequestManager;
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final ConsumerNetworkThread consumerNetworkThread;
+private final MockClient client;
+private final NetworkClientDelegate networkClientDelegate;
+private final NetworkClientDelegate networkClient;
+private final RequestManagers requestManagers;
+private final CompletableEventReaper applicationEventReaper;
+
+ConsumerNetworkThreadTest() {
+ConsumerConfig config = mock(ConsumerConfig.class);
+this.time = new MockTime();
+this.networkClientDelegate = mock(NetworkClientDelegate.class);
+this.requestManagers = mock(RequestManagers.class);
+this.offsetsRequestManager = mock(OffsetsRequestManager.class);
+this.heartbeatRequestManager = mock(HeartbeatRequestManager.class);
+this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+this.metadata = mock(ConsumerMetadata.class);
+this.applicationEventProcessor = mock(ApplicationEventProcessor.class);
+this.applicationEventReaper = mock(CompletableEventReaper.class);
+this.client = mock(MockClient.class);
+this.applicationEventsQueue = new LinkedBlockingQueue<>();
+LogContext logContext = new LogContext();
+
+this.networkClient = new NetworkClientDelegate(
+time,
+config,
+logContext,
+client
+);
 
-private ConsumerTestBuilder testBuilder;
-private Time time;
-private ConsumerMetadata metadata;
-private NetworkClientDelegate networkClient;
-private BlockingQueue applicationEventsQueue;
-private ApplicationEventProcessor applicationEventProcessor;
-private OffsetsRequestManager offsetsRequestManager;
-private CommitRequestManager commitRequestManager;
-private CoordinatorRequestManager coordinatorRequestManager;
-private ConsumerNetworkThread consumerNetworkThread;
-private final CompletableEventReaper applicationEventReaper = 
mock(CompletableEventReaper.class);
-private MockClient client;
-
-@BeforeEach
-public void setup() {
-testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation());
-time = testBuilder.time;
-metadata = testBuilder.metadata;
-networkClient = testBuilder.networkClientDelegate;
-client = testBuilder.client;
-applicationEventsQueue = testBuilder.applicationEventQueue;
-applicationEventProcessor = testBuilder.applicationEventProcessor;
-commitRequestManager = 
testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new);
-offsetsRequestManager = testBuilder.offsetsRequestManager;
-coordinatorRequestManager = 
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
-consumerNetworkThread = new ConsumerNetworkThread(
-testBuilder.logContext,
+this.consumerNetworkThread = new ConsumerNetworkThread(
+logContext,
 time,
-testBuilder.applicationEventQueue,
+applicationEventsQueue,
 applicationEventReaper,
 () -> applicationEventProcessor,
-() -> testBuilder.networkClientDelegate,
-() -> testBuilder.requestManagers
+() -> networkClientDelegate,
+() -> requestManagers
 );
+}
+
+@BeforeEach
+public void setup() {
 consumerNetworkThread.initializeResources();
 }
 
 @AfterEach
 public void tearDown() {
-if (testBuilder != null) {
-testBuilder.close();
-consumerNetworkThread.close(Duration.ZERO);
-}
+if (consumerNetworkThread != null)
+consumerNetworkThread.close();
+}
+
+@Test
+public void testEnsureCloseStopsRunningThread() {
+// consumerNet

Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-05 Thread via GitHub


lianetm commented on PR #16140:
URL: https://github.com/apache/kafka/pull/16140#issuecomment-2150972168

   Hey @brenden20 , thanks for the updates! Left some minor comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-05 Thread via GitHub


lianetm commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1628443660


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -353,13 +379,22 @@ void testSendUnsentRequest() {
 assertTrue(networkClient.hasAnyPendingRequests());
 assertFalse(networkClient.unsentRequests().isEmpty());
 assertFalse(client.hasInFlightRequests());
-consumerNetworkThread.cleanup();
+consumerNetworkThread1.cleanup();

Review Comment:
   nit: up to you but we really don't need this change, and could keep the var 
name just as `consumerNetworkThread`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-05 Thread via GitHub


lianetm commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1628434821


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -74,68 +52,105 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class ConsumerNetworkThreadTest {
+static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000;
+static final int DEFAULT_REQUEST_TIMEOUT_MS = 500;
+
+private final Time time;
+private final ConsumerMetadata metadata;
+private final BlockingQueue applicationEventsQueue;
+private final ApplicationEventProcessor applicationEventProcessor;
+private final OffsetsRequestManager offsetsRequestManager;
+private final HeartbeatRequestManager heartbeatRequestManager;
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final ConsumerNetworkThread consumerNetworkThread;
+private final MockClient client;
+private final NetworkClientDelegate networkClientDelegate;
+private final NetworkClientDelegate networkClient;
+private final RequestManagers requestManagers;
+private final CompletableEventReaper applicationEventReaper;
+
+ConsumerNetworkThreadTest() {
+LogContext logContext = new LogContext();
+ConsumerConfig config = mock(ConsumerConfig.class);
+this.time = new MockTime();
+this.networkClientDelegate = mock(NetworkClientDelegate.class);
+this.requestManagers = mock(RequestManagers.class);
+this.offsetsRequestManager = mock(OffsetsRequestManager.class);
+this.heartbeatRequestManager = mock(HeartbeatRequestManager.class);
+this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+this.applicationEventsQueue = new LinkedBlockingQueue<>();
+this.metadata = mock(ConsumerMetadata.class);
+this.applicationEventProcessor = mock(ApplicationEventProcessor.class);
+this.applicationEventReaper = mock(CompletableEventReaper.class);
+this.client = new MockClient(time);
+
+this.networkClient = new NetworkClientDelegate(

Review Comment:
   this is a good point, what was the resolution? I expect we should be able to 
mock everything except the ConsumerNetworkThread (but I do see how we have 
tests asserting on the network client so it would require untangling that, is 
that the reason why we're still not mocking the NetworkClientDelegate?). OK for 
me to leave it as it is and do a follow-up btw, just want to understand. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-05 Thread via GitHub


lianetm commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1628418739


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -150,35 +200,44 @@ public void testStartupAndTearDown() throws 
InterruptedException {
 }
 
 @Test
-public void testApplicationEvent() {
-ApplicationEvent e = new PollEvent(100);
-applicationEventsQueue.add(e);
+public void testRequestManagersArePolledOnce() {

Review Comment:
   nit: this name has diverged from the test content I would say. What about 
`testRequestsTransferFromManagersToClientOnThreadRun` or something like it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-05 Thread via GitHub


lianetm commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1628408348


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -74,68 +79,113 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class ConsumerNetworkThreadTest {
+static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000;
+static final int DEFAULT_REQUEST_TIMEOUT_MS = 500;
+
+private final Time time;
+private final ConsumerMetadata metadata;
+private final BlockingQueue applicationEventsQueue;
+private final ApplicationEventProcessor applicationEventProcessor;
+private final OffsetsRequestManager offsetsRequestManager;
+private final HeartbeatRequestManager heartbeatRequestManager;
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final ConsumerNetworkThread consumerNetworkThread;
+private final MockClient client;
+private final NetworkClientDelegate networkClientDelegate;
+private final NetworkClientDelegate networkClient;
+private final RequestManagers requestManagers;
+private final CompletableEventReaper applicationEventReaper;
+
+ConsumerNetworkThreadTest() {
+ConsumerConfig config = mock(ConsumerConfig.class);
+this.time = new MockTime();
+this.networkClientDelegate = mock(NetworkClientDelegate.class);
+this.requestManagers = mock(RequestManagers.class);
+this.offsetsRequestManager = mock(OffsetsRequestManager.class);
+this.heartbeatRequestManager = mock(HeartbeatRequestManager.class);
+this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+this.metadata = mock(ConsumerMetadata.class);
+this.applicationEventProcessor = mock(ApplicationEventProcessor.class);
+this.applicationEventReaper = mock(CompletableEventReaper.class);
+this.client = new MockClient(time);
+this.applicationEventsQueue = new LinkedBlockingQueue<>();
+LogContext logContext = new LogContext();
+
+this.networkClient = new NetworkClientDelegate(
+time,
+config,
+logContext,
+client
+);
 
-private ConsumerTestBuilder testBuilder;
-private Time time;
-private ConsumerMetadata metadata;
-private NetworkClientDelegate networkClient;
-private BlockingQueue applicationEventsQueue;
-private ApplicationEventProcessor applicationEventProcessor;
-private OffsetsRequestManager offsetsRequestManager;
-private CommitRequestManager commitRequestManager;
-private CoordinatorRequestManager coordinatorRequestManager;
-private ConsumerNetworkThread consumerNetworkThread;
-private final CompletableEventReaper applicationEventReaper = 
mock(CompletableEventReaper.class);
-private MockClient client;
-
-@BeforeEach
-public void setup() {
-testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation());
-time = testBuilder.time;
-metadata = testBuilder.metadata;
-networkClient = testBuilder.networkClientDelegate;
-client = testBuilder.client;
-applicationEventsQueue = testBuilder.applicationEventQueue;
-applicationEventProcessor = testBuilder.applicationEventProcessor;
-commitRequestManager = 
testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new);
-offsetsRequestManager = testBuilder.offsetsRequestManager;
-coordinatorRequestManager = 
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
-consumerNetworkThread = new ConsumerNetworkThread(
-testBuilder.logContext,
+this.consumerNetworkThread = new ConsumerNetworkThread(
+logContext,
 time,
-testBuilder.applicationEventQueue,
+applicationEventsQueue,
 applicationEventReaper,
 () -> applicationEventProcessor,
-() -> testBuilder.networkClientDelegate,
-() -> testBuilder.requestManagers
+() -> networkClientDelegate,
+() -> requestManagers
 );
+}
+
+@BeforeEach
+public void setup() {
 consumerNetworkThread.initializeResources();
 }
 
 @AfterEach
 public void tearDown() {
-if (testBuilder != null) {
-testBuilder.close();
-consumerNetworkThread.close(Duratio

Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-05 Thread via GitHub


lianetm commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1628406670


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -74,68 +79,113 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class ConsumerNetworkThreadTest {
+static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000;
+static final int DEFAULT_REQUEST_TIMEOUT_MS = 500;
+
+private final Time time;
+private final ConsumerMetadata metadata;
+private final BlockingQueue applicationEventsQueue;
+private final ApplicationEventProcessor applicationEventProcessor;
+private final OffsetsRequestManager offsetsRequestManager;
+private final HeartbeatRequestManager heartbeatRequestManager;
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final ConsumerNetworkThread consumerNetworkThread;
+private final MockClient client;
+private final NetworkClientDelegate networkClientDelegate;
+private final NetworkClientDelegate networkClient;
+private final RequestManagers requestManagers;
+private final CompletableEventReaper applicationEventReaper;
+
+ConsumerNetworkThreadTest() {
+ConsumerConfig config = mock(ConsumerConfig.class);
+this.time = new MockTime();
+this.networkClientDelegate = mock(NetworkClientDelegate.class);
+this.requestManagers = mock(RequestManagers.class);
+this.offsetsRequestManager = mock(OffsetsRequestManager.class);
+this.heartbeatRequestManager = mock(HeartbeatRequestManager.class);
+this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+this.metadata = mock(ConsumerMetadata.class);
+this.applicationEventProcessor = mock(ApplicationEventProcessor.class);
+this.applicationEventReaper = mock(CompletableEventReaper.class);
+this.client = new MockClient(time);
+this.applicationEventsQueue = new LinkedBlockingQueue<>();
+LogContext logContext = new LogContext();
+
+this.networkClient = new NetworkClientDelegate(
+time,
+config,
+logContext,
+client
+);
 
-private ConsumerTestBuilder testBuilder;
-private Time time;
-private ConsumerMetadata metadata;
-private NetworkClientDelegate networkClient;
-private BlockingQueue applicationEventsQueue;
-private ApplicationEventProcessor applicationEventProcessor;
-private OffsetsRequestManager offsetsRequestManager;
-private CommitRequestManager commitRequestManager;
-private CoordinatorRequestManager coordinatorRequestManager;
-private ConsumerNetworkThread consumerNetworkThread;
-private final CompletableEventReaper applicationEventReaper = 
mock(CompletableEventReaper.class);
-private MockClient client;
-
-@BeforeEach
-public void setup() {
-testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation());
-time = testBuilder.time;
-metadata = testBuilder.metadata;
-networkClient = testBuilder.networkClientDelegate;
-client = testBuilder.client;
-applicationEventsQueue = testBuilder.applicationEventQueue;
-applicationEventProcessor = testBuilder.applicationEventProcessor;
-commitRequestManager = 
testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new);
-offsetsRequestManager = testBuilder.offsetsRequestManager;
-coordinatorRequestManager = 
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
-consumerNetworkThread = new ConsumerNetworkThread(
-testBuilder.logContext,
+this.consumerNetworkThread = new ConsumerNetworkThread(
+logContext,
 time,
-testBuilder.applicationEventQueue,
+applicationEventsQueue,
 applicationEventReaper,
 () -> applicationEventProcessor,
-() -> testBuilder.networkClientDelegate,
-() -> testBuilder.requestManagers
+() -> networkClientDelegate,
+() -> requestManagers
 );
+}
+
+@BeforeEach
+public void setup() {
 consumerNetworkThread.initializeResources();
 }
 
 @AfterEach
 public void tearDown() {
-if (testBuilder != null) {
-testBuilder.close();
-consumerNetworkThread.close(Duratio

Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-05 Thread via GitHub


brenden20 commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1628374297


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -270,30 +326,27 @@ void testPollResultTimer() {
 void testMaximumTimeToWait() {
 // Initial value before runOnce has been called
 assertEquals(ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS, 
consumerNetworkThread.maximumTimeToWait());
+
+
when(requestManagers.entries()).thenReturn(Collections.singletonList(Optional.of(heartbeatRequestManager)));
+
when(heartbeatRequestManager.maximumTimeToWait(time.milliseconds())).thenReturn((long)
 DEFAULT_HEARTBEAT_INTERVAL_MS);
+
 consumerNetworkThread.runOnce();
 // After runOnce has been called, it takes the default heartbeat 
interval from the heartbeat request manager
 assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, 
consumerNetworkThread.maximumTimeToWait());
 }
 
 @Test
-void testRequestManagersArePolledOnce() {
-consumerNetworkThread.runOnce();
-testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm 
-> verify(rm, times(1)).poll(anyLong(;
-testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm 
-> verify(rm, times(1)).maximumTimeToWait(anyLong(;
-verify(networkClient, times(1)).poll(anyLong(), anyLong());
-}
+void testEnsureEventsAreCompleted() {

Review Comment:
   I went ahead and removed this test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-05 Thread via GitHub


lianetm commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1628152712


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -78,64 +81,120 @@
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class ConsumerNetworkThreadTest {
+static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000;
+static final int DEFAULT_REQUEST_TIMEOUT_MS = 500;
+
+private final Time time;
+private final ConsumerMetadata metadata;
+private final BlockingQueue applicationEventsQueue;
+private final ApplicationEventProcessor applicationEventProcessor;
+private final OffsetsRequestManager offsetsRequestManager;
+private final HeartbeatRequestManager heartbeatRequestManager;
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final ConsumerNetworkThread consumerNetworkThread;
+private final MockClient client;
+private final NetworkClientDelegate networkClientDelegate;
+private final NetworkClientDelegate networkClient;
+private final RequestManagers requestManagers;
+private final CompletableEventReaper applicationEventReaper;
+
+ConsumerNetworkThreadTest() {
+ConsumerConfig config = mock(ConsumerConfig.class);
+this.time = new MockTime();
+this.networkClientDelegate = mock(NetworkClientDelegate.class);
+this.requestManagers = mock(RequestManagers.class);
+this.offsetsRequestManager = mock(OffsetsRequestManager.class);
+this.heartbeatRequestManager = mock(HeartbeatRequestManager.class);
+this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+this.metadata = mock(ConsumerMetadata.class);
+this.applicationEventProcessor = mock(ApplicationEventProcessor.class);
+this.applicationEventReaper = mock(CompletableEventReaper.class);
+this.client = mock(MockClient.class);
+this.applicationEventsQueue = new LinkedBlockingQueue<>();
+LogContext logContext = new LogContext();
+
+this.networkClient = new NetworkClientDelegate(
+time,
+config,
+logContext,
+client
+);
 
-private ConsumerTestBuilder testBuilder;
-private Time time;
-private ConsumerMetadata metadata;
-private NetworkClientDelegate networkClient;
-private BlockingQueue applicationEventsQueue;
-private ApplicationEventProcessor applicationEventProcessor;
-private OffsetsRequestManager offsetsRequestManager;
-private CommitRequestManager commitRequestManager;
-private CoordinatorRequestManager coordinatorRequestManager;
-private ConsumerNetworkThread consumerNetworkThread;
-private final CompletableEventReaper applicationEventReaper = 
mock(CompletableEventReaper.class);
-private MockClient client;
-
-@BeforeEach
-public void setup() {
-testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation());
-time = testBuilder.time;
-metadata = testBuilder.metadata;
-networkClient = testBuilder.networkClientDelegate;
-client = testBuilder.client;
-applicationEventsQueue = testBuilder.applicationEventQueue;
-applicationEventProcessor = testBuilder.applicationEventProcessor;
-commitRequestManager = 
testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new);
-offsetsRequestManager = testBuilder.offsetsRequestManager;
-coordinatorRequestManager = 
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
-consumerNetworkThread = new ConsumerNetworkThread(
-testBuilder.logContext,
+this.consumerNetworkThread = new ConsumerNetworkThread(
+logContext,
 time,
-testBuilder.applicationEventQueue,
+applicationEventsQueue,
 applicationEventReaper,
 () -> applicationEventProcessor,
-() -> testBuilder.networkClientDelegate,
-() -> testBuilder.requestManagers
+() -> networkClientDelegate,
+() -> requestManagers
 );
+}
+
+@BeforeEach
+public void setup() {
 consumerNetworkThread.initializeResources();
 }
 
 @AfterEach
 public void tearDown() {
-if (testBuilder != null) {
-testBuilder.close();
-consumerNetworkThread.close(Duration.ZERO);
-}
+if (consumerNetworkThread != null)
+consumerNetworkThread.close();
+}
+
+@Test
+public void testEnsureCloseStopsRunningThread() {
+// consumerNetwo

Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-05 Thread via GitHub


lianetm commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1628152712


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -78,64 +81,120 @@
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class ConsumerNetworkThreadTest {
+static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000;
+static final int DEFAULT_REQUEST_TIMEOUT_MS = 500;
+
+private final Time time;
+private final ConsumerMetadata metadata;
+private final BlockingQueue applicationEventsQueue;
+private final ApplicationEventProcessor applicationEventProcessor;
+private final OffsetsRequestManager offsetsRequestManager;
+private final HeartbeatRequestManager heartbeatRequestManager;
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final ConsumerNetworkThread consumerNetworkThread;
+private final MockClient client;
+private final NetworkClientDelegate networkClientDelegate;
+private final NetworkClientDelegate networkClient;
+private final RequestManagers requestManagers;
+private final CompletableEventReaper applicationEventReaper;
+
+ConsumerNetworkThreadTest() {
+ConsumerConfig config = mock(ConsumerConfig.class);
+this.time = new MockTime();
+this.networkClientDelegate = mock(NetworkClientDelegate.class);
+this.requestManagers = mock(RequestManagers.class);
+this.offsetsRequestManager = mock(OffsetsRequestManager.class);
+this.heartbeatRequestManager = mock(HeartbeatRequestManager.class);
+this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+this.metadata = mock(ConsumerMetadata.class);
+this.applicationEventProcessor = mock(ApplicationEventProcessor.class);
+this.applicationEventReaper = mock(CompletableEventReaper.class);
+this.client = mock(MockClient.class);
+this.applicationEventsQueue = new LinkedBlockingQueue<>();
+LogContext logContext = new LogContext();
+
+this.networkClient = new NetworkClientDelegate(
+time,
+config,
+logContext,
+client
+);
 
-private ConsumerTestBuilder testBuilder;
-private Time time;
-private ConsumerMetadata metadata;
-private NetworkClientDelegate networkClient;
-private BlockingQueue applicationEventsQueue;
-private ApplicationEventProcessor applicationEventProcessor;
-private OffsetsRequestManager offsetsRequestManager;
-private CommitRequestManager commitRequestManager;
-private CoordinatorRequestManager coordinatorRequestManager;
-private ConsumerNetworkThread consumerNetworkThread;
-private final CompletableEventReaper applicationEventReaper = 
mock(CompletableEventReaper.class);
-private MockClient client;
-
-@BeforeEach
-public void setup() {
-testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation());
-time = testBuilder.time;
-metadata = testBuilder.metadata;
-networkClient = testBuilder.networkClientDelegate;
-client = testBuilder.client;
-applicationEventsQueue = testBuilder.applicationEventQueue;
-applicationEventProcessor = testBuilder.applicationEventProcessor;
-commitRequestManager = 
testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new);
-offsetsRequestManager = testBuilder.offsetsRequestManager;
-coordinatorRequestManager = 
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
-consumerNetworkThread = new ConsumerNetworkThread(
-testBuilder.logContext,
+this.consumerNetworkThread = new ConsumerNetworkThread(
+logContext,
 time,
-testBuilder.applicationEventQueue,
+applicationEventsQueue,
 applicationEventReaper,
 () -> applicationEventProcessor,
-() -> testBuilder.networkClientDelegate,
-() -> testBuilder.requestManagers
+() -> networkClientDelegate,
+() -> requestManagers
 );
+}
+
+@BeforeEach
+public void setup() {
 consumerNetworkThread.initializeResources();
 }
 
 @AfterEach
 public void tearDown() {
-if (testBuilder != null) {
-testBuilder.close();
-consumerNetworkThread.close(Duration.ZERO);
-}
+if (consumerNetworkThread != null)
+consumerNetworkThread.close();
+}
+
+@Test
+public void testEnsureCloseStopsRunningThread() {
+// consumerNetwo

Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-05 Thread via GitHub


lianetm commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1628131861


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -150,19 +209,19 @@ public void testStartupAndTearDown() throws 
InterruptedException {
 }
 
 @Test
-public void testApplicationEvent() {
-ApplicationEvent e = new PollEvent(100);
-applicationEventsQueue.add(e);
+void testRequestManagersArePolledOnce() {
 consumerNetworkThread.runOnce();
-verify(applicationEventProcessor, times(1)).process(e);
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm).poll(anyLong(;
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm).maximumTimeToWait(anyLong(;
+verify(networkClientDelegate).poll(anyLong(), anyLong());
 }
 
 @Test
-public void testMetadataUpdateEvent() {
-ApplicationEvent e = new NewTopicsMetadataUpdateRequestEvent();
+public void testApplicationEvent() {

Review Comment:
   I had something like this in mind:
   ```
   @ParameterizedTest
   @MethodSource("appEvents")
   public void testEventIsProcessed(ApplicationEvent e) {
   applicationEventsQueue.add(e);
   consumerNetworkThread.runOnce();
   verify(applicationEventProcessor).process(any(e.getClass()));
   assertTrue(applicationEventsQueue.isEmpty());
   }
   
   private static Stream appEvents() {
   return Stream.of(
   Arguments.of(new PollEvent(100)),
   Arguments.of(new NewTopicsMetadataUpdateRequestEvent()));
   }
   ```
   (extending `appEvents` with all the event instances that are individually 
created in each testxxxIsProcessed so we can replace them all with this single 
parametrized test)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-05 Thread via GitHub


brenden20 commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1628070196


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -150,19 +209,19 @@ public void testStartupAndTearDown() throws 
InterruptedException {
 }
 
 @Test
-public void testApplicationEvent() {
-ApplicationEvent e = new PollEvent(100);
-applicationEventsQueue.add(e);
+void testRequestManagersArePolledOnce() {
 consumerNetworkThread.runOnce();
-verify(applicationEventProcessor, times(1)).process(e);
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm).poll(anyLong(;
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm).maximumTimeToWait(anyLong(;
+verify(networkClientDelegate).poll(anyLong(), anyLong());
 }
 
 @Test
-public void testMetadataUpdateEvent() {
-ApplicationEvent e = new NewTopicsMetadataUpdateRequestEvent();
+public void testApplicationEvent() {

Review Comment:
   I tried this out, it seems like parameterizing the tests into one will not 
work, it keeps giving me an error saying that the parameters must be constant. 
I did some research, it looks like it can be done, but would require an enum 
class to get it to work. Not sure if this is something we would want to do just 
for these few tests, but let me know.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-05 Thread via GitHub


brenden20 commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1628008988


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -78,64 +81,120 @@
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class ConsumerNetworkThreadTest {
+static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000;
+static final int DEFAULT_REQUEST_TIMEOUT_MS = 500;
+
+private final Time time;
+private final ConsumerMetadata metadata;
+private final BlockingQueue applicationEventsQueue;
+private final ApplicationEventProcessor applicationEventProcessor;
+private final OffsetsRequestManager offsetsRequestManager;
+private final HeartbeatRequestManager heartbeatRequestManager;
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final ConsumerNetworkThread consumerNetworkThread;
+private final MockClient client;
+private final NetworkClientDelegate networkClientDelegate;
+private final NetworkClientDelegate networkClient;
+private final RequestManagers requestManagers;
+private final CompletableEventReaper applicationEventReaper;
+
+ConsumerNetworkThreadTest() {
+ConsumerConfig config = mock(ConsumerConfig.class);
+this.time = new MockTime();
+this.networkClientDelegate = mock(NetworkClientDelegate.class);
+this.requestManagers = mock(RequestManagers.class);
+this.offsetsRequestManager = mock(OffsetsRequestManager.class);
+this.heartbeatRequestManager = mock(HeartbeatRequestManager.class);
+this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+this.metadata = mock(ConsumerMetadata.class);
+this.applicationEventProcessor = mock(ApplicationEventProcessor.class);
+this.applicationEventReaper = mock(CompletableEventReaper.class);
+this.client = mock(MockClient.class);
+this.applicationEventsQueue = new LinkedBlockingQueue<>();
+LogContext logContext = new LogContext();
+
+this.networkClient = new NetworkClientDelegate(
+time,
+config,
+logContext,
+client
+);
 
-private ConsumerTestBuilder testBuilder;
-private Time time;
-private ConsumerMetadata metadata;
-private NetworkClientDelegate networkClient;
-private BlockingQueue applicationEventsQueue;
-private ApplicationEventProcessor applicationEventProcessor;
-private OffsetsRequestManager offsetsRequestManager;
-private CommitRequestManager commitRequestManager;
-private CoordinatorRequestManager coordinatorRequestManager;
-private ConsumerNetworkThread consumerNetworkThread;
-private final CompletableEventReaper applicationEventReaper = 
mock(CompletableEventReaper.class);
-private MockClient client;
-
-@BeforeEach
-public void setup() {
-testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation());
-time = testBuilder.time;
-metadata = testBuilder.metadata;
-networkClient = testBuilder.networkClientDelegate;
-client = testBuilder.client;
-applicationEventsQueue = testBuilder.applicationEventQueue;
-applicationEventProcessor = testBuilder.applicationEventProcessor;
-commitRequestManager = 
testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new);
-offsetsRequestManager = testBuilder.offsetsRequestManager;
-coordinatorRequestManager = 
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
-consumerNetworkThread = new ConsumerNetworkThread(
-testBuilder.logContext,
+this.consumerNetworkThread = new ConsumerNetworkThread(
+logContext,
 time,
-testBuilder.applicationEventQueue,
+applicationEventsQueue,
 applicationEventReaper,
 () -> applicationEventProcessor,
-() -> testBuilder.networkClientDelegate,
-() -> testBuilder.requestManagers
+() -> networkClientDelegate,
+() -> requestManagers
 );
+}
+
+@BeforeEach
+public void setup() {
 consumerNetworkThread.initializeResources();
 }
 
 @AfterEach
 public void tearDown() {
-if (testBuilder != null) {
-testBuilder.close();
-consumerNetworkThread.close(Duration.ZERO);
-}
+if (consumerNetworkThread != null)
+consumerNetworkThread.close();
+}
+
+@Test
+public void testEnsureCloseStopsRunningThread() {
+// consumerNet

Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-05 Thread via GitHub


philipnee commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1627478606


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -292,7 +292,8 @@ private void closeInternal(final Duration timeout) {
 /**
  * Check the unsent queue one last time and poll until all requests are 
sent or the timer runs out.
  */
-private void sendUnsentRequests(final Timer timer) {
+// Visible for testing
+protected void sendUnsentRequests(final Timer timer) {

Review Comment:
   i was wondering if we could just test cleanup()



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-05 Thread via GitHub


philipnee commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1627474682


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -150,19 +209,27 @@ public void testStartupAndTearDown() throws 
InterruptedException {
 }
 
 @Test
-public void testApplicationEvent() {
-ApplicationEvent e = new PollEvent(100);
-applicationEventsQueue.add(e);
+void testRequestManagersArePolledOnce() {

Review Comment:
   let's just make this public void for consistency.



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -150,19 +209,19 @@ public void testStartupAndTearDown() throws 
InterruptedException {
 }
 
 @Test
-public void testApplicationEvent() {
-ApplicationEvent e = new PollEvent(100);
-applicationEventsQueue.add(e);
+void testRequestManagersArePolledOnce() {
 consumerNetworkThread.runOnce();
-verify(applicationEventProcessor, times(1)).process(e);
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm).poll(anyLong(;
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm).maximumTimeToWait(anyLong(;
+verify(networkClientDelegate).poll(anyLong(), anyLong());
 }
 
 @Test
-public void testMetadataUpdateEvent() {
-ApplicationEvent e = new NewTopicsMetadataUpdateRequestEvent();
+public void testApplicationEvent() {

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-05 Thread via GitHub


philipnee commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1627471906


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -78,64 +81,120 @@
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class ConsumerNetworkThreadTest {
+static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000;
+static final int DEFAULT_REQUEST_TIMEOUT_MS = 500;
+
+private final Time time;
+private final ConsumerMetadata metadata;
+private final BlockingQueue applicationEventsQueue;
+private final ApplicationEventProcessor applicationEventProcessor;
+private final OffsetsRequestManager offsetsRequestManager;
+private final HeartbeatRequestManager heartbeatRequestManager;
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final ConsumerNetworkThread consumerNetworkThread;
+private final MockClient client;
+private final NetworkClientDelegate networkClientDelegate;
+private final NetworkClientDelegate networkClient;
+private final RequestManagers requestManagers;
+private final CompletableEventReaper applicationEventReaper;
+
+ConsumerNetworkThreadTest() {
+ConsumerConfig config = mock(ConsumerConfig.class);
+this.time = new MockTime();
+this.networkClientDelegate = mock(NetworkClientDelegate.class);
+this.requestManagers = mock(RequestManagers.class);
+this.offsetsRequestManager = mock(OffsetsRequestManager.class);
+this.heartbeatRequestManager = mock(HeartbeatRequestManager.class);
+this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+this.metadata = mock(ConsumerMetadata.class);
+this.applicationEventProcessor = mock(ApplicationEventProcessor.class);
+this.applicationEventReaper = mock(CompletableEventReaper.class);
+this.client = mock(MockClient.class);
+this.applicationEventsQueue = new LinkedBlockingQueue<>();
+LogContext logContext = new LogContext();
+
+this.networkClient = new NetworkClientDelegate(
+time,
+config,
+logContext,
+client
+);
 
-private ConsumerTestBuilder testBuilder;
-private Time time;
-private ConsumerMetadata metadata;
-private NetworkClientDelegate networkClient;
-private BlockingQueue applicationEventsQueue;
-private ApplicationEventProcessor applicationEventProcessor;
-private OffsetsRequestManager offsetsRequestManager;
-private CommitRequestManager commitRequestManager;
-private CoordinatorRequestManager coordinatorRequestManager;
-private ConsumerNetworkThread consumerNetworkThread;
-private final CompletableEventReaper applicationEventReaper = 
mock(CompletableEventReaper.class);
-private MockClient client;
-
-@BeforeEach
-public void setup() {
-testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation());
-time = testBuilder.time;
-metadata = testBuilder.metadata;
-networkClient = testBuilder.networkClientDelegate;
-client = testBuilder.client;
-applicationEventsQueue = testBuilder.applicationEventQueue;
-applicationEventProcessor = testBuilder.applicationEventProcessor;
-commitRequestManager = 
testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new);
-offsetsRequestManager = testBuilder.offsetsRequestManager;
-coordinatorRequestManager = 
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
-consumerNetworkThread = new ConsumerNetworkThread(
-testBuilder.logContext,
+this.consumerNetworkThread = new ConsumerNetworkThread(
+logContext,
 time,
-testBuilder.applicationEventQueue,
+applicationEventsQueue,
 applicationEventReaper,
 () -> applicationEventProcessor,
-() -> testBuilder.networkClientDelegate,
-() -> testBuilder.requestManagers
+() -> networkClientDelegate,
+() -> requestManagers
 );
+}
+
+@BeforeEach
+public void setup() {
 consumerNetworkThread.initializeResources();
 }
 
 @AfterEach
 public void tearDown() {
-if (testBuilder != null) {
-testBuilder.close();
-consumerNetworkThread.close(Duration.ZERO);
-}
+if (consumerNetworkThread != null)
+consumerNetworkThread.close();
+}
+
+@Test
+public void testEnsureCloseStopsRunningThread() {
+// consumerNet

Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-05 Thread via GitHub


philipnee commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1627235270


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -78,64 +81,120 @@
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class ConsumerNetworkThreadTest {
+static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000;
+static final int DEFAULT_REQUEST_TIMEOUT_MS = 500;
+
+private final Time time;
+private final ConsumerMetadata metadata;
+private final BlockingQueue applicationEventsQueue;
+private final ApplicationEventProcessor applicationEventProcessor;
+private final OffsetsRequestManager offsetsRequestManager;
+private final HeartbeatRequestManager heartbeatRequestManager;
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final ConsumerNetworkThread consumerNetworkThread;
+private final MockClient client;
+private final NetworkClientDelegate networkClientDelegate;
+private final NetworkClientDelegate networkClient;
+private final RequestManagers requestManagers;
+private final CompletableEventReaper applicationEventReaper;
+
+ConsumerNetworkThreadTest() {
+ConsumerConfig config = mock(ConsumerConfig.class);
+this.time = new MockTime();
+this.networkClientDelegate = mock(NetworkClientDelegate.class);
+this.requestManagers = mock(RequestManagers.class);
+this.offsetsRequestManager = mock(OffsetsRequestManager.class);
+this.heartbeatRequestManager = mock(HeartbeatRequestManager.class);
+this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+this.metadata = mock(ConsumerMetadata.class);
+this.applicationEventProcessor = mock(ApplicationEventProcessor.class);
+this.applicationEventReaper = mock(CompletableEventReaper.class);
+this.client = mock(MockClient.class);
+this.applicationEventsQueue = new LinkedBlockingQueue<>();
+LogContext logContext = new LogContext();
+
+this.networkClient = new NetworkClientDelegate(
+time,
+config,
+logContext,
+client
+);
 
-private ConsumerTestBuilder testBuilder;
-private Time time;
-private ConsumerMetadata metadata;
-private NetworkClientDelegate networkClient;
-private BlockingQueue applicationEventsQueue;
-private ApplicationEventProcessor applicationEventProcessor;
-private OffsetsRequestManager offsetsRequestManager;
-private CommitRequestManager commitRequestManager;
-private CoordinatorRequestManager coordinatorRequestManager;
-private ConsumerNetworkThread consumerNetworkThread;
-private final CompletableEventReaper applicationEventReaper = 
mock(CompletableEventReaper.class);
-private MockClient client;
-
-@BeforeEach
-public void setup() {
-testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation());
-time = testBuilder.time;
-metadata = testBuilder.metadata;
-networkClient = testBuilder.networkClientDelegate;
-client = testBuilder.client;
-applicationEventsQueue = testBuilder.applicationEventQueue;
-applicationEventProcessor = testBuilder.applicationEventProcessor;
-commitRequestManager = 
testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new);
-offsetsRequestManager = testBuilder.offsetsRequestManager;
-coordinatorRequestManager = 
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
-consumerNetworkThread = new ConsumerNetworkThread(
-testBuilder.logContext,
+this.consumerNetworkThread = new ConsumerNetworkThread(
+logContext,
 time,
-testBuilder.applicationEventQueue,
+applicationEventsQueue,
 applicationEventReaper,
 () -> applicationEventProcessor,
-() -> testBuilder.networkClientDelegate,
-() -> testBuilder.requestManagers
+() -> networkClientDelegate,
+() -> requestManagers
 );
+}
+
+@BeforeEach
+public void setup() {
 consumerNetworkThread.initializeResources();
 }
 
 @AfterEach
 public void tearDown() {
-if (testBuilder != null) {
-testBuilder.close();
-consumerNetworkThread.close(Duration.ZERO);
-}
+if (consumerNetworkThread != null)
+consumerNetworkThread.close();
+}
+
+@Test
+public void testEnsureCloseStopsRunningThread() {
+// consumerNet

Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-05 Thread via GitHub


philipnee commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1627218743


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -78,64 +81,120 @@
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class ConsumerNetworkThreadTest {
+static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000;
+static final int DEFAULT_REQUEST_TIMEOUT_MS = 500;
+
+private final Time time;
+private final ConsumerMetadata metadata;
+private final BlockingQueue applicationEventsQueue;
+private final ApplicationEventProcessor applicationEventProcessor;
+private final OffsetsRequestManager offsetsRequestManager;
+private final HeartbeatRequestManager heartbeatRequestManager;
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final ConsumerNetworkThread consumerNetworkThread;
+private final MockClient client;
+private final NetworkClientDelegate networkClientDelegate;
+private final NetworkClientDelegate networkClient;
+private final RequestManagers requestManagers;
+private final CompletableEventReaper applicationEventReaper;
+
+ConsumerNetworkThreadTest() {
+ConsumerConfig config = mock(ConsumerConfig.class);
+this.time = new MockTime();
+this.networkClientDelegate = mock(NetworkClientDelegate.class);
+this.requestManagers = mock(RequestManagers.class);
+this.offsetsRequestManager = mock(OffsetsRequestManager.class);
+this.heartbeatRequestManager = mock(HeartbeatRequestManager.class);
+this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+this.metadata = mock(ConsumerMetadata.class);
+this.applicationEventProcessor = mock(ApplicationEventProcessor.class);
+this.applicationEventReaper = mock(CompletableEventReaper.class);
+this.client = mock(MockClient.class);

Review Comment:
   new MockClient() ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-04 Thread via GitHub


brenden20 commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1626607635


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -329,6 +382,8 @@ void testEnsureEventsAreCompleted() {
 
 @Test
 void testCleanupInvokesReaper() {
+LinkedList queue = new 
LinkedList<>();
+when(networkClientDelegate.unsentRequests()).thenReturn(queue);

Review Comment:
   I will get that done, thank you for letting me know!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-04 Thread via GitHub


lianetm commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1626606071


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -329,6 +382,8 @@ void testEnsureEventsAreCompleted() {
 
 @Test
 void testCleanupInvokesReaper() {
+LinkedList queue = new 
LinkedList<>();
+when(networkClientDelegate.unsentRequests()).thenReturn(queue);

Review Comment:
   it just got merged, so worth rebasing and updating the test as needed ;)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-04 Thread via GitHub


brenden20 commented on PR #16140:
URL: https://github.com/apache/kafka/pull/16140#issuecomment-2148211980

   @lianetm @kirktrue @philipnee, I have implemented all suggestions except for 
test removals since I think if we do that, it should go on another ticket as to 
not bog this ticket down. All tests are passing as well, so let me know if 
there is anything else I should take care of or if this PR is ready for review. 
Thank you all for the reviews!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-04 Thread via GitHub


brenden20 commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1626467806


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -329,6 +382,8 @@ void testEnsureEventsAreCompleted() {
 
 @Test
 void testCleanupInvokesReaper() {
+LinkedList queue = new 
LinkedList<>();
+when(networkClientDelegate.unsentRequests()).thenReturn(queue);

Review Comment:
   Thanks for the heads up, I will keep that in mind



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-04 Thread via GitHub


brenden20 commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1626465679


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -289,10 +289,11 @@ private void closeInternal(final Duration timeout) {
 }
 }
 
+// Add test to see if poll() is run once with timer of 0

Review Comment:
   I forgot to remove that, removed now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-04 Thread via GitHub


brenden20 commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1626464002


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -150,19 +209,19 @@ public void testStartupAndTearDown() throws 
InterruptedException {
 }
 
 @Test
-public void testApplicationEvent() {
-ApplicationEvent e = new PollEvent(100);
-applicationEventsQueue.add(e);
+void testRequestManagersArePolledOnce() {
 consumerNetworkThread.runOnce();
-verify(applicationEventProcessor, times(1)).process(e);
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm).poll(anyLong(;
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm).maximumTimeToWait(anyLong(;
+verify(networkClientDelegate).poll(anyLong(), anyLong());

Review Comment:
   Thank you for the feedback, I have implemented this suggestion as well as 
the previous one for this test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-03 Thread via GitHub


lianetm commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1625068181


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -270,30 +326,27 @@ void testPollResultTimer() {
 void testMaximumTimeToWait() {
 // Initial value before runOnce has been called
 assertEquals(ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS, 
consumerNetworkThread.maximumTimeToWait());
+
+
when(requestManagers.entries()).thenReturn(Collections.singletonList(Optional.of(heartbeatRequestManager)));
+
when(heartbeatRequestManager.maximumTimeToWait(time.milliseconds())).thenReturn((long)
 DEFAULT_HEARTBEAT_INTERVAL_MS);
+
 consumerNetworkThread.runOnce();
 // After runOnce has been called, it takes the default heartbeat 
interval from the heartbeat request manager
 assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, 
consumerNetworkThread.maximumTimeToWait());
 }
 
 @Test
-void testRequestManagersArePolledOnce() {
-consumerNetworkThread.runOnce();
-testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm 
-> verify(rm, times(1)).poll(anyLong(;
-testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm 
-> verify(rm, times(1)).maximumTimeToWait(anyLong(;
-verify(networkClient, times(1)).poll(anyLong(), anyLong());
-}
+void testEnsureEventsAreCompleted() {

Review Comment:
   Not introduced by this PR but since we're improving here, I find that this 
test does not bring any value, because it's testing something that it's not the 
responsibility of the `ConsumerNetworkThread`, so we end up testing something 
we're mocking ourselves with the `doAnswer`. What do you think?
   
   I had suggested to remove it completely. See my original comment about it on 
this other [PR](https://github.com/apache/kafka/pull/15640/files#r1608897877). 
If you agree I would say we remove it instead of keep having to maintain it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-03 Thread via GitHub


lianetm commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1625068181


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -270,30 +326,27 @@ void testPollResultTimer() {
 void testMaximumTimeToWait() {
 // Initial value before runOnce has been called
 assertEquals(ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS, 
consumerNetworkThread.maximumTimeToWait());
+
+
when(requestManagers.entries()).thenReturn(Collections.singletonList(Optional.of(heartbeatRequestManager)));
+
when(heartbeatRequestManager.maximumTimeToWait(time.milliseconds())).thenReturn((long)
 DEFAULT_HEARTBEAT_INTERVAL_MS);
+
 consumerNetworkThread.runOnce();
 // After runOnce has been called, it takes the default heartbeat 
interval from the heartbeat request manager
 assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, 
consumerNetworkThread.maximumTimeToWait());
 }
 
 @Test
-void testRequestManagersArePolledOnce() {
-consumerNetworkThread.runOnce();
-testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm 
-> verify(rm, times(1)).poll(anyLong(;
-testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm 
-> verify(rm, times(1)).maximumTimeToWait(anyLong(;
-verify(networkClient, times(1)).poll(anyLong(), anyLong());
-}
+void testEnsureEventsAreCompleted() {

Review Comment:
   Not introduced by this PR but since we're improving here, I find that this 
test does not bring any value, because it's testing something that it's not the 
responsibility of the `ConsumerNetworkThread`, so we end up testing something 
we're mocking ourselves with the `doAnswer`. What do you think?
   
   I had suggested to remove it completely. See my original comment about it on 
this other [PR](https://github.com/apache/kafka/pull/15640/files#r1608897877).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-03 Thread via GitHub


lianetm commented on PR #16140:
URL: https://github.com/apache/kafka/pull/16140#issuecomment-2146155510

   Hey @brenden20, very nice improvement! Left a few comments. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-03 Thread via GitHub


lianetm commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1625068181


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -270,30 +326,27 @@ void testPollResultTimer() {
 void testMaximumTimeToWait() {
 // Initial value before runOnce has been called
 assertEquals(ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS, 
consumerNetworkThread.maximumTimeToWait());
+
+
when(requestManagers.entries()).thenReturn(Collections.singletonList(Optional.of(heartbeatRequestManager)));
+
when(heartbeatRequestManager.maximumTimeToWait(time.milliseconds())).thenReturn((long)
 DEFAULT_HEARTBEAT_INTERVAL_MS);
+
 consumerNetworkThread.runOnce();
 // After runOnce has been called, it takes the default heartbeat 
interval from the heartbeat request manager
 assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, 
consumerNetworkThread.maximumTimeToWait());
 }
 
 @Test
-void testRequestManagersArePolledOnce() {
-consumerNetworkThread.runOnce();
-testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm 
-> verify(rm, times(1)).poll(anyLong(;
-testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm 
-> verify(rm, times(1)).maximumTimeToWait(anyLong(;
-verify(networkClient, times(1)).poll(anyLong(), anyLong());
-}
+void testEnsureEventsAreCompleted() {

Review Comment:
   Not introduced by this PR but since we're improving here, I find that this 
test does not bring any value, because it's testing something that it's not the 
responsibility of the `ConsumerNetworkThread`, so in the end we end up testing 
something we're mocking ourselves with the `doAnswer`. What do you think?
   
   I had suggested to remove it completely. See my original comment about it on 
this other [PR](https://github.com/apache/kafka/pull/15640/files#r1608897877).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-03 Thread via GitHub


lianetm commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1625060556


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -270,30 +326,27 @@ void testPollResultTimer() {
 void testMaximumTimeToWait() {
 // Initial value before runOnce has been called
 assertEquals(ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS, 
consumerNetworkThread.maximumTimeToWait());
+
+
when(requestManagers.entries()).thenReturn(Collections.singletonList(Optional.of(heartbeatRequestManager)));
+
when(heartbeatRequestManager.maximumTimeToWait(time.milliseconds())).thenReturn((long)
 DEFAULT_HEARTBEAT_INTERVAL_MS);
+
 consumerNetworkThread.runOnce();
 // After runOnce has been called, it takes the default heartbeat 
interval from the heartbeat request manager
 assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, 
consumerNetworkThread.maximumTimeToWait());
 }
 
 @Test
-void testRequestManagersArePolledOnce() {
-consumerNetworkThread.runOnce();
-testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm 
-> verify(rm, times(1)).poll(anyLong(;
-testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm 
-> verify(rm, times(1)).maximumTimeToWait(anyLong(;
-verify(networkClient, times(1)).poll(anyLong(), anyLong());
-}
+void testEnsureEventsAreCompleted() {
+Cluster cluster = mock(Cluster.class);
+when(metadata.fetch()).thenReturn(cluster);
 
-@Test
-void testEnsureMetadataUpdateOnPoll() {
-MetadataResponse metadataResponse = 
RequestTestUtils.metadataUpdateWith(2, Collections.emptyMap());
-client.prepareMetadataUpdate(metadataResponse);
-metadata.requestUpdate(false);
-consumerNetworkThread.runOnce();
-verify(metadata, 
times(1)).updateWithCurrentRequestVersion(eq(metadataResponse), eq(false), 
anyLong());
-}
+List list = new ArrayList<>();
+list.add(new Node(0, "host", 0));
+when(cluster.nodes()).thenReturn(list);

Review Comment:
   what about simplifying to a single 
`when(cluster.nodes()).thenReturn(Collections.singletonList(new Node(0, "host", 
0)));` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-03 Thread via GitHub


lianetm commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1625059147


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -289,10 +289,11 @@ private void closeInternal(final Duration timeout) {
 }
 }
 
+// Add test to see if poll() is run once with timer of 0

Review Comment:
   is this something you intend to do on this PR or separately? Either way, I 
would suggest we remove the comment and address it in this PR or have a jira to 
follow-up separately if needed please.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-03 Thread via GitHub


lianetm commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1625054713


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -329,6 +382,8 @@ void testEnsureEventsAreCompleted() {
 
 @Test
 void testCleanupInvokesReaper() {
+LinkedList queue = new 
LinkedList<>();
+when(networkClientDelegate.unsentRequests()).thenReturn(queue);

Review Comment:
   totally needed, but heads-up, there is another 
[PR](https://github.com/apache/kafka/pull/16156) in-flight where we're trying 
to change the check behind this, so we'll need to update this expectation here 
if that goes in first.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-03 Thread via GitHub


lianetm commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1625048146


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -150,19 +209,19 @@ public void testStartupAndTearDown() throws 
InterruptedException {
 }
 
 @Test
-public void testApplicationEvent() {
-ApplicationEvent e = new PollEvent(100);
-applicationEventsQueue.add(e);
+void testRequestManagersArePolledOnce() {
 consumerNetworkThread.runOnce();
-verify(applicationEventProcessor, times(1)).process(e);
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm).poll(anyLong(;
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm).maximumTimeToWait(anyLong(;
+verify(networkClientDelegate).poll(anyLong(), anyLong());
 }
 
 @Test
-public void testMetadataUpdateEvent() {
-ApplicationEvent e = new NewTopicsMetadataUpdateRequestEvent();
+public void testApplicationEvent() {

Review Comment:
   Not introduced by this PR, but reviewing this one I noticed that we have 
lots of similar tests for checking that runOnce processes the events (ex. 
testResetPositionsEventIsProcessed, testSyncCommitEvent, 
testAsyncCommitEvent...), all doing the same (add event, run once, check is 
processed). Could we parametrize this and have a single test maybe? (same shape 
as this one but receiving the event as param). Should allow us to remove lots 
that, in the end this component does not care about the specific events, it's 
just responsible for processing whatever event is added. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-03 Thread via GitHub


lianetm commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1625034602


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -150,19 +209,19 @@ public void testStartupAndTearDown() throws 
InterruptedException {
 }
 
 @Test
-public void testApplicationEvent() {
-ApplicationEvent e = new PollEvent(100);
-applicationEventsQueue.add(e);
+void testRequestManagersArePolledOnce() {
 consumerNetworkThread.runOnce();
-verify(applicationEventProcessor, times(1)).process(e);
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm).poll(anyLong(;
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm).maximumTimeToWait(anyLong(;
+verify(networkClientDelegate).poll(anyLong(), anyLong());

Review Comment:
   This test seems to leave out one of the core actions of the `runOnce`:
   
   1. poll managers (this generates the requests) -> covered by the test
   2. add newly generated requests are added to the network client -> not 
covered
   3. poll network client to send the requests -> covered by the test
   
   Step 3 wouldn't have the effect we want (send the request) if step 2 does 
not happen, so what about adding a `verify(networkClientDelegate).addAll(..` 
before verifying the network client poll, to make sure we cover the whole flow 
of how requests move from the managers to the network layer on run once? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-03 Thread via GitHub


lianetm commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1625032096


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -150,19 +209,19 @@ public void testStartupAndTearDown() throws 
InterruptedException {
 }
 
 @Test
-public void testApplicationEvent() {
-ApplicationEvent e = new PollEvent(100);
-applicationEventsQueue.add(e);
+void testRequestManagersArePolledOnce() {
 consumerNetworkThread.runOnce();
-verify(applicationEventProcessor, times(1)).process(e);
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm).poll(anyLong(;

Review Comment:
   I think we're missing the expectations for the requestManagers.entries() to 
make sure there are some managers and we actually verify something, so we need  
`when(requestManagers.entries()).thenReturn(..` with some managers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-03 Thread via GitHub


brenden20 commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1624586673


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -149,20 +164,28 @@ public void testStartupAndTearDown() throws 
InterruptedException {
 "The consumer network thread did not stop within " + 
DEFAULT_MAX_WAIT_MS + " ms");
 }
 
+@Test
+void testRequestManagersArePolledOnce() {
+consumerNetworkThread.runOnce();
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm, times(1)).poll(anyLong(;
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm, times(1)).maximumTimeToWait(anyLong(;
+verify(networkClientDelegate).poll(anyLong(), anyLong());
+}
+
 @Test
 public void testApplicationEvent() {
 ApplicationEvent e = new PollEvent(100);
 applicationEventsQueue.add(e);
 consumerNetworkThread.runOnce();
-verify(applicationEventProcessor, times(1)).process(e);
+verify(applicationEventProcessor).process(e);
 }
 
 @Test
 public void testMetadataUpdateEvent() {

Review Comment:
   Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-03 Thread via GitHub


brenden20 commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1624585963


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -149,20 +164,28 @@ public void testStartupAndTearDown() throws 
InterruptedException {
 "The consumer network thread did not stop within " + 
DEFAULT_MAX_WAIT_MS + " ms");
 }
 
+@Test
+void testRequestManagersArePolledOnce() {
+consumerNetworkThread.runOnce();
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm, times(1)).poll(anyLong(;

Review Comment:
   It is, removing instances of times(1) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-02 Thread via GitHub


philipnee commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1623649872


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -339,6 +358,15 @@ void testRunOnceInvokesReaper() {
 verify(applicationEventReaper).reap(any(Long.class));
 }
 
+private HashMap 
mockTopicPartitionOffset() {
+final TopicPartition t0 = new TopicPartition("t0", 2);
+final TopicPartition t1 = new TopicPartition("t0", 3);
+HashMap topicPartitionOffsets = new 
HashMap<>();

Review Comment:
   1. this can be final as well
   2. let's use Map on the left side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-02 Thread via GitHub


philipnee commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1623649732


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -339,6 +358,15 @@ void testRunOnceInvokesReaper() {
 verify(applicationEventReaper).reap(any(Long.class));
 }
 
+private HashMap 
mockTopicPartitionOffset() {

Review Comment:
   try to use Map instead of HashMap



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-02 Thread via GitHub


philipnee commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1623648674


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -231,10 +254,7 @@ public void testAssignmentChangeEvent() {
 
 consumerNetworkThread.runOnce();
 
verify(applicationEventProcessor).process(any(AssignmentChangeEvent.class));
-verify(networkClient, times(1)).poll(anyLong(), anyLong());
-verify(commitRequestManager, 
times(1)).updateAutoCommitTimer(currentTimeMs);
-// Assignment change should generate an async commit (not retried).
-verify(commitRequestManager, times(1)).maybeAutoCommitAsync();
+verify(networkClientDelegate, times(1)).poll(anyLong(), anyLong());

Review Comment:
   can we be consistent at using times(1) here? above you removed times(1) but 
it seems to be inconsistently applied elsewhere.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-02 Thread via GitHub


philipnee commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1623648578


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -207,7 +230,7 @@ public void testResetPositionsProcessFailureIsIgnored() {
 
 ResetPositionsEvent event = new 
ResetPositionsEvent(calculateDeadlineMs(time, 100));
 applicationEventsQueue.add(event);
-assertDoesNotThrow(() -> consumerNetworkThread.runOnce());
+assertDoesNotThrow(consumerNetworkThread::runOnce);

Review Comment:
   can we revert the irrelevant change? also I think we can remove this test 
and see if this is covered elsewhere.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-02 Thread via GitHub


philipnee commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1623648474


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -149,20 +164,28 @@ public void testStartupAndTearDown() throws 
InterruptedException {
 "The consumer network thread did not stop within " + 
DEFAULT_MAX_WAIT_MS + " ms");
 }
 
+@Test
+void testRequestManagersArePolledOnce() {
+consumerNetworkThread.runOnce();
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm, times(1)).poll(anyLong(;
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm, times(1)).maximumTimeToWait(anyLong(;
+verify(networkClientDelegate).poll(anyLong(), anyLong());
+}
+
 @Test
 public void testApplicationEvent() {
 ApplicationEvent e = new PollEvent(100);
 applicationEventsQueue.add(e);
 consumerNetworkThread.runOnce();
-verify(applicationEventProcessor, times(1)).process(e);
+verify(applicationEventProcessor).process(e);
 }
 
 @Test
 public void testMetadataUpdateEvent() {

Review Comment:
   i think can probably remove this teset as it is doing exactly the same thing 
as `testApplicationEvent`.  We should probably try to find a similar test to 
ensure the code path is covered.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-02 Thread via GitHub


philipnee commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1623648058


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -149,20 +164,28 @@ public void testStartupAndTearDown() throws 
InterruptedException {
 "The consumer network thread did not stop within " + 
DEFAULT_MAX_WAIT_MS + " ms");
 }
 
+@Test
+void testRequestManagersArePolledOnce() {
+consumerNetworkThread.runOnce();
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm, times(1)).poll(anyLong(;

Review Comment:
   i believe times(1) is the default, could you verify this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-02 Thread via GitHub


philipnee commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1623647935


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -74,68 +52,105 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class ConsumerNetworkThreadTest {
+static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000;
+static final int DEFAULT_REQUEST_TIMEOUT_MS = 500;
+
+private final Time time;
+private final ConsumerMetadata metadata;
+private final BlockingQueue applicationEventsQueue;
+private final ApplicationEventProcessor applicationEventProcessor;
+private final OffsetsRequestManager offsetsRequestManager;
+private final HeartbeatRequestManager heartbeatRequestManager;
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final ConsumerNetworkThread consumerNetworkThread;
+private final MockClient client;
+private final NetworkClientDelegate networkClientDelegate;
+private final NetworkClientDelegate networkClient;
+private final RequestManagers requestManagers;
+private final CompletableEventReaper applicationEventReaper;
+
+ConsumerNetworkThreadTest() {
+LogContext logContext = new LogContext();
+ConsumerConfig config = mock(ConsumerConfig.class);
+this.time = new MockTime();
+this.networkClientDelegate = mock(NetworkClientDelegate.class);
+this.requestManagers = mock(RequestManagers.class);
+this.offsetsRequestManager = mock(OffsetsRequestManager.class);
+this.heartbeatRequestManager = mock(HeartbeatRequestManager.class);
+this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+this.applicationEventsQueue = new LinkedBlockingQueue<>();
+this.metadata = mock(ConsumerMetadata.class);
+this.applicationEventProcessor = mock(ApplicationEventProcessor.class);
+this.applicationEventReaper = mock(CompletableEventReaper.class);
+this.client = new MockClient(time);
+
+this.networkClient = new NetworkClientDelegate(
+time,
+config,
+logContext,
+client
+);
 
-private ConsumerTestBuilder testBuilder;
-private Time time;
-private ConsumerMetadata metadata;
-private NetworkClientDelegate networkClient;
-private BlockingQueue applicationEventsQueue;
-private ApplicationEventProcessor applicationEventProcessor;
-private OffsetsRequestManager offsetsRequestManager;
-private CommitRequestManager commitRequestManager;
-private CoordinatorRequestManager coordinatorRequestManager;
-private ConsumerNetworkThread consumerNetworkThread;
-private final CompletableEventReaper applicationEventReaper = 
mock(CompletableEventReaper.class);
-private MockClient client;
-
-@BeforeEach
-public void setup() {
-testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation());
-time = testBuilder.time;
-metadata = testBuilder.metadata;
-networkClient = testBuilder.networkClientDelegate;
-client = testBuilder.client;
-applicationEventsQueue = testBuilder.applicationEventQueue;
-applicationEventProcessor = testBuilder.applicationEventProcessor;
-commitRequestManager = 
testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new);
-offsetsRequestManager = testBuilder.offsetsRequestManager;
-coordinatorRequestManager = 
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
-consumerNetworkThread = new ConsumerNetworkThread(
-testBuilder.logContext,
+this.consumerNetworkThread = new ConsumerNetworkThread(
+logContext,
 time,
-testBuilder.applicationEventQueue,
+applicationEventsQueue,
 applicationEventReaper,
 () -> applicationEventProcessor,
-() -> testBuilder.networkClientDelegate,
-() -> testBuilder.requestManagers
+() -> networkClientDelegate,
+() -> requestManagers
 );
+}
+
+@BeforeEach
+public void setup() {
 consumerNetworkThread.initializeResources();
 }
 
 @AfterEach
 public void tearDown() {
-if (testBuilder != null) {
-testBuilder.close();
-consumerNetworkThread.close(Durat

Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-02 Thread via GitHub


philipnee commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1623647643


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -74,68 +52,105 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class ConsumerNetworkThreadTest {
+static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000;
+static final int DEFAULT_REQUEST_TIMEOUT_MS = 500;
+
+private final Time time;
+private final ConsumerMetadata metadata;
+private final BlockingQueue applicationEventsQueue;
+private final ApplicationEventProcessor applicationEventProcessor;
+private final OffsetsRequestManager offsetsRequestManager;
+private final HeartbeatRequestManager heartbeatRequestManager;
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final ConsumerNetworkThread consumerNetworkThread;
+private final MockClient client;
+private final NetworkClientDelegate networkClientDelegate;
+private final NetworkClientDelegate networkClient;
+private final RequestManagers requestManagers;
+private final CompletableEventReaper applicationEventReaper;
+
+ConsumerNetworkThreadTest() {
+LogContext logContext = new LogContext();
+ConsumerConfig config = mock(ConsumerConfig.class);
+this.time = new MockTime();
+this.networkClientDelegate = mock(NetworkClientDelegate.class);
+this.requestManagers = mock(RequestManagers.class);
+this.offsetsRequestManager = mock(OffsetsRequestManager.class);
+this.heartbeatRequestManager = mock(HeartbeatRequestManager.class);
+this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+this.applicationEventsQueue = new LinkedBlockingQueue<>();
+this.metadata = mock(ConsumerMetadata.class);
+this.applicationEventProcessor = mock(ApplicationEventProcessor.class);
+this.applicationEventReaper = mock(CompletableEventReaper.class);
+this.client = new MockClient(time);
+
+this.networkClient = new NetworkClientDelegate(

Review Comment:
   i think i might have asked you to use MockClient here.  since we aren't 
necessary testing request sending etc., can we just mock the 
networkClientDelegate?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-02 Thread via GitHub


philipnee commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1623647362


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -74,68 +52,105 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class ConsumerNetworkThreadTest {
+static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000;
+static final int DEFAULT_REQUEST_TIMEOUT_MS = 500;
+
+private final Time time;
+private final ConsumerMetadata metadata;
+private final BlockingQueue applicationEventsQueue;
+private final ApplicationEventProcessor applicationEventProcessor;
+private final OffsetsRequestManager offsetsRequestManager;
+private final HeartbeatRequestManager heartbeatRequestManager;
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final ConsumerNetworkThread consumerNetworkThread;
+private final MockClient client;
+private final NetworkClientDelegate networkClientDelegate;
+private final NetworkClientDelegate networkClient;
+private final RequestManagers requestManagers;
+private final CompletableEventReaper applicationEventReaper;
+
+ConsumerNetworkThreadTest() {
+LogContext logContext = new LogContext();
+ConsumerConfig config = mock(ConsumerConfig.class);
+this.time = new MockTime();
+this.networkClientDelegate = mock(NetworkClientDelegate.class);
+this.requestManagers = mock(RequestManagers.class);
+this.offsetsRequestManager = mock(OffsetsRequestManager.class);
+this.heartbeatRequestManager = mock(HeartbeatRequestManager.class);
+this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+this.applicationEventsQueue = new LinkedBlockingQueue<>();

Review Comment:
   can we mock this as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >