liangyepianzhou commented on code in PR #17836:
URL: https://github.com/apache/pulsar/pull/17836#discussion_r1027571438
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java:
##########
@@ -1615,4 +1615,59 @@ public void testGetTxnState() throws Exception {
Transaction abortingTxn = transaction;
Awaitility.await().until(() -> abortingTxn.getState() ==
Transaction.State.ABORTING);
}
+
+ @Test
+ public void testUpdateSequenceIdInSyncCodeSegment() throws Exception {
+ String topic = NAMESPACE1 + "/sequenceId";
+ int totalMessage = 10;
+ int threadSize = 30;
+ String topicName = "subscription";
+ ExecutorService executorService =
Executors.newFixedThreadPool(threadSize);
+
+ //build producer/consumer
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic)
+ .producerName("producer")
+ .sendTimeout(0, TimeUnit.SECONDS)
+ .create();
+
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionType(SubscriptionType.Exclusive)
+ .subscriptionName(topicName)
+ .subscribe();
+
+ //send and ack messages with transaction
+ Transaction transaction = pulsarClient.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.SECONDS)
+ .build()
+ .get();
+
+ for (int i = 0; i < totalMessage * threadSize; i++) {
+ producer.newMessage().send();
+ }
+
+ CountDownLatch countDownLatch = new CountDownLatch(threadSize);
+ for (int i = 0; i < threadSize; i++) {
+ executorService.submit(() -> {
+ try {
+ for (int j = 0; j < totalMessage; j++) {
+ //The message will be sent with out-of-order sequence
ID.
+ producer.newMessage(transaction).sendAsync();
+ Message<byte[]> message = consumer.receive();
+ consumer.acknowledgeAsync(message.getMessageId(),
+ transaction);
+ }
+ } catch (Exception e) {
+ log.error("Failed to send/ack messages with transaction.",
e);
+ } finally {
+ countDownLatch.countDown();
+ }
+ });
+ }
+ //wait the all send/ack op is executed and store its futures in the
arraylist.
+ countDownLatch.await(5, TimeUnit.SECONDS);
+ //The transaction will be failed due to timeout.
+ transaction.commit().get();
+ }
Review Comment:
The unexpected behavior is the transaction will not be committed
successfully. This is clarified in the note.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]