congbobo184 commented on code in PR #17777:
URL: https://github.com/apache/pulsar/pull/17777#discussion_r980116725


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java:
##########
@@ -318,6 +319,84 @@ public Consumer<byte[]> getConsumer(String topicName, 
String subName) throws Pul
                 .subscribe();
     }
 
+    @Test
+    public void testAsyncSendOrAckForSingleFuture() throws Exception {
+        String topic = NAMESPACE1 + "/testSingleFuture";
+        int totalMessage = 10;
+        int threadSize = 30;
+        String topicName = "subscription";
+        
getPulsarServiceList().get(0).getConfig().setBrokerDeduplicationEnabled(false);
+        ExecutorService executorService = 
Executors.newFixedThreadPool(threadSize);
+
+        //build producer/consumer
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .producerName("producer")
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .create();
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscriptionName(topicName)
+                .subscribe();
+        //store the send/ack result  futures
+        ArrayList<CompletableFuture<MessageId>> sendFutures = new 
ArrayList<>();
+        ArrayList<CompletableFuture<Void>> ackFutures = new ArrayList<>();
+
+        //send and ack messages with transaction
+        Transaction transaction = pulsarClient.newTransaction()
+                .withTransactionTimeout(10, TimeUnit.SECONDS)
+                .build()
+                .get();
+
+        for (int i = 0; i < totalMessage * threadSize; i++) {
+            producer.newMessage().send();
+        }
+
+        CountDownLatch countDownLatch = new CountDownLatch(threadSize);
+        new Thread(() -> {
+            for (int i = 0; i < threadSize; i++) {
+                executorService.submit(() -> {
+                    try {
+                        for (int j = 0; j < totalMessage; j++) {
+                            CompletableFuture<MessageId> sendFuture = 
producer.newMessage(transaction).sendAsync();
+                            sendFutures.add(sendFuture);
+                            Message<byte[]> message = consumer.receive();
+                            CompletableFuture<Void> ackFuture = 
consumer.acknowledgeAsync(message.getMessageId(),
+                                    transaction);
+                            ackFutures.add(ackFuture);
+                        }
+                        countDownLatch.countDown();
+                    } catch (Exception e) {
+                        log.error("Failed to send/ack messages with 
transaction.", e);
+                    }
+                });
+            }
+        }).start();
+        //wait the all send/ack op is excuted and store its futures in the 
arraylist.
+        countDownLatch.await(10, TimeUnit.SECONDS);
+        transaction.commit().get();
+
+        //verify the final status is right.
+        Field ackCountField = 
TransactionImpl.class.getDeclaredField("ackCount");

Review Comment:
   can use WhiteboxImpl



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java:
##########
@@ -61,8 +64,12 @@ public class TransactionImpl implements Transaction , 
TimerTask {
     private final Map<Pair<String, String>, CompletableFuture<Void>> 
registerSubscriptionMap;
     private final TransactionCoordinatorClientImpl tcClient;
 
-    private final ArrayList<CompletableFuture<MessageId>> sendFutureList;
-    private final ArrayList<CompletableFuture<Void>> ackFutureList;
+    private CompletableFuture<MessageId> sendFuture;
+    private CompletableFuture<Void> ackFuture;
+
+    private AtomicLong ackCount = new AtomicLong(0);

Review Comment:
   final



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to