Vanlightly commented on a change in pull request #11691:
URL: https://github.com/apache/pulsar/pull/11691#discussion_r690942099



##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -204,26 +205,11 @@ protected ConsumerBase(PulsarClientImpl client, String 
topic, ConsumerConfigurat
         }
     }
 
-    protected CompletableFuture<Message<T>> peekPendingReceive() {
-        CompletableFuture<Message<T>> receivedFuture = null;
-        while (receivedFuture == null) {
-            receivedFuture = pendingReceives.peek();
-            if (receivedFuture == null) {
-                break;
-            }
-            // skip done futures (cancelling a future could mark it done)
-            if (receivedFuture.isDone()) {
-                CompletableFuture<Message<T>> removed = pendingReceives.poll();
-                if (removed != receivedFuture) {
-                    log.error("Bug! Removed future wasn't the expected one. 
expected={} removed={}", receivedFuture, removed);
-                }
-                receivedFuture = null;
-            }
-        }
-        return receivedFuture;
+    protected boolean hasNextPendingReceive() {

Review comment:
       I left these checks without a lock as they are only an indicator that 
there may be receives to be notified, but in the end, it is a call to poll() 
that counts, which is thread safe and null checks are always performed.

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -790,31 +778,20 @@ protected void notifyPendingBatchReceivedCallBack() {
         }
     }
 
-    private OpBatchReceive<T> peekNextBatchReceive() {
-        OpBatchReceive<T> opBatchReceive = null;
-        while (opBatchReceive == null) {
-            opBatchReceive = pendingBatchReceives.peek();
-            // no entry available
-            if (opBatchReceive == null) {
-                return null;
-            }
-            // remove entries where future is null or has been completed 
(cancel / timeout)
-            if (opBatchReceive.future == null || 
opBatchReceive.future.isDone()) {
-                OpBatchReceive<T> removed = pendingBatchReceives.poll();
-                if (removed != opBatchReceive) {
-                    log.error("Bug: Removed entry wasn't the expected one. 
expected={}, removed={}", opBatchReceive, removed);
-                }
-                opBatchReceive = null;
-            }
-        }
-        return opBatchReceive;
+    private boolean hasNextBatchReceive() {
+        return !pendingBatchReceives.isEmpty();
     }
 
 
-    private OpBatchReceive<T> pollNextBatchReceive() {
+    private OpBatchReceive<T> nextBatchReceive() {
         OpBatchReceive<T> opBatchReceive = null;
         while (opBatchReceive == null) {
-            opBatchReceive = pendingBatchReceives.poll();
+            try {
+                pendingBatchLock.lock();

Review comment:
       Yes, any call to `pendingBatchReceives.poll()` requires a lock in order 
to protect the peek and poll that caused the bug.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to