TakaHiR07 commented on code in PR #23551:
URL: https://github.com/apache/pulsar/pull/23551#discussion_r1830316831


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java:
##########
@@ -237,26 +240,25 @@ public void onResponse(OpRequestSend op) {
     }
 
     private void checkPendingRequests() {
-        while (true) {
-            int permits = REQUEST_CREDITS_UPDATER.get(this);
-            if (permits > 0 && pendingRequests.peek() != null) {
-                if (REQUEST_CREDITS_UPDATER.compareAndSet(this, permits, 
permits - 1)) {
-                    OpRequestSend polled = pendingRequests.poll();
-                    if (polled != null) {
-                        CompletableFuture<ClientCnx> clientCnx = 
getClientCnx(polled.topic);
-                        if (polled.cnx != clientCnx) {
-                            OpRequestSend invalid = polled;
-                            polled = OpRequestSend.create(invalid.requestId, 
invalid.topic, invalid.cmd, invalid.cb,
-                                    clientCnx);
-                            invalid.recycle();
-                        }
-                        endTxn(polled);
-                    } else {
-                        REQUEST_CREDITS_UPDATER.incrementAndGet(this);
+        int permits = REQUEST_CREDITS_UPDATER.get(this);
+        if (permits > 0 && pendingRequests.peek() != null) {
+            if (REQUEST_CREDITS_UPDATER.compareAndSet(this, permits, permits - 
1)) {
+                OpRequestSend polled = pendingRequests.poll();
+                if (polled != null) {
+                    CompletableFuture<ClientCnx> clientCnx = 
getClientCnx(polled.topic);
+                    if (polled.cnx != clientCnx) {
+                        OpRequestSend invalid = polled;
+                        polled = OpRequestSend.create(invalid.requestId, 
invalid.topic, invalid.cmd, invalid.cb,
+                                clientCnx);
+                        invalid.recycle();
                     }
+                    endTxn(polled);
+                } else {
+                    REQUEST_CREDITS_UPDATER.incrementAndGet(this);
+                    checkPendingRequests();
                 }
             } else {
-                break;
+                checkPendingRequests();

Review Comment:
   Because the while loop looks somewhat confusing, it look like 1 response can 
trigger taking multiple requests from pendingRequest. But actually it is 
executed when its permit > 0 . And permit would +1 when 1 response return. 
   
   And let's assume a scene, 100 responses come back at the same time, both of 
them would go into while loop and compete for acquire the permits. Now the 
permits is 100 and pendingRequestQueue is also 100, then all of the responses 
would compete for taking all requests from queue. That is not neccessary. 
   
   So I replace to 1 response trigger take 1 request from pendingRequestQueue.  
And only two case we should do recursion to retry:
   - If the request taking is null, we should retry checkPendingRequest() to 
take the next one. 
   - If permit.compareAndSet is not successful, it means there is concurrent 
checkRequestCredits() or checkPendingRequest(), we should retry 
checkPendingRequest() again to make sure response can trigger taking 1  request 
from pendingRequestQueue
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to