brandboat commented on code in PR #18105:
URL: https://github.com/apache/kafka/pull/18105#discussion_r1884303970
##########
core/src/test/java/kafka/test/api/ShareConsumerTest.java:
##########
@@ -1069,100 +1054,47 @@ public void
testMultipleConsumersInMultipleGroupsConcurrentConsumption(String pe
alterShareAutoOffsetReset(groupId2, "earliest");
alterShareAutoOffsetReset(groupId3, "earliest");
- ExecutorService producerExecutorService =
Executors.newFixedThreadPool(producerCount);
- ExecutorService shareGroupExecutorService1 =
Executors.newFixedThreadPool(consumerCount);
- ExecutorService shareGroupExecutorService2 =
Executors.newFixedThreadPool(consumerCount);
- ExecutorService shareGroupExecutorService3 =
Executors.newFixedThreadPool(consumerCount);
-
- CountDownLatch startSignal = new CountDownLatch(producerCount);
-
- ConcurrentLinkedQueue<CompletableFuture<Integer>> producerFutures =
new ConcurrentLinkedQueue<>();
-
+ List<CompletableFuture<Integer>> producerFutures = new ArrayList<>();
for (int i = 0; i < producerCount; i++) {
- producerExecutorService.submit(() -> {
- CompletableFuture<Integer> future =
produceMessages(messagesPerProducer);
- producerFutures.add(future);
- startSignal.countDown();
- });
+ producerFutures.add(CompletableFuture.supplyAsync(() ->
produceMessages(messagesPerProducer)));
}
-
- ConcurrentLinkedQueue<CompletableFuture<Integer>> futures1 = new
ConcurrentLinkedQueue<>();
- ConcurrentLinkedQueue<CompletableFuture<Integer>> futures2 = new
ConcurrentLinkedQueue<>();
- ConcurrentLinkedQueue<CompletableFuture<Integer>> futures3 = new
ConcurrentLinkedQueue<>();
-
// Wait for the producers to run
- try {
- boolean signalled = startSignal.await(15, TimeUnit.SECONDS);
- assertTrue(signalled);
- } catch (InterruptedException e) {
- fail("Exception awaiting start signal");
- }
+ assertDoesNotThrow(() ->
CompletableFuture.allOf(producerFutures.toArray(CompletableFuture[]::new))
+ .get(15, TimeUnit.SECONDS), "Exception awaiting
produceMessages");
+ int actualMessageSent =
producerFutures.stream().mapToInt(CompletableFuture::join).sum();
- int maxBytes = 100000;
+ List<CompletableFuture<Integer>> consumeMessagesFutures1 = new
ArrayList<>();
+ List<CompletableFuture<Integer>> consumeMessagesFutures2 = new
ArrayList<>();
+ List<CompletableFuture<Integer>> consumeMessagesFutures3 = new
ArrayList<>();
+ int maxBytes = 100000;
for (int i = 0; i < consumerCount; i++) {
Review Comment:
Thanks for the comment ! I think consumerGroupCount might still be confusing
since there are three consumer groups in this test. Instead of renaming,
perhaps we could remove the consumerCount variable and inlining its value
directly into the for-loop.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]