brandboat commented on code in PR #18105:
URL: https://github.com/apache/kafka/pull/18105#discussion_r1884303970


##########
core/src/test/java/kafka/test/api/ShareConsumerTest.java:
##########
@@ -1069,100 +1054,47 @@ public void 
testMultipleConsumersInMultipleGroupsConcurrentConsumption(String pe
         alterShareAutoOffsetReset(groupId2, "earliest");
         alterShareAutoOffsetReset(groupId3, "earliest");
 
-        ExecutorService producerExecutorService = 
Executors.newFixedThreadPool(producerCount);
-        ExecutorService shareGroupExecutorService1 = 
Executors.newFixedThreadPool(consumerCount);
-        ExecutorService shareGroupExecutorService2 = 
Executors.newFixedThreadPool(consumerCount);
-        ExecutorService shareGroupExecutorService3 = 
Executors.newFixedThreadPool(consumerCount);
-
-        CountDownLatch startSignal = new CountDownLatch(producerCount);
-
-        ConcurrentLinkedQueue<CompletableFuture<Integer>> producerFutures = 
new ConcurrentLinkedQueue<>();
-
+        List<CompletableFuture<Integer>> producerFutures = new ArrayList<>();
         for (int i = 0; i < producerCount; i++) {
-            producerExecutorService.submit(() -> {
-                CompletableFuture<Integer> future = 
produceMessages(messagesPerProducer);
-                producerFutures.add(future);
-                startSignal.countDown();
-            });
+            producerFutures.add(CompletableFuture.supplyAsync(() -> 
produceMessages(messagesPerProducer)));
         }
-
-        ConcurrentLinkedQueue<CompletableFuture<Integer>> futures1 = new 
ConcurrentLinkedQueue<>();
-        ConcurrentLinkedQueue<CompletableFuture<Integer>> futures2 = new 
ConcurrentLinkedQueue<>();
-        ConcurrentLinkedQueue<CompletableFuture<Integer>> futures3 = new 
ConcurrentLinkedQueue<>();
-
         // Wait for the producers to run
-        try {
-            boolean signalled = startSignal.await(15, TimeUnit.SECONDS);
-            assertTrue(signalled);
-        } catch (InterruptedException e) {
-            fail("Exception awaiting start signal");
-        }
+        assertDoesNotThrow(() -> 
CompletableFuture.allOf(producerFutures.toArray(CompletableFuture[]::new))
+                .get(15, TimeUnit.SECONDS), "Exception awaiting 
produceMessages");
+        int actualMessageSent = 
producerFutures.stream().mapToInt(CompletableFuture::join).sum();
 
-        int maxBytes = 100000;
+        List<CompletableFuture<Integer>> consumeMessagesFutures1 = new 
ArrayList<>();
+        List<CompletableFuture<Integer>> consumeMessagesFutures2 = new 
ArrayList<>();
+        List<CompletableFuture<Integer>> consumeMessagesFutures3 = new 
ArrayList<>();
 
+        int maxBytes = 100000;
         for (int i = 0; i < consumerCount; i++) {

Review Comment:
   Thanks for the comment ! I think consumerGroupCount might still be confusing 
since there are three consumer groups in this test. Instead of renaming, 
perhaps we could remove the consumerCount variable and inlining its value 
directly into the for-loop.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to