eolivelli opened a new issue, #16421: URL: https://github.com/apache/pulsar/issues/16421
I am doing some testing on Shared subscription and Batch messages with the current Pulsar master. The behaviour that I am observing is that when you have Batch messages the Consumer is sending flow control messages for more messages that it can handle. This is how to reproduce the problem: - write 100.000 messages using batching - start a Consumer with a Shared subscription (from the beginning of the topic) - you will see that the PersistentDispatcherMultipleConsumers `consumerFlow` trigger the read of many messages This is happening because `consumerFlow` calls `readMoreEntries() ` and `readMoreEntries() ` sees that there are messages to be re-delivered, because the consumer still haven't acknowledged them. This is turn requests the ManagedCursor to read the data from storage. I have observed this behaviour while working on offloader performances, but it also happens with regular BK based ledgers. This simple test case reproduces the problem, I append it to this test https://github.com/apache/pulsar/blob/1ba180cbc7490eff6ac6d3a78d61ce7919236c95/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java#L66 ``` @Test public void testConsumerFlowOnSharedSubscription() throws Exception { String topic = "persistent://my-property/my-ns/topic" + UUID.randomUUID(); admin.topics().createNonPartitionedTopic(topic); String subName = "my-sub"; int numMessages = 20_000; final CountDownLatch count = new CountDownLatch(numMessages); try (Consumer<byte[]> consumer = pulsarClient.newConsumer() .subscriptionMode(SubscriptionMode.Durable) .subscriptionType(SubscriptionType.Shared) .topic(topic) .subscriptionName(subName) .messageListener(new MessageListener<byte[]>() { @Override public void received(Consumer<byte[]> consumer, Message<byte[]> msg) { //log.info("received {} - {}", msg, count.getCount()); consumer.acknowledgeAsync(msg); count.countDown(); } }) .subscribe(); Producer<byte[]> producer = pulsarClient .newProducer() .blockIfQueueFull(true) .enableBatching(true) .topic(topic) .create()) { consumer.pause(); byte[] message = "foo".getBytes(StandardCharsets.UTF_8); List<CompletableFuture<?>> futures = new ArrayList<>(); for (int i = 0; i < numMessages; i++) { futures.add(producer.sendAsync(message).whenComplete( (id,e) -> { if (e != null) { log.error("error", e); } })); if (futures.size() == 1000) { FutureUtil.waitForAll(futures).get(); futures.clear(); } } producer.flush(); consumer.resume(); assertTrue(count.await(20, TimeUnit.SECONDS)); } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
