chia7712 commented on code in PR #18105:
URL: https://github.com/apache/kafka/pull/18105#discussion_r1881120287


##########
core/src/test/java/kafka/test/api/ShareConsumerTest.java:
##########
@@ -1225,58 +1162,37 @@ public void 
testMultipleConsumersInGroupFailureConcurrentConsumption(String pers
 
         alterShareAutoOffsetReset(groupId, "earliest");
 
-        ExecutorService consumerExecutorService = 
Executors.newFixedThreadPool(consumerCount);
-        ExecutorService producerExecutorService = 
Executors.newFixedThreadPool(producerCount);
-
+        List<CompletableFuture<Void>> produceMessageFutures = new 
ArrayList<>();
         for (int i = 0; i < producerCount; i++) {
-            Runnable task = () -> produceMessages(messagesPerProducer);
-            producerExecutorService.submit(task);
+            produceMessageFutures.add(CompletableFuture.runAsync(() -> 
produceMessages(messagesPerProducer)));
         }
 
-        ConcurrentLinkedQueue<CompletableFuture<Integer>> futuresSuccess = new 
ConcurrentLinkedQueue<>();
-
-        CountDownLatch startSignal = new CountDownLatch(1);
-
         int maxBytes = 1000000;
 
-        consumerExecutorService.submit(() -> {
-            // The "failing" consumer polls but immediately closes, which 
releases the records for the other consumers
-            CompletableFuture<Integer> future = new CompletableFuture<>();
-            AtomicInteger failedMessagesConsumed = new AtomicInteger(0);
-            consumeMessages(failedMessagesConsumed, producerCount * 
messagesPerProducer, groupId, 0, 1, false, future);
-            startSignal.countDown();
-        });
+        // The "failing" consumer polls but immediately closes, which releases 
the records for the other consumers
+        AtomicInteger failedMessagesConsumed = new AtomicInteger(0);
+        CompletableFuture<Integer> failedMessagesConsumedFuture = 
CompletableFuture.supplyAsync(
+                () -> consumeMessages(failedMessagesConsumed, producerCount * 
messagesPerProducer, groupId,
+                        0, 1, false));
 
         // Wait for the failed consumer to run
-        try {
-            boolean signalled = startSignal.await(15, TimeUnit.SECONDS);
-            assertTrue(signalled);
-        } catch (InterruptedException e) {
-            fail("Exception awaiting start signal");
-        }
+        assertDoesNotThrow(() -> failedMessagesConsumedFuture.get(15, 
TimeUnit.SECONDS),
+                "Exception awaiting consumeMessages");
 
+        List<CompletableFuture<Integer>> consumeMessagesFutures = new 
ArrayList<>();
         for (int i = 0; i < consumerCount; i++) {
             final int consumerNumber = i + 1;
-            consumerExecutorService.submit(() -> {
-                CompletableFuture<Integer> future = new CompletableFuture<>();
-                futuresSuccess.add(future);
-                consumeMessages(totalMessagesConsumed, producerCount * 
messagesPerProducer, groupId, consumerNumber, 40, true, future, maxBytes);
-            });
-        }
-        producerExecutorService.shutdown();
-        consumerExecutorService.shutdown();
-        try {
-            producerExecutorService.awaitTermination(60, TimeUnit.SECONDS); // 
Wait for all producer threads to complete
-            consumerExecutorService.awaitTermination(60, TimeUnit.SECONDS); // 
Wait for all consumer threads to complete
-            int totalSuccessResult = 0;
-            for (CompletableFuture<Integer> future : futuresSuccess) {
-                totalSuccessResult += future.get();
-            }
-            assertEquals(producerCount * messagesPerProducer, 
totalMessagesConsumed.get());
-            assertEquals(producerCount * messagesPerProducer, 
totalSuccessResult);
-        } catch (Exception e) {
-            fail("Exception occurred : " + e.getMessage());
+            consumeMessagesFutures.add(CompletableFuture.supplyAsync(
+                    () -> consumeMessages(totalMessagesConsumed, producerCount 
* messagesPerProducer,
+                            groupId, consumerNumber, 40, true, maxBytes)));
         }
+
+        
CompletableFuture.allOf(produceMessageFutures.toArray(CompletableFuture[]::new)).get(60,
 TimeUnit.SECONDS);
+        
CompletableFuture.allOf(consumeMessagesFutures.toArray(CompletableFuture[]::new)).get(60,
 TimeUnit.SECONDS);
+
+        int totalSuccessResult = 
consumeMessagesFutures.stream().map(CompletableFuture::join).reduce(Integer::sum).orElse(0);

Review Comment:
   ```
   int totalSuccessResult = 
consumeMessagesFutures.stream().mapToInt(CompletableFuture::join).sum();
   ```



##########
core/src/test/java/kafka/test/api/ShareConsumerTest.java:
##########
@@ -1819,17 +1706,12 @@ private CompletableFuture<Integer> produceMessages(int 
messageCount) {
                 recordFutures[i] = producer.send(record);
             }
             for (int i = 0; i < messageCount; i++) {
-                try {
-                    recordFutures[i].get();
-                    messagesSent++;
-                } catch (Exception e) {
-                    fail("Failed to send record: " + e);
-                }
+                final int index = i;
+                assertDoesNotThrow(() -> recordFutures[index].get(), "Failed 
to send record");

Review Comment:
   it  seems we expect all messages should be succeed, and hence we can rewrite 
this function:
   ```java
       private int produceMessages(int messageCount) {
           try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer())) {
               ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
               IntStream.range(0, messageCount).forEach(__ -> 
producer.send(record));
               producer.flush();
           }
           return messageCount;
       }
   ```



##########
core/src/test/java/kafka/test/api/ShareConsumerTest.java:
##########
@@ -1069,100 +1055,50 @@ public void 
testMultipleConsumersInMultipleGroupsConcurrentConsumption(String pe
         alterShareAutoOffsetReset(groupId2, "earliest");
         alterShareAutoOffsetReset(groupId3, "earliest");
 
-        ExecutorService producerExecutorService = 
Executors.newFixedThreadPool(producerCount);
-        ExecutorService shareGroupExecutorService1 = 
Executors.newFixedThreadPool(consumerCount);
-        ExecutorService shareGroupExecutorService2 = 
Executors.newFixedThreadPool(consumerCount);
-        ExecutorService shareGroupExecutorService3 = 
Executors.newFixedThreadPool(consumerCount);
-
-        CountDownLatch startSignal = new CountDownLatch(producerCount);
-
-        ConcurrentLinkedQueue<CompletableFuture<Integer>> producerFutures = 
new ConcurrentLinkedQueue<>();
-
+        List<CompletableFuture<Integer>> producerFutures = new ArrayList<>();
         for (int i = 0; i < producerCount; i++) {
-            producerExecutorService.submit(() -> {
-                CompletableFuture<Integer> future = 
produceMessages(messagesPerProducer);
-                producerFutures.add(future);
-                startSignal.countDown();
-            });
+            producerFutures.add(CompletableFuture.supplyAsync(() -> 
produceMessages(messagesPerProducer)));
         }
-
-        ConcurrentLinkedQueue<CompletableFuture<Integer>> futures1 = new 
ConcurrentLinkedQueue<>();
-        ConcurrentLinkedQueue<CompletableFuture<Integer>> futures2 = new 
ConcurrentLinkedQueue<>();
-        ConcurrentLinkedQueue<CompletableFuture<Integer>> futures3 = new 
ConcurrentLinkedQueue<>();
-
         // Wait for the producers to run
-        try {
-            boolean signalled = startSignal.await(15, TimeUnit.SECONDS);
-            assertTrue(signalled);
-        } catch (InterruptedException e) {
-            fail("Exception awaiting start signal");
-        }
+        assertDoesNotThrow(() -> 
CompletableFuture.allOf(producerFutures.toArray(CompletableFuture[]::new))
+                .get(15, TimeUnit.SECONDS), "Exception awaiting 
produceMessages");
+        int actualMessageSent = 
producerFutures.stream().map(CompletableFuture::join).reduce(Integer::sum).orElse(0);
 
-        int maxBytes = 100000;
+        List<CompletableFuture<Integer>> consumeMessagesFutures1 = new 
ArrayList<>();
+        List<CompletableFuture<Integer>> consumeMessagesFutures2 = new 
ArrayList<>();
+        List<CompletableFuture<Integer>> consumeMessagesFutures3 = new 
ArrayList<>();
 
+        int maxBytes = 100000;
         for (int i = 0; i < consumerCount; i++) {
             final int consumerNumber = i + 1;
-            shareGroupExecutorService1.submit(() -> {
-                CompletableFuture<Integer> future = new CompletableFuture<>();
-                futures1.add(future);
-                consumeMessages(totalMessagesConsumedGroup1, 
totalMessagesSent, "group1", consumerNumber, 100, true, future, maxBytes);
-            });
-            shareGroupExecutorService2.submit(() -> {
-                CompletableFuture<Integer> future = new CompletableFuture<>();
-                futures2.add(future);
-                consumeMessages(totalMessagesConsumedGroup2, 
totalMessagesSent, "group2", consumerNumber, 100, true, future, maxBytes);
-            });
-            shareGroupExecutorService3.submit(() -> {
-                CompletableFuture<Integer> future = new CompletableFuture<>();
-                futures3.add(future);
-                consumeMessages(totalMessagesConsumedGroup3, 
totalMessagesSent, "group3", consumerNumber, 100, true, future, maxBytes);
-            });
-        }
-        producerExecutorService.shutdown();
-        shareGroupExecutorService1.shutdown();
-        shareGroupExecutorService2.shutdown();
-        shareGroupExecutorService3.shutdown();
-        try {
-            shareGroupExecutorService1.awaitTermination(120, 
TimeUnit.SECONDS); // Wait for all consumer threads for group 1 to complete
-            shareGroupExecutorService2.awaitTermination(120, 
TimeUnit.SECONDS); // Wait for all consumer threads for group 2 to complete
-            shareGroupExecutorService3.awaitTermination(120, 
TimeUnit.SECONDS); // Wait for all consumer threads for group 3 to complete
-
-            int totalResult1 = 0;
-            for (CompletableFuture<Integer> future : futures1) {
-                totalResult1 += future.get();
-            }
+            consumeMessagesFutures1.add(CompletableFuture.supplyAsync(() ->
+                    consumeMessages(totalMessagesConsumedGroup1, 
totalMessagesSent,
+                            "group1", consumerNumber, 100, true, maxBytes)));
 
-            int totalResult2 = 0;
-            for (CompletableFuture<Integer> future : futures2) {
-                totalResult2 += future.get();
-            }
+            consumeMessagesFutures2.add(CompletableFuture.supplyAsync(() ->
+                    consumeMessages(totalMessagesConsumedGroup2, 
totalMessagesSent,
+                            "group2", consumerNumber, 100, true, maxBytes)));
 
-            int totalResult3 = 0;
-            for (CompletableFuture<Integer> future : futures3) {
-                totalResult3 += future.get();
-            }
+            consumeMessagesFutures3.add(CompletableFuture.supplyAsync(() ->
+                    consumeMessages(totalMessagesConsumedGroup3, 
totalMessagesSent,
+                            "group3", consumerNumber, 100, true, maxBytes)));
+        }
 
-            assertEquals(totalMessagesSent, totalMessagesConsumedGroup1.get());
-            assertEquals(totalMessagesSent, totalMessagesConsumedGroup2.get());
-            assertEquals(totalMessagesSent, totalMessagesConsumedGroup3.get());
-            assertEquals(totalMessagesSent, totalResult1);
-            assertEquals(totalMessagesSent, totalResult2);
-            assertEquals(totalMessagesSent, totalResult3);
+        CompletableFuture.allOf(Stream.of(consumeMessagesFutures1.stream(), 
consumeMessagesFutures2.stream(),
+                        
consumeMessagesFutures3.stream()).flatMap(Function.identity()).toArray(CompletableFuture[]::new))
+                .get(120, TimeUnit.SECONDS);
 
-            int actualMessagesSent = 0;
-            try {
-                producerExecutorService.awaitTermination(60, 
TimeUnit.SECONDS); // Wait for all producer threads to complete
+        int totalResult1 = 
consumeMessagesFutures1.stream().map(CompletableFuture::join).reduce(Integer::sum).orElse(0);
+        int totalResult2 = 
consumeMessagesFutures2.stream().map(CompletableFuture::join).reduce(Integer::sum).orElse(0);
+        int totalResult3 = 
consumeMessagesFutures3.stream().map(CompletableFuture::join).reduce(Integer::sum).orElse(0);
 
-                for (CompletableFuture<Integer> future : producerFutures) {
-                    actualMessagesSent += future.get();
-                }
-            } catch (Exception e) {
-                fail("Exception occurred : " + e.getMessage());
-            }
-            assertEquals(totalMessagesSent, actualMessagesSent);
-        } catch (Exception e) {
-            fail("Exception occurred : " + e.getMessage());
-        }
+        assertEquals(totalMessagesSent, totalMessagesConsumedGroup1.get());

Review Comment:
   Do we actually need `totalMessagesConsumedGroup1`, or can we simply count 
the returned values from all `consumeMessages` instead?



##########
core/src/test/java/kafka/test/api/ShareConsumerTest.java:
##########
@@ -1069,100 +1055,50 @@ public void 
testMultipleConsumersInMultipleGroupsConcurrentConsumption(String pe
         alterShareAutoOffsetReset(groupId2, "earliest");
         alterShareAutoOffsetReset(groupId3, "earliest");
 
-        ExecutorService producerExecutorService = 
Executors.newFixedThreadPool(producerCount);
-        ExecutorService shareGroupExecutorService1 = 
Executors.newFixedThreadPool(consumerCount);
-        ExecutorService shareGroupExecutorService2 = 
Executors.newFixedThreadPool(consumerCount);
-        ExecutorService shareGroupExecutorService3 = 
Executors.newFixedThreadPool(consumerCount);
-
-        CountDownLatch startSignal = new CountDownLatch(producerCount);
-
-        ConcurrentLinkedQueue<CompletableFuture<Integer>> producerFutures = 
new ConcurrentLinkedQueue<>();
-
+        List<CompletableFuture<Integer>> producerFutures = new ArrayList<>();
         for (int i = 0; i < producerCount; i++) {
-            producerExecutorService.submit(() -> {
-                CompletableFuture<Integer> future = 
produceMessages(messagesPerProducer);
-                producerFutures.add(future);
-                startSignal.countDown();
-            });
+            producerFutures.add(CompletableFuture.supplyAsync(() -> 
produceMessages(messagesPerProducer)));
         }
-
-        ConcurrentLinkedQueue<CompletableFuture<Integer>> futures1 = new 
ConcurrentLinkedQueue<>();
-        ConcurrentLinkedQueue<CompletableFuture<Integer>> futures2 = new 
ConcurrentLinkedQueue<>();
-        ConcurrentLinkedQueue<CompletableFuture<Integer>> futures3 = new 
ConcurrentLinkedQueue<>();
-
         // Wait for the producers to run
-        try {
-            boolean signalled = startSignal.await(15, TimeUnit.SECONDS);
-            assertTrue(signalled);
-        } catch (InterruptedException e) {
-            fail("Exception awaiting start signal");
-        }
+        assertDoesNotThrow(() -> 
CompletableFuture.allOf(producerFutures.toArray(CompletableFuture[]::new))
+                .get(15, TimeUnit.SECONDS), "Exception awaiting 
produceMessages");
+        int actualMessageSent = 
producerFutures.stream().map(CompletableFuture::join).reduce(Integer::sum).orElse(0);
 
-        int maxBytes = 100000;
+        List<CompletableFuture<Integer>> consumeMessagesFutures1 = new 
ArrayList<>();
+        List<CompletableFuture<Integer>> consumeMessagesFutures2 = new 
ArrayList<>();
+        List<CompletableFuture<Integer>> consumeMessagesFutures3 = new 
ArrayList<>();
 
+        int maxBytes = 100000;
         for (int i = 0; i < consumerCount; i++) {
             final int consumerNumber = i + 1;
-            shareGroupExecutorService1.submit(() -> {
-                CompletableFuture<Integer> future = new CompletableFuture<>();
-                futures1.add(future);
-                consumeMessages(totalMessagesConsumedGroup1, 
totalMessagesSent, "group1", consumerNumber, 100, true, future, maxBytes);
-            });
-            shareGroupExecutorService2.submit(() -> {
-                CompletableFuture<Integer> future = new CompletableFuture<>();
-                futures2.add(future);
-                consumeMessages(totalMessagesConsumedGroup2, 
totalMessagesSent, "group2", consumerNumber, 100, true, future, maxBytes);
-            });
-            shareGroupExecutorService3.submit(() -> {
-                CompletableFuture<Integer> future = new CompletableFuture<>();
-                futures3.add(future);
-                consumeMessages(totalMessagesConsumedGroup3, 
totalMessagesSent, "group3", consumerNumber, 100, true, future, maxBytes);
-            });
-        }
-        producerExecutorService.shutdown();
-        shareGroupExecutorService1.shutdown();
-        shareGroupExecutorService2.shutdown();
-        shareGroupExecutorService3.shutdown();
-        try {
-            shareGroupExecutorService1.awaitTermination(120, 
TimeUnit.SECONDS); // Wait for all consumer threads for group 1 to complete
-            shareGroupExecutorService2.awaitTermination(120, 
TimeUnit.SECONDS); // Wait for all consumer threads for group 2 to complete
-            shareGroupExecutorService3.awaitTermination(120, 
TimeUnit.SECONDS); // Wait for all consumer threads for group 3 to complete
-
-            int totalResult1 = 0;
-            for (CompletableFuture<Integer> future : futures1) {
-                totalResult1 += future.get();
-            }
+            consumeMessagesFutures1.add(CompletableFuture.supplyAsync(() ->
+                    consumeMessages(totalMessagesConsumedGroup1, 
totalMessagesSent,
+                            "group1", consumerNumber, 100, true, maxBytes)));
 
-            int totalResult2 = 0;
-            for (CompletableFuture<Integer> future : futures2) {
-                totalResult2 += future.get();
-            }
+            consumeMessagesFutures2.add(CompletableFuture.supplyAsync(() ->
+                    consumeMessages(totalMessagesConsumedGroup2, 
totalMessagesSent,
+                            "group2", consumerNumber, 100, true, maxBytes)));
 
-            int totalResult3 = 0;
-            for (CompletableFuture<Integer> future : futures3) {
-                totalResult3 += future.get();
-            }
+            consumeMessagesFutures3.add(CompletableFuture.supplyAsync(() ->
+                    consumeMessages(totalMessagesConsumedGroup3, 
totalMessagesSent,
+                            "group3", consumerNumber, 100, true, maxBytes)));
+        }
 
-            assertEquals(totalMessagesSent, totalMessagesConsumedGroup1.get());
-            assertEquals(totalMessagesSent, totalMessagesConsumedGroup2.get());
-            assertEquals(totalMessagesSent, totalMessagesConsumedGroup3.get());
-            assertEquals(totalMessagesSent, totalResult1);
-            assertEquals(totalMessagesSent, totalResult2);
-            assertEquals(totalMessagesSent, totalResult3);
+        CompletableFuture.allOf(Stream.of(consumeMessagesFutures1.stream(), 
consumeMessagesFutures2.stream(),
+                        
consumeMessagesFutures3.stream()).flatMap(Function.identity()).toArray(CompletableFuture[]::new))
+                .get(120, TimeUnit.SECONDS);
 
-            int actualMessagesSent = 0;
-            try {
-                producerExecutorService.awaitTermination(60, 
TimeUnit.SECONDS); // Wait for all producer threads to complete
+        int totalResult1 = 
consumeMessagesFutures1.stream().map(CompletableFuture::join).reduce(Integer::sum).orElse(0);

Review Comment:
   ```java
           int totalResult1 = 
consumeMessagesFutures1.stream().mapToInt(CompletableFuture::join).sum();
           int totalResult2 = 
consumeMessagesFutures2.stream().mapToInt(CompletableFuture::join).sum();
           int totalResult3 = 
consumeMessagesFutures3.stream().mapToInt(CompletableFuture::join).sum();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to