gaoran10 commented on code in PR #14371:
URL: https://github.com/apache/pulsar/pull/14371#discussion_r879424489


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java:
##########
@@ -1061,4 +1060,72 @@ public void testTxnTimeOutInClient() throws Exception{
                     .InvalidTxnStatusException);
         }
     }
+
+    @Test
+    public void testCumulativeAckRedeliverMessages() throws Exception {
+        String topic = NAMESPACE1 + "/testCumulativeAckRedeliverMessages";
+
+        int count = 5;
+        int transactionCumulativeAck = 3;
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName("test")
+                .subscribe();
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .create();
+
+        // send 5 messages
+        for (int i = 0; i < count; i++) {
+            producer.send((i + "").getBytes(UTF_8));
+        }
+
+        Transaction transaction = getTxn();
+        Transaction invalidTransaction = getTxn();
+
+        Message<byte[]> message = null;
+        for (int i = 0; i < transactionCumulativeAck; i++) {
+            message = consumer.receive();
+        }
+
+        // receive transaction in order
+        assertEquals(message.getValue(), (transactionCumulativeAck - 1 + 
"").getBytes(UTF_8));
+
+        // ack the last message
+        consumer.acknowledgeCumulativeAsync(message.getMessageId(), 
transaction).get();
+
+        // another ack will throw TransactionConflictException
+        try {
+            consumer.acknowledgeCumulativeAsync(message.getMessageId(), 
invalidTransaction).get();
+            fail();
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof 
PulsarClientException.TransactionConflictException);
+            // abort transaction then redeliver messages
+            transaction.abort().get();
+            // consumer redeliver messages
+            consumer.redeliverUnacknowledgedMessages();
+        }
+
+        // receive the rest of the message
+        for (int i = 0; i < count; i++) {
+            message = consumer.receive();

Review Comment:
   Do we need to add a check to verify the consumer receives all 5 messages?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to