This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d3707fc815c KAFKA-19214: Clean up use of Optionals in
RequestManagers.entries() (#19609)
d3707fc815c is described below
commit d3707fc815cc89b065c255ef87269d9c75d4b4cd
Author: Kirk True <[email protected]>
AuthorDate: Wed May 7 09:18:12 2025 -0700
KAFKA-19214: Clean up use of Optionals in RequestManagers.entries() (#19609)
Change:
`public List<Optional<? extends RequestManager>> entries();`
to:
`public List<RequestManager> entries();`
and clean up the callers.
Reviewers: TengYao Chi <[email protected]>, Andrew Schofield
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../consumer/internals/ConsumerNetworkThread.java | 21 ++++++-------
.../consumer/internals/RequestManagers.java | 36 ++++++++++------------
.../internals/ConsumerNetworkThreadTest.java | 19 +++---------
.../internals/FetchRequestManagerTest.java | 2 +-
4 files changed, 32 insertions(+), 46 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
index a48289919b0..de146f29e82 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
@@ -39,7 +39,6 @@ import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
-import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -154,18 +153,18 @@ public class ConsumerNetworkThread extends KafkaThread
implements Closeable {
lastPollTimeMs = currentTimeMs;
final long pollWaitTimeMs = requestManagers.entries().stream()
- .filter(Optional::isPresent)
- .map(Optional::get)
.map(rm -> rm.poll(currentTimeMs))
- .map(networkClientDelegate::addAll)
- .reduce(MAX_POLL_TIMEOUT_MS, Math::min);
+ .mapToLong(networkClientDelegate::addAll)
+ .filter(ms -> ms <= MAX_POLL_TIMEOUT_MS)
+ .min()
+ .orElse(MAX_POLL_TIMEOUT_MS);
+
networkClientDelegate.poll(pollWaitTimeMs, currentTimeMs);
cachedMaximumTimeToWait = requestManagers.entries().stream()
- .filter(Optional::isPresent)
- .map(Optional::get)
- .map(rm -> rm.maximumTimeToWait(currentTimeMs))
- .reduce(Long.MAX_VALUE, Math::min);
+ .mapToLong(rm -> rm.maximumTimeToWait(currentTimeMs))
+ .min()
+ .orElse(Long.MAX_VALUE);
reapExpiredApplicationEvents(currentTimeMs);
List<CompletableEvent<?>> uncompletedEvents =
applicationEventReaper.uncompletedEvents();
@@ -233,13 +232,11 @@ public class ConsumerNetworkThread extends KafkaThread
implements Closeable {
* </ol>
*/
// Visible for testing
- static void runAtClose(final Collection<Optional<? extends
RequestManager>> requestManagers,
+ static void runAtClose(final Collection<RequestManager> requestManagers,
final NetworkClientDelegate networkClientDelegate,
final long currentTimeMs) {
// These are the optional outgoing requests at the
requestManagers.stream()
- .filter(Optional::isPresent)
- .map(Optional::get)
.map(rm -> rm.pollOnClose(currentTimeMs))
.forEach(networkClientDelegate::addAll);
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
index cab7d804cad..f341dc35a4a 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
@@ -60,7 +60,7 @@ public class RequestManagers implements Closeable {
public final FetchRequestManager fetchRequestManager;
public final Optional<ShareConsumeRequestManager>
shareConsumeRequestManager;
public final Optional<StreamsGroupHeartbeatRequestManager>
streamsGroupHeartbeatRequestManager;
- private final List<Optional<? extends RequestManager>> entries;
+ private final List<RequestManager> entries;
private final IdempotentCloser closer = new IdempotentCloser();
public RequestManagers(LogContext logContext,
@@ -87,16 +87,16 @@ public class RequestManagers implements Closeable {
this.streamsMembershipManager = streamsMembershipManager;
this.shareMembershipManager = Optional.empty();
- List<Optional<? extends RequestManager>> list = new ArrayList<>();
- list.add(coordinatorRequestManager);
- list.add(commitRequestManager);
- list.add(heartbeatRequestManager);
- list.add(membershipManager);
- list.add(streamsGroupHeartbeatRequestManager);
- list.add(streamsMembershipManager);
- list.add(Optional.of(offsetsRequestManager));
- list.add(Optional.of(topicMetadataRequestManager));
- list.add(Optional.of(fetchRequestManager));
+ List<RequestManager> list = new ArrayList<>();
+ coordinatorRequestManager.ifPresent(list::add);
+ commitRequestManager.ifPresent(list::add);
+ heartbeatRequestManager.ifPresent(list::add);
+ membershipManager.ifPresent(list::add);
+ streamsGroupHeartbeatRequestManager.ifPresent(list::add);
+ streamsMembershipManager.ifPresent(list::add);
+ list.add(offsetsRequestManager);
+ list.add(topicMetadataRequestManager);
+ list.add(fetchRequestManager);
entries = Collections.unmodifiableList(list);
}
@@ -119,15 +119,15 @@ public class RequestManagers implements Closeable {
this.topicMetadataRequestManager = null;
this.fetchRequestManager = null;
- List<Optional<? extends RequestManager>> list = new ArrayList<>();
- list.add(coordinatorRequestManager);
- list.add(shareHeartbeatRequestManager);
- list.add(shareMembershipManager);
- list.add(Optional.of(shareConsumeRequestManager));
+ List<RequestManager> list = new ArrayList<>();
+ coordinatorRequestManager.ifPresent(list::add);
+ shareHeartbeatRequestManager.ifPresent(list::add);
+ shareMembershipManager.ifPresent(list::add);
+ list.add(shareConsumeRequestManager);
entries = Collections.unmodifiableList(list);
}
- public List<Optional<? extends RequestManager>> entries() {
+ public List<RequestManager> entries() {
return entries;
}
@@ -138,8 +138,6 @@ public class RequestManagers implements Closeable {
log.debug("Closing RequestManagers");
entries.stream()
- .filter(Optional::isPresent)
- .map(Optional::get)
.filter(rm -> rm instanceof Closeable)
.map(rm -> (Closeable) rm)
.forEach(c -> closeQuietly(c,
c.getClass().getSimpleName()));
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
index 520279fc8d4..de0653b616f 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
@@ -35,11 +35,8 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
-import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -117,10 +114,7 @@ public class ConsumerNetworkThreadTest {
@ParameterizedTest
@ValueSource(longs = {ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS - 1,
ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS,
ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS + 1})
public void testConsumerNetworkThreadPollTimeComputations(long
exampleTime) {
- List<Optional<? extends RequestManager>> list = new ArrayList<>();
- list.add(Optional.of(coordinatorRequestManager));
- list.add(Optional.of(heartbeatRequestManager));
-
+ List<RequestManager> list = List.of(coordinatorRequestManager,
heartbeatRequestManager);
when(requestManagers.entries()).thenReturn(list);
NetworkClientDelegate.PollResult pollResult = new
NetworkClientDelegate.PollResult(exampleTime);
@@ -158,16 +152,13 @@ public class ConsumerNetworkThreadTest {
@Test
public void testRequestsTransferFromManagersToClientOnThreadRun() {
- List<Optional<? extends RequestManager>> list = new ArrayList<>();
- list.add(Optional.of(coordinatorRequestManager));
- list.add(Optional.of(heartbeatRequestManager));
- list.add(Optional.of(offsetsRequestManager));
+ List<RequestManager> list = List.of(coordinatorRequestManager,
heartbeatRequestManager, offsetsRequestManager);
when(requestManagers.entries()).thenReturn(list);
when(coordinatorRequestManager.poll(anyLong())).thenReturn(mock(NetworkClientDelegate.PollResult.class));
consumerNetworkThread.runOnce();
- requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm ->
verify(rm).poll(anyLong())));
- requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm ->
verify(rm).maximumTimeToWait(anyLong())));
+ requestManagers.entries().forEach(rm -> verify(rm).poll(anyLong()));
+ requestManagers.entries().forEach(rm ->
verify(rm).maximumTimeToWait(anyLong()));
verify(networkClientDelegate).addAll(any(NetworkClientDelegate.PollResult.class));
verify(networkClientDelegate).poll(anyLong(), anyLong());
}
@@ -178,7 +169,7 @@ public class ConsumerNetworkThreadTest {
// Initial value before runOnce has been called
assertEquals(ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS,
consumerNetworkThread.maximumTimeToWait());
-
when(requestManagers.entries()).thenReturn(Collections.singletonList(Optional.of(heartbeatRequestManager)));
+
when(requestManagers.entries()).thenReturn(List.of(heartbeatRequestManager));
when(heartbeatRequestManager.maximumTimeToWait(time.milliseconds())).thenReturn((long)
defaultHeartbeatIntervalMs);
consumerNetworkThread.runOnce();
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
index e25bccc1892..7c8547ddd88 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
@@ -382,7 +382,7 @@ public class FetchRequestManagerTest {
// NOTE: by design the FetchRequestManager doesn't perform network I/O
internally. That means that calling
// the close() method with a Timer will NOT send out the close session
requests on close. The network
// I/O logic is handled inside ConsumerNetworkThread.runAtClose, so we
need to run that logic here.
- ConsumerNetworkThread.runAtClose(singletonList(Optional.of(fetcher)),
networkClientDelegate, time.milliseconds());
+ ConsumerNetworkThread.runAtClose(List.of(fetcher),
networkClientDelegate, time.milliseconds());
// the network is polled during the last state of clean up.
networkClientDelegate.poll(time.timer(1));
// validate that closing the fetcher has sent a request with final
epoch. 2 requests are sent, one for the