TakaHiR07 commented on code in PR #23551:
URL: https://github.com/apache/pulsar/pull/23551#discussion_r1830316831
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java:
##########
@@ -237,26 +240,25 @@ public void onResponse(OpRequestSend op) {
}
private void checkPendingRequests() {
- while (true) {
- int permits = REQUEST_CREDITS_UPDATER.get(this);
- if (permits > 0 && pendingRequests.peek() != null) {
- if (REQUEST_CREDITS_UPDATER.compareAndSet(this, permits,
permits - 1)) {
- OpRequestSend polled = pendingRequests.poll();
- if (polled != null) {
- CompletableFuture<ClientCnx> clientCnx =
getClientCnx(polled.topic);
- if (polled.cnx != clientCnx) {
- OpRequestSend invalid = polled;
- polled = OpRequestSend.create(invalid.requestId,
invalid.topic, invalid.cmd, invalid.cb,
- clientCnx);
- invalid.recycle();
- }
- endTxn(polled);
- } else {
- REQUEST_CREDITS_UPDATER.incrementAndGet(this);
+ int permits = REQUEST_CREDITS_UPDATER.get(this);
+ if (permits > 0 && pendingRequests.peek() != null) {
+ if (REQUEST_CREDITS_UPDATER.compareAndSet(this, permits, permits -
1)) {
+ OpRequestSend polled = pendingRequests.poll();
+ if (polled != null) {
+ CompletableFuture<ClientCnx> clientCnx =
getClientCnx(polled.topic);
+ if (polled.cnx != clientCnx) {
+ OpRequestSend invalid = polled;
+ polled = OpRequestSend.create(invalid.requestId,
invalid.topic, invalid.cmd, invalid.cb,
+ clientCnx);
+ invalid.recycle();
}
+ endTxn(polled);
+ } else {
+ REQUEST_CREDITS_UPDATER.incrementAndGet(this);
+ checkPendingRequests();
}
} else {
- break;
+ checkPendingRequests();
Review Comment:
Because the while loop looks somewhat confusing, it look like 1 response can
trigger taking multiple requests from pendingRequest. But actually it is
executed when its permit > 0 . And permit would +1 when 1 response return.
And let's assume a scene, 100 responses come back at the same time, both of
them would go into while loop and compete for acquire the permits. Now the
permits is 100 and pendingRequestQueue is also 100, then all of the responses
would compete for taking all requests from queue. That is not neccessary.
So I replace to 1 response trigger take 1 request from pendingRequestQueue.
And only two case we should do recursion to retry:
- If the request taking is null, we should retry checkPendingRequest() to
take the next one.
- If permit.compareAndSet is not successful, it means there is concurrent
checkRequestCredits() or checkPendingRequest(), we should retry
checkPendingRequest() again to make sure response can trigger taking 1 request
from pendingRequestQueue
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]