Re: [PR] KAFKA-16001: Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
brenden20 commented on PR #16140: URL: https://github.com/apache/kafka/pull/16140#issuecomment-2166728664 @mjsax thank you for the review! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001: Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
mjsax commented on PR #16140: URL: https://github.com/apache/kafka/pull/16140#issuecomment-2166718530 Thanks for the PR @brenden20! Merged to `trunk`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001: Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
mjsax merged PR #16140: URL: https://github.com/apache/kafka/pull/16140 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1637062738 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -72,68 +64,97 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class ConsumerNetworkThreadTest { - -private ConsumerTestBuilder testBuilder; -private Time time; -private ConsumerMetadata metadata; -private NetworkClientDelegate networkClient; -private BlockingQueue applicationEventsQueue; -private ApplicationEventProcessor applicationEventProcessor; -private OffsetsRequestManager offsetsRequestManager; -private CommitRequestManager commitRequestManager; -private CoordinatorRequestManager coordinatorRequestManager; -private ConsumerNetworkThread consumerNetworkThread; -private final CompletableEventReaper applicationEventReaper = mock(CompletableEventReaper.class); -private MockClient client; - -@BeforeEach -public void setup() { -testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation()); -time = testBuilder.time; -metadata = testBuilder.metadata; -networkClient = testBuilder.networkClientDelegate; -client = testBuilder.client; -applicationEventsQueue = testBuilder.applicationEventQueue; -applicationEventProcessor = testBuilder.applicationEventProcessor; -commitRequestManager = testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new); -offsetsRequestManager = testBuilder.offsetsRequestManager; -coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new); -consumerNetworkThread = new ConsumerNetworkThread( -testBuilder.logContext, +private final Time time; +private final BlockingQueue applicationEventsQueue; +private final ApplicationEventProcessor applicationEventProcessor; +private final OffsetsRequestManager offsetsRequestManager; +private final HeartbeatRequestManager heartbeatRequestManager; +private final CoordinatorRequestManager coordinatorRequestManager; +private final ConsumerNetworkThread consumerNetworkThread; +private final NetworkClientDelegate networkClientDelegate; +private final RequestManagers requestManagers; +private final CompletableEventReaper applicationEventReaper; + +ConsumerNetworkThreadTest() { +this.networkClientDelegate = mock(NetworkClientDelegate.class); +this.requestManagers = mock(RequestManagers.class); +this.offsetsRequestManager = mock(OffsetsRequestManager.class); +this.heartbeatRequestManager = mock(HeartbeatRequestManager.class); +this.coordinatorRequestManager = mock(CoordinatorRequestManager.class); +this.applicationEventProcessor = mock(ApplicationEventProcessor.class); +this.applicationEventReaper = mock(CompletableEventReaper.class); +this.time = new MockTime(); +this.applicationEventsQueue = new LinkedBlockingQueue<>(); +LogContext logContext = new LogContext(); + +this.consumerNetworkThread = new ConsumerNetworkThread( +logContext, time, -testBuilder.applicationEventQueue, +applicationEventsQueue, applicationEventReaper, () -> applicationEventProcessor, -() -> testBuilder.networkClientDelegate, -() -> testBuilder.requestManagers +() -> networkClientDelegate, +() -> requestManagers ); +} + +@BeforeEach +public void setup() { consumerNetworkThread.initializeResources(); } @AfterEach public void tearDown() { -if (testBuilder != null) { -testBuilder.close(); -consumerNetworkThread.close(Duration.ZERO); -} +if (consumerNetworkThread != null) +consumerNetworkThread.close(); +} + +@Test +public void testEnsureCloseStopsRunningThread() { +assertTrue(consumerNetworkThread.isRunning(), +"ConsumerNetworkThread should start running when created"); + +consumerNetworkThread.close(); +assertFalse(consumerNetworkThread.isRunning(), +"close() should make consumerNetworkThread.running false by calling closeInternal(Duration timeout)"); +} + +@ParameterizedTest +@ValueSource(longs = {Cons
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1637062738 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -72,68 +64,97 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class ConsumerNetworkThreadTest { - -private ConsumerTestBuilder testBuilder; -private Time time; -private ConsumerMetadata metadata; -private NetworkClientDelegate networkClient; -private BlockingQueue applicationEventsQueue; -private ApplicationEventProcessor applicationEventProcessor; -private OffsetsRequestManager offsetsRequestManager; -private CommitRequestManager commitRequestManager; -private CoordinatorRequestManager coordinatorRequestManager; -private ConsumerNetworkThread consumerNetworkThread; -private final CompletableEventReaper applicationEventReaper = mock(CompletableEventReaper.class); -private MockClient client; - -@BeforeEach -public void setup() { -testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation()); -time = testBuilder.time; -metadata = testBuilder.metadata; -networkClient = testBuilder.networkClientDelegate; -client = testBuilder.client; -applicationEventsQueue = testBuilder.applicationEventQueue; -applicationEventProcessor = testBuilder.applicationEventProcessor; -commitRequestManager = testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new); -offsetsRequestManager = testBuilder.offsetsRequestManager; -coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new); -consumerNetworkThread = new ConsumerNetworkThread( -testBuilder.logContext, +private final Time time; +private final BlockingQueue applicationEventsQueue; +private final ApplicationEventProcessor applicationEventProcessor; +private final OffsetsRequestManager offsetsRequestManager; +private final HeartbeatRequestManager heartbeatRequestManager; +private final CoordinatorRequestManager coordinatorRequestManager; +private final ConsumerNetworkThread consumerNetworkThread; +private final NetworkClientDelegate networkClientDelegate; +private final RequestManagers requestManagers; +private final CompletableEventReaper applicationEventReaper; + +ConsumerNetworkThreadTest() { +this.networkClientDelegate = mock(NetworkClientDelegate.class); +this.requestManagers = mock(RequestManagers.class); +this.offsetsRequestManager = mock(OffsetsRequestManager.class); +this.heartbeatRequestManager = mock(HeartbeatRequestManager.class); +this.coordinatorRequestManager = mock(CoordinatorRequestManager.class); +this.applicationEventProcessor = mock(ApplicationEventProcessor.class); +this.applicationEventReaper = mock(CompletableEventReaper.class); +this.time = new MockTime(); +this.applicationEventsQueue = new LinkedBlockingQueue<>(); +LogContext logContext = new LogContext(); + +this.consumerNetworkThread = new ConsumerNetworkThread( +logContext, time, -testBuilder.applicationEventQueue, +applicationEventsQueue, applicationEventReaper, () -> applicationEventProcessor, -() -> testBuilder.networkClientDelegate, -() -> testBuilder.requestManagers +() -> networkClientDelegate, +() -> requestManagers ); +} + +@BeforeEach +public void setup() { consumerNetworkThread.initializeResources(); } @AfterEach public void tearDown() { -if (testBuilder != null) { -testBuilder.close(); -consumerNetworkThread.close(Duration.ZERO); -} +if (consumerNetworkThread != null) +consumerNetworkThread.close(); +} + +@Test +public void testEnsureCloseStopsRunningThread() { +assertTrue(consumerNetworkThread.isRunning(), +"ConsumerNetworkThread should start running when created"); + +consumerNetworkThread.close(); +assertFalse(consumerNetworkThread.isRunning(), +"close() should make consumerNetworkThread.running false by calling closeInternal(Duration timeout)"); +} + +@ParameterizedTest +@ValueSource(longs = {Cons
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
brenden20 commented on PR #16140: URL: https://github.com/apache/kafka/pull/16140#issuecomment-2163090508 @mjsax can you take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
philipnee commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1632136201 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -211,178 +220,52 @@ public void testResetPositionsProcessFailureIsIgnored() { } @Test -public void testValidatePositionsEventIsProcessed() { -ValidatePositionsEvent e = new ValidatePositionsEvent(calculateDeadlineMs(time, 100)); -applicationEventsQueue.add(e); -consumerNetworkThread.runOnce(); - verify(applicationEventProcessor).process(any(ValidatePositionsEvent.class)); -assertTrue(applicationEventsQueue.isEmpty()); -} - -@Test -public void testAssignmentChangeEvent() { -HashMap offset = mockTopicPartitionOffset(); - -final long currentTimeMs = time.milliseconds(); -ApplicationEvent e = new AssignmentChangeEvent(offset, currentTimeMs); -applicationEventsQueue.add(e); - -consumerNetworkThread.runOnce(); - verify(applicationEventProcessor).process(any(AssignmentChangeEvent.class)); -verify(networkClient, times(1)).poll(anyLong(), anyLong()); -verify(commitRequestManager, times(1)).updateAutoCommitTimer(currentTimeMs); -// Assignment change should generate an async commit (not retried). -verify(commitRequestManager, times(1)).maybeAutoCommitAsync(); -} - -@Test -void testFetchTopicMetadata() { -applicationEventsQueue.add(new TopicMetadataEvent("topic", Long.MAX_VALUE)); -consumerNetworkThread.runOnce(); - verify(applicationEventProcessor).process(any(TopicMetadataEvent.class)); -} - -@Test -void testMaximumTimeToWait() { +public void testMaximumTimeToWait() { +final int defaultHeartbeatIntervalMs = 1000; // Initial value before runOnce has been called assertEquals(ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS, consumerNetworkThread.maximumTimeToWait()); -consumerNetworkThread.runOnce(); -// After runOnce has been called, it takes the default heartbeat interval from the heartbeat request manager -assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, consumerNetworkThread.maximumTimeToWait()); -} -@Test -void testRequestManagersArePolledOnce() { -consumerNetworkThread.runOnce(); -testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).poll(anyLong(; -testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).maximumTimeToWait(anyLong(; -verify(networkClient, times(1)).poll(anyLong(), anyLong()); -} + when(requestManagers.entries()).thenReturn(Collections.singletonList(Optional.of(heartbeatRequestManager))); + when(heartbeatRequestManager.maximumTimeToWait(time.milliseconds())).thenReturn((long) defaultHeartbeatIntervalMs); -@Test -void testEnsureMetadataUpdateOnPoll() { -MetadataResponse metadataResponse = RequestTestUtils.metadataUpdateWith(2, Collections.emptyMap()); -client.prepareMetadataUpdate(metadataResponse); -metadata.requestUpdate(false); consumerNetworkThread.runOnce(); -verify(metadata, times(1)).updateWithCurrentRequestVersion(eq(metadataResponse), eq(false), anyLong()); -} - -@Test -void testEnsureEventsAreCompleted() { -// Mimic the logic of CompletableEventReaper.reap(Collection): -doAnswer(__ -> { -Iterator i = applicationEventsQueue.iterator(); - -while (i.hasNext()) { -ApplicationEvent event = i.next(); - -if (event instanceof CompletableEvent) -((CompletableEvent) event).future().completeExceptionally(new TimeoutException()); - -i.remove(); -} - -return null; -}).when(applicationEventReaper).reap(any(Collection.class)); - -Node node = metadata.fetch().nodes().get(0); -coordinatorRequestManager.markCoordinatorUnknown("test", time.milliseconds()); - client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "group-id", node)); -prepareOffsetCommitRequest(new HashMap<>(), Errors.NONE, false); -CompletableApplicationEvent event1 = spy(new AsyncCommitEvent(Collections.emptyMap())); -ApplicationEvent event2 = new AsyncCommitEvent(Collections.emptyMap()); -CompletableFuture future = new CompletableFuture<>(); -when(event1.future()).thenReturn(future); -applicationEventsQueue.add(event1); -applicationEventsQueue.add(event2); -assertFalse(future.isDone()); -assertFalse(applicationEventsQueue.isEmpty()); -consumerNetworkThread.cleanup(); -assertTrue(future.isCompletedExceptionally()); -assertTrue(applicationEventsQueue.isEmpty()); +
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1631774198 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -148,35 +169,32 @@ public void testStartupAndTearDown() throws InterruptedException { } @Test -public void testApplicationEvent() { -ApplicationEvent e = new PollEvent(100); -applicationEventsQueue.add(e); +public void testRequestsTransferFromManagersToClientOnThreadRun() { +List> list = new ArrayList<>(); +list.add(Optional.of(coordinatorRequestManager)); +list.add(Optional.of(heartbeatRequestManager)); +list.add(Optional.of(offsetsRequestManager)); + +when(requestManagers.entries()).thenReturn(list); + when(coordinatorRequestManager.poll(anyLong())).thenReturn(mock(NetworkClientDelegate.PollResult.class)); consumerNetworkThread.runOnce(); -verify(applicationEventProcessor, times(1)).process(e); +requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm).poll(anyLong(; +requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm).maximumTimeToWait(anyLong(; + verify(networkClientDelegate).addAll(any(NetworkClientDelegate.PollResult.class)); +verify(networkClientDelegate).poll(anyLong(), anyLong()); } -@Test -public void testMetadataUpdateEvent() { Review Comment: Refer to previous comment for all event tests that were removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1631774020 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -148,35 +169,32 @@ public void testStartupAndTearDown() throws InterruptedException { } @Test -public void testApplicationEvent() { Review Comment: Yes, this test as well as many of the other application event tests were rewritten into a single parameterized test. It gets the parameters from ```applicationEvents()```, where it supplies a stream of different kinds of events. This was done to condense the code a bit. And ```testRequestsTransferFromManagersToClientOnThreadRun()``` is a new test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1631772919 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -72,68 +64,97 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class ConsumerNetworkThreadTest { - -private ConsumerTestBuilder testBuilder; -private Time time; -private ConsumerMetadata metadata; -private NetworkClientDelegate networkClient; -private BlockingQueue applicationEventsQueue; -private ApplicationEventProcessor applicationEventProcessor; -private OffsetsRequestManager offsetsRequestManager; -private CommitRequestManager commitRequestManager; -private CoordinatorRequestManager coordinatorRequestManager; -private ConsumerNetworkThread consumerNetworkThread; -private final CompletableEventReaper applicationEventReaper = mock(CompletableEventReaper.class); -private MockClient client; - -@BeforeEach -public void setup() { -testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation()); -time = testBuilder.time; -metadata = testBuilder.metadata; -networkClient = testBuilder.networkClientDelegate; -client = testBuilder.client; -applicationEventsQueue = testBuilder.applicationEventQueue; -applicationEventProcessor = testBuilder.applicationEventProcessor; -commitRequestManager = testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new); -offsetsRequestManager = testBuilder.offsetsRequestManager; -coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new); -consumerNetworkThread = new ConsumerNetworkThread( -testBuilder.logContext, +private final Time time; +private final BlockingQueue applicationEventsQueue; +private final ApplicationEventProcessor applicationEventProcessor; +private final OffsetsRequestManager offsetsRequestManager; +private final HeartbeatRequestManager heartbeatRequestManager; +private final CoordinatorRequestManager coordinatorRequestManager; +private final ConsumerNetworkThread consumerNetworkThread; +private final NetworkClientDelegate networkClientDelegate; +private final RequestManagers requestManagers; +private final CompletableEventReaper applicationEventReaper; + +ConsumerNetworkThreadTest() { +this.networkClientDelegate = mock(NetworkClientDelegate.class); +this.requestManagers = mock(RequestManagers.class); +this.offsetsRequestManager = mock(OffsetsRequestManager.class); +this.heartbeatRequestManager = mock(HeartbeatRequestManager.class); +this.coordinatorRequestManager = mock(CoordinatorRequestManager.class); +this.applicationEventProcessor = mock(ApplicationEventProcessor.class); +this.applicationEventReaper = mock(CompletableEventReaper.class); +this.time = new MockTime(); +this.applicationEventsQueue = new LinkedBlockingQueue<>(); +LogContext logContext = new LogContext(); + +this.consumerNetworkThread = new ConsumerNetworkThread( +logContext, time, -testBuilder.applicationEventQueue, +applicationEventsQueue, applicationEventReaper, () -> applicationEventProcessor, -() -> testBuilder.networkClientDelegate, -() -> testBuilder.requestManagers +() -> networkClientDelegate, +() -> requestManagers ); +} + +@BeforeEach +public void setup() { consumerNetworkThread.initializeResources(); } @AfterEach public void tearDown() { -if (testBuilder != null) { -testBuilder.close(); -consumerNetworkThread.close(Duration.ZERO); -} +if (consumerNetworkThread != null) +consumerNetworkThread.close(); +} + +@Test +public void testEnsureCloseStopsRunningThread() { +assertTrue(consumerNetworkThread.isRunning(), +"ConsumerNetworkThread should start running when created"); + +consumerNetworkThread.close(); +assertFalse(consumerNetworkThread.isRunning(), +"close() should make consumerNetworkThread.running false by calling closeInternal(Duration timeout)"); +} + +@ParameterizedTest +@ValueSource(longs = {Co
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1631771648 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -72,68 +64,97 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class ConsumerNetworkThreadTest { - -private ConsumerTestBuilder testBuilder; -private Time time; -private ConsumerMetadata metadata; -private NetworkClientDelegate networkClient; -private BlockingQueue applicationEventsQueue; -private ApplicationEventProcessor applicationEventProcessor; -private OffsetsRequestManager offsetsRequestManager; -private CommitRequestManager commitRequestManager; -private CoordinatorRequestManager coordinatorRequestManager; -private ConsumerNetworkThread consumerNetworkThread; -private final CompletableEventReaper applicationEventReaper = mock(CompletableEventReaper.class); -private MockClient client; - -@BeforeEach -public void setup() { -testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation()); -time = testBuilder.time; -metadata = testBuilder.metadata; -networkClient = testBuilder.networkClientDelegate; -client = testBuilder.client; -applicationEventsQueue = testBuilder.applicationEventQueue; -applicationEventProcessor = testBuilder.applicationEventProcessor; -commitRequestManager = testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new); -offsetsRequestManager = testBuilder.offsetsRequestManager; -coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new); -consumerNetworkThread = new ConsumerNetworkThread( -testBuilder.logContext, +private final Time time; +private final BlockingQueue applicationEventsQueue; +private final ApplicationEventProcessor applicationEventProcessor; +private final OffsetsRequestManager offsetsRequestManager; +private final HeartbeatRequestManager heartbeatRequestManager; +private final CoordinatorRequestManager coordinatorRequestManager; +private final ConsumerNetworkThread consumerNetworkThread; +private final NetworkClientDelegate networkClientDelegate; +private final RequestManagers requestManagers; +private final CompletableEventReaper applicationEventReaper; + +ConsumerNetworkThreadTest() { +this.networkClientDelegate = mock(NetworkClientDelegate.class); +this.requestManagers = mock(RequestManagers.class); +this.offsetsRequestManager = mock(OffsetsRequestManager.class); +this.heartbeatRequestManager = mock(HeartbeatRequestManager.class); +this.coordinatorRequestManager = mock(CoordinatorRequestManager.class); +this.applicationEventProcessor = mock(ApplicationEventProcessor.class); +this.applicationEventReaper = mock(CompletableEventReaper.class); +this.time = new MockTime(); +this.applicationEventsQueue = new LinkedBlockingQueue<>(); +LogContext logContext = new LogContext(); + +this.consumerNetworkThread = new ConsumerNetworkThread( +logContext, time, -testBuilder.applicationEventQueue, +applicationEventsQueue, applicationEventReaper, () -> applicationEventProcessor, -() -> testBuilder.networkClientDelegate, -() -> testBuilder.requestManagers +() -> networkClientDelegate, +() -> requestManagers ); +} + +@BeforeEach +public void setup() { consumerNetworkThread.initializeResources(); } @AfterEach public void tearDown() { -if (testBuilder != null) { -testBuilder.close(); -consumerNetworkThread.close(Duration.ZERO); -} +if (consumerNetworkThread != null) +consumerNetworkThread.close(); +} + +@Test +public void testEnsureCloseStopsRunningThread() { Review Comment: Yes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1631771779 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -72,68 +64,97 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class ConsumerNetworkThreadTest { - -private ConsumerTestBuilder testBuilder; -private Time time; -private ConsumerMetadata metadata; -private NetworkClientDelegate networkClient; -private BlockingQueue applicationEventsQueue; -private ApplicationEventProcessor applicationEventProcessor; -private OffsetsRequestManager offsetsRequestManager; -private CommitRequestManager commitRequestManager; -private CoordinatorRequestManager coordinatorRequestManager; -private ConsumerNetworkThread consumerNetworkThread; -private final CompletableEventReaper applicationEventReaper = mock(CompletableEventReaper.class); -private MockClient client; - -@BeforeEach -public void setup() { -testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation()); -time = testBuilder.time; -metadata = testBuilder.metadata; -networkClient = testBuilder.networkClientDelegate; -client = testBuilder.client; -applicationEventsQueue = testBuilder.applicationEventQueue; -applicationEventProcessor = testBuilder.applicationEventProcessor; -commitRequestManager = testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new); -offsetsRequestManager = testBuilder.offsetsRequestManager; -coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new); -consumerNetworkThread = new ConsumerNetworkThread( -testBuilder.logContext, +private final Time time; +private final BlockingQueue applicationEventsQueue; +private final ApplicationEventProcessor applicationEventProcessor; +private final OffsetsRequestManager offsetsRequestManager; +private final HeartbeatRequestManager heartbeatRequestManager; +private final CoordinatorRequestManager coordinatorRequestManager; +private final ConsumerNetworkThread consumerNetworkThread; +private final NetworkClientDelegate networkClientDelegate; +private final RequestManagers requestManagers; +private final CompletableEventReaper applicationEventReaper; + +ConsumerNetworkThreadTest() { +this.networkClientDelegate = mock(NetworkClientDelegate.class); +this.requestManagers = mock(RequestManagers.class); +this.offsetsRequestManager = mock(OffsetsRequestManager.class); +this.heartbeatRequestManager = mock(HeartbeatRequestManager.class); +this.coordinatorRequestManager = mock(CoordinatorRequestManager.class); +this.applicationEventProcessor = mock(ApplicationEventProcessor.class); +this.applicationEventReaper = mock(CompletableEventReaper.class); +this.time = new MockTime(); +this.applicationEventsQueue = new LinkedBlockingQueue<>(); +LogContext logContext = new LogContext(); + +this.consumerNetworkThread = new ConsumerNetworkThread( +logContext, time, -testBuilder.applicationEventQueue, +applicationEventsQueue, applicationEventReaper, () -> applicationEventProcessor, -() -> testBuilder.networkClientDelegate, -() -> testBuilder.requestManagers +() -> networkClientDelegate, +() -> requestManagers ); +} + +@BeforeEach +public void setup() { consumerNetworkThread.initializeResources(); } @AfterEach public void tearDown() { -if (testBuilder != null) { -testBuilder.close(); -consumerNetworkThread.close(Duration.ZERO); -} +if (consumerNetworkThread != null) +consumerNetworkThread.close(); +} + +@Test +public void testEnsureCloseStopsRunningThread() { +assertTrue(consumerNetworkThread.isRunning(), +"ConsumerNetworkThread should start running when created"); + +consumerNetworkThread.close(); +assertFalse(consumerNetworkThread.isRunning(), +"close() should make consumerNetworkThread.running false by calling closeInternal(Duration timeout)"); +} + +@ParameterizedTest +@ValueSource(longs = {Co
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
mjsax commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1631768784 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -148,35 +169,32 @@ public void testStartupAndTearDown() throws InterruptedException { } @Test -public void testApplicationEvent() { -ApplicationEvent e = new PollEvent(100); -applicationEventsQueue.add(e); +public void testRequestsTransferFromManagersToClientOnThreadRun() { +List> list = new ArrayList<>(); +list.add(Optional.of(coordinatorRequestManager)); +list.add(Optional.of(heartbeatRequestManager)); +list.add(Optional.of(offsetsRequestManager)); + +when(requestManagers.entries()).thenReturn(list); + when(coordinatorRequestManager.poll(anyLong())).thenReturn(mock(NetworkClientDelegate.PollResult.class)); consumerNetworkThread.runOnce(); -verify(applicationEventProcessor, times(1)).process(e); +requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm).poll(anyLong(; +requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm).maximumTimeToWait(anyLong(; + verify(networkClientDelegate).addAll(any(NetworkClientDelegate.PollResult.class)); +verify(networkClientDelegate).poll(anyLong(), anyLong()); } -@Test -public void testMetadataUpdateEvent() { Review Comment: Seems this test was effectively removed? Why? ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -190,15 +208,6 @@ public void testListOffsetsEventIsProcessed(boolean requireTimestamp) { assertTrue(applicationEventsQueue.isEmpty()); } -@Test -public void testResetPositionsEventIsProcessed() { Review Comment: Seems this test was effectively removed? Why? Same for more test below ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -72,68 +64,97 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class ConsumerNetworkThreadTest { - -private ConsumerTestBuilder testBuilder; -private Time time; -private ConsumerMetadata metadata; -private NetworkClientDelegate networkClient; -private BlockingQueue applicationEventsQueue; -private ApplicationEventProcessor applicationEventProcessor; -private OffsetsRequestManager offsetsRequestManager; -private CommitRequestManager commitRequestManager; -private CoordinatorRequestManager coordinatorRequestManager; -private ConsumerNetworkThread consumerNetworkThread; -private final CompletableEventReaper applicationEventReaper = mock(CompletableEventReaper.class); -private MockClient client; - -@BeforeEach -public void setup() { -testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation()); -time = testBuilder.time; -metadata = testBuilder.metadata; -networkClient = testBuilder.networkClientDelegate; -client = testBuilder.client; -applicationEventsQueue = testBuilder.applicationEventQueue; -applicationEventProcessor = testBuilder.applicationEventProcessor; -commitRequestManager = testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new); -offsetsRequestManager = testBuilder.offsetsRequestManager; -coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new); -consumerNetworkThread = new ConsumerNetworkThread( -testBuilder.logContext, +private final Time time; +private final BlockingQueue applicationEventsQueue; +private final ApplicationEventProcessor applicationEventProcessor; +private final OffsetsRequestManager offsetsRequestManager; +private final HeartbeatRequestManager heartbeatRequestManager; +private final CoordinatorRequestManager coordinatorRequestManager; +private final ConsumerNetworkThread consumerNetworkThread; +private final NetworkClientDelegate networkClientDelegate; +private final RequestManagers requestManagers; +private final CompletableEventReaper applicationEventReaper; + +ConsumerNetworkThreadTest() { +this.networkClientDelegate = mock(NetworkClientDelegate.class); +this.requestManagers = mock(RequestManagers.class); +this.offsetsRequestManager =
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1631701125 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -211,178 +219,52 @@ public void testResetPositionsProcessFailureIsIgnored() { } @Test -public void testValidatePositionsEventIsProcessed() { -ValidatePositionsEvent e = new ValidatePositionsEvent(calculateDeadlineMs(time, 100)); -applicationEventsQueue.add(e); -consumerNetworkThread.runOnce(); - verify(applicationEventProcessor).process(any(ValidatePositionsEvent.class)); -assertTrue(applicationEventsQueue.isEmpty()); -} - -@Test -public void testAssignmentChangeEvent() { -HashMap offset = mockTopicPartitionOffset(); - -final long currentTimeMs = time.milliseconds(); -ApplicationEvent e = new AssignmentChangeEvent(offset, currentTimeMs); -applicationEventsQueue.add(e); - -consumerNetworkThread.runOnce(); - verify(applicationEventProcessor).process(any(AssignmentChangeEvent.class)); -verify(networkClient, times(1)).poll(anyLong(), anyLong()); -verify(commitRequestManager, times(1)).updateAutoCommitTimer(currentTimeMs); -// Assignment change should generate an async commit (not retried). -verify(commitRequestManager, times(1)).maybeAutoCommitAsync(); -} - -@Test -void testFetchTopicMetadata() { -applicationEventsQueue.add(new TopicMetadataEvent("topic", Long.MAX_VALUE)); -consumerNetworkThread.runOnce(); - verify(applicationEventProcessor).process(any(TopicMetadataEvent.class)); -} - -@Test -void testMaximumTimeToWait() { +public void testMaximumTimeToWait() { +final int defaultHeartbeatIntervalMs = 1000; // Initial value before runOnce has been called assertEquals(ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS, consumerNetworkThread.maximumTimeToWait()); -consumerNetworkThread.runOnce(); -// After runOnce has been called, it takes the default heartbeat interval from the heartbeat request manager -assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, consumerNetworkThread.maximumTimeToWait()); -} -@Test -void testRequestManagersArePolledOnce() { -consumerNetworkThread.runOnce(); -testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).poll(anyLong(; -testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).maximumTimeToWait(anyLong(; -verify(networkClient, times(1)).poll(anyLong(), anyLong()); -} + when(requestManagers.entries()).thenReturn(Collections.singletonList(Optional.of(heartbeatRequestManager))); + when(heartbeatRequestManager.maximumTimeToWait(time.milliseconds())).thenReturn((long) defaultHeartbeatIntervalMs); -@Test -void testEnsureMetadataUpdateOnPoll() { -MetadataResponse metadataResponse = RequestTestUtils.metadataUpdateWith(2, Collections.emptyMap()); -client.prepareMetadataUpdate(metadataResponse); -metadata.requestUpdate(false); consumerNetworkThread.runOnce(); -verify(metadata, times(1)).updateWithCurrentRequestVersion(eq(metadataResponse), eq(false), anyLong()); -} - -@Test -void testEnsureEventsAreCompleted() { -// Mimic the logic of CompletableEventReaper.reap(Collection): -doAnswer(__ -> { -Iterator i = applicationEventsQueue.iterator(); - -while (i.hasNext()) { -ApplicationEvent event = i.next(); - -if (event instanceof CompletableEvent) -((CompletableEvent) event).future().completeExceptionally(new TimeoutException()); - -i.remove(); -} - -return null; -}).when(applicationEventReaper).reap(any(Collection.class)); - -Node node = metadata.fetch().nodes().get(0); -coordinatorRequestManager.markCoordinatorUnknown("test", time.milliseconds()); - client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "group-id", node)); -prepareOffsetCommitRequest(new HashMap<>(), Errors.NONE, false); -CompletableApplicationEvent event1 = spy(new AsyncCommitEvent(Collections.emptyMap())); -ApplicationEvent event2 = new AsyncCommitEvent(Collections.emptyMap()); -CompletableFuture future = new CompletableFuture<>(); -when(event1.future()).thenReturn(future); -applicationEventsQueue.add(event1); -applicationEventsQueue.add(event2); -assertFalse(future.isDone()); -assertFalse(applicationEventsQueue.isEmpty()); -consumerNetworkThread.cleanup(); -assertTrue(future.isCompletedExceptionally()); -assertTrue(applicationEventsQueue.isEmpty()); +
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1631698362 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -211,178 +219,52 @@ public void testResetPositionsProcessFailureIsIgnored() { } @Test -public void testValidatePositionsEventIsProcessed() { -ValidatePositionsEvent e = new ValidatePositionsEvent(calculateDeadlineMs(time, 100)); -applicationEventsQueue.add(e); -consumerNetworkThread.runOnce(); - verify(applicationEventProcessor).process(any(ValidatePositionsEvent.class)); -assertTrue(applicationEventsQueue.isEmpty()); -} - -@Test -public void testAssignmentChangeEvent() { -HashMap offset = mockTopicPartitionOffset(); - -final long currentTimeMs = time.milliseconds(); -ApplicationEvent e = new AssignmentChangeEvent(offset, currentTimeMs); -applicationEventsQueue.add(e); - -consumerNetworkThread.runOnce(); - verify(applicationEventProcessor).process(any(AssignmentChangeEvent.class)); -verify(networkClient, times(1)).poll(anyLong(), anyLong()); -verify(commitRequestManager, times(1)).updateAutoCommitTimer(currentTimeMs); -// Assignment change should generate an async commit (not retried). -verify(commitRequestManager, times(1)).maybeAutoCommitAsync(); -} - -@Test -void testFetchTopicMetadata() { -applicationEventsQueue.add(new TopicMetadataEvent("topic", Long.MAX_VALUE)); -consumerNetworkThread.runOnce(); - verify(applicationEventProcessor).process(any(TopicMetadataEvent.class)); -} - -@Test -void testMaximumTimeToWait() { +public void testMaximumTimeToWait() { +final int defaultHeartbeatIntervalMs = 1000; // Initial value before runOnce has been called assertEquals(ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS, consumerNetworkThread.maximumTimeToWait()); -consumerNetworkThread.runOnce(); -// After runOnce has been called, it takes the default heartbeat interval from the heartbeat request manager -assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, consumerNetworkThread.maximumTimeToWait()); -} -@Test -void testRequestManagersArePolledOnce() { -consumerNetworkThread.runOnce(); -testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).poll(anyLong(; -testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).maximumTimeToWait(anyLong(; -verify(networkClient, times(1)).poll(anyLong(), anyLong()); -} + when(requestManagers.entries()).thenReturn(Collections.singletonList(Optional.of(heartbeatRequestManager))); + when(heartbeatRequestManager.maximumTimeToWait(time.milliseconds())).thenReturn((long) defaultHeartbeatIntervalMs); -@Test -void testEnsureMetadataUpdateOnPoll() { -MetadataResponse metadataResponse = RequestTestUtils.metadataUpdateWith(2, Collections.emptyMap()); -client.prepareMetadataUpdate(metadataResponse); -metadata.requestUpdate(false); consumerNetworkThread.runOnce(); -verify(metadata, times(1)).updateWithCurrentRequestVersion(eq(metadataResponse), eq(false), anyLong()); -} - -@Test -void testEnsureEventsAreCompleted() { -// Mimic the logic of CompletableEventReaper.reap(Collection): -doAnswer(__ -> { -Iterator i = applicationEventsQueue.iterator(); - -while (i.hasNext()) { -ApplicationEvent event = i.next(); - -if (event instanceof CompletableEvent) -((CompletableEvent) event).future().completeExceptionally(new TimeoutException()); - -i.remove(); -} - -return null; -}).when(applicationEventReaper).reap(any(Collection.class)); - -Node node = metadata.fetch().nodes().get(0); -coordinatorRequestManager.markCoordinatorUnknown("test", time.milliseconds()); - client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "group-id", node)); -prepareOffsetCommitRequest(new HashMap<>(), Errors.NONE, false); -CompletableApplicationEvent event1 = spy(new AsyncCommitEvent(Collections.emptyMap())); -ApplicationEvent event2 = new AsyncCommitEvent(Collections.emptyMap()); -CompletableFuture future = new CompletableFuture<>(); -when(event1.future()).thenReturn(future); -applicationEventsQueue.add(event1); -applicationEventsQueue.add(event2); -assertFalse(future.isDone()); -assertFalse(applicationEventsQueue.isEmpty()); -consumerNetworkThread.cleanup(); -assertTrue(future.isCompletedExceptionally()); -assertTrue(applicationEventsQueue.isEmpty()); +
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
brenden20 commented on PR #16140: URL: https://github.com/apache/kafka/pull/16140#issuecomment-2155534482 @lianetm I implemented your new suggestions, let me know if it looks good -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1631653829 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -72,68 +64,99 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class ConsumerNetworkThreadTest { - -private ConsumerTestBuilder testBuilder; -private Time time; -private ConsumerMetadata metadata; -private NetworkClientDelegate networkClient; -private BlockingQueue applicationEventsQueue; -private ApplicationEventProcessor applicationEventProcessor; -private OffsetsRequestManager offsetsRequestManager; -private CommitRequestManager commitRequestManager; -private CoordinatorRequestManager coordinatorRequestManager; -private ConsumerNetworkThread consumerNetworkThread; -private final CompletableEventReaper applicationEventReaper = mock(CompletableEventReaper.class); -private MockClient client; - -@BeforeEach -public void setup() { -testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation()); -time = testBuilder.time; -metadata = testBuilder.metadata; -networkClient = testBuilder.networkClientDelegate; -client = testBuilder.client; -applicationEventsQueue = testBuilder.applicationEventQueue; -applicationEventProcessor = testBuilder.applicationEventProcessor; -commitRequestManager = testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new); -offsetsRequestManager = testBuilder.offsetsRequestManager; -coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new); -consumerNetworkThread = new ConsumerNetworkThread( -testBuilder.logContext, +static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000; +static final long MAX_POLL_TIMEOUT_MS = 5000; + +private final Time time; +private final BlockingQueue applicationEventsQueue; +private final ApplicationEventProcessor applicationEventProcessor; +private final OffsetsRequestManager offsetsRequestManager; +private final HeartbeatRequestManager heartbeatRequestManager; +private final CoordinatorRequestManager coordinatorRequestManager; +private final ConsumerNetworkThread consumerNetworkThread; +private final NetworkClientDelegate networkClientDelegate; +private final RequestManagers requestManagers; +private final CompletableEventReaper applicationEventReaper; + +ConsumerNetworkThreadTest() { +this.networkClientDelegate = mock(NetworkClientDelegate.class); +this.requestManagers = mock(RequestManagers.class); +this.offsetsRequestManager = mock(OffsetsRequestManager.class); +this.heartbeatRequestManager = mock(HeartbeatRequestManager.class); +this.coordinatorRequestManager = mock(CoordinatorRequestManager.class); +this.applicationEventProcessor = mock(ApplicationEventProcessor.class); +this.applicationEventReaper = mock(CompletableEventReaper.class); +this.time = new MockTime(); +this.applicationEventsQueue = new LinkedBlockingQueue<>(); +LogContext logContext = new LogContext(); + +this.consumerNetworkThread = new ConsumerNetworkThread( +logContext, time, -testBuilder.applicationEventQueue, +applicationEventsQueue, applicationEventReaper, () -> applicationEventProcessor, -() -> testBuilder.networkClientDelegate, -() -> testBuilder.requestManagers +() -> networkClientDelegate, +() -> requestManagers ); +} + +@BeforeEach +public void setup() { consumerNetworkThread.initializeResources(); } @AfterEach public void tearDown() { -if (testBuilder != null) { -testBuilder.close(); -consumerNetworkThread.close(Duration.ZERO); -} +if (consumerNetworkThread != null) +consumerNetworkThread.close(); +} + +@Test +public void testEnsureCloseStopsRunningThread() { +assertTrue(consumerNetworkThread.isRunning(), +"ConsumerNetworkThread should start running when created"); + +consumerNetworkThread.close(); +assertFalse(consumerNetworkThread.isRunning(), +"close() should make consumerNetworkThread.running fals
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1631646232 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -72,68 +64,99 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class ConsumerNetworkThreadTest { - -private ConsumerTestBuilder testBuilder; -private Time time; -private ConsumerMetadata metadata; -private NetworkClientDelegate networkClient; -private BlockingQueue applicationEventsQueue; -private ApplicationEventProcessor applicationEventProcessor; -private OffsetsRequestManager offsetsRequestManager; -private CommitRequestManager commitRequestManager; -private CoordinatorRequestManager coordinatorRequestManager; -private ConsumerNetworkThread consumerNetworkThread; -private final CompletableEventReaper applicationEventReaper = mock(CompletableEventReaper.class); -private MockClient client; - -@BeforeEach -public void setup() { -testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation()); -time = testBuilder.time; -metadata = testBuilder.metadata; -networkClient = testBuilder.networkClientDelegate; -client = testBuilder.client; -applicationEventsQueue = testBuilder.applicationEventQueue; -applicationEventProcessor = testBuilder.applicationEventProcessor; -commitRequestManager = testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new); -offsetsRequestManager = testBuilder.offsetsRequestManager; -coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new); -consumerNetworkThread = new ConsumerNetworkThread( -testBuilder.logContext, +static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000; +static final long MAX_POLL_TIMEOUT_MS = 5000; Review Comment: I mean to remove this and reference ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS when needed (like it's already done here for some tests) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1631647886 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -72,68 +64,99 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class ConsumerNetworkThreadTest { - -private ConsumerTestBuilder testBuilder; -private Time time; -private ConsumerMetadata metadata; -private NetworkClientDelegate networkClient; -private BlockingQueue applicationEventsQueue; -private ApplicationEventProcessor applicationEventProcessor; -private OffsetsRequestManager offsetsRequestManager; -private CommitRequestManager commitRequestManager; -private CoordinatorRequestManager coordinatorRequestManager; -private ConsumerNetworkThread consumerNetworkThread; -private final CompletableEventReaper applicationEventReaper = mock(CompletableEventReaper.class); -private MockClient client; - -@BeforeEach -public void setup() { -testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation()); -time = testBuilder.time; -metadata = testBuilder.metadata; -networkClient = testBuilder.networkClientDelegate; -client = testBuilder.client; -applicationEventsQueue = testBuilder.applicationEventQueue; -applicationEventProcessor = testBuilder.applicationEventProcessor; -commitRequestManager = testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new); -offsetsRequestManager = testBuilder.offsetsRequestManager; -coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new); -consumerNetworkThread = new ConsumerNetworkThread( -testBuilder.logContext, +static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000; +static final long MAX_POLL_TIMEOUT_MS = 5000; + +private final Time time; +private final BlockingQueue applicationEventsQueue; +private final ApplicationEventProcessor applicationEventProcessor; +private final OffsetsRequestManager offsetsRequestManager; +private final HeartbeatRequestManager heartbeatRequestManager; +private final CoordinatorRequestManager coordinatorRequestManager; +private final ConsumerNetworkThread consumerNetworkThread; +private final NetworkClientDelegate networkClientDelegate; +private final RequestManagers requestManagers; +private final CompletableEventReaper applicationEventReaper; + +ConsumerNetworkThreadTest() { +this.networkClientDelegate = mock(NetworkClientDelegate.class); +this.requestManagers = mock(RequestManagers.class); +this.offsetsRequestManager = mock(OffsetsRequestManager.class); +this.heartbeatRequestManager = mock(HeartbeatRequestManager.class); +this.coordinatorRequestManager = mock(CoordinatorRequestManager.class); +this.applicationEventProcessor = mock(ApplicationEventProcessor.class); +this.applicationEventReaper = mock(CompletableEventReaper.class); +this.time = new MockTime(); +this.applicationEventsQueue = new LinkedBlockingQueue<>(); +LogContext logContext = new LogContext(); + +this.consumerNetworkThread = new ConsumerNetworkThread( +logContext, time, -testBuilder.applicationEventQueue, +applicationEventsQueue, applicationEventReaper, () -> applicationEventProcessor, -() -> testBuilder.networkClientDelegate, -() -> testBuilder.requestManagers +() -> networkClientDelegate, +() -> requestManagers ); +} + +@BeforeEach +public void setup() { consumerNetworkThread.initializeResources(); } @AfterEach public void tearDown() { -if (testBuilder != null) { -testBuilder.close(); -consumerNetworkThread.close(Duration.ZERO); -} +if (consumerNetworkThread != null) +consumerNetworkThread.close(); +} + +@Test +public void testEnsureCloseStopsRunningThread() { +assertTrue(consumerNetworkThread.isRunning(), +"ConsumerNetworkThread should start running when created"); + +consumerNetworkThread.close(); +assertFalse(consumerNetworkThread.isRunning(), +"close() should make consumerNetworkThread.running fals
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1631646852 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -72,68 +64,99 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class ConsumerNetworkThreadTest { - -private ConsumerTestBuilder testBuilder; -private Time time; -private ConsumerMetadata metadata; -private NetworkClientDelegate networkClient; -private BlockingQueue applicationEventsQueue; -private ApplicationEventProcessor applicationEventProcessor; -private OffsetsRequestManager offsetsRequestManager; -private CommitRequestManager commitRequestManager; -private CoordinatorRequestManager coordinatorRequestManager; -private ConsumerNetworkThread consumerNetworkThread; -private final CompletableEventReaper applicationEventReaper = mock(CompletableEventReaper.class); -private MockClient client; - -@BeforeEach -public void setup() { -testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation()); -time = testBuilder.time; -metadata = testBuilder.metadata; -networkClient = testBuilder.networkClientDelegate; -client = testBuilder.client; -applicationEventsQueue = testBuilder.applicationEventQueue; -applicationEventProcessor = testBuilder.applicationEventProcessor; -commitRequestManager = testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new); -offsetsRequestManager = testBuilder.offsetsRequestManager; -coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new); -consumerNetworkThread = new ConsumerNetworkThread( -testBuilder.logContext, +static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000; Review Comment: this seems to be only used in `testMaximumTimeToWait` right? if so we better keep it as a local var -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1631646232 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -72,68 +64,99 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class ConsumerNetworkThreadTest { - -private ConsumerTestBuilder testBuilder; -private Time time; -private ConsumerMetadata metadata; -private NetworkClientDelegate networkClient; -private BlockingQueue applicationEventsQueue; -private ApplicationEventProcessor applicationEventProcessor; -private OffsetsRequestManager offsetsRequestManager; -private CommitRequestManager commitRequestManager; -private CoordinatorRequestManager coordinatorRequestManager; -private ConsumerNetworkThread consumerNetworkThread; -private final CompletableEventReaper applicationEventReaper = mock(CompletableEventReaper.class); -private MockClient client; - -@BeforeEach -public void setup() { -testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation()); -time = testBuilder.time; -metadata = testBuilder.metadata; -networkClient = testBuilder.networkClientDelegate; -client = testBuilder.client; -applicationEventsQueue = testBuilder.applicationEventQueue; -applicationEventProcessor = testBuilder.applicationEventProcessor; -commitRequestManager = testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new); -offsetsRequestManager = testBuilder.offsetsRequestManager; -coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new); -consumerNetworkThread = new ConsumerNetworkThread( -testBuilder.logContext, +static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000; +static final long MAX_POLL_TIMEOUT_MS = 5000; Review Comment: I mean to remove this and reference ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS (like it's already done here for some tests) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1631645522 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -72,68 +64,99 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class ConsumerNetworkThreadTest { - -private ConsumerTestBuilder testBuilder; -private Time time; -private ConsumerMetadata metadata; -private NetworkClientDelegate networkClient; -private BlockingQueue applicationEventsQueue; -private ApplicationEventProcessor applicationEventProcessor; -private OffsetsRequestManager offsetsRequestManager; -private CommitRequestManager commitRequestManager; -private CoordinatorRequestManager coordinatorRequestManager; -private ConsumerNetworkThread consumerNetworkThread; -private final CompletableEventReaper applicationEventReaper = mock(CompletableEventReaper.class); -private MockClient client; - -@BeforeEach -public void setup() { -testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation()); -time = testBuilder.time; -metadata = testBuilder.metadata; -networkClient = testBuilder.networkClientDelegate; -client = testBuilder.client; -applicationEventsQueue = testBuilder.applicationEventQueue; -applicationEventProcessor = testBuilder.applicationEventProcessor; -commitRequestManager = testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new); -offsetsRequestManager = testBuilder.offsetsRequestManager; -coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new); -consumerNetworkThread = new ConsumerNetworkThread( -testBuilder.logContext, +static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000; +static final long MAX_POLL_TIMEOUT_MS = 5000; Review Comment: let's better use the constant defined in ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS so that we don't have to maintain this one (and avoid a test break if that one changes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
brenden20 commented on PR #16140: URL: https://github.com/apache/kafka/pull/16140#issuecomment-2155384516 @lianetm implemented your suggestions on the description and testConsumerNetworkThreadPollTimeComputations(). Let me know how it looks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on PR #16140: URL: https://github.com/apache/kafka/pull/16140#issuecomment-2155322585 Regarding the PR description, I would suggest to update: > Removed testEnsureMetadataUpdateOnPoll() since it was doing integration testing~~, could not get a unit test to work for that one~~ You did just great removing it, as it was testing something that it's not the responsibility of this component, and that's exactly why a real unit test is not doable ;) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1631549092 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -72,68 +64,98 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class ConsumerNetworkThreadTest { - -private ConsumerTestBuilder testBuilder; -private Time time; -private ConsumerMetadata metadata; -private NetworkClientDelegate networkClient; -private BlockingQueue applicationEventsQueue; -private ApplicationEventProcessor applicationEventProcessor; -private OffsetsRequestManager offsetsRequestManager; -private CommitRequestManager commitRequestManager; -private CoordinatorRequestManager coordinatorRequestManager; -private ConsumerNetworkThread consumerNetworkThread; -private final CompletableEventReaper applicationEventReaper = mock(CompletableEventReaper.class); -private MockClient client; - -@BeforeEach -public void setup() { -testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation()); -time = testBuilder.time; -metadata = testBuilder.metadata; -networkClient = testBuilder.networkClientDelegate; -client = testBuilder.client; -applicationEventsQueue = testBuilder.applicationEventQueue; -applicationEventProcessor = testBuilder.applicationEventProcessor; -commitRequestManager = testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new); -offsetsRequestManager = testBuilder.offsetsRequestManager; -coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new); -consumerNetworkThread = new ConsumerNetworkThread( -testBuilder.logContext, +static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000; + +private final Time time; +private final BlockingQueue applicationEventsQueue; +private final ApplicationEventProcessor applicationEventProcessor; +private final OffsetsRequestManager offsetsRequestManager; +private final HeartbeatRequestManager heartbeatRequestManager; +private final CoordinatorRequestManager coordinatorRequestManager; +private final ConsumerNetworkThread consumerNetworkThread; +private final NetworkClientDelegate networkClientDelegate; +private final RequestManagers requestManagers; +private final CompletableEventReaper applicationEventReaper; + +ConsumerNetworkThreadTest() { +this.networkClientDelegate = mock(NetworkClientDelegate.class); +this.requestManagers = mock(RequestManagers.class); +this.offsetsRequestManager = mock(OffsetsRequestManager.class); +this.heartbeatRequestManager = mock(HeartbeatRequestManager.class); +this.coordinatorRequestManager = mock(CoordinatorRequestManager.class); +this.applicationEventProcessor = mock(ApplicationEventProcessor.class); +this.applicationEventReaper = mock(CompletableEventReaper.class); +this.time = new MockTime(); +this.applicationEventsQueue = new LinkedBlockingQueue<>(); +LogContext logContext = new LogContext(); + +this.consumerNetworkThread = new ConsumerNetworkThread( +logContext, time, -testBuilder.applicationEventQueue, +applicationEventsQueue, applicationEventReaper, () -> applicationEventProcessor, -() -> testBuilder.networkClientDelegate, -() -> testBuilder.requestManagers +() -> networkClientDelegate, +() -> requestManagers ); +} + +@BeforeEach +public void setup() { consumerNetworkThread.initializeResources(); } @AfterEach public void tearDown() { -if (testBuilder != null) { -testBuilder.close(); -consumerNetworkThread.close(Duration.ZERO); -} +if (consumerNetworkThread != null) +consumerNetworkThread.close(); +} + +@Test +public void testEnsureCloseStopsRunningThread() { +assertTrue(consumerNetworkThread.isRunning(), +"ConsumerNetworkThread should start running when created"); + +consumerNetworkThread.close(); +assertFalse(consumerNetworkThread.isRunning(), +"close() should make consumerNetworkThread.running false by calling closeInternal(Duration timeout)"); +
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1631524427 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -211,178 +232,80 @@ public void testResetPositionsProcessFailureIsIgnored() { } @Test -public void testValidatePositionsEventIsProcessed() { -ValidatePositionsEvent e = new ValidatePositionsEvent(calculateDeadlineMs(time, 100)); -applicationEventsQueue.add(e); -consumerNetworkThread.runOnce(); - verify(applicationEventProcessor).process(any(ValidatePositionsEvent.class)); -assertTrue(applicationEventsQueue.isEmpty()); -} - -@Test -public void testAssignmentChangeEvent() { -HashMap offset = mockTopicPartitionOffset(); - -final long currentTimeMs = time.milliseconds(); -ApplicationEvent e = new AssignmentChangeEvent(offset, currentTimeMs); -applicationEventsQueue.add(e); - -consumerNetworkThread.runOnce(); - verify(applicationEventProcessor).process(any(AssignmentChangeEvent.class)); -verify(networkClient, times(1)).poll(anyLong(), anyLong()); -verify(commitRequestManager, times(1)).updateAutoCommitTimer(currentTimeMs); -// Assignment change should generate an async commit (not retried). -verify(commitRequestManager, times(1)).maybeAutoCommitAsync(); -} +public void testPollResultTimer() { +NetworkClientDelegate networkClientDelegate = new NetworkClientDelegate( +time, +config, +logContext, +client +); -@Test -void testFetchTopicMetadata() { -applicationEventsQueue.add(new TopicMetadataEvent("topic", Long.MAX_VALUE)); -consumerNetworkThread.runOnce(); - verify(applicationEventProcessor).process(any(TopicMetadataEvent.class)); +NetworkClientDelegate.UnsentRequest req = new NetworkClientDelegate.UnsentRequest( +new FindCoordinatorRequest.Builder( +new FindCoordinatorRequestData() + .setKeyType(FindCoordinatorRequest.CoordinatorType.TRANSACTION.id()) +.setKey("foobar")), +Optional.empty()); +req.setTimer(time, DEFAULT_REQUEST_TIMEOUT_MS); + +// purposely setting a non-MAX time to ensure it is returning Long.MAX_VALUE upon success +NetworkClientDelegate.PollResult success = new NetworkClientDelegate.PollResult( +10, +Collections.singletonList(req)); +assertEquals(10, networkClientDelegate.addAll(success)); + +NetworkClientDelegate.PollResult failure = new NetworkClientDelegate.PollResult( +10, +new ArrayList<>()); +assertEquals(10, networkClientDelegate.addAll(failure)); Review Comment: It sems some of my changes were overwritten when I merged, I am fixing that up -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1631518633 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -211,178 +232,80 @@ public void testResetPositionsProcessFailureIsIgnored() { } @Test -public void testValidatePositionsEventIsProcessed() { -ValidatePositionsEvent e = new ValidatePositionsEvent(calculateDeadlineMs(time, 100)); -applicationEventsQueue.add(e); -consumerNetworkThread.runOnce(); - verify(applicationEventProcessor).process(any(ValidatePositionsEvent.class)); -assertTrue(applicationEventsQueue.isEmpty()); -} - -@Test -public void testAssignmentChangeEvent() { -HashMap offset = mockTopicPartitionOffset(); - -final long currentTimeMs = time.milliseconds(); -ApplicationEvent e = new AssignmentChangeEvent(offset, currentTimeMs); -applicationEventsQueue.add(e); - -consumerNetworkThread.runOnce(); - verify(applicationEventProcessor).process(any(AssignmentChangeEvent.class)); -verify(networkClient, times(1)).poll(anyLong(), anyLong()); -verify(commitRequestManager, times(1)).updateAutoCommitTimer(currentTimeMs); -// Assignment change should generate an async commit (not retried). -verify(commitRequestManager, times(1)).maybeAutoCommitAsync(); -} +public void testPollResultTimer() { +NetworkClientDelegate networkClientDelegate = new NetworkClientDelegate( +time, +config, +logContext, +client +); -@Test -void testFetchTopicMetadata() { -applicationEventsQueue.add(new TopicMetadataEvent("topic", Long.MAX_VALUE)); -consumerNetworkThread.runOnce(); - verify(applicationEventProcessor).process(any(TopicMetadataEvent.class)); +NetworkClientDelegate.UnsentRequest req = new NetworkClientDelegate.UnsentRequest( +new FindCoordinatorRequest.Builder( +new FindCoordinatorRequestData() + .setKeyType(FindCoordinatorRequest.CoordinatorType.TRANSACTION.id()) +.setKey("foobar")), +Optional.empty()); +req.setTimer(time, DEFAULT_REQUEST_TIMEOUT_MS); + +// purposely setting a non-MAX time to ensure it is returning Long.MAX_VALUE upon success +NetworkClientDelegate.PollResult success = new NetworkClientDelegate.PollResult( +10, +Collections.singletonList(req)); +assertEquals(10, networkClientDelegate.addAll(success)); + +NetworkClientDelegate.PollResult failure = new NetworkClientDelegate.PollResult( +10, +new ArrayList<>()); +assertEquals(10, networkClientDelegate.addAll(failure)); Review Comment: this seems like a duplicate, not aligned with the comment, what's the intention? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
brenden20 commented on PR #16140: URL: https://github.com/apache/kafka/pull/16140#issuecomment-2155226928 @lianetm implemented suggestions and merged with trunk for no conflicts -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1631457443 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -258,155 +252,66 @@ void testPollResultTimer() { NetworkClientDelegate.PollResult success = new NetworkClientDelegate.PollResult( 10, Collections.singletonList(req)); -assertEquals(10, networkClient.addAll(success)); +assertEquals(10, networkClientDelegate.addAll(success)); NetworkClientDelegate.PollResult failure = new NetworkClientDelegate.PollResult( 10, new ArrayList<>()); -assertEquals(10, networkClient.addAll(failure)); +assertEquals(10, networkClientDelegate.addAll(failure)); } @Test -void testMaximumTimeToWait() { +public void testMaximumTimeToWait() { // Initial value before runOnce has been called assertEquals(ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS, consumerNetworkThread.maximumTimeToWait()); -consumerNetworkThread.runOnce(); -// After runOnce has been called, it takes the default heartbeat interval from the heartbeat request manager -assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, consumerNetworkThread.maximumTimeToWait()); -} -@Test -void testRequestManagersArePolledOnce() { -consumerNetworkThread.runOnce(); -testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).poll(anyLong(; -testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).maximumTimeToWait(anyLong(; -verify(networkClient, times(1)).poll(anyLong(), anyLong()); -} + when(requestManagers.entries()).thenReturn(Collections.singletonList(Optional.of(heartbeatRequestManager))); + when(heartbeatRequestManager.maximumTimeToWait(time.milliseconds())).thenReturn((long) DEFAULT_HEARTBEAT_INTERVAL_MS); -@Test -void testEnsureMetadataUpdateOnPoll() { -MetadataResponse metadataResponse = RequestTestUtils.metadataUpdateWith(2, Collections.emptyMap()); -client.prepareMetadataUpdate(metadataResponse); -metadata.requestUpdate(false); consumerNetworkThread.runOnce(); -verify(metadata, times(1)).updateWithCurrentRequestVersion(eq(metadataResponse), eq(false), anyLong()); -} - -@Test -void testEnsureEventsAreCompleted() { -// Mimic the logic of CompletableEventReaper.reap(Collection): -doAnswer(__ -> { -Iterator i = applicationEventsQueue.iterator(); - -while (i.hasNext()) { -ApplicationEvent event = i.next(); - -if (event instanceof CompletableEvent) -((CompletableEvent) event).future().completeExceptionally(new TimeoutException()); - -i.remove(); -} - -return null; -}).when(applicationEventReaper).reap(any(Collection.class)); - -Node node = metadata.fetch().nodes().get(0); -coordinatorRequestManager.markCoordinatorUnknown("test", time.milliseconds()); - client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "group-id", node)); -prepareOffsetCommitRequest(new HashMap<>(), Errors.NONE, false); -CompletableApplicationEvent event1 = spy(new AsyncCommitEvent(Collections.emptyMap())); -ApplicationEvent event2 = new AsyncCommitEvent(Collections.emptyMap()); -CompletableFuture future = new CompletableFuture<>(); -when(event1.future()).thenReturn(future); -applicationEventsQueue.add(event1); -applicationEventsQueue.add(event2); -assertFalse(future.isDone()); -assertFalse(applicationEventsQueue.isEmpty()); -consumerNetworkThread.cleanup(); -assertTrue(future.isCompletedExceptionally()); -assertTrue(applicationEventsQueue.isEmpty()); +// After runOnce has been called, it takes the default heartbeat interval from the heartbeat request manager +assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, consumerNetworkThread.maximumTimeToWait()); } @Test -void testCleanupInvokesReaper() { +public void testCleanupInvokesReaper() { +LinkedList queue = new LinkedList<>(); +when(networkClientDelegate.unsentRequests()).thenReturn(queue); consumerNetworkThread.cleanup(); verify(applicationEventReaper).reap(applicationEventsQueue); } @Test -void testRunOnceInvokesReaper() { +public void testRunOnceInvokesReaper() { consumerNetworkThread.runOnce(); verify(applicationEventReaper).reap(any(Long.class)); } @Test -void testSendUnsentRequest() { -String groupId = "group-id"; -NetworkClientDelegate.UnsentRequest request = new NetworkClientD
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on PR #16140: URL: https://github.com/apache/kafka/pull/16140#issuecomment-2154902452 Hey @brenden20 , thanks for the update, just 2 minor comments left. Could you please get trunk latests changes and solve the conflicts? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1631259791 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -258,155 +252,66 @@ void testPollResultTimer() { NetworkClientDelegate.PollResult success = new NetworkClientDelegate.PollResult( 10, Collections.singletonList(req)); -assertEquals(10, networkClient.addAll(success)); +assertEquals(10, networkClientDelegate.addAll(success)); NetworkClientDelegate.PollResult failure = new NetworkClientDelegate.PollResult( 10, new ArrayList<>()); -assertEquals(10, networkClient.addAll(failure)); +assertEquals(10, networkClientDelegate.addAll(failure)); } @Test -void testMaximumTimeToWait() { +public void testMaximumTimeToWait() { // Initial value before runOnce has been called assertEquals(ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS, consumerNetworkThread.maximumTimeToWait()); -consumerNetworkThread.runOnce(); -// After runOnce has been called, it takes the default heartbeat interval from the heartbeat request manager -assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, consumerNetworkThread.maximumTimeToWait()); -} -@Test -void testRequestManagersArePolledOnce() { -consumerNetworkThread.runOnce(); -testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).poll(anyLong(; -testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).maximumTimeToWait(anyLong(; -verify(networkClient, times(1)).poll(anyLong(), anyLong()); -} + when(requestManagers.entries()).thenReturn(Collections.singletonList(Optional.of(heartbeatRequestManager))); + when(heartbeatRequestManager.maximumTimeToWait(time.milliseconds())).thenReturn((long) DEFAULT_HEARTBEAT_INTERVAL_MS); -@Test -void testEnsureMetadataUpdateOnPoll() { -MetadataResponse metadataResponse = RequestTestUtils.metadataUpdateWith(2, Collections.emptyMap()); -client.prepareMetadataUpdate(metadataResponse); -metadata.requestUpdate(false); consumerNetworkThread.runOnce(); -verify(metadata, times(1)).updateWithCurrentRequestVersion(eq(metadataResponse), eq(false), anyLong()); -} - -@Test -void testEnsureEventsAreCompleted() { -// Mimic the logic of CompletableEventReaper.reap(Collection): -doAnswer(__ -> { -Iterator i = applicationEventsQueue.iterator(); - -while (i.hasNext()) { -ApplicationEvent event = i.next(); - -if (event instanceof CompletableEvent) -((CompletableEvent) event).future().completeExceptionally(new TimeoutException()); - -i.remove(); -} - -return null; -}).when(applicationEventReaper).reap(any(Collection.class)); - -Node node = metadata.fetch().nodes().get(0); -coordinatorRequestManager.markCoordinatorUnknown("test", time.milliseconds()); - client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "group-id", node)); -prepareOffsetCommitRequest(new HashMap<>(), Errors.NONE, false); -CompletableApplicationEvent event1 = spy(new AsyncCommitEvent(Collections.emptyMap())); -ApplicationEvent event2 = new AsyncCommitEvent(Collections.emptyMap()); -CompletableFuture future = new CompletableFuture<>(); -when(event1.future()).thenReturn(future); -applicationEventsQueue.add(event1); -applicationEventsQueue.add(event2); -assertFalse(future.isDone()); -assertFalse(applicationEventsQueue.isEmpty()); -consumerNetworkThread.cleanup(); -assertTrue(future.isCompletedExceptionally()); -assertTrue(applicationEventsQueue.isEmpty()); +// After runOnce has been called, it takes the default heartbeat interval from the heartbeat request manager +assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, consumerNetworkThread.maximumTimeToWait()); } @Test -void testCleanupInvokesReaper() { +public void testCleanupInvokesReaper() { +LinkedList queue = new LinkedList<>(); +when(networkClientDelegate.unsentRequests()).thenReturn(queue); consumerNetworkThread.cleanup(); verify(applicationEventReaper).reap(applicationEventsQueue); } @Test -void testRunOnceInvokesReaper() { +public void testRunOnceInvokesReaper() { consumerNetworkThread.runOnce(); verify(applicationEventReaper).reap(any(Long.class)); } @Test -void testSendUnsentRequest() { -String groupId = "group-id"; -NetworkClientDelegate.UnsentRequest request = new NetworkClientDel
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1631257210 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -258,155 +252,66 @@ void testPollResultTimer() { NetworkClientDelegate.PollResult success = new NetworkClientDelegate.PollResult( 10, Collections.singletonList(req)); -assertEquals(10, networkClient.addAll(success)); +assertEquals(10, networkClientDelegate.addAll(success)); NetworkClientDelegate.PollResult failure = new NetworkClientDelegate.PollResult( 10, new ArrayList<>()); -assertEquals(10, networkClient.addAll(failure)); +assertEquals(10, networkClientDelegate.addAll(failure)); } @Test -void testMaximumTimeToWait() { +public void testMaximumTimeToWait() { // Initial value before runOnce has been called assertEquals(ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS, consumerNetworkThread.maximumTimeToWait()); -consumerNetworkThread.runOnce(); -// After runOnce has been called, it takes the default heartbeat interval from the heartbeat request manager -assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, consumerNetworkThread.maximumTimeToWait()); -} -@Test -void testRequestManagersArePolledOnce() { -consumerNetworkThread.runOnce(); -testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).poll(anyLong(; -testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).maximumTimeToWait(anyLong(; -verify(networkClient, times(1)).poll(anyLong(), anyLong()); -} + when(requestManagers.entries()).thenReturn(Collections.singletonList(Optional.of(heartbeatRequestManager))); + when(heartbeatRequestManager.maximumTimeToWait(time.milliseconds())).thenReturn((long) DEFAULT_HEARTBEAT_INTERVAL_MS); -@Test -void testEnsureMetadataUpdateOnPoll() { -MetadataResponse metadataResponse = RequestTestUtils.metadataUpdateWith(2, Collections.emptyMap()); -client.prepareMetadataUpdate(metadataResponse); -metadata.requestUpdate(false); consumerNetworkThread.runOnce(); -verify(metadata, times(1)).updateWithCurrentRequestVersion(eq(metadataResponse), eq(false), anyLong()); -} - -@Test -void testEnsureEventsAreCompleted() { -// Mimic the logic of CompletableEventReaper.reap(Collection): -doAnswer(__ -> { -Iterator i = applicationEventsQueue.iterator(); - -while (i.hasNext()) { -ApplicationEvent event = i.next(); - -if (event instanceof CompletableEvent) -((CompletableEvent) event).future().completeExceptionally(new TimeoutException()); - -i.remove(); -} - -return null; -}).when(applicationEventReaper).reap(any(Collection.class)); - -Node node = metadata.fetch().nodes().get(0); -coordinatorRequestManager.markCoordinatorUnknown("test", time.milliseconds()); - client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "group-id", node)); -prepareOffsetCommitRequest(new HashMap<>(), Errors.NONE, false); -CompletableApplicationEvent event1 = spy(new AsyncCommitEvent(Collections.emptyMap())); -ApplicationEvent event2 = new AsyncCommitEvent(Collections.emptyMap()); -CompletableFuture future = new CompletableFuture<>(); -when(event1.future()).thenReturn(future); -applicationEventsQueue.add(event1); -applicationEventsQueue.add(event2); -assertFalse(future.isDone()); -assertFalse(applicationEventsQueue.isEmpty()); -consumerNetworkThread.cleanup(); -assertTrue(future.isCompletedExceptionally()); -assertTrue(applicationEventsQueue.isEmpty()); +// After runOnce has been called, it takes the default heartbeat interval from the heartbeat request manager +assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, consumerNetworkThread.maximumTimeToWait()); } @Test -void testCleanupInvokesReaper() { +public void testCleanupInvokesReaper() { +LinkedList queue = new LinkedList<>(); +when(networkClientDelegate.unsentRequests()).thenReturn(queue); consumerNetworkThread.cleanup(); verify(applicationEventReaper).reap(applicationEventsQueue); } @Test -void testRunOnceInvokesReaper() { +public void testRunOnceInvokesReaper() { consumerNetworkThread.runOnce(); verify(applicationEventReaper).reap(any(Long.class)); } @Test -void testSendUnsentRequest() { -String groupId = "group-id"; -NetworkClientDelegate.UnsentRequest request = new NetworkClientDel
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1630153025 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -341,6 +309,24 @@ void testRunOnceInvokesReaper() { @Test void testSendUnsentRequest() { +NetworkClientDelegate networkClientDelegate = new NetworkClientDelegate( Review Comment: Got that done! Thank you -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1630086160 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -212,40 +251,15 @@ public void testResetPositionsProcessFailureIsIgnored() { verify(applicationEventProcessor).process(any(ResetPositionsEvent.class)); } -@Test -public void testValidatePositionsEventIsProcessed() { -ValidatePositionsEvent e = new ValidatePositionsEvent(calculateDeadlineMs(time, 100)); -applicationEventsQueue.add(e); -consumerNetworkThread.runOnce(); - verify(applicationEventProcessor).process(any(ValidatePositionsEvent.class)); -assertTrue(applicationEventsQueue.isEmpty()); -} - -@Test -public void testAssignmentChangeEvent() { -HashMap offset = mockTopicPartitionOffset(); - -final long currentTimeMs = time.milliseconds(); -ApplicationEvent e = new AssignmentChangeEvent(offset, currentTimeMs); -applicationEventsQueue.add(e); - -consumerNetworkThread.runOnce(); - verify(applicationEventProcessor).process(any(AssignmentChangeEvent.class)); -verify(networkClient, times(1)).poll(anyLong(), anyLong()); -verify(commitRequestManager, times(1)).updateAutoCommitTimer(currentTimeMs); -// Assignment change should generate an async commit (not retried). -verify(commitRequestManager, times(1)).maybeAutoCommitAsync(); -} - -@Test -void testFetchTopicMetadata() { -applicationEventsQueue.add(new TopicMetadataEvent("topic", Long.MAX_VALUE)); -consumerNetworkThread.runOnce(); - verify(applicationEventProcessor).process(any(TopicMetadataEvent.class)); -} - @Test void testPollResultTimer() { Review Comment: Good point, I will make a separate ticket and PR to handle this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on PR #16140: URL: https://github.com/apache/kafka/pull/16140#issuecomment-2153227747 I completed another pass @brenden20 , left some comments to consider getting rid of some instances that we couldn't still mock but seems possible. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1630082187 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -150,35 +184,49 @@ public void testStartupAndTearDown() throws InterruptedException { } @Test -public void testApplicationEvent() { -ApplicationEvent e = new PollEvent(100); -applicationEventsQueue.add(e); +public void testRequestsTransferFromManagersToClientOnThreadRun() { +List> list = new ArrayList<>(); +list.add(Optional.of(coordinatorRequestManager)); +list.add(Optional.of(heartbeatRequestManager)); +list.add(Optional.of(offsetsRequestManager)); + +when(requestManagers.entries()).thenReturn(list); + when(coordinatorRequestManager.poll(anyLong())).thenReturn(mock(NetworkClientDelegate.PollResult.class)); consumerNetworkThread.runOnce(); -verify(applicationEventProcessor, times(1)).process(e); +requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm).poll(anyLong(; +requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm).maximumTimeToWait(anyLong(; + verify(networkClientDelegate).addAll(any(NetworkClientDelegate.PollResult.class)); +verify(networkClientDelegate).poll(anyLong(), anyLong()); } -@Test -public void testMetadataUpdateEvent() { -ApplicationEvent e = new NewTopicsMetadataUpdateRequestEvent(); +@ParameterizedTest +@MethodSource("applicationEvents") +public void testApplicationEventIsProcessed(ApplicationEvent e) { applicationEventsQueue.add(e); consumerNetworkThread.runOnce(); -verify(metadata).requestUpdateForNewTopics(); -} -@Test -public void testAsyncCommitEvent() { -ApplicationEvent e = new AsyncCommitEvent(new HashMap<>()); -applicationEventsQueue.add(e); -consumerNetworkThread.runOnce(); -verify(applicationEventProcessor).process(any(AsyncCommitEvent.class)); +if (e instanceof CompletableEvent) +verify(applicationEventReaper).add((CompletableEvent) e); + +verify(applicationEventProcessor).process(any(e.getClass())); +assertTrue(applicationEventsQueue.isEmpty()); } -@Test -public void testSyncCommitEvent() { -ApplicationEvent e = new SyncCommitEvent(new HashMap<>(), calculateDeadlineMs(time, 100)); -applicationEventsQueue.add(e); -consumerNetworkThread.runOnce(); -verify(applicationEventProcessor).process(any(SyncCommitEvent.class)); +private static Stream applicationEvents() { +Time time1 = new MockTime(); +Map offset = mockTopicPartitionOffset(); +final long currentTimeMs = time1.milliseconds(); + +// use 500 for deadlineMs Review Comment: Mistake leaving that there, fixed now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1630075479 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -341,6 +309,24 @@ void testRunOnceInvokesReaper() { @Test void testSendUnsentRequest() { +NetworkClientDelegate networkClientDelegate = new NetworkClientDelegate( Review Comment: This could all be greatly simplified. What about something like: ``` when(networkClientDelegate.hasAnyPendingRequests()).thenReturn(true).thenReturn(true).thenReturn(false); consumerNetworkThread.cleanup(); verify(networkClientDelegate, times(2)).poll(anyLong(), anyLong()); ``` The responsibility of the component is only to poll the network client as long as there are pending requests, so with the above we verify that (no need to get into creating an actual network client and assert on it, etc). Makes sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1630075479 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -341,6 +309,24 @@ void testRunOnceInvokesReaper() { @Test void testSendUnsentRequest() { +NetworkClientDelegate networkClientDelegate = new NetworkClientDelegate( Review Comment: This could all be greatly simplified. What about replacing the whole test with something like: ``` when(networkClientDelegate.hasAnyPendingRequests()).thenReturn(true).thenReturn(true).thenReturn(false); consumerNetworkThread.cleanup(); verify(networkClientDelegate, times(2)).poll(anyLong(), anyLong()); ``` The responsibility of the component is only to poll the network client as long as there are pending requests, so with the above we verify that (no need to get into creating an actual network client and assert on it, etc). Makes sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1630075479 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -341,6 +309,24 @@ void testRunOnceInvokesReaper() { @Test void testSendUnsentRequest() { +NetworkClientDelegate networkClientDelegate = new NetworkClientDelegate( Review Comment: This could all be greatly simplified. What about something like: ``` when(networkClientDelegate.hasAnyPendingRequests()).thenReturn(true).thenReturn(true).thenReturn(false); consumerNetworkThread.cleanup(); verify(networkClientDelegate, times(2)).poll(anyLong(), anyLong()); ``` The responsibility of the component is only to poll the network client as long as there are pending requests, so with the above we verify that (no need to get into creating an actual network client and assert on it, etc). Makes sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1630033370 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -212,40 +251,15 @@ public void testResetPositionsProcessFailureIsIgnored() { verify(applicationEventProcessor).process(any(ResetPositionsEvent.class)); } -@Test -public void testValidatePositionsEventIsProcessed() { -ValidatePositionsEvent e = new ValidatePositionsEvent(calculateDeadlineMs(time, 100)); -applicationEventsQueue.add(e); -consumerNetworkThread.runOnce(); - verify(applicationEventProcessor).process(any(ValidatePositionsEvent.class)); -assertTrue(applicationEventsQueue.isEmpty()); -} - -@Test -public void testAssignmentChangeEvent() { -HashMap offset = mockTopicPartitionOffset(); - -final long currentTimeMs = time.milliseconds(); -ApplicationEvent e = new AssignmentChangeEvent(offset, currentTimeMs); -applicationEventsQueue.add(e); - -consumerNetworkThread.runOnce(); - verify(applicationEventProcessor).process(any(AssignmentChangeEvent.class)); -verify(networkClient, times(1)).poll(anyLong(), anyLong()); -verify(commitRequestManager, times(1)).updateAutoCommitTimer(currentTimeMs); -// Assignment change should generate an async commit (not retried). -verify(commitRequestManager, times(1)).maybeAutoCommitAsync(); -} - -@Test -void testFetchTopicMetadata() { -applicationEventsQueue.add(new TopicMetadataEvent("topic", Long.MAX_VALUE)); -consumerNetworkThread.runOnce(); - verify(applicationEventProcessor).process(any(TopicMetadataEvent.class)); -} - @Test void testPollResultTimer() { Review Comment: This test is all about the `NetworkClientDelegate`, no interaction at all with the `ConsumerNetworkThread` right? Should be completely moved to `NetorkClientDelegateTest` I would say (this PR or separately, as you wish) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1630022846 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -258,77 +272,31 @@ void testPollResultTimer() { NetworkClientDelegate.PollResult success = new NetworkClientDelegate.PollResult( 10, Collections.singletonList(req)); -assertEquals(10, networkClient.addAll(success)); +assertEquals(10, networkClientDelegate.addAll(success)); NetworkClientDelegate.PollResult failure = new NetworkClientDelegate.PollResult( 10, new ArrayList<>()); -assertEquals(10, networkClient.addAll(failure)); +assertEquals(10, networkClientDelegate.addAll(failure)); } @Test void testMaximumTimeToWait() { // Initial value before runOnce has been called assertEquals(ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS, consumerNetworkThread.maximumTimeToWait()); -consumerNetworkThread.runOnce(); -// After runOnce has been called, it takes the default heartbeat interval from the heartbeat request manager -assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, consumerNetworkThread.maximumTimeToWait()); -} -@Test -void testRequestManagersArePolledOnce() { -consumerNetworkThread.runOnce(); -testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).poll(anyLong(; -testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).maximumTimeToWait(anyLong(; -verify(networkClient, times(1)).poll(anyLong(), anyLong()); -} + when(requestManagers.entries()).thenReturn(Collections.singletonList(Optional.of(heartbeatRequestManager))); + when(heartbeatRequestManager.maximumTimeToWait(time.milliseconds())).thenReturn((long) DEFAULT_HEARTBEAT_INTERVAL_MS); -@Test -void testEnsureMetadataUpdateOnPoll() { -MetadataResponse metadataResponse = RequestTestUtils.metadataUpdateWith(2, Collections.emptyMap()); -client.prepareMetadataUpdate(metadataResponse); -metadata.requestUpdate(false); consumerNetworkThread.runOnce(); -verify(metadata, times(1)).updateWithCurrentRequestVersion(eq(metadataResponse), eq(false), anyLong()); -} - -@Test -void testEnsureEventsAreCompleted() { Review Comment: yay! thanks for removing this! it's been hanging around for too long without value -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1630020941 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -150,35 +184,49 @@ public void testStartupAndTearDown() throws InterruptedException { } @Test -public void testApplicationEvent() { -ApplicationEvent e = new PollEvent(100); -applicationEventsQueue.add(e); +public void testRequestsTransferFromManagersToClientOnThreadRun() { +List> list = new ArrayList<>(); +list.add(Optional.of(coordinatorRequestManager)); +list.add(Optional.of(heartbeatRequestManager)); +list.add(Optional.of(offsetsRequestManager)); + +when(requestManagers.entries()).thenReturn(list); + when(coordinatorRequestManager.poll(anyLong())).thenReturn(mock(NetworkClientDelegate.PollResult.class)); consumerNetworkThread.runOnce(); -verify(applicationEventProcessor, times(1)).process(e); +requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm).poll(anyLong(; +requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm).maximumTimeToWait(anyLong(; + verify(networkClientDelegate).addAll(any(NetworkClientDelegate.PollResult.class)); +verify(networkClientDelegate).poll(anyLong(), anyLong()); } -@Test -public void testMetadataUpdateEvent() { -ApplicationEvent e = new NewTopicsMetadataUpdateRequestEvent(); +@ParameterizedTest +@MethodSource("applicationEvents") +public void testApplicationEventIsProcessed(ApplicationEvent e) { applicationEventsQueue.add(e); consumerNetworkThread.runOnce(); -verify(metadata).requestUpdateForNewTopics(); -} -@Test -public void testAsyncCommitEvent() { -ApplicationEvent e = new AsyncCommitEvent(new HashMap<>()); -applicationEventsQueue.add(e); -consumerNetworkThread.runOnce(); -verify(applicationEventProcessor).process(any(AsyncCommitEvent.class)); +if (e instanceof CompletableEvent) +verify(applicationEventReaper).add((CompletableEvent) e); + +verify(applicationEventProcessor).process(any(e.getClass())); +assertTrue(applicationEventsQueue.isEmpty()); } -@Test -public void testSyncCommitEvent() { -ApplicationEvent e = new SyncCommitEvent(new HashMap<>(), calculateDeadlineMs(time, 100)); -applicationEventsQueue.add(e); -consumerNetworkThread.runOnce(); -verify(applicationEventProcessor).process(any(SyncCommitEvent.class)); +private static Stream applicationEvents() { +Time time1 = new MockTime(); +Map offset = mockTopicPartitionOffset(); +final long currentTimeMs = time1.milliseconds(); + +// use 500 for deadlineMs Review Comment: what's this comment for? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1629578263 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -150,35 +200,44 @@ public void testStartupAndTearDown() throws InterruptedException { } @Test -public void testApplicationEvent() { -ApplicationEvent e = new PollEvent(100); -applicationEventsQueue.add(e); +public void testRequestManagersArePolledOnce() { Review Comment: Yes, this part we're testing gets the requests from the managers and adds them to the network client (that's the transfer I was referring to). But whatever name we prefer, the point is to reflect what we're testing (which is not "managersPolledOnce") -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
philipnee commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1628618091 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -74,68 +79,113 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class ConsumerNetworkThreadTest { +static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000; +static final int DEFAULT_REQUEST_TIMEOUT_MS = 500; + +private final Time time; +private final ConsumerMetadata metadata; +private final BlockingQueue applicationEventsQueue; +private final ApplicationEventProcessor applicationEventProcessor; +private final OffsetsRequestManager offsetsRequestManager; +private final HeartbeatRequestManager heartbeatRequestManager; +private final CoordinatorRequestManager coordinatorRequestManager; +private final ConsumerNetworkThread consumerNetworkThread; +private final MockClient client; +private final NetworkClientDelegate networkClientDelegate; +private final NetworkClientDelegate networkClient; +private final RequestManagers requestManagers; +private final CompletableEventReaper applicationEventReaper; + +ConsumerNetworkThreadTest() { +ConsumerConfig config = mock(ConsumerConfig.class); +this.time = new MockTime(); Review Comment: for aesthetic purposes, can we group mock instances and non-mock ones? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
philipnee commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1628611058 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -74,68 +79,113 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class ConsumerNetworkThreadTest { +static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000; +static final int DEFAULT_REQUEST_TIMEOUT_MS = 500; + +private final Time time; +private final ConsumerMetadata metadata; +private final BlockingQueue applicationEventsQueue; +private final ApplicationEventProcessor applicationEventProcessor; +private final OffsetsRequestManager offsetsRequestManager; +private final HeartbeatRequestManager heartbeatRequestManager; +private final CoordinatorRequestManager coordinatorRequestManager; +private final ConsumerNetworkThread consumerNetworkThread; +private final MockClient client; +private final NetworkClientDelegate networkClientDelegate; +private final NetworkClientDelegate networkClient; +private final RequestManagers requestManagers; +private final CompletableEventReaper applicationEventReaper; + +ConsumerNetworkThreadTest() { +ConsumerConfig config = mock(ConsumerConfig.class); +this.time = new MockTime(); +this.networkClientDelegate = mock(NetworkClientDelegate.class); +this.requestManagers = mock(RequestManagers.class); +this.offsetsRequestManager = mock(OffsetsRequestManager.class); +this.heartbeatRequestManager = mock(HeartbeatRequestManager.class); +this.coordinatorRequestManager = mock(CoordinatorRequestManager.class); +this.metadata = mock(ConsumerMetadata.class); +this.applicationEventProcessor = mock(ApplicationEventProcessor.class); +this.applicationEventReaper = mock(CompletableEventReaper.class); +this.client = new MockClient(time); +this.applicationEventsQueue = new LinkedBlockingQueue<>(); +LogContext logContext = new LogContext(); + +this.networkClient = new NetworkClientDelegate( +time, +config, +logContext, +client +); -private ConsumerTestBuilder testBuilder; -private Time time; -private ConsumerMetadata metadata; -private NetworkClientDelegate networkClient; -private BlockingQueue applicationEventsQueue; -private ApplicationEventProcessor applicationEventProcessor; -private OffsetsRequestManager offsetsRequestManager; -private CommitRequestManager commitRequestManager; -private CoordinatorRequestManager coordinatorRequestManager; -private ConsumerNetworkThread consumerNetworkThread; -private final CompletableEventReaper applicationEventReaper = mock(CompletableEventReaper.class); -private MockClient client; - -@BeforeEach -public void setup() { -testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation()); -time = testBuilder.time; -metadata = testBuilder.metadata; -networkClient = testBuilder.networkClientDelegate; -client = testBuilder.client; -applicationEventsQueue = testBuilder.applicationEventQueue; -applicationEventProcessor = testBuilder.applicationEventProcessor; -commitRequestManager = testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new); -offsetsRequestManager = testBuilder.offsetsRequestManager; -coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new); -consumerNetworkThread = new ConsumerNetworkThread( -testBuilder.logContext, +this.consumerNetworkThread = new ConsumerNetworkThread( +logContext, time, -testBuilder.applicationEventQueue, +applicationEventsQueue, applicationEventReaper, () -> applicationEventProcessor, -() -> testBuilder.networkClientDelegate, -() -> testBuilder.requestManagers +() -> networkClientDelegate, Review Comment: you created two networkClientDelegate instances here - you should pick one to use. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.o
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
philipnee commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1628610328 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -341,6 +356,17 @@ void testRunOnceInvokesReaper() { @Test void testSendUnsentRequest() { +ConsumerNetworkThread consumerNetworkThread1 = new ConsumerNetworkThread( +new LogContext(), +time, +applicationEventsQueue, +applicationEventReaper, +() -> applicationEventProcessor, +() -> networkClient, +() -> requestManagers +); +consumerNetworkThread1.initializeResources(); Review Comment: you can also remove this line. since it's in the setup() -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
philipnee commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1628598367 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -341,6 +356,17 @@ void testRunOnceInvokesReaper() { @Test void testSendUnsentRequest() { +ConsumerNetworkThread consumerNetworkThread1 = new ConsumerNetworkThread( Review Comment: can you just use `this.consumerNetworkThread` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
philipnee commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1628596559 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -150,35 +200,44 @@ public void testStartupAndTearDown() throws InterruptedException { } @Test -public void testApplicationEvent() { -ApplicationEvent e = new PollEvent(100); -applicationEventsQueue.add(e); +public void testRequestManagersArePolledOnce() { +List> list = new ArrayList<>(); +list.add(Optional.of(coordinatorRequestManager)); +list.add(Optional.of(heartbeatRequestManager)); +list.add(Optional.of(offsetsRequestManager)); + +when(requestManagers.entries()).thenReturn(list); + when(coordinatorRequestManager.poll(anyLong())).thenReturn(mock(NetworkClientDelegate.PollResult.class)); consumerNetworkThread.runOnce(); -verify(applicationEventProcessor, times(1)).process(e); +requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm).poll(anyLong(; +requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm).maximumTimeToWait(anyLong(; + verify(networkClientDelegate).addAll(any(NetworkClientDelegate.PollResult.class)); +verify(networkClientDelegate).poll(anyLong(), anyLong()); } -@Test -public void testMetadataUpdateEvent() { -ApplicationEvent e = new NewTopicsMetadataUpdateRequestEvent(); -applicationEventsQueue.add(e); -consumerNetworkThread.runOnce(); -verify(metadata).requestUpdateForNewTopics(); -} - -@Test -public void testAsyncCommitEvent() { -ApplicationEvent e = new AsyncCommitEvent(new HashMap<>()); +@ParameterizedTest +@MethodSource("appEvents") +public void testEventIsProcessed(ApplicationEvent e) { applicationEventsQueue.add(e); consumerNetworkThread.runOnce(); -verify(applicationEventProcessor).process(any(AsyncCommitEvent.class)); +verify(applicationEventProcessor).process(any(e.getClass())); +assertTrue(applicationEventsQueue.isEmpty()); } -@Test -public void testSyncCommitEvent() { -ApplicationEvent e = new SyncCommitEvent(new HashMap<>(), calculateDeadlineMs(time, 100)); -applicationEventsQueue.add(e); -consumerNetworkThread.runOnce(); -verify(applicationEventProcessor).process(any(SyncCommitEvent.class)); +private static Stream appEvents() { +Time time1 = new MockTime(); Review Comment: you can just this.time ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
philipnee commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1628596024 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -150,35 +200,44 @@ public void testStartupAndTearDown() throws InterruptedException { } @Test -public void testApplicationEvent() { -ApplicationEvent e = new PollEvent(100); -applicationEventsQueue.add(e); +public void testRequestManagersArePolledOnce() { +List> list = new ArrayList<>(); +list.add(Optional.of(coordinatorRequestManager)); +list.add(Optional.of(heartbeatRequestManager)); +list.add(Optional.of(offsetsRequestManager)); + +when(requestManagers.entries()).thenReturn(list); + when(coordinatorRequestManager.poll(anyLong())).thenReturn(mock(NetworkClientDelegate.PollResult.class)); consumerNetworkThread.runOnce(); -verify(applicationEventProcessor, times(1)).process(e); +requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm).poll(anyLong(; +requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm).maximumTimeToWait(anyLong(; + verify(networkClientDelegate).addAll(any(NetworkClientDelegate.PollResult.class)); +verify(networkClientDelegate).poll(anyLong(), anyLong()); } -@Test -public void testMetadataUpdateEvent() { -ApplicationEvent e = new NewTopicsMetadataUpdateRequestEvent(); -applicationEventsQueue.add(e); -consumerNetworkThread.runOnce(); -verify(metadata).requestUpdateForNewTopics(); -} - -@Test -public void testAsyncCommitEvent() { -ApplicationEvent e = new AsyncCommitEvent(new HashMap<>()); +@ParameterizedTest +@MethodSource("appEvents") +public void testEventIsProcessed(ApplicationEvent e) { applicationEventsQueue.add(e); consumerNetworkThread.runOnce(); -verify(applicationEventProcessor).process(any(AsyncCommitEvent.class)); +verify(applicationEventProcessor).process(any(e.getClass())); +assertTrue(applicationEventsQueue.isEmpty()); } -@Test -public void testSyncCommitEvent() { -ApplicationEvent e = new SyncCommitEvent(new HashMap<>(), calculateDeadlineMs(time, 100)); -applicationEventsQueue.add(e); -consumerNetworkThread.runOnce(); -verify(applicationEventProcessor).process(any(SyncCommitEvent.class)); +private static Stream appEvents() { +Time time1 = new MockTime(); Review Comment: can we just call this `time`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
philipnee commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1628596024 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -150,35 +200,44 @@ public void testStartupAndTearDown() throws InterruptedException { } @Test -public void testApplicationEvent() { -ApplicationEvent e = new PollEvent(100); -applicationEventsQueue.add(e); +public void testRequestManagersArePolledOnce() { +List> list = new ArrayList<>(); +list.add(Optional.of(coordinatorRequestManager)); +list.add(Optional.of(heartbeatRequestManager)); +list.add(Optional.of(offsetsRequestManager)); + +when(requestManagers.entries()).thenReturn(list); + when(coordinatorRequestManager.poll(anyLong())).thenReturn(mock(NetworkClientDelegate.PollResult.class)); consumerNetworkThread.runOnce(); -verify(applicationEventProcessor, times(1)).process(e); +requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm).poll(anyLong(; +requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm).maximumTimeToWait(anyLong(; + verify(networkClientDelegate).addAll(any(NetworkClientDelegate.PollResult.class)); +verify(networkClientDelegate).poll(anyLong(), anyLong()); } -@Test -public void testMetadataUpdateEvent() { -ApplicationEvent e = new NewTopicsMetadataUpdateRequestEvent(); -applicationEventsQueue.add(e); -consumerNetworkThread.runOnce(); -verify(metadata).requestUpdateForNewTopics(); -} - -@Test -public void testAsyncCommitEvent() { -ApplicationEvent e = new AsyncCommitEvent(new HashMap<>()); +@ParameterizedTest +@MethodSource("appEvents") +public void testEventIsProcessed(ApplicationEvent e) { applicationEventsQueue.add(e); consumerNetworkThread.runOnce(); -verify(applicationEventProcessor).process(any(AsyncCommitEvent.class)); +verify(applicationEventProcessor).process(any(e.getClass())); +assertTrue(applicationEventsQueue.isEmpty()); } -@Test -public void testSyncCommitEvent() { -ApplicationEvent e = new SyncCommitEvent(new HashMap<>(), calculateDeadlineMs(time, 100)); -applicationEventsQueue.add(e); -consumerNetworkThread.runOnce(); -verify(applicationEventProcessor).process(any(SyncCommitEvent.class)); +private static Stream appEvents() { +Time time1 = new MockTime(); Review Comment: can we just call this `time`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
philipnee commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1628595462 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -150,35 +200,44 @@ public void testStartupAndTearDown() throws InterruptedException { } @Test -public void testApplicationEvent() { -ApplicationEvent e = new PollEvent(100); -applicationEventsQueue.add(e); +public void testRequestManagersArePolledOnce() { +List> list = new ArrayList<>(); +list.add(Optional.of(coordinatorRequestManager)); +list.add(Optional.of(heartbeatRequestManager)); +list.add(Optional.of(offsetsRequestManager)); + +when(requestManagers.entries()).thenReturn(list); + when(coordinatorRequestManager.poll(anyLong())).thenReturn(mock(NetworkClientDelegate.PollResult.class)); consumerNetworkThread.runOnce(); -verify(applicationEventProcessor, times(1)).process(e); +requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm).poll(anyLong(; +requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm).maximumTimeToWait(anyLong(; + verify(networkClientDelegate).addAll(any(NetworkClientDelegate.PollResult.class)); +verify(networkClientDelegate).poll(anyLong(), anyLong()); } -@Test -public void testMetadataUpdateEvent() { -ApplicationEvent e = new NewTopicsMetadataUpdateRequestEvent(); -applicationEventsQueue.add(e); -consumerNetworkThread.runOnce(); -verify(metadata).requestUpdateForNewTopics(); -} - -@Test -public void testAsyncCommitEvent() { -ApplicationEvent e = new AsyncCommitEvent(new HashMap<>()); +@ParameterizedTest +@MethodSource("appEvents") +public void testEventIsProcessed(ApplicationEvent e) { applicationEventsQueue.add(e); consumerNetworkThread.runOnce(); -verify(applicationEventProcessor).process(any(AsyncCommitEvent.class)); +verify(applicationEventProcessor).process(any(e.getClass())); Review Comment: also need to check this part: ``` if (event instanceof CompletableEvent) applicationEventReaper.add((CompletableEvent) event); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
philipnee commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1628593481 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -150,35 +200,44 @@ public void testStartupAndTearDown() throws InterruptedException { } @Test -public void testApplicationEvent() { -ApplicationEvent e = new PollEvent(100); -applicationEventsQueue.add(e); +public void testRequestManagersArePolledOnce() { +List> list = new ArrayList<>(); +list.add(Optional.of(coordinatorRequestManager)); +list.add(Optional.of(heartbeatRequestManager)); +list.add(Optional.of(offsetsRequestManager)); + +when(requestManagers.entries()).thenReturn(list); + when(coordinatorRequestManager.poll(anyLong())).thenReturn(mock(NetworkClientDelegate.PollResult.class)); consumerNetworkThread.runOnce(); -verify(applicationEventProcessor, times(1)).process(e); +requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm).poll(anyLong(; +requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm).maximumTimeToWait(anyLong(; + verify(networkClientDelegate).addAll(any(NetworkClientDelegate.PollResult.class)); +verify(networkClientDelegate).poll(anyLong(), anyLong()); } -@Test -public void testMetadataUpdateEvent() { -ApplicationEvent e = new NewTopicsMetadataUpdateRequestEvent(); -applicationEventsQueue.add(e); -consumerNetworkThread.runOnce(); -verify(metadata).requestUpdateForNewTopics(); -} - -@Test -public void testAsyncCommitEvent() { -ApplicationEvent e = new AsyncCommitEvent(new HashMap<>()); +@ParameterizedTest +@MethodSource("appEvents") +public void testEventIsProcessed(ApplicationEvent e) { applicationEventsQueue.add(e); consumerNetworkThread.runOnce(); -verify(applicationEventProcessor).process(any(AsyncCommitEvent.class)); +verify(applicationEventProcessor).process(any(e.getClass())); Review Comment: this is not right, the method call is `applicationEventProcessor.process();` in runOnce() -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
philipnee commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1628593481 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -150,35 +200,44 @@ public void testStartupAndTearDown() throws InterruptedException { } @Test -public void testApplicationEvent() { -ApplicationEvent e = new PollEvent(100); -applicationEventsQueue.add(e); +public void testRequestManagersArePolledOnce() { +List> list = new ArrayList<>(); +list.add(Optional.of(coordinatorRequestManager)); +list.add(Optional.of(heartbeatRequestManager)); +list.add(Optional.of(offsetsRequestManager)); + +when(requestManagers.entries()).thenReturn(list); + when(coordinatorRequestManager.poll(anyLong())).thenReturn(mock(NetworkClientDelegate.PollResult.class)); consumerNetworkThread.runOnce(); -verify(applicationEventProcessor, times(1)).process(e); +requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm).poll(anyLong(; +requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm).maximumTimeToWait(anyLong(; + verify(networkClientDelegate).addAll(any(NetworkClientDelegate.PollResult.class)); +verify(networkClientDelegate).poll(anyLong(), anyLong()); } -@Test -public void testMetadataUpdateEvent() { -ApplicationEvent e = new NewTopicsMetadataUpdateRequestEvent(); -applicationEventsQueue.add(e); -consumerNetworkThread.runOnce(); -verify(metadata).requestUpdateForNewTopics(); -} - -@Test -public void testAsyncCommitEvent() { -ApplicationEvent e = new AsyncCommitEvent(new HashMap<>()); +@ParameterizedTest +@MethodSource("appEvents") +public void testEventIsProcessed(ApplicationEvent e) { applicationEventsQueue.add(e); consumerNetworkThread.runOnce(); -verify(applicationEventProcessor).process(any(AsyncCommitEvent.class)); +verify(applicationEventProcessor).process(any(e.getClass())); Review Comment: this is not right, the method call is `applicationEventProcessor.process();` in runOnce() -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
philipnee commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1628593280 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -74,68 +79,113 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class ConsumerNetworkThreadTest { +static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000; +static final int DEFAULT_REQUEST_TIMEOUT_MS = 500; + +private final Time time; +private final ConsumerMetadata metadata; +private final BlockingQueue applicationEventsQueue; +private final ApplicationEventProcessor applicationEventProcessor; +private final OffsetsRequestManager offsetsRequestManager; +private final HeartbeatRequestManager heartbeatRequestManager; +private final CoordinatorRequestManager coordinatorRequestManager; +private final ConsumerNetworkThread consumerNetworkThread; +private final MockClient client; +private final NetworkClientDelegate networkClientDelegate; +private final NetworkClientDelegate networkClient; +private final RequestManagers requestManagers; +private final CompletableEventReaper applicationEventReaper; + +ConsumerNetworkThreadTest() { +ConsumerConfig config = mock(ConsumerConfig.class); +this.time = new MockTime(); +this.networkClientDelegate = mock(NetworkClientDelegate.class); +this.requestManagers = mock(RequestManagers.class); +this.offsetsRequestManager = mock(OffsetsRequestManager.class); +this.heartbeatRequestManager = mock(HeartbeatRequestManager.class); +this.coordinatorRequestManager = mock(CoordinatorRequestManager.class); +this.metadata = mock(ConsumerMetadata.class); +this.applicationEventProcessor = mock(ApplicationEventProcessor.class); +this.applicationEventReaper = mock(CompletableEventReaper.class); +this.client = new MockClient(time); +this.applicationEventsQueue = new LinkedBlockingQueue<>(); +LogContext logContext = new LogContext(); + +this.networkClient = new NetworkClientDelegate( +time, +config, +logContext, +client +); -private ConsumerTestBuilder testBuilder; -private Time time; -private ConsumerMetadata metadata; -private NetworkClientDelegate networkClient; -private BlockingQueue applicationEventsQueue; -private ApplicationEventProcessor applicationEventProcessor; -private OffsetsRequestManager offsetsRequestManager; -private CommitRequestManager commitRequestManager; -private CoordinatorRequestManager coordinatorRequestManager; -private ConsumerNetworkThread consumerNetworkThread; -private final CompletableEventReaper applicationEventReaper = mock(CompletableEventReaper.class); -private MockClient client; - -@BeforeEach -public void setup() { -testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation()); -time = testBuilder.time; -metadata = testBuilder.metadata; -networkClient = testBuilder.networkClientDelegate; -client = testBuilder.client; -applicationEventsQueue = testBuilder.applicationEventQueue; -applicationEventProcessor = testBuilder.applicationEventProcessor; -commitRequestManager = testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new); -offsetsRequestManager = testBuilder.offsetsRequestManager; -coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new); -consumerNetworkThread = new ConsumerNetworkThread( -testBuilder.logContext, +this.consumerNetworkThread = new ConsumerNetworkThread( +logContext, time, -testBuilder.applicationEventQueue, +applicationEventsQueue, applicationEventReaper, () -> applicationEventProcessor, -() -> testBuilder.networkClientDelegate, -() -> testBuilder.requestManagers +() -> networkClientDelegate, +() -> requestManagers ); +} + +@BeforeEach +public void setup() { consumerNetworkThread.initializeResources(); } @AfterEach public void tearDown() { -if (testBuilder != null) { -testBuilder.close(); -consumerNetworkThread.close(Durat
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
philipnee commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1628589378 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -150,35 +200,44 @@ public void testStartupAndTearDown() throws InterruptedException { } @Test -public void testApplicationEvent() { -ApplicationEvent e = new PollEvent(100); -applicationEventsQueue.add(e); +public void testRequestManagersArePolledOnce() { Review Comment: i don't quite get the TransferFromManagersToClient part, perhaps you mean by ` verify(networkClientDelegate).addAll(any(NetworkClientDelegate.PollResult.class));` ? i agree that the test verifies a lot more calls than just verifying rms are `PolledOnce`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
philipnee commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1628502331 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -78,64 +81,120 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class ConsumerNetworkThreadTest { +static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000; +static final int DEFAULT_REQUEST_TIMEOUT_MS = 500; + +private final Time time; +private final ConsumerMetadata metadata; +private final BlockingQueue applicationEventsQueue; +private final ApplicationEventProcessor applicationEventProcessor; +private final OffsetsRequestManager offsetsRequestManager; +private final HeartbeatRequestManager heartbeatRequestManager; +private final CoordinatorRequestManager coordinatorRequestManager; +private final ConsumerNetworkThread consumerNetworkThread; +private final MockClient client; +private final NetworkClientDelegate networkClientDelegate; +private final NetworkClientDelegate networkClient; +private final RequestManagers requestManagers; +private final CompletableEventReaper applicationEventReaper; + +ConsumerNetworkThreadTest() { +ConsumerConfig config = mock(ConsumerConfig.class); +this.time = new MockTime(); +this.networkClientDelegate = mock(NetworkClientDelegate.class); +this.requestManagers = mock(RequestManagers.class); +this.offsetsRequestManager = mock(OffsetsRequestManager.class); +this.heartbeatRequestManager = mock(HeartbeatRequestManager.class); +this.coordinatorRequestManager = mock(CoordinatorRequestManager.class); +this.metadata = mock(ConsumerMetadata.class); +this.applicationEventProcessor = mock(ApplicationEventProcessor.class); +this.applicationEventReaper = mock(CompletableEventReaper.class); +this.client = mock(MockClient.class); +this.applicationEventsQueue = new LinkedBlockingQueue<>(); +LogContext logContext = new LogContext(); + +this.networkClient = new NetworkClientDelegate( +time, +config, +logContext, +client +); -private ConsumerTestBuilder testBuilder; -private Time time; -private ConsumerMetadata metadata; -private NetworkClientDelegate networkClient; -private BlockingQueue applicationEventsQueue; -private ApplicationEventProcessor applicationEventProcessor; -private OffsetsRequestManager offsetsRequestManager; -private CommitRequestManager commitRequestManager; -private CoordinatorRequestManager coordinatorRequestManager; -private ConsumerNetworkThread consumerNetworkThread; -private final CompletableEventReaper applicationEventReaper = mock(CompletableEventReaper.class); -private MockClient client; - -@BeforeEach -public void setup() { -testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation()); -time = testBuilder.time; -metadata = testBuilder.metadata; -networkClient = testBuilder.networkClientDelegate; -client = testBuilder.client; -applicationEventsQueue = testBuilder.applicationEventQueue; -applicationEventProcessor = testBuilder.applicationEventProcessor; -commitRequestManager = testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new); -offsetsRequestManager = testBuilder.offsetsRequestManager; -coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new); -consumerNetworkThread = new ConsumerNetworkThread( -testBuilder.logContext, +this.consumerNetworkThread = new ConsumerNetworkThread( +logContext, time, -testBuilder.applicationEventQueue, +applicationEventsQueue, applicationEventReaper, () -> applicationEventProcessor, -() -> testBuilder.networkClientDelegate, -() -> testBuilder.requestManagers +() -> networkClientDelegate, +() -> requestManagers ); +} + +@BeforeEach +public void setup() { consumerNetworkThread.initializeResources(); } @AfterEach public void tearDown() { -if (testBuilder != null) { -testBuilder.close(); -consumerNetworkThread.close(Duration.ZERO); -} +if (consumerNetworkThread != null) +consumerNetworkThread.close(); +} + +@Test +public void testEnsureCloseStopsRunningThread() { +// consumerNet
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on PR #16140: URL: https://github.com/apache/kafka/pull/16140#issuecomment-2150972168 Hey @brenden20 , thanks for the updates! Left some minor comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1628443660 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -353,13 +379,22 @@ void testSendUnsentRequest() { assertTrue(networkClient.hasAnyPendingRequests()); assertFalse(networkClient.unsentRequests().isEmpty()); assertFalse(client.hasInFlightRequests()); -consumerNetworkThread.cleanup(); +consumerNetworkThread1.cleanup(); Review Comment: nit: up to you but we really don't need this change, and could keep the var name just as `consumerNetworkThread` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1628434821 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -74,68 +52,105 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class ConsumerNetworkThreadTest { +static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000; +static final int DEFAULT_REQUEST_TIMEOUT_MS = 500; + +private final Time time; +private final ConsumerMetadata metadata; +private final BlockingQueue applicationEventsQueue; +private final ApplicationEventProcessor applicationEventProcessor; +private final OffsetsRequestManager offsetsRequestManager; +private final HeartbeatRequestManager heartbeatRequestManager; +private final CoordinatorRequestManager coordinatorRequestManager; +private final ConsumerNetworkThread consumerNetworkThread; +private final MockClient client; +private final NetworkClientDelegate networkClientDelegate; +private final NetworkClientDelegate networkClient; +private final RequestManagers requestManagers; +private final CompletableEventReaper applicationEventReaper; + +ConsumerNetworkThreadTest() { +LogContext logContext = new LogContext(); +ConsumerConfig config = mock(ConsumerConfig.class); +this.time = new MockTime(); +this.networkClientDelegate = mock(NetworkClientDelegate.class); +this.requestManagers = mock(RequestManagers.class); +this.offsetsRequestManager = mock(OffsetsRequestManager.class); +this.heartbeatRequestManager = mock(HeartbeatRequestManager.class); +this.coordinatorRequestManager = mock(CoordinatorRequestManager.class); +this.applicationEventsQueue = new LinkedBlockingQueue<>(); +this.metadata = mock(ConsumerMetadata.class); +this.applicationEventProcessor = mock(ApplicationEventProcessor.class); +this.applicationEventReaper = mock(CompletableEventReaper.class); +this.client = new MockClient(time); + +this.networkClient = new NetworkClientDelegate( Review Comment: this is a good point, what was the resolution? I expect we should be able to mock everything except the ConsumerNetworkThread (but I do see how we have tests asserting on the network client so it would require untangling that, is that the reason why we're still not mocking the NetworkClientDelegate?). OK for me to leave it as it is and do a follow-up btw, just want to understand. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1628418739 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -150,35 +200,44 @@ public void testStartupAndTearDown() throws InterruptedException { } @Test -public void testApplicationEvent() { -ApplicationEvent e = new PollEvent(100); -applicationEventsQueue.add(e); +public void testRequestManagersArePolledOnce() { Review Comment: nit: this name has diverged from the test content I would say. What about `testRequestsTransferFromManagersToClientOnThreadRun` or something like it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1628408348 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -74,68 +79,113 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class ConsumerNetworkThreadTest { +static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000; +static final int DEFAULT_REQUEST_TIMEOUT_MS = 500; + +private final Time time; +private final ConsumerMetadata metadata; +private final BlockingQueue applicationEventsQueue; +private final ApplicationEventProcessor applicationEventProcessor; +private final OffsetsRequestManager offsetsRequestManager; +private final HeartbeatRequestManager heartbeatRequestManager; +private final CoordinatorRequestManager coordinatorRequestManager; +private final ConsumerNetworkThread consumerNetworkThread; +private final MockClient client; +private final NetworkClientDelegate networkClientDelegate; +private final NetworkClientDelegate networkClient; +private final RequestManagers requestManagers; +private final CompletableEventReaper applicationEventReaper; + +ConsumerNetworkThreadTest() { +ConsumerConfig config = mock(ConsumerConfig.class); +this.time = new MockTime(); +this.networkClientDelegate = mock(NetworkClientDelegate.class); +this.requestManagers = mock(RequestManagers.class); +this.offsetsRequestManager = mock(OffsetsRequestManager.class); +this.heartbeatRequestManager = mock(HeartbeatRequestManager.class); +this.coordinatorRequestManager = mock(CoordinatorRequestManager.class); +this.metadata = mock(ConsumerMetadata.class); +this.applicationEventProcessor = mock(ApplicationEventProcessor.class); +this.applicationEventReaper = mock(CompletableEventReaper.class); +this.client = new MockClient(time); +this.applicationEventsQueue = new LinkedBlockingQueue<>(); +LogContext logContext = new LogContext(); + +this.networkClient = new NetworkClientDelegate( +time, +config, +logContext, +client +); -private ConsumerTestBuilder testBuilder; -private Time time; -private ConsumerMetadata metadata; -private NetworkClientDelegate networkClient; -private BlockingQueue applicationEventsQueue; -private ApplicationEventProcessor applicationEventProcessor; -private OffsetsRequestManager offsetsRequestManager; -private CommitRequestManager commitRequestManager; -private CoordinatorRequestManager coordinatorRequestManager; -private ConsumerNetworkThread consumerNetworkThread; -private final CompletableEventReaper applicationEventReaper = mock(CompletableEventReaper.class); -private MockClient client; - -@BeforeEach -public void setup() { -testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation()); -time = testBuilder.time; -metadata = testBuilder.metadata; -networkClient = testBuilder.networkClientDelegate; -client = testBuilder.client; -applicationEventsQueue = testBuilder.applicationEventQueue; -applicationEventProcessor = testBuilder.applicationEventProcessor; -commitRequestManager = testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new); -offsetsRequestManager = testBuilder.offsetsRequestManager; -coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new); -consumerNetworkThread = new ConsumerNetworkThread( -testBuilder.logContext, +this.consumerNetworkThread = new ConsumerNetworkThread( +logContext, time, -testBuilder.applicationEventQueue, +applicationEventsQueue, applicationEventReaper, () -> applicationEventProcessor, -() -> testBuilder.networkClientDelegate, -() -> testBuilder.requestManagers +() -> networkClientDelegate, +() -> requestManagers ); +} + +@BeforeEach +public void setup() { consumerNetworkThread.initializeResources(); } @AfterEach public void tearDown() { -if (testBuilder != null) { -testBuilder.close(); -consumerNetworkThread.close(Duratio
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1628406670 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -74,68 +79,113 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class ConsumerNetworkThreadTest { +static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000; +static final int DEFAULT_REQUEST_TIMEOUT_MS = 500; + +private final Time time; +private final ConsumerMetadata metadata; +private final BlockingQueue applicationEventsQueue; +private final ApplicationEventProcessor applicationEventProcessor; +private final OffsetsRequestManager offsetsRequestManager; +private final HeartbeatRequestManager heartbeatRequestManager; +private final CoordinatorRequestManager coordinatorRequestManager; +private final ConsumerNetworkThread consumerNetworkThread; +private final MockClient client; +private final NetworkClientDelegate networkClientDelegate; +private final NetworkClientDelegate networkClient; +private final RequestManagers requestManagers; +private final CompletableEventReaper applicationEventReaper; + +ConsumerNetworkThreadTest() { +ConsumerConfig config = mock(ConsumerConfig.class); +this.time = new MockTime(); +this.networkClientDelegate = mock(NetworkClientDelegate.class); +this.requestManagers = mock(RequestManagers.class); +this.offsetsRequestManager = mock(OffsetsRequestManager.class); +this.heartbeatRequestManager = mock(HeartbeatRequestManager.class); +this.coordinatorRequestManager = mock(CoordinatorRequestManager.class); +this.metadata = mock(ConsumerMetadata.class); +this.applicationEventProcessor = mock(ApplicationEventProcessor.class); +this.applicationEventReaper = mock(CompletableEventReaper.class); +this.client = new MockClient(time); +this.applicationEventsQueue = new LinkedBlockingQueue<>(); +LogContext logContext = new LogContext(); + +this.networkClient = new NetworkClientDelegate( +time, +config, +logContext, +client +); -private ConsumerTestBuilder testBuilder; -private Time time; -private ConsumerMetadata metadata; -private NetworkClientDelegate networkClient; -private BlockingQueue applicationEventsQueue; -private ApplicationEventProcessor applicationEventProcessor; -private OffsetsRequestManager offsetsRequestManager; -private CommitRequestManager commitRequestManager; -private CoordinatorRequestManager coordinatorRequestManager; -private ConsumerNetworkThread consumerNetworkThread; -private final CompletableEventReaper applicationEventReaper = mock(CompletableEventReaper.class); -private MockClient client; - -@BeforeEach -public void setup() { -testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation()); -time = testBuilder.time; -metadata = testBuilder.metadata; -networkClient = testBuilder.networkClientDelegate; -client = testBuilder.client; -applicationEventsQueue = testBuilder.applicationEventQueue; -applicationEventProcessor = testBuilder.applicationEventProcessor; -commitRequestManager = testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new); -offsetsRequestManager = testBuilder.offsetsRequestManager; -coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new); -consumerNetworkThread = new ConsumerNetworkThread( -testBuilder.logContext, +this.consumerNetworkThread = new ConsumerNetworkThread( +logContext, time, -testBuilder.applicationEventQueue, +applicationEventsQueue, applicationEventReaper, () -> applicationEventProcessor, -() -> testBuilder.networkClientDelegate, -() -> testBuilder.requestManagers +() -> networkClientDelegate, +() -> requestManagers ); +} + +@BeforeEach +public void setup() { consumerNetworkThread.initializeResources(); } @AfterEach public void tearDown() { -if (testBuilder != null) { -testBuilder.close(); -consumerNetworkThread.close(Duratio
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1628374297 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -270,30 +326,27 @@ void testPollResultTimer() { void testMaximumTimeToWait() { // Initial value before runOnce has been called assertEquals(ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS, consumerNetworkThread.maximumTimeToWait()); + + when(requestManagers.entries()).thenReturn(Collections.singletonList(Optional.of(heartbeatRequestManager))); + when(heartbeatRequestManager.maximumTimeToWait(time.milliseconds())).thenReturn((long) DEFAULT_HEARTBEAT_INTERVAL_MS); + consumerNetworkThread.runOnce(); // After runOnce has been called, it takes the default heartbeat interval from the heartbeat request manager assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, consumerNetworkThread.maximumTimeToWait()); } @Test -void testRequestManagersArePolledOnce() { -consumerNetworkThread.runOnce(); -testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).poll(anyLong(; -testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).maximumTimeToWait(anyLong(; -verify(networkClient, times(1)).poll(anyLong(), anyLong()); -} +void testEnsureEventsAreCompleted() { Review Comment: I went ahead and removed this test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1628152712 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -78,64 +81,120 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class ConsumerNetworkThreadTest { +static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000; +static final int DEFAULT_REQUEST_TIMEOUT_MS = 500; + +private final Time time; +private final ConsumerMetadata metadata; +private final BlockingQueue applicationEventsQueue; +private final ApplicationEventProcessor applicationEventProcessor; +private final OffsetsRequestManager offsetsRequestManager; +private final HeartbeatRequestManager heartbeatRequestManager; +private final CoordinatorRequestManager coordinatorRequestManager; +private final ConsumerNetworkThread consumerNetworkThread; +private final MockClient client; +private final NetworkClientDelegate networkClientDelegate; +private final NetworkClientDelegate networkClient; +private final RequestManagers requestManagers; +private final CompletableEventReaper applicationEventReaper; + +ConsumerNetworkThreadTest() { +ConsumerConfig config = mock(ConsumerConfig.class); +this.time = new MockTime(); +this.networkClientDelegate = mock(NetworkClientDelegate.class); +this.requestManagers = mock(RequestManagers.class); +this.offsetsRequestManager = mock(OffsetsRequestManager.class); +this.heartbeatRequestManager = mock(HeartbeatRequestManager.class); +this.coordinatorRequestManager = mock(CoordinatorRequestManager.class); +this.metadata = mock(ConsumerMetadata.class); +this.applicationEventProcessor = mock(ApplicationEventProcessor.class); +this.applicationEventReaper = mock(CompletableEventReaper.class); +this.client = mock(MockClient.class); +this.applicationEventsQueue = new LinkedBlockingQueue<>(); +LogContext logContext = new LogContext(); + +this.networkClient = new NetworkClientDelegate( +time, +config, +logContext, +client +); -private ConsumerTestBuilder testBuilder; -private Time time; -private ConsumerMetadata metadata; -private NetworkClientDelegate networkClient; -private BlockingQueue applicationEventsQueue; -private ApplicationEventProcessor applicationEventProcessor; -private OffsetsRequestManager offsetsRequestManager; -private CommitRequestManager commitRequestManager; -private CoordinatorRequestManager coordinatorRequestManager; -private ConsumerNetworkThread consumerNetworkThread; -private final CompletableEventReaper applicationEventReaper = mock(CompletableEventReaper.class); -private MockClient client; - -@BeforeEach -public void setup() { -testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation()); -time = testBuilder.time; -metadata = testBuilder.metadata; -networkClient = testBuilder.networkClientDelegate; -client = testBuilder.client; -applicationEventsQueue = testBuilder.applicationEventQueue; -applicationEventProcessor = testBuilder.applicationEventProcessor; -commitRequestManager = testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new); -offsetsRequestManager = testBuilder.offsetsRequestManager; -coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new); -consumerNetworkThread = new ConsumerNetworkThread( -testBuilder.logContext, +this.consumerNetworkThread = new ConsumerNetworkThread( +logContext, time, -testBuilder.applicationEventQueue, +applicationEventsQueue, applicationEventReaper, () -> applicationEventProcessor, -() -> testBuilder.networkClientDelegate, -() -> testBuilder.requestManagers +() -> networkClientDelegate, +() -> requestManagers ); +} + +@BeforeEach +public void setup() { consumerNetworkThread.initializeResources(); } @AfterEach public void tearDown() { -if (testBuilder != null) { -testBuilder.close(); -consumerNetworkThread.close(Duration.ZERO); -} +if (consumerNetworkThread != null) +consumerNetworkThread.close(); +} + +@Test +public void testEnsureCloseStopsRunningThread() { +// consumerNetwo
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1628152712 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -78,64 +81,120 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class ConsumerNetworkThreadTest { +static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000; +static final int DEFAULT_REQUEST_TIMEOUT_MS = 500; + +private final Time time; +private final ConsumerMetadata metadata; +private final BlockingQueue applicationEventsQueue; +private final ApplicationEventProcessor applicationEventProcessor; +private final OffsetsRequestManager offsetsRequestManager; +private final HeartbeatRequestManager heartbeatRequestManager; +private final CoordinatorRequestManager coordinatorRequestManager; +private final ConsumerNetworkThread consumerNetworkThread; +private final MockClient client; +private final NetworkClientDelegate networkClientDelegate; +private final NetworkClientDelegate networkClient; +private final RequestManagers requestManagers; +private final CompletableEventReaper applicationEventReaper; + +ConsumerNetworkThreadTest() { +ConsumerConfig config = mock(ConsumerConfig.class); +this.time = new MockTime(); +this.networkClientDelegate = mock(NetworkClientDelegate.class); +this.requestManagers = mock(RequestManagers.class); +this.offsetsRequestManager = mock(OffsetsRequestManager.class); +this.heartbeatRequestManager = mock(HeartbeatRequestManager.class); +this.coordinatorRequestManager = mock(CoordinatorRequestManager.class); +this.metadata = mock(ConsumerMetadata.class); +this.applicationEventProcessor = mock(ApplicationEventProcessor.class); +this.applicationEventReaper = mock(CompletableEventReaper.class); +this.client = mock(MockClient.class); +this.applicationEventsQueue = new LinkedBlockingQueue<>(); +LogContext logContext = new LogContext(); + +this.networkClient = new NetworkClientDelegate( +time, +config, +logContext, +client +); -private ConsumerTestBuilder testBuilder; -private Time time; -private ConsumerMetadata metadata; -private NetworkClientDelegate networkClient; -private BlockingQueue applicationEventsQueue; -private ApplicationEventProcessor applicationEventProcessor; -private OffsetsRequestManager offsetsRequestManager; -private CommitRequestManager commitRequestManager; -private CoordinatorRequestManager coordinatorRequestManager; -private ConsumerNetworkThread consumerNetworkThread; -private final CompletableEventReaper applicationEventReaper = mock(CompletableEventReaper.class); -private MockClient client; - -@BeforeEach -public void setup() { -testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation()); -time = testBuilder.time; -metadata = testBuilder.metadata; -networkClient = testBuilder.networkClientDelegate; -client = testBuilder.client; -applicationEventsQueue = testBuilder.applicationEventQueue; -applicationEventProcessor = testBuilder.applicationEventProcessor; -commitRequestManager = testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new); -offsetsRequestManager = testBuilder.offsetsRequestManager; -coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new); -consumerNetworkThread = new ConsumerNetworkThread( -testBuilder.logContext, +this.consumerNetworkThread = new ConsumerNetworkThread( +logContext, time, -testBuilder.applicationEventQueue, +applicationEventsQueue, applicationEventReaper, () -> applicationEventProcessor, -() -> testBuilder.networkClientDelegate, -() -> testBuilder.requestManagers +() -> networkClientDelegate, +() -> requestManagers ); +} + +@BeforeEach +public void setup() { consumerNetworkThread.initializeResources(); } @AfterEach public void tearDown() { -if (testBuilder != null) { -testBuilder.close(); -consumerNetworkThread.close(Duration.ZERO); -} +if (consumerNetworkThread != null) +consumerNetworkThread.close(); +} + +@Test +public void testEnsureCloseStopsRunningThread() { +// consumerNetwo
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1628131861 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -150,19 +209,19 @@ public void testStartupAndTearDown() throws InterruptedException { } @Test -public void testApplicationEvent() { -ApplicationEvent e = new PollEvent(100); -applicationEventsQueue.add(e); +void testRequestManagersArePolledOnce() { consumerNetworkThread.runOnce(); -verify(applicationEventProcessor, times(1)).process(e); +requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm).poll(anyLong(; +requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm).maximumTimeToWait(anyLong(; +verify(networkClientDelegate).poll(anyLong(), anyLong()); } @Test -public void testMetadataUpdateEvent() { -ApplicationEvent e = new NewTopicsMetadataUpdateRequestEvent(); +public void testApplicationEvent() { Review Comment: I had something like this in mind: ``` @ParameterizedTest @MethodSource("appEvents") public void testEventIsProcessed(ApplicationEvent e) { applicationEventsQueue.add(e); consumerNetworkThread.runOnce(); verify(applicationEventProcessor).process(any(e.getClass())); assertTrue(applicationEventsQueue.isEmpty()); } private static Stream appEvents() { return Stream.of( Arguments.of(new PollEvent(100)), Arguments.of(new NewTopicsMetadataUpdateRequestEvent())); } ``` (extending `appEvents` with all the event instances that are individually created in each testxxxIsProcessed so we can replace them all with this single parametrized test) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1628070196 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -150,19 +209,19 @@ public void testStartupAndTearDown() throws InterruptedException { } @Test -public void testApplicationEvent() { -ApplicationEvent e = new PollEvent(100); -applicationEventsQueue.add(e); +void testRequestManagersArePolledOnce() { consumerNetworkThread.runOnce(); -verify(applicationEventProcessor, times(1)).process(e); +requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm).poll(anyLong(; +requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm).maximumTimeToWait(anyLong(; +verify(networkClientDelegate).poll(anyLong(), anyLong()); } @Test -public void testMetadataUpdateEvent() { -ApplicationEvent e = new NewTopicsMetadataUpdateRequestEvent(); +public void testApplicationEvent() { Review Comment: I tried this out, it seems like parameterizing the tests into one will not work, it keeps giving me an error saying that the parameters must be constant. I did some research, it looks like it can be done, but would require an enum class to get it to work. Not sure if this is something we would want to do just for these few tests, but let me know. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1628008988 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -78,64 +81,120 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class ConsumerNetworkThreadTest { +static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000; +static final int DEFAULT_REQUEST_TIMEOUT_MS = 500; + +private final Time time; +private final ConsumerMetadata metadata; +private final BlockingQueue applicationEventsQueue; +private final ApplicationEventProcessor applicationEventProcessor; +private final OffsetsRequestManager offsetsRequestManager; +private final HeartbeatRequestManager heartbeatRequestManager; +private final CoordinatorRequestManager coordinatorRequestManager; +private final ConsumerNetworkThread consumerNetworkThread; +private final MockClient client; +private final NetworkClientDelegate networkClientDelegate; +private final NetworkClientDelegate networkClient; +private final RequestManagers requestManagers; +private final CompletableEventReaper applicationEventReaper; + +ConsumerNetworkThreadTest() { +ConsumerConfig config = mock(ConsumerConfig.class); +this.time = new MockTime(); +this.networkClientDelegate = mock(NetworkClientDelegate.class); +this.requestManagers = mock(RequestManagers.class); +this.offsetsRequestManager = mock(OffsetsRequestManager.class); +this.heartbeatRequestManager = mock(HeartbeatRequestManager.class); +this.coordinatorRequestManager = mock(CoordinatorRequestManager.class); +this.metadata = mock(ConsumerMetadata.class); +this.applicationEventProcessor = mock(ApplicationEventProcessor.class); +this.applicationEventReaper = mock(CompletableEventReaper.class); +this.client = mock(MockClient.class); +this.applicationEventsQueue = new LinkedBlockingQueue<>(); +LogContext logContext = new LogContext(); + +this.networkClient = new NetworkClientDelegate( +time, +config, +logContext, +client +); -private ConsumerTestBuilder testBuilder; -private Time time; -private ConsumerMetadata metadata; -private NetworkClientDelegate networkClient; -private BlockingQueue applicationEventsQueue; -private ApplicationEventProcessor applicationEventProcessor; -private OffsetsRequestManager offsetsRequestManager; -private CommitRequestManager commitRequestManager; -private CoordinatorRequestManager coordinatorRequestManager; -private ConsumerNetworkThread consumerNetworkThread; -private final CompletableEventReaper applicationEventReaper = mock(CompletableEventReaper.class); -private MockClient client; - -@BeforeEach -public void setup() { -testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation()); -time = testBuilder.time; -metadata = testBuilder.metadata; -networkClient = testBuilder.networkClientDelegate; -client = testBuilder.client; -applicationEventsQueue = testBuilder.applicationEventQueue; -applicationEventProcessor = testBuilder.applicationEventProcessor; -commitRequestManager = testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new); -offsetsRequestManager = testBuilder.offsetsRequestManager; -coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new); -consumerNetworkThread = new ConsumerNetworkThread( -testBuilder.logContext, +this.consumerNetworkThread = new ConsumerNetworkThread( +logContext, time, -testBuilder.applicationEventQueue, +applicationEventsQueue, applicationEventReaper, () -> applicationEventProcessor, -() -> testBuilder.networkClientDelegate, -() -> testBuilder.requestManagers +() -> networkClientDelegate, +() -> requestManagers ); +} + +@BeforeEach +public void setup() { consumerNetworkThread.initializeResources(); } @AfterEach public void tearDown() { -if (testBuilder != null) { -testBuilder.close(); -consumerNetworkThread.close(Duration.ZERO); -} +if (consumerNetworkThread != null) +consumerNetworkThread.close(); +} + +@Test +public void testEnsureCloseStopsRunningThread() { +// consumerNet
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
philipnee commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1627478606 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ## @@ -292,7 +292,8 @@ private void closeInternal(final Duration timeout) { /** * Check the unsent queue one last time and poll until all requests are sent or the timer runs out. */ -private void sendUnsentRequests(final Timer timer) { +// Visible for testing +protected void sendUnsentRequests(final Timer timer) { Review Comment: i was wondering if we could just test cleanup() -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
philipnee commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1627474682 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -150,19 +209,27 @@ public void testStartupAndTearDown() throws InterruptedException { } @Test -public void testApplicationEvent() { -ApplicationEvent e = new PollEvent(100); -applicationEventsQueue.add(e); +void testRequestManagersArePolledOnce() { Review Comment: let's just make this public void for consistency. ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -150,19 +209,19 @@ public void testStartupAndTearDown() throws InterruptedException { } @Test -public void testApplicationEvent() { -ApplicationEvent e = new PollEvent(100); -applicationEventsQueue.add(e); +void testRequestManagersArePolledOnce() { consumerNetworkThread.runOnce(); -verify(applicationEventProcessor, times(1)).process(e); +requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm).poll(anyLong(; +requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm).maximumTimeToWait(anyLong(; +verify(networkClientDelegate).poll(anyLong(), anyLong()); } @Test -public void testMetadataUpdateEvent() { -ApplicationEvent e = new NewTopicsMetadataUpdateRequestEvent(); +public void testApplicationEvent() { Review Comment: +1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
philipnee commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1627471906 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -78,64 +81,120 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class ConsumerNetworkThreadTest { +static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000; +static final int DEFAULT_REQUEST_TIMEOUT_MS = 500; + +private final Time time; +private final ConsumerMetadata metadata; +private final BlockingQueue applicationEventsQueue; +private final ApplicationEventProcessor applicationEventProcessor; +private final OffsetsRequestManager offsetsRequestManager; +private final HeartbeatRequestManager heartbeatRequestManager; +private final CoordinatorRequestManager coordinatorRequestManager; +private final ConsumerNetworkThread consumerNetworkThread; +private final MockClient client; +private final NetworkClientDelegate networkClientDelegate; +private final NetworkClientDelegate networkClient; +private final RequestManagers requestManagers; +private final CompletableEventReaper applicationEventReaper; + +ConsumerNetworkThreadTest() { +ConsumerConfig config = mock(ConsumerConfig.class); +this.time = new MockTime(); +this.networkClientDelegate = mock(NetworkClientDelegate.class); +this.requestManagers = mock(RequestManagers.class); +this.offsetsRequestManager = mock(OffsetsRequestManager.class); +this.heartbeatRequestManager = mock(HeartbeatRequestManager.class); +this.coordinatorRequestManager = mock(CoordinatorRequestManager.class); +this.metadata = mock(ConsumerMetadata.class); +this.applicationEventProcessor = mock(ApplicationEventProcessor.class); +this.applicationEventReaper = mock(CompletableEventReaper.class); +this.client = mock(MockClient.class); +this.applicationEventsQueue = new LinkedBlockingQueue<>(); +LogContext logContext = new LogContext(); + +this.networkClient = new NetworkClientDelegate( +time, +config, +logContext, +client +); -private ConsumerTestBuilder testBuilder; -private Time time; -private ConsumerMetadata metadata; -private NetworkClientDelegate networkClient; -private BlockingQueue applicationEventsQueue; -private ApplicationEventProcessor applicationEventProcessor; -private OffsetsRequestManager offsetsRequestManager; -private CommitRequestManager commitRequestManager; -private CoordinatorRequestManager coordinatorRequestManager; -private ConsumerNetworkThread consumerNetworkThread; -private final CompletableEventReaper applicationEventReaper = mock(CompletableEventReaper.class); -private MockClient client; - -@BeforeEach -public void setup() { -testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation()); -time = testBuilder.time; -metadata = testBuilder.metadata; -networkClient = testBuilder.networkClientDelegate; -client = testBuilder.client; -applicationEventsQueue = testBuilder.applicationEventQueue; -applicationEventProcessor = testBuilder.applicationEventProcessor; -commitRequestManager = testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new); -offsetsRequestManager = testBuilder.offsetsRequestManager; -coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new); -consumerNetworkThread = new ConsumerNetworkThread( -testBuilder.logContext, +this.consumerNetworkThread = new ConsumerNetworkThread( +logContext, time, -testBuilder.applicationEventQueue, +applicationEventsQueue, applicationEventReaper, () -> applicationEventProcessor, -() -> testBuilder.networkClientDelegate, -() -> testBuilder.requestManagers +() -> networkClientDelegate, +() -> requestManagers ); +} + +@BeforeEach +public void setup() { consumerNetworkThread.initializeResources(); } @AfterEach public void tearDown() { -if (testBuilder != null) { -testBuilder.close(); -consumerNetworkThread.close(Duration.ZERO); -} +if (consumerNetworkThread != null) +consumerNetworkThread.close(); +} + +@Test +public void testEnsureCloseStopsRunningThread() { +// consumerNet
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
philipnee commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1627235270 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -78,64 +81,120 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class ConsumerNetworkThreadTest { +static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000; +static final int DEFAULT_REQUEST_TIMEOUT_MS = 500; + +private final Time time; +private final ConsumerMetadata metadata; +private final BlockingQueue applicationEventsQueue; +private final ApplicationEventProcessor applicationEventProcessor; +private final OffsetsRequestManager offsetsRequestManager; +private final HeartbeatRequestManager heartbeatRequestManager; +private final CoordinatorRequestManager coordinatorRequestManager; +private final ConsumerNetworkThread consumerNetworkThread; +private final MockClient client; +private final NetworkClientDelegate networkClientDelegate; +private final NetworkClientDelegate networkClient; +private final RequestManagers requestManagers; +private final CompletableEventReaper applicationEventReaper; + +ConsumerNetworkThreadTest() { +ConsumerConfig config = mock(ConsumerConfig.class); +this.time = new MockTime(); +this.networkClientDelegate = mock(NetworkClientDelegate.class); +this.requestManagers = mock(RequestManagers.class); +this.offsetsRequestManager = mock(OffsetsRequestManager.class); +this.heartbeatRequestManager = mock(HeartbeatRequestManager.class); +this.coordinatorRequestManager = mock(CoordinatorRequestManager.class); +this.metadata = mock(ConsumerMetadata.class); +this.applicationEventProcessor = mock(ApplicationEventProcessor.class); +this.applicationEventReaper = mock(CompletableEventReaper.class); +this.client = mock(MockClient.class); +this.applicationEventsQueue = new LinkedBlockingQueue<>(); +LogContext logContext = new LogContext(); + +this.networkClient = new NetworkClientDelegate( +time, +config, +logContext, +client +); -private ConsumerTestBuilder testBuilder; -private Time time; -private ConsumerMetadata metadata; -private NetworkClientDelegate networkClient; -private BlockingQueue applicationEventsQueue; -private ApplicationEventProcessor applicationEventProcessor; -private OffsetsRequestManager offsetsRequestManager; -private CommitRequestManager commitRequestManager; -private CoordinatorRequestManager coordinatorRequestManager; -private ConsumerNetworkThread consumerNetworkThread; -private final CompletableEventReaper applicationEventReaper = mock(CompletableEventReaper.class); -private MockClient client; - -@BeforeEach -public void setup() { -testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation()); -time = testBuilder.time; -metadata = testBuilder.metadata; -networkClient = testBuilder.networkClientDelegate; -client = testBuilder.client; -applicationEventsQueue = testBuilder.applicationEventQueue; -applicationEventProcessor = testBuilder.applicationEventProcessor; -commitRequestManager = testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new); -offsetsRequestManager = testBuilder.offsetsRequestManager; -coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new); -consumerNetworkThread = new ConsumerNetworkThread( -testBuilder.logContext, +this.consumerNetworkThread = new ConsumerNetworkThread( +logContext, time, -testBuilder.applicationEventQueue, +applicationEventsQueue, applicationEventReaper, () -> applicationEventProcessor, -() -> testBuilder.networkClientDelegate, -() -> testBuilder.requestManagers +() -> networkClientDelegate, +() -> requestManagers ); +} + +@BeforeEach +public void setup() { consumerNetworkThread.initializeResources(); } @AfterEach public void tearDown() { -if (testBuilder != null) { -testBuilder.close(); -consumerNetworkThread.close(Duration.ZERO); -} +if (consumerNetworkThread != null) +consumerNetworkThread.close(); +} + +@Test +public void testEnsureCloseStopsRunningThread() { +// consumerNet
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
philipnee commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1627218743 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -78,64 +81,120 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class ConsumerNetworkThreadTest { +static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000; +static final int DEFAULT_REQUEST_TIMEOUT_MS = 500; + +private final Time time; +private final ConsumerMetadata metadata; +private final BlockingQueue applicationEventsQueue; +private final ApplicationEventProcessor applicationEventProcessor; +private final OffsetsRequestManager offsetsRequestManager; +private final HeartbeatRequestManager heartbeatRequestManager; +private final CoordinatorRequestManager coordinatorRequestManager; +private final ConsumerNetworkThread consumerNetworkThread; +private final MockClient client; +private final NetworkClientDelegate networkClientDelegate; +private final NetworkClientDelegate networkClient; +private final RequestManagers requestManagers; +private final CompletableEventReaper applicationEventReaper; + +ConsumerNetworkThreadTest() { +ConsumerConfig config = mock(ConsumerConfig.class); +this.time = new MockTime(); +this.networkClientDelegate = mock(NetworkClientDelegate.class); +this.requestManagers = mock(RequestManagers.class); +this.offsetsRequestManager = mock(OffsetsRequestManager.class); +this.heartbeatRequestManager = mock(HeartbeatRequestManager.class); +this.coordinatorRequestManager = mock(CoordinatorRequestManager.class); +this.metadata = mock(ConsumerMetadata.class); +this.applicationEventProcessor = mock(ApplicationEventProcessor.class); +this.applicationEventReaper = mock(CompletableEventReaper.class); +this.client = mock(MockClient.class); Review Comment: new MockClient() ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1626607635 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -329,6 +382,8 @@ void testEnsureEventsAreCompleted() { @Test void testCleanupInvokesReaper() { +LinkedList queue = new LinkedList<>(); +when(networkClientDelegate.unsentRequests()).thenReturn(queue); Review Comment: I will get that done, thank you for letting me know! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1626606071 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -329,6 +382,8 @@ void testEnsureEventsAreCompleted() { @Test void testCleanupInvokesReaper() { +LinkedList queue = new LinkedList<>(); +when(networkClientDelegate.unsentRequests()).thenReturn(queue); Review Comment: it just got merged, so worth rebasing and updating the test as needed ;) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
brenden20 commented on PR #16140: URL: https://github.com/apache/kafka/pull/16140#issuecomment-2148211980 @lianetm @kirktrue @philipnee, I have implemented all suggestions except for test removals since I think if we do that, it should go on another ticket as to not bog this ticket down. All tests are passing as well, so let me know if there is anything else I should take care of or if this PR is ready for review. Thank you all for the reviews! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1626467806 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -329,6 +382,8 @@ void testEnsureEventsAreCompleted() { @Test void testCleanupInvokesReaper() { +LinkedList queue = new LinkedList<>(); +when(networkClientDelegate.unsentRequests()).thenReturn(queue); Review Comment: Thanks for the heads up, I will keep that in mind -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1626465679 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ## @@ -289,10 +289,11 @@ private void closeInternal(final Duration timeout) { } } +// Add test to see if poll() is run once with timer of 0 Review Comment: I forgot to remove that, removed now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1626464002 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -150,19 +209,19 @@ public void testStartupAndTearDown() throws InterruptedException { } @Test -public void testApplicationEvent() { -ApplicationEvent e = new PollEvent(100); -applicationEventsQueue.add(e); +void testRequestManagersArePolledOnce() { consumerNetworkThread.runOnce(); -verify(applicationEventProcessor, times(1)).process(e); +requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm).poll(anyLong(; +requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm).maximumTimeToWait(anyLong(; +verify(networkClientDelegate).poll(anyLong(), anyLong()); Review Comment: Thank you for the feedback, I have implemented this suggestion as well as the previous one for this test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1625068181 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -270,30 +326,27 @@ void testPollResultTimer() { void testMaximumTimeToWait() { // Initial value before runOnce has been called assertEquals(ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS, consumerNetworkThread.maximumTimeToWait()); + + when(requestManagers.entries()).thenReturn(Collections.singletonList(Optional.of(heartbeatRequestManager))); + when(heartbeatRequestManager.maximumTimeToWait(time.milliseconds())).thenReturn((long) DEFAULT_HEARTBEAT_INTERVAL_MS); + consumerNetworkThread.runOnce(); // After runOnce has been called, it takes the default heartbeat interval from the heartbeat request manager assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, consumerNetworkThread.maximumTimeToWait()); } @Test -void testRequestManagersArePolledOnce() { -consumerNetworkThread.runOnce(); -testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).poll(anyLong(; -testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).maximumTimeToWait(anyLong(; -verify(networkClient, times(1)).poll(anyLong(), anyLong()); -} +void testEnsureEventsAreCompleted() { Review Comment: Not introduced by this PR but since we're improving here, I find that this test does not bring any value, because it's testing something that it's not the responsibility of the `ConsumerNetworkThread`, so we end up testing something we're mocking ourselves with the `doAnswer`. What do you think? I had suggested to remove it completely. See my original comment about it on this other [PR](https://github.com/apache/kafka/pull/15640/files#r1608897877). If you agree I would say we remove it instead of keep having to maintain it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1625068181 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -270,30 +326,27 @@ void testPollResultTimer() { void testMaximumTimeToWait() { // Initial value before runOnce has been called assertEquals(ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS, consumerNetworkThread.maximumTimeToWait()); + + when(requestManagers.entries()).thenReturn(Collections.singletonList(Optional.of(heartbeatRequestManager))); + when(heartbeatRequestManager.maximumTimeToWait(time.milliseconds())).thenReturn((long) DEFAULT_HEARTBEAT_INTERVAL_MS); + consumerNetworkThread.runOnce(); // After runOnce has been called, it takes the default heartbeat interval from the heartbeat request manager assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, consumerNetworkThread.maximumTimeToWait()); } @Test -void testRequestManagersArePolledOnce() { -consumerNetworkThread.runOnce(); -testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).poll(anyLong(; -testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).maximumTimeToWait(anyLong(; -verify(networkClient, times(1)).poll(anyLong(), anyLong()); -} +void testEnsureEventsAreCompleted() { Review Comment: Not introduced by this PR but since we're improving here, I find that this test does not bring any value, because it's testing something that it's not the responsibility of the `ConsumerNetworkThread`, so we end up testing something we're mocking ourselves with the `doAnswer`. What do you think? I had suggested to remove it completely. See my original comment about it on this other [PR](https://github.com/apache/kafka/pull/15640/files#r1608897877). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on PR #16140: URL: https://github.com/apache/kafka/pull/16140#issuecomment-2146155510 Hey @brenden20, very nice improvement! Left a few comments. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1625068181 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -270,30 +326,27 @@ void testPollResultTimer() { void testMaximumTimeToWait() { // Initial value before runOnce has been called assertEquals(ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS, consumerNetworkThread.maximumTimeToWait()); + + when(requestManagers.entries()).thenReturn(Collections.singletonList(Optional.of(heartbeatRequestManager))); + when(heartbeatRequestManager.maximumTimeToWait(time.milliseconds())).thenReturn((long) DEFAULT_HEARTBEAT_INTERVAL_MS); + consumerNetworkThread.runOnce(); // After runOnce has been called, it takes the default heartbeat interval from the heartbeat request manager assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, consumerNetworkThread.maximumTimeToWait()); } @Test -void testRequestManagersArePolledOnce() { -consumerNetworkThread.runOnce(); -testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).poll(anyLong(; -testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).maximumTimeToWait(anyLong(; -verify(networkClient, times(1)).poll(anyLong(), anyLong()); -} +void testEnsureEventsAreCompleted() { Review Comment: Not introduced by this PR but since we're improving here, I find that this test does not bring any value, because it's testing something that it's not the responsibility of the `ConsumerNetworkThread`, so in the end we end up testing something we're mocking ourselves with the `doAnswer`. What do you think? I had suggested to remove it completely. See my original comment about it on this other [PR](https://github.com/apache/kafka/pull/15640/files#r1608897877). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1625060556 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -270,30 +326,27 @@ void testPollResultTimer() { void testMaximumTimeToWait() { // Initial value before runOnce has been called assertEquals(ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS, consumerNetworkThread.maximumTimeToWait()); + + when(requestManagers.entries()).thenReturn(Collections.singletonList(Optional.of(heartbeatRequestManager))); + when(heartbeatRequestManager.maximumTimeToWait(time.milliseconds())).thenReturn((long) DEFAULT_HEARTBEAT_INTERVAL_MS); + consumerNetworkThread.runOnce(); // After runOnce has been called, it takes the default heartbeat interval from the heartbeat request manager assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, consumerNetworkThread.maximumTimeToWait()); } @Test -void testRequestManagersArePolledOnce() { -consumerNetworkThread.runOnce(); -testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).poll(anyLong(; -testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).maximumTimeToWait(anyLong(; -verify(networkClient, times(1)).poll(anyLong(), anyLong()); -} +void testEnsureEventsAreCompleted() { +Cluster cluster = mock(Cluster.class); +when(metadata.fetch()).thenReturn(cluster); -@Test -void testEnsureMetadataUpdateOnPoll() { -MetadataResponse metadataResponse = RequestTestUtils.metadataUpdateWith(2, Collections.emptyMap()); -client.prepareMetadataUpdate(metadataResponse); -metadata.requestUpdate(false); -consumerNetworkThread.runOnce(); -verify(metadata, times(1)).updateWithCurrentRequestVersion(eq(metadataResponse), eq(false), anyLong()); -} +List list = new ArrayList<>(); +list.add(new Node(0, "host", 0)); +when(cluster.nodes()).thenReturn(list); Review Comment: what about simplifying to a single `when(cluster.nodes()).thenReturn(Collections.singletonList(new Node(0, "host", 0)));` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1625059147 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ## @@ -289,10 +289,11 @@ private void closeInternal(final Duration timeout) { } } +// Add test to see if poll() is run once with timer of 0 Review Comment: is this something you intend to do on this PR or separately? Either way, I would suggest we remove the comment and address it in this PR or have a jira to follow-up separately if needed please. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1625054713 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -329,6 +382,8 @@ void testEnsureEventsAreCompleted() { @Test void testCleanupInvokesReaper() { +LinkedList queue = new LinkedList<>(); +when(networkClientDelegate.unsentRequests()).thenReturn(queue); Review Comment: totally needed, but heads-up, there is another [PR](https://github.com/apache/kafka/pull/16156) in-flight where we're trying to change the check behind this, so we'll need to update this expectation here if that goes in first. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1625048146 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -150,19 +209,19 @@ public void testStartupAndTearDown() throws InterruptedException { } @Test -public void testApplicationEvent() { -ApplicationEvent e = new PollEvent(100); -applicationEventsQueue.add(e); +void testRequestManagersArePolledOnce() { consumerNetworkThread.runOnce(); -verify(applicationEventProcessor, times(1)).process(e); +requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm).poll(anyLong(; +requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm).maximumTimeToWait(anyLong(; +verify(networkClientDelegate).poll(anyLong(), anyLong()); } @Test -public void testMetadataUpdateEvent() { -ApplicationEvent e = new NewTopicsMetadataUpdateRequestEvent(); +public void testApplicationEvent() { Review Comment: Not introduced by this PR, but reviewing this one I noticed that we have lots of similar tests for checking that runOnce processes the events (ex. testResetPositionsEventIsProcessed, testSyncCommitEvent, testAsyncCommitEvent...), all doing the same (add event, run once, check is processed). Could we parametrize this and have a single test maybe? (same shape as this one but receiving the event as param). Should allow us to remove lots that, in the end this component does not care about the specific events, it's just responsible for processing whatever event is added. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1625034602 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -150,19 +209,19 @@ public void testStartupAndTearDown() throws InterruptedException { } @Test -public void testApplicationEvent() { -ApplicationEvent e = new PollEvent(100); -applicationEventsQueue.add(e); +void testRequestManagersArePolledOnce() { consumerNetworkThread.runOnce(); -verify(applicationEventProcessor, times(1)).process(e); +requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm).poll(anyLong(; +requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm).maximumTimeToWait(anyLong(; +verify(networkClientDelegate).poll(anyLong(), anyLong()); Review Comment: This test seems to leave out one of the core actions of the `runOnce`: 1. poll managers (this generates the requests) -> covered by the test 2. add newly generated requests are added to the network client -> not covered 3. poll network client to send the requests -> covered by the test Step 3 wouldn't have the effect we want (send the request) if step 2 does not happen, so what about adding a `verify(networkClientDelegate).addAll(..` before verifying the network client poll, to make sure we cover the whole flow of how requests move from the managers to the network layer on run once? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1625032096 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -150,19 +209,19 @@ public void testStartupAndTearDown() throws InterruptedException { } @Test -public void testApplicationEvent() { -ApplicationEvent e = new PollEvent(100); -applicationEventsQueue.add(e); +void testRequestManagersArePolledOnce() { consumerNetworkThread.runOnce(); -verify(applicationEventProcessor, times(1)).process(e); +requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm).poll(anyLong(; Review Comment: I think we're missing the expectations for the requestManagers.entries() to make sure there are some managers and we actually verify something, so we need `when(requestManagers.entries()).thenReturn(..` with some managers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1624586673 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -149,20 +164,28 @@ public void testStartupAndTearDown() throws InterruptedException { "The consumer network thread did not stop within " + DEFAULT_MAX_WAIT_MS + " ms"); } +@Test +void testRequestManagersArePolledOnce() { +consumerNetworkThread.runOnce(); +requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).poll(anyLong(; +requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).maximumTimeToWait(anyLong(; +verify(networkClientDelegate).poll(anyLong(), anyLong()); +} + @Test public void testApplicationEvent() { ApplicationEvent e = new PollEvent(100); applicationEventsQueue.add(e); consumerNetworkThread.runOnce(); -verify(applicationEventProcessor, times(1)).process(e); +verify(applicationEventProcessor).process(e); } @Test public void testMetadataUpdateEvent() { Review Comment: Removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1624585963 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -149,20 +164,28 @@ public void testStartupAndTearDown() throws InterruptedException { "The consumer network thread did not stop within " + DEFAULT_MAX_WAIT_MS + " ms"); } +@Test +void testRequestManagersArePolledOnce() { +consumerNetworkThread.runOnce(); +requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).poll(anyLong(; Review Comment: It is, removing instances of times(1) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
philipnee commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1623649872 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -339,6 +358,15 @@ void testRunOnceInvokesReaper() { verify(applicationEventReaper).reap(any(Long.class)); } +private HashMap mockTopicPartitionOffset() { +final TopicPartition t0 = new TopicPartition("t0", 2); +final TopicPartition t1 = new TopicPartition("t0", 3); +HashMap topicPartitionOffsets = new HashMap<>(); Review Comment: 1. this can be final as well 2. let's use Map on the left side. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
philipnee commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1623649732 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -339,6 +358,15 @@ void testRunOnceInvokesReaper() { verify(applicationEventReaper).reap(any(Long.class)); } +private HashMap mockTopicPartitionOffset() { Review Comment: try to use Map instead of HashMap -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
philipnee commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1623648674 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -231,10 +254,7 @@ public void testAssignmentChangeEvent() { consumerNetworkThread.runOnce(); verify(applicationEventProcessor).process(any(AssignmentChangeEvent.class)); -verify(networkClient, times(1)).poll(anyLong(), anyLong()); -verify(commitRequestManager, times(1)).updateAutoCommitTimer(currentTimeMs); -// Assignment change should generate an async commit (not retried). -verify(commitRequestManager, times(1)).maybeAutoCommitAsync(); +verify(networkClientDelegate, times(1)).poll(anyLong(), anyLong()); Review Comment: can we be consistent at using times(1) here? above you removed times(1) but it seems to be inconsistently applied elsewhere. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
philipnee commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1623648578 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -207,7 +230,7 @@ public void testResetPositionsProcessFailureIsIgnored() { ResetPositionsEvent event = new ResetPositionsEvent(calculateDeadlineMs(time, 100)); applicationEventsQueue.add(event); -assertDoesNotThrow(() -> consumerNetworkThread.runOnce()); +assertDoesNotThrow(consumerNetworkThread::runOnce); Review Comment: can we revert the irrelevant change? also I think we can remove this test and see if this is covered elsewhere. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
philipnee commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1623648474 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -149,20 +164,28 @@ public void testStartupAndTearDown() throws InterruptedException { "The consumer network thread did not stop within " + DEFAULT_MAX_WAIT_MS + " ms"); } +@Test +void testRequestManagersArePolledOnce() { +consumerNetworkThread.runOnce(); +requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).poll(anyLong(; +requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).maximumTimeToWait(anyLong(; +verify(networkClientDelegate).poll(anyLong(), anyLong()); +} + @Test public void testApplicationEvent() { ApplicationEvent e = new PollEvent(100); applicationEventsQueue.add(e); consumerNetworkThread.runOnce(); -verify(applicationEventProcessor, times(1)).process(e); +verify(applicationEventProcessor).process(e); } @Test public void testMetadataUpdateEvent() { Review Comment: i think can probably remove this teset as it is doing exactly the same thing as `testApplicationEvent`. We should probably try to find a similar test to ensure the code path is covered. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
philipnee commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1623648058 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -149,20 +164,28 @@ public void testStartupAndTearDown() throws InterruptedException { "The consumer network thread did not stop within " + DEFAULT_MAX_WAIT_MS + " ms"); } +@Test +void testRequestManagersArePolledOnce() { +consumerNetworkThread.runOnce(); +requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).poll(anyLong(; Review Comment: i believe times(1) is the default, could you verify this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
philipnee commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1623647935 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -74,68 +52,105 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class ConsumerNetworkThreadTest { +static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000; +static final int DEFAULT_REQUEST_TIMEOUT_MS = 500; + +private final Time time; +private final ConsumerMetadata metadata; +private final BlockingQueue applicationEventsQueue; +private final ApplicationEventProcessor applicationEventProcessor; +private final OffsetsRequestManager offsetsRequestManager; +private final HeartbeatRequestManager heartbeatRequestManager; +private final CoordinatorRequestManager coordinatorRequestManager; +private final ConsumerNetworkThread consumerNetworkThread; +private final MockClient client; +private final NetworkClientDelegate networkClientDelegate; +private final NetworkClientDelegate networkClient; +private final RequestManagers requestManagers; +private final CompletableEventReaper applicationEventReaper; + +ConsumerNetworkThreadTest() { +LogContext logContext = new LogContext(); +ConsumerConfig config = mock(ConsumerConfig.class); +this.time = new MockTime(); +this.networkClientDelegate = mock(NetworkClientDelegate.class); +this.requestManagers = mock(RequestManagers.class); +this.offsetsRequestManager = mock(OffsetsRequestManager.class); +this.heartbeatRequestManager = mock(HeartbeatRequestManager.class); +this.coordinatorRequestManager = mock(CoordinatorRequestManager.class); +this.applicationEventsQueue = new LinkedBlockingQueue<>(); +this.metadata = mock(ConsumerMetadata.class); +this.applicationEventProcessor = mock(ApplicationEventProcessor.class); +this.applicationEventReaper = mock(CompletableEventReaper.class); +this.client = new MockClient(time); + +this.networkClient = new NetworkClientDelegate( +time, +config, +logContext, +client +); -private ConsumerTestBuilder testBuilder; -private Time time; -private ConsumerMetadata metadata; -private NetworkClientDelegate networkClient; -private BlockingQueue applicationEventsQueue; -private ApplicationEventProcessor applicationEventProcessor; -private OffsetsRequestManager offsetsRequestManager; -private CommitRequestManager commitRequestManager; -private CoordinatorRequestManager coordinatorRequestManager; -private ConsumerNetworkThread consumerNetworkThread; -private final CompletableEventReaper applicationEventReaper = mock(CompletableEventReaper.class); -private MockClient client; - -@BeforeEach -public void setup() { -testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation()); -time = testBuilder.time; -metadata = testBuilder.metadata; -networkClient = testBuilder.networkClientDelegate; -client = testBuilder.client; -applicationEventsQueue = testBuilder.applicationEventQueue; -applicationEventProcessor = testBuilder.applicationEventProcessor; -commitRequestManager = testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new); -offsetsRequestManager = testBuilder.offsetsRequestManager; -coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new); -consumerNetworkThread = new ConsumerNetworkThread( -testBuilder.logContext, +this.consumerNetworkThread = new ConsumerNetworkThread( +logContext, time, -testBuilder.applicationEventQueue, +applicationEventsQueue, applicationEventReaper, () -> applicationEventProcessor, -() -> testBuilder.networkClientDelegate, -() -> testBuilder.requestManagers +() -> networkClientDelegate, +() -> requestManagers ); +} + +@BeforeEach +public void setup() { consumerNetworkThread.initializeResources(); } @AfterEach public void tearDown() { -if (testBuilder != null) { -testBuilder.close(); -consumerNetworkThread.close(Durat
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
philipnee commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1623647643 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -74,68 +52,105 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class ConsumerNetworkThreadTest { +static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000; +static final int DEFAULT_REQUEST_TIMEOUT_MS = 500; + +private final Time time; +private final ConsumerMetadata metadata; +private final BlockingQueue applicationEventsQueue; +private final ApplicationEventProcessor applicationEventProcessor; +private final OffsetsRequestManager offsetsRequestManager; +private final HeartbeatRequestManager heartbeatRequestManager; +private final CoordinatorRequestManager coordinatorRequestManager; +private final ConsumerNetworkThread consumerNetworkThread; +private final MockClient client; +private final NetworkClientDelegate networkClientDelegate; +private final NetworkClientDelegate networkClient; +private final RequestManagers requestManagers; +private final CompletableEventReaper applicationEventReaper; + +ConsumerNetworkThreadTest() { +LogContext logContext = new LogContext(); +ConsumerConfig config = mock(ConsumerConfig.class); +this.time = new MockTime(); +this.networkClientDelegate = mock(NetworkClientDelegate.class); +this.requestManagers = mock(RequestManagers.class); +this.offsetsRequestManager = mock(OffsetsRequestManager.class); +this.heartbeatRequestManager = mock(HeartbeatRequestManager.class); +this.coordinatorRequestManager = mock(CoordinatorRequestManager.class); +this.applicationEventsQueue = new LinkedBlockingQueue<>(); +this.metadata = mock(ConsumerMetadata.class); +this.applicationEventProcessor = mock(ApplicationEventProcessor.class); +this.applicationEventReaper = mock(CompletableEventReaper.class); +this.client = new MockClient(time); + +this.networkClient = new NetworkClientDelegate( Review Comment: i think i might have asked you to use MockClient here. since we aren't necessary testing request sending etc., can we just mock the networkClientDelegate? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]
philipnee commented on code in PR #16140: URL: https://github.com/apache/kafka/pull/16140#discussion_r1623647362 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -74,68 +52,105 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class ConsumerNetworkThreadTest { +static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000; +static final int DEFAULT_REQUEST_TIMEOUT_MS = 500; + +private final Time time; +private final ConsumerMetadata metadata; +private final BlockingQueue applicationEventsQueue; +private final ApplicationEventProcessor applicationEventProcessor; +private final OffsetsRequestManager offsetsRequestManager; +private final HeartbeatRequestManager heartbeatRequestManager; +private final CoordinatorRequestManager coordinatorRequestManager; +private final ConsumerNetworkThread consumerNetworkThread; +private final MockClient client; +private final NetworkClientDelegate networkClientDelegate; +private final NetworkClientDelegate networkClient; +private final RequestManagers requestManagers; +private final CompletableEventReaper applicationEventReaper; + +ConsumerNetworkThreadTest() { +LogContext logContext = new LogContext(); +ConsumerConfig config = mock(ConsumerConfig.class); +this.time = new MockTime(); +this.networkClientDelegate = mock(NetworkClientDelegate.class); +this.requestManagers = mock(RequestManagers.class); +this.offsetsRequestManager = mock(OffsetsRequestManager.class); +this.heartbeatRequestManager = mock(HeartbeatRequestManager.class); +this.coordinatorRequestManager = mock(CoordinatorRequestManager.class); +this.applicationEventsQueue = new LinkedBlockingQueue<>(); Review Comment: can we mock this as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org