heesung-sn commented on issue #23908: URL: https://github.com/apache/pulsar/issues/23908#issuecomment-2741291281
I am still unable to reproduce this issue. I tried the followings. 1. switch pulsar to 4.0.3 branch ``` commit e5aa85f0afb65ca002881efd6f6f52f63b17db15 (HEAD, tag: v4.0.3-candidate-2, tag: v4.0.3) Author: guan46 <[email protected]> Date: Tue Feb 25 19:08:33 2025 +0800 [improve][ml] Use lock-free queue in InflightReadsLimiter since there's no concurrent access (#23962) (cherry picked from commit 38a41e0d29192d0e29cc172cccf6c187cf7cb542) ``` 2. update producer perf code to give back pressure when too many pending messages ``` hsohn@HeesungSohns-MacBook-Pro pulsar % git diff pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java index ba5be3a3c4..68b4c2b92b 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java @@ -549,6 +549,7 @@ public class PerformanceProducer extends PerformanceTopicListArguments{ } } // Send messages on all topics/producers + AtomicLong totalPending = new AtomicLong(0); AtomicLong totalSent = new AtomicLong(0); AtomicLong numMessageSend = new AtomicLong(0); Semaphore numMsgPerTxnLimit = new Semaphore(this.numMessagesPerTransaction); @@ -576,6 +577,13 @@ public class PerformanceProducer extends PerformanceTopicListArguments{ break; } } + long totalPendingCount = totalPending.get(); + if (totalPendingCount > msgRate * 2L) { + log.info("{} totalPendingCount:{} > msgRate:{}, sleeping 1 sec!", + producer.getProducerName(), totalPendingCount, msgRate); + Thread.sleep(1000); + continue; + } rateLimiter.acquire(); //if transaction is disable, transaction will be null. Transaction transaction = transactionAtomicReference.get(); @@ -626,7 +634,9 @@ public class PerformanceProducer extends PerformanceTopicListArguments{ messageBuilder.key(String.valueOf(totalSent.get())); } PulsarClient pulsarClient = client; + totalPending.incrementAndGet(); messageBuilder.sendAsync().thenRun(() -> { + totalPending.decrementAndGet(); bytesSent.add(payloadData.length); messagesSent.increment(); totalSent.incrementAndGet(); @@ -647,6 +657,7 @@ public class PerformanceProducer extends PerformanceTopicListArguments{ } log.warn("Write message error with exception", ex); messagesFailed.increment(); + totalPending.decrementAndGet(); if (this.exitOnFailure) { PerfClientUtils.exit(1); } ``` 3. build code and test image ``` mvn clean install -DskipTests # setting DOCKER_CLI_EXPERIMENTAL=enabled is required in some environments with older docker versions export DOCKER_CLI_EXPERIMENTAL=enabled mvn package -Pdocker,-main -am -pl docker/pulsar-all -DskipTests ``` 4. init cluster dirs on pulsar dir rm -rf data sudo mkdir -p ./data/zookeeper ./data/bookkeeper-1 ./data/bookkeeper-2 ./data/bookkeeper-3 sudo chown -R 10000 data sudo chmod -R 777 data 5. start the cluster [compose.yaml.txt](https://github.com/user-attachments/files/19373025/compose.yaml.txt) docker compose up -d 6. run consumer on docker exec -it pulsar-client-1 /bin/bash bin/pulsar-admin --admin-url http://broker-1:8080 tenants create my-tenant bin/pulsar-admin --admin-url http://broker-1:8080 namespaces create my-tenant/my-namespace2 --bundles 5 bin/pulsar-admin --admin-url http://broker-1:8080 topics create-partitioned-topic persistent://my-tenant/my-namespace2/my-topic -p 100 bin/pulsar-perf consume -st Key_Shared persistent://my-tenant/my-namespace2/my-topic -n 4 --service-url pulsar://broker-1:6650 7. run producer on docker exec -it pulsar-client-2 /bin/bash bin/pulsar-perf produce persistent://my-tenant/my-namespace2/my-topic -r 5000 -bm 1 -mk random -n 1 --service-url pulsar://broker-1:6650 8. restart bookie 1 and bookie2 9. confirm no lag on client-1 pulsar-admin --admin-url http://broker-1:8080 topics partitioned-stats-internal persistent://my-tenant/my-namespace2/my-topic | awk -F'[:,]' '/markDeletePosition/ {md1=$2; md2=$3} /readPosition/ {rp1=$2; rp2=$3; print (md1!=rp1 || rp2-md2>200) ? "Lag too high! markDeletePosition: " md1 ":" md2 " readPosition: " rp1 ":" rp2 : "OK markDeletePosition: " md1 ":" md2 " readPosition: " rp1 ":" rp2}' 10. cluster shutdown docker compose down -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
