lordcheng10 commented on code in PR #18107:
URL: https://github.com/apache/pulsar/pull/18107#discussion_r1000083598


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java:
##########
@@ -2408,6 +2409,59 @@ public void testMaxProducersPerTopicUnlimited() throws 
Exception {
         }
     }
 
+
+    @Test
+    public void testAcknowledgmentGroupSize() throws Exception {
+        final String namespace = "prop-xyz/ns2";
+        final String topicName = "persistent://" + namespace + "/topic1";
+        admin.namespaces().createNamespace(namespace, Set.of("test"));
+        int acknowledgmentGroupSize = 6;
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
+                .subscriptionName("my-sub")
+                .acknowledgmentGroupTime(10000, TimeUnit.SECONDS)
+                .maxAcknowledgmentGroupSize(acknowledgmentGroupSize)
+                .subscribe();
+
+        PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+        ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) 
topic.getManagedLedger();
+        ManagedCursorImpl cursor = (ManagedCursorImpl) 
managedLedger.getCursors().iterator().next();
+
+        for (int i = 0; i < 10; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        MessageIdImpl ackMessageId = new MessageIdImpl(-1, -1, -1);
+        for (int i = 0; i < 10; i++) {
+            Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
+            if (msg != null) {
+                MessageId messageId = msg.getMessageId();
+                consumer.acknowledge(msg);
+                // When the acknowledgmentGroupSize message is confirmed, send 
ack will be triggered
+                if (i == (acknowledgmentGroupSize - 1)) {
+                    ackMessageId = (MessageIdImpl) messageId;
+                }
+            }
+        }
+
+        Thread.sleep(5000);
+        Position markDeletedPosition = cursor.getMarkDeletedPosition();
+        long ledgerId = markDeletedPosition.getLedgerId();
+        long entryId = markDeletedPosition.getEntryId();
+
+        assertEquals(ledgerId, ackMessageId.getLedgerId());
+        assertEquals(entryId, ackMessageId.getEntryId());

Review Comment:
   OK, I will fix



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to