zebehringer opened a new issue, #21794: URL: https://github.com/apache/pulsar/issues/21794
### Search before asking - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Version M1 macOS Sonoma + docker desktop 4.22.1, image: apachepulsar/pulsar:3.1.1, Java pulsar-client:3.1.1 also on linux/k8s ### Minimal reproduce step using the following Java code and a local docker container running the apachepulsar/pulsar:3.1.1 image calling `runTest(50000, 5000, 1000000)` consistently times out a lot (returning partial batches), mixed with a few quickly received full batches calling `runTest(20000, 5000, 1000000)` never has a timeout/everything is received quickly ``` public static void runTest( final int batchSize, final long batchTimeoutMs, final int totalMessages ) throws Exception { final PulsarClient client = PulsarClient.builder() .serviceUrl("pulsar://localhost:6650") .ioThreads(8) .connectionsPerBroker(8) .build(); final Producer<byte[]> producer = client.newProducer() .topic("persistent://public/default/test") .compressionType(CompressionType.LZ4) .messageRoutingMode(MessageRoutingMode.RoundRobinPartition) .enableBatching(true) .maxPendingMessages(2000) .batchingMaxMessages(1000) .blockIfQueueFull(true).create(); final Consumer<byte[]> consumer = client.newConsumer() .topic("persistent://public/default/test") .subscriptionName("test2") .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscriptionType(SubscriptionType.Shared) .ackTimeout(batchTimeoutMs + 60000, TimeUnit.MILLISECONDS) .deadLetterPolicy( DeadLetterPolicy.builder() .maxRedeliverCount(10) .build() ) .batchReceivePolicy( BatchReceivePolicy.builder() .maxNumMessages(batchSize) .timeout((int)batchTimeoutMs, TimeUnit.MILLISECONDS) .build() ) .receiverQueueSize(50000) .subscribe(); for (int i = 0; i < totalMessages; i++) { producer.sendAsync("item $i".getBytes()); } int remainder = totalMessages; int iteration = 1; while (remainder > batchSize) { // compare to batchSize instead of zero since the last batch may not be full and would timeout, throwing off the test metric being observed long start = System.currentTimeMillis(); final Messages<byte[]> batch = consumer.batchReceive(); System.out.println("batch " + iteration + " size = " + batch.size() + ", time = " + (System.currentTimeMillis() - start)); try { Thread.sleep(100); } catch (InterruptedException ignored) {} consumer.acknowledge(batch); remainder -= batch.size(); iteration += 1; } // clean out the remaining messages while (remainder > 0) { final Messages<byte[]> batch = consumer.batchReceive(); consumer.acknowledge(batch); remainder -= batch.size(); } client.close(); } ``` ### What did you expect to see? I expect to see no timeouts while the topic has more messages in the backlog than _batchSize_ I assume this something to do with some combination of `blockedSubscriptionOnUnackedMsgs`, `unackedMessages` and `availablePermits`. I read somewhere that if the number of `unackedMessages` is greater than 50000, flow to the consumer will stop, but the messages are ACKed very soon after being received, so I would expect flow to start again, and for the receive queue to fill up again before the timeout. It appears that I have to wait for a full timeout, or sometimes two, in order for the receive queue to fill up. ### What did you see instead? timeouts occur intermittently, more frequently for larger _batchSize_ parameter values ### Anything else? I want to use a large batchSize together with a long timeout threshold because this process writes message batches to file for long term storage, and I do not want a bunch of small files. ### Are you willing to submit a PR? - [ ] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
