TakaHiR07 commented on code in PR #23551:
URL: https://github.com/apache/pulsar/pull/23551#discussion_r1830316831
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java:
##########
@@ -237,26 +240,25 @@ public void onResponse(OpRequestSend op) {
}
private void checkPendingRequests() {
- while (true) {
- int permits = REQUEST_CREDITS_UPDATER.get(this);
- if (permits > 0 && pendingRequests.peek() != null) {
- if (REQUEST_CREDITS_UPDATER.compareAndSet(this, permits,
permits - 1)) {
- OpRequestSend polled = pendingRequests.poll();
- if (polled != null) {
- CompletableFuture<ClientCnx> clientCnx =
getClientCnx(polled.topic);
- if (polled.cnx != clientCnx) {
- OpRequestSend invalid = polled;
- polled = OpRequestSend.create(invalid.requestId,
invalid.topic, invalid.cmd, invalid.cb,
- clientCnx);
- invalid.recycle();
- }
- endTxn(polled);
- } else {
- REQUEST_CREDITS_UPDATER.incrementAndGet(this);
+ int permits = REQUEST_CREDITS_UPDATER.get(this);
+ if (permits > 0 && pendingRequests.peek() != null) {
+ if (REQUEST_CREDITS_UPDATER.compareAndSet(this, permits, permits -
1)) {
+ OpRequestSend polled = pendingRequests.poll();
+ if (polled != null) {
+ CompletableFuture<ClientCnx> clientCnx =
getClientCnx(polled.topic);
+ if (polled.cnx != clientCnx) {
+ OpRequestSend invalid = polled;
+ polled = OpRequestSend.create(invalid.requestId,
invalid.topic, invalid.cmd, invalid.cb,
+ clientCnx);
+ invalid.recycle();
}
+ endTxn(polled);
+ } else {
+ REQUEST_CREDITS_UPDATER.incrementAndGet(this);
+ checkPendingRequests();
}
} else {
- break;
+ checkPendingRequests();
Review Comment:
Because the while loop is somewhat confusing, it look like 1 response can
trigger taking multiple requests from pendingRequest. But actually it is
executed when its permit > 0 . And permit would +1 when 1 response return.
So if different responses come back at the same time, both of them would go
into while loop and compete for acquire the permits, until the
pendingRequestQueue become null. That is not neccessary.
So I replace to 1 response trigger take 1 request from pendingRequestQueue.
And only two case we should do recursion to retry:
- If the request taking is null, we should retry checkPendingRequest() to
take the next one.
- If permit.compareAndSet is not ok, it means there is concurrent
checkRequestCredits() or checkPendingRequest(), we should retry
checkPendingRequest() again to make sure response can trigger taking 1 request
from pendingRequestQueue
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]