chia7712 commented on code in PR #18105:
URL: https://github.com/apache/kafka/pull/18105#discussion_r1881120287
##########
core/src/test/java/kafka/test/api/ShareConsumerTest.java:
##########
@@ -1225,58 +1162,37 @@ public void
testMultipleConsumersInGroupFailureConcurrentConsumption(String pers
alterShareAutoOffsetReset(groupId, "earliest");
- ExecutorService consumerExecutorService =
Executors.newFixedThreadPool(consumerCount);
- ExecutorService producerExecutorService =
Executors.newFixedThreadPool(producerCount);
-
+ List<CompletableFuture<Void>> produceMessageFutures = new
ArrayList<>();
for (int i = 0; i < producerCount; i++) {
- Runnable task = () -> produceMessages(messagesPerProducer);
- producerExecutorService.submit(task);
+ produceMessageFutures.add(CompletableFuture.runAsync(() ->
produceMessages(messagesPerProducer)));
}
- ConcurrentLinkedQueue<CompletableFuture<Integer>> futuresSuccess = new
ConcurrentLinkedQueue<>();
-
- CountDownLatch startSignal = new CountDownLatch(1);
-
int maxBytes = 1000000;
- consumerExecutorService.submit(() -> {
- // The "failing" consumer polls but immediately closes, which
releases the records for the other consumers
- CompletableFuture<Integer> future = new CompletableFuture<>();
- AtomicInteger failedMessagesConsumed = new AtomicInteger(0);
- consumeMessages(failedMessagesConsumed, producerCount *
messagesPerProducer, groupId, 0, 1, false, future);
- startSignal.countDown();
- });
+ // The "failing" consumer polls but immediately closes, which releases
the records for the other consumers
+ AtomicInteger failedMessagesConsumed = new AtomicInteger(0);
+ CompletableFuture<Integer> failedMessagesConsumedFuture =
CompletableFuture.supplyAsync(
+ () -> consumeMessages(failedMessagesConsumed, producerCount *
messagesPerProducer, groupId,
+ 0, 1, false));
// Wait for the failed consumer to run
- try {
- boolean signalled = startSignal.await(15, TimeUnit.SECONDS);
- assertTrue(signalled);
- } catch (InterruptedException e) {
- fail("Exception awaiting start signal");
- }
+ assertDoesNotThrow(() -> failedMessagesConsumedFuture.get(15,
TimeUnit.SECONDS),
+ "Exception awaiting consumeMessages");
+ List<CompletableFuture<Integer>> consumeMessagesFutures = new
ArrayList<>();
for (int i = 0; i < consumerCount; i++) {
final int consumerNumber = i + 1;
- consumerExecutorService.submit(() -> {
- CompletableFuture<Integer> future = new CompletableFuture<>();
- futuresSuccess.add(future);
- consumeMessages(totalMessagesConsumed, producerCount *
messagesPerProducer, groupId, consumerNumber, 40, true, future, maxBytes);
- });
- }
- producerExecutorService.shutdown();
- consumerExecutorService.shutdown();
- try {
- producerExecutorService.awaitTermination(60, TimeUnit.SECONDS); //
Wait for all producer threads to complete
- consumerExecutorService.awaitTermination(60, TimeUnit.SECONDS); //
Wait for all consumer threads to complete
- int totalSuccessResult = 0;
- for (CompletableFuture<Integer> future : futuresSuccess) {
- totalSuccessResult += future.get();
- }
- assertEquals(producerCount * messagesPerProducer,
totalMessagesConsumed.get());
- assertEquals(producerCount * messagesPerProducer,
totalSuccessResult);
- } catch (Exception e) {
- fail("Exception occurred : " + e.getMessage());
+ consumeMessagesFutures.add(CompletableFuture.supplyAsync(
+ () -> consumeMessages(totalMessagesConsumed, producerCount
* messagesPerProducer,
+ groupId, consumerNumber, 40, true, maxBytes)));
}
+
+
CompletableFuture.allOf(produceMessageFutures.toArray(CompletableFuture[]::new)).get(60,
TimeUnit.SECONDS);
+
CompletableFuture.allOf(consumeMessagesFutures.toArray(CompletableFuture[]::new)).get(60,
TimeUnit.SECONDS);
+
+ int totalSuccessResult =
consumeMessagesFutures.stream().map(CompletableFuture::join).reduce(Integer::sum).orElse(0);
Review Comment:
```
int totalSuccessResult =
consumeMessagesFutures.stream().mapToInt(CompletableFuture::join).sum();
```
##########
core/src/test/java/kafka/test/api/ShareConsumerTest.java:
##########
@@ -1819,17 +1706,12 @@ private CompletableFuture<Integer> produceMessages(int
messageCount) {
recordFutures[i] = producer.send(record);
}
for (int i = 0; i < messageCount; i++) {
- try {
- recordFutures[i].get();
- messagesSent++;
- } catch (Exception e) {
- fail("Failed to send record: " + e);
- }
+ final int index = i;
+ assertDoesNotThrow(() -> recordFutures[index].get(), "Failed
to send record");
Review Comment:
it seems we expect all messages should be succeed, and hence we can rewrite
this function:
```java
private int produceMessages(int messageCount) {
try (KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer())) {
ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
IntStream.range(0, messageCount).forEach(__ ->
producer.send(record));
producer.flush();
}
return messageCount;
}
```
##########
core/src/test/java/kafka/test/api/ShareConsumerTest.java:
##########
@@ -1069,100 +1055,50 @@ public void
testMultipleConsumersInMultipleGroupsConcurrentConsumption(String pe
alterShareAutoOffsetReset(groupId2, "earliest");
alterShareAutoOffsetReset(groupId3, "earliest");
- ExecutorService producerExecutorService =
Executors.newFixedThreadPool(producerCount);
- ExecutorService shareGroupExecutorService1 =
Executors.newFixedThreadPool(consumerCount);
- ExecutorService shareGroupExecutorService2 =
Executors.newFixedThreadPool(consumerCount);
- ExecutorService shareGroupExecutorService3 =
Executors.newFixedThreadPool(consumerCount);
-
- CountDownLatch startSignal = new CountDownLatch(producerCount);
-
- ConcurrentLinkedQueue<CompletableFuture<Integer>> producerFutures =
new ConcurrentLinkedQueue<>();
-
+ List<CompletableFuture<Integer>> producerFutures = new ArrayList<>();
for (int i = 0; i < producerCount; i++) {
- producerExecutorService.submit(() -> {
- CompletableFuture<Integer> future =
produceMessages(messagesPerProducer);
- producerFutures.add(future);
- startSignal.countDown();
- });
+ producerFutures.add(CompletableFuture.supplyAsync(() ->
produceMessages(messagesPerProducer)));
}
-
- ConcurrentLinkedQueue<CompletableFuture<Integer>> futures1 = new
ConcurrentLinkedQueue<>();
- ConcurrentLinkedQueue<CompletableFuture<Integer>> futures2 = new
ConcurrentLinkedQueue<>();
- ConcurrentLinkedQueue<CompletableFuture<Integer>> futures3 = new
ConcurrentLinkedQueue<>();
-
// Wait for the producers to run
- try {
- boolean signalled = startSignal.await(15, TimeUnit.SECONDS);
- assertTrue(signalled);
- } catch (InterruptedException e) {
- fail("Exception awaiting start signal");
- }
+ assertDoesNotThrow(() ->
CompletableFuture.allOf(producerFutures.toArray(CompletableFuture[]::new))
+ .get(15, TimeUnit.SECONDS), "Exception awaiting
produceMessages");
+ int actualMessageSent =
producerFutures.stream().map(CompletableFuture::join).reduce(Integer::sum).orElse(0);
- int maxBytes = 100000;
+ List<CompletableFuture<Integer>> consumeMessagesFutures1 = new
ArrayList<>();
+ List<CompletableFuture<Integer>> consumeMessagesFutures2 = new
ArrayList<>();
+ List<CompletableFuture<Integer>> consumeMessagesFutures3 = new
ArrayList<>();
+ int maxBytes = 100000;
for (int i = 0; i < consumerCount; i++) {
final int consumerNumber = i + 1;
- shareGroupExecutorService1.submit(() -> {
- CompletableFuture<Integer> future = new CompletableFuture<>();
- futures1.add(future);
- consumeMessages(totalMessagesConsumedGroup1,
totalMessagesSent, "group1", consumerNumber, 100, true, future, maxBytes);
- });
- shareGroupExecutorService2.submit(() -> {
- CompletableFuture<Integer> future = new CompletableFuture<>();
- futures2.add(future);
- consumeMessages(totalMessagesConsumedGroup2,
totalMessagesSent, "group2", consumerNumber, 100, true, future, maxBytes);
- });
- shareGroupExecutorService3.submit(() -> {
- CompletableFuture<Integer> future = new CompletableFuture<>();
- futures3.add(future);
- consumeMessages(totalMessagesConsumedGroup3,
totalMessagesSent, "group3", consumerNumber, 100, true, future, maxBytes);
- });
- }
- producerExecutorService.shutdown();
- shareGroupExecutorService1.shutdown();
- shareGroupExecutorService2.shutdown();
- shareGroupExecutorService3.shutdown();
- try {
- shareGroupExecutorService1.awaitTermination(120,
TimeUnit.SECONDS); // Wait for all consumer threads for group 1 to complete
- shareGroupExecutorService2.awaitTermination(120,
TimeUnit.SECONDS); // Wait for all consumer threads for group 2 to complete
- shareGroupExecutorService3.awaitTermination(120,
TimeUnit.SECONDS); // Wait for all consumer threads for group 3 to complete
-
- int totalResult1 = 0;
- for (CompletableFuture<Integer> future : futures1) {
- totalResult1 += future.get();
- }
+ consumeMessagesFutures1.add(CompletableFuture.supplyAsync(() ->
+ consumeMessages(totalMessagesConsumedGroup1,
totalMessagesSent,
+ "group1", consumerNumber, 100, true, maxBytes)));
- int totalResult2 = 0;
- for (CompletableFuture<Integer> future : futures2) {
- totalResult2 += future.get();
- }
+ consumeMessagesFutures2.add(CompletableFuture.supplyAsync(() ->
+ consumeMessages(totalMessagesConsumedGroup2,
totalMessagesSent,
+ "group2", consumerNumber, 100, true, maxBytes)));
- int totalResult3 = 0;
- for (CompletableFuture<Integer> future : futures3) {
- totalResult3 += future.get();
- }
+ consumeMessagesFutures3.add(CompletableFuture.supplyAsync(() ->
+ consumeMessages(totalMessagesConsumedGroup3,
totalMessagesSent,
+ "group3", consumerNumber, 100, true, maxBytes)));
+ }
- assertEquals(totalMessagesSent, totalMessagesConsumedGroup1.get());
- assertEquals(totalMessagesSent, totalMessagesConsumedGroup2.get());
- assertEquals(totalMessagesSent, totalMessagesConsumedGroup3.get());
- assertEquals(totalMessagesSent, totalResult1);
- assertEquals(totalMessagesSent, totalResult2);
- assertEquals(totalMessagesSent, totalResult3);
+ CompletableFuture.allOf(Stream.of(consumeMessagesFutures1.stream(),
consumeMessagesFutures2.stream(),
+
consumeMessagesFutures3.stream()).flatMap(Function.identity()).toArray(CompletableFuture[]::new))
+ .get(120, TimeUnit.SECONDS);
- int actualMessagesSent = 0;
- try {
- producerExecutorService.awaitTermination(60,
TimeUnit.SECONDS); // Wait for all producer threads to complete
+ int totalResult1 =
consumeMessagesFutures1.stream().map(CompletableFuture::join).reduce(Integer::sum).orElse(0);
+ int totalResult2 =
consumeMessagesFutures2.stream().map(CompletableFuture::join).reduce(Integer::sum).orElse(0);
+ int totalResult3 =
consumeMessagesFutures3.stream().map(CompletableFuture::join).reduce(Integer::sum).orElse(0);
- for (CompletableFuture<Integer> future : producerFutures) {
- actualMessagesSent += future.get();
- }
- } catch (Exception e) {
- fail("Exception occurred : " + e.getMessage());
- }
- assertEquals(totalMessagesSent, actualMessagesSent);
- } catch (Exception e) {
- fail("Exception occurred : " + e.getMessage());
- }
+ assertEquals(totalMessagesSent, totalMessagesConsumedGroup1.get());
Review Comment:
Do we actually need `totalMessagesConsumedGroup1`, or can we simply count
the returned values from all `consumeMessages` instead?
##########
core/src/test/java/kafka/test/api/ShareConsumerTest.java:
##########
@@ -1069,100 +1055,50 @@ public void
testMultipleConsumersInMultipleGroupsConcurrentConsumption(String pe
alterShareAutoOffsetReset(groupId2, "earliest");
alterShareAutoOffsetReset(groupId3, "earliest");
- ExecutorService producerExecutorService =
Executors.newFixedThreadPool(producerCount);
- ExecutorService shareGroupExecutorService1 =
Executors.newFixedThreadPool(consumerCount);
- ExecutorService shareGroupExecutorService2 =
Executors.newFixedThreadPool(consumerCount);
- ExecutorService shareGroupExecutorService3 =
Executors.newFixedThreadPool(consumerCount);
-
- CountDownLatch startSignal = new CountDownLatch(producerCount);
-
- ConcurrentLinkedQueue<CompletableFuture<Integer>> producerFutures =
new ConcurrentLinkedQueue<>();
-
+ List<CompletableFuture<Integer>> producerFutures = new ArrayList<>();
for (int i = 0; i < producerCount; i++) {
- producerExecutorService.submit(() -> {
- CompletableFuture<Integer> future =
produceMessages(messagesPerProducer);
- producerFutures.add(future);
- startSignal.countDown();
- });
+ producerFutures.add(CompletableFuture.supplyAsync(() ->
produceMessages(messagesPerProducer)));
}
-
- ConcurrentLinkedQueue<CompletableFuture<Integer>> futures1 = new
ConcurrentLinkedQueue<>();
- ConcurrentLinkedQueue<CompletableFuture<Integer>> futures2 = new
ConcurrentLinkedQueue<>();
- ConcurrentLinkedQueue<CompletableFuture<Integer>> futures3 = new
ConcurrentLinkedQueue<>();
-
// Wait for the producers to run
- try {
- boolean signalled = startSignal.await(15, TimeUnit.SECONDS);
- assertTrue(signalled);
- } catch (InterruptedException e) {
- fail("Exception awaiting start signal");
- }
+ assertDoesNotThrow(() ->
CompletableFuture.allOf(producerFutures.toArray(CompletableFuture[]::new))
+ .get(15, TimeUnit.SECONDS), "Exception awaiting
produceMessages");
+ int actualMessageSent =
producerFutures.stream().map(CompletableFuture::join).reduce(Integer::sum).orElse(0);
- int maxBytes = 100000;
+ List<CompletableFuture<Integer>> consumeMessagesFutures1 = new
ArrayList<>();
+ List<CompletableFuture<Integer>> consumeMessagesFutures2 = new
ArrayList<>();
+ List<CompletableFuture<Integer>> consumeMessagesFutures3 = new
ArrayList<>();
+ int maxBytes = 100000;
for (int i = 0; i < consumerCount; i++) {
final int consumerNumber = i + 1;
- shareGroupExecutorService1.submit(() -> {
- CompletableFuture<Integer> future = new CompletableFuture<>();
- futures1.add(future);
- consumeMessages(totalMessagesConsumedGroup1,
totalMessagesSent, "group1", consumerNumber, 100, true, future, maxBytes);
- });
- shareGroupExecutorService2.submit(() -> {
- CompletableFuture<Integer> future = new CompletableFuture<>();
- futures2.add(future);
- consumeMessages(totalMessagesConsumedGroup2,
totalMessagesSent, "group2", consumerNumber, 100, true, future, maxBytes);
- });
- shareGroupExecutorService3.submit(() -> {
- CompletableFuture<Integer> future = new CompletableFuture<>();
- futures3.add(future);
- consumeMessages(totalMessagesConsumedGroup3,
totalMessagesSent, "group3", consumerNumber, 100, true, future, maxBytes);
- });
- }
- producerExecutorService.shutdown();
- shareGroupExecutorService1.shutdown();
- shareGroupExecutorService2.shutdown();
- shareGroupExecutorService3.shutdown();
- try {
- shareGroupExecutorService1.awaitTermination(120,
TimeUnit.SECONDS); // Wait for all consumer threads for group 1 to complete
- shareGroupExecutorService2.awaitTermination(120,
TimeUnit.SECONDS); // Wait for all consumer threads for group 2 to complete
- shareGroupExecutorService3.awaitTermination(120,
TimeUnit.SECONDS); // Wait for all consumer threads for group 3 to complete
-
- int totalResult1 = 0;
- for (CompletableFuture<Integer> future : futures1) {
- totalResult1 += future.get();
- }
+ consumeMessagesFutures1.add(CompletableFuture.supplyAsync(() ->
+ consumeMessages(totalMessagesConsumedGroup1,
totalMessagesSent,
+ "group1", consumerNumber, 100, true, maxBytes)));
- int totalResult2 = 0;
- for (CompletableFuture<Integer> future : futures2) {
- totalResult2 += future.get();
- }
+ consumeMessagesFutures2.add(CompletableFuture.supplyAsync(() ->
+ consumeMessages(totalMessagesConsumedGroup2,
totalMessagesSent,
+ "group2", consumerNumber, 100, true, maxBytes)));
- int totalResult3 = 0;
- for (CompletableFuture<Integer> future : futures3) {
- totalResult3 += future.get();
- }
+ consumeMessagesFutures3.add(CompletableFuture.supplyAsync(() ->
+ consumeMessages(totalMessagesConsumedGroup3,
totalMessagesSent,
+ "group3", consumerNumber, 100, true, maxBytes)));
+ }
- assertEquals(totalMessagesSent, totalMessagesConsumedGroup1.get());
- assertEquals(totalMessagesSent, totalMessagesConsumedGroup2.get());
- assertEquals(totalMessagesSent, totalMessagesConsumedGroup3.get());
- assertEquals(totalMessagesSent, totalResult1);
- assertEquals(totalMessagesSent, totalResult2);
- assertEquals(totalMessagesSent, totalResult3);
+ CompletableFuture.allOf(Stream.of(consumeMessagesFutures1.stream(),
consumeMessagesFutures2.stream(),
+
consumeMessagesFutures3.stream()).flatMap(Function.identity()).toArray(CompletableFuture[]::new))
+ .get(120, TimeUnit.SECONDS);
- int actualMessagesSent = 0;
- try {
- producerExecutorService.awaitTermination(60,
TimeUnit.SECONDS); // Wait for all producer threads to complete
+ int totalResult1 =
consumeMessagesFutures1.stream().map(CompletableFuture::join).reduce(Integer::sum).orElse(0);
Review Comment:
```java
int totalResult1 =
consumeMessagesFutures1.stream().mapToInt(CompletableFuture::join).sum();
int totalResult2 =
consumeMessagesFutures2.stream().mapToInt(CompletableFuture::join).sum();
int totalResult3 =
consumeMessagesFutures3.stream().mapToInt(CompletableFuture::join).sum();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]