heesung-sn commented on issue #23908:
URL: https://github.com/apache/pulsar/issues/23908#issuecomment-2741291281

   I am still unable to reproduce this issue. 
   I tried the followings.
   
   1. switch pulsar to 4.0.3 branch
   ```
   commit e5aa85f0afb65ca002881efd6f6f52f63b17db15 (HEAD, tag: 
v4.0.3-candidate-2, tag: v4.0.3)
   Author: guan46 <[email protected]>
   Date:   Tue Feb 25 19:08:33 2025 +0800
   
       [improve][ml] Use lock-free queue in InflightReadsLimiter since there's 
no concurrent access  (#23962)
   
       (cherry picked from commit 38a41e0d29192d0e29cc172cccf6c187cf7cb542)
   
   ```
   
   
   2. update producer perf code to give back pressure when too many pending 
messages
   
   
   ```
   hsohn@HeesungSohns-MacBook-Pro pulsar % git diff 
pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
   diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
   index ba5be3a3c4..68b4c2b92b 100644
   --- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
   +++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
   @@ -549,6 +549,7 @@ public class PerformanceProducer extends 
PerformanceTopicListArguments{
                    }
                }
                // Send messages on all topics/producers
   +            AtomicLong totalPending = new AtomicLong(0);
                AtomicLong totalSent = new AtomicLong(0);
                AtomicLong numMessageSend = new AtomicLong(0);
                Semaphore numMsgPerTxnLimit = new 
Semaphore(this.numMessagesPerTransaction);
   @@ -576,6 +577,13 @@ public class PerformanceProducer extends 
PerformanceTopicListArguments{
                                break;
                            }
                        }
   +                    long totalPendingCount = totalPending.get();
   +                    if (totalPendingCount > msgRate * 2L) {
   +                        log.info("{} totalPendingCount:{} > msgRate:{}, 
sleeping 1 sec!",
   +                                producer.getProducerName(), 
totalPendingCount, msgRate);
   +                        Thread.sleep(1000);
   +                        continue;
   +                    }
                        rateLimiter.acquire();
                        //if transaction is disable, transaction will be null.
                        Transaction transaction = 
transactionAtomicReference.get();
   @@ -626,7 +634,9 @@ public class PerformanceProducer extends 
PerformanceTopicListArguments{
                            messageBuilder.key(String.valueOf(totalSent.get()));
                        }
                        PulsarClient pulsarClient = client;
   +                    totalPending.incrementAndGet();
                        messageBuilder.sendAsync().thenRun(() -> {
   +                        totalPending.decrementAndGet();
                            bytesSent.add(payloadData.length);
                            messagesSent.increment();
                            totalSent.incrementAndGet();
   @@ -647,6 +657,7 @@ public class PerformanceProducer extends 
PerformanceTopicListArguments{
                            }
                            log.warn("Write message error with exception", ex);
                            messagesFailed.increment();
   +                        totalPending.decrementAndGet();
                            if (this.exitOnFailure) {
                                PerfClientUtils.exit(1);
                            }
   ```
   
   3. build code and test image
   ```
   mvn clean install -DskipTests
   # setting DOCKER_CLI_EXPERIMENTAL=enabled is required in some environments 
with older docker versions
   export DOCKER_CLI_EXPERIMENTAL=enabled
   mvn package -Pdocker,-main -am -pl docker/pulsar-all -DskipTests
   ```
   
   
   4. init cluster dirs on pulsar dir
   rm -rf data
   sudo mkdir -p ./data/zookeeper ./data/bookkeeper-1 ./data/bookkeeper-2 
./data/bookkeeper-3
   sudo chown -R 10000 data
   sudo chmod -R 777 data
   
   5. start the cluster
   
[compose.yaml.txt](https://github.com/user-attachments/files/19373025/compose.yaml.txt)
   docker compose up -d
   
   
   6. run consumer on docker exec -it pulsar-client-1 /bin/bash
   
   bin/pulsar-admin --admin-url http://broker-1:8080 tenants create my-tenant
   bin/pulsar-admin --admin-url http://broker-1:8080 namespaces create 
my-tenant/my-namespace2 --bundles 5 
   bin/pulsar-admin --admin-url http://broker-1:8080 topics 
create-partitioned-topic persistent://my-tenant/my-namespace2/my-topic -p 100
   bin/pulsar-perf consume -st Key_Shared 
persistent://my-tenant/my-namespace2/my-topic -n 4 --service-url 
pulsar://broker-1:6650
   
   7. run producer on docker exec -it pulsar-client-2 /bin/bash
   bin/pulsar-perf produce persistent://my-tenant/my-namespace2/my-topic -r 
5000 -bm 1 -mk random -n 1 --service-url pulsar://broker-1:6650
   
   
   8. restart bookie 1 and bookie2
   
   9. confirm no lag on client-1
   
   pulsar-admin --admin-url http://broker-1:8080 topics 
partitioned-stats-internal persistent://my-tenant/my-namespace2/my-topic | awk 
-F'[:,]' '/markDeletePosition/ {md1=$2; md2=$3} /readPosition/ {rp1=$2; rp2=$3; 
print (md1!=rp1 || rp2-md2>200) ? "Lag too high! markDeletePosition: " md1 ":" 
md2 " readPosition: " rp1 ":" rp2 : "OK markDeletePosition: " md1 ":" md2 " 
readPosition: " rp1 ":" rp2}'
   
   10. cluster shutdown
    docker compose down


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to