315157973 commented on a change in pull request #11691:
URL: https://github.com/apache/pulsar/pull/11691#discussion_r690863726



##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -82,6 +79,7 @@
     protected final ConsumerInterceptors<T> interceptors;
     protected final BatchReceivePolicy batchReceivePolicy;
     protected ConcurrentLinkedQueue<OpBatchReceive<T>> pendingBatchReceives;
+    protected Lock pendingBatchLock;

Review comment:
       final

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -865,32 +842,42 @@ private void pendingBatchReceiveTask(Timeout timeout) 
throws Exception {
             if (getState() == State.Closing || getState() == State.Closed) {
                 return;
             }
-            if (pendingBatchReceives == null) {
-                pendingBatchReceives = Queues.newConcurrentLinkedQueue();
-            }
-            OpBatchReceive<T> firstOpBatchReceive = peekNextBatchReceive();
+
             timeToWaitMs = batchReceivePolicy.getTimeoutMs();
 
-            while (firstOpBatchReceive != null) {
+            while (hasNextBatchReceive()) {
                 // If there is at least one batch receive, calculate the diff 
between the batch receive timeout
                 // and the elapsed time since the operation was created.
-                long diff = batchReceivePolicy.getTimeoutMs()
-                        - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
firstOpBatchReceive.createdAt);
-                if (diff <= 0) {
-                    // The diff is less than or equal to zero, meaning that 
the batch receive has been timed out.
-                    // complete the OpBatchReceive and continue to check the 
next OpBatchReceive in pendingBatchReceives.
-                    OpBatchReceive<T> op = pollNextBatchReceive();
-                    if (op != null) {
-                        completeOpBatchReceive(op);
+
+                boolean timeoutTriggered = false;
+                OpBatchReceive<T> opBatchReceive = null;
+                try {
+                    pendingBatchLock.lock();

Review comment:
       Please put the lock outside the `try`

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -204,26 +205,11 @@ protected ConsumerBase(PulsarClientImpl client, String 
topic, ConsumerConfigurat
         }
     }
 
-    protected CompletableFuture<Message<T>> peekPendingReceive() {
-        CompletableFuture<Message<T>> receivedFuture = null;
-        while (receivedFuture == null) {
-            receivedFuture = pendingReceives.peek();
-            if (receivedFuture == null) {
-                break;
-            }
-            // skip done futures (cancelling a future could mark it done)
-            if (receivedFuture.isDone()) {
-                CompletableFuture<Message<T>> removed = pendingReceives.poll();
-                if (removed != receivedFuture) {
-                    log.error("Bug! Removed future wasn't the expected one. 
expected={} removed={}", receivedFuture, removed);
-                }
-                receivedFuture = null;
-            }
-        }
-        return receivedFuture;
+    protected boolean hasNextPendingReceive() {

Review comment:
       This is thread-unsafe. If multiple threads call this method at the same 
time, it will return true at the same time.
   Eventually cause multiple notifications

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -790,31 +778,20 @@ protected void notifyPendingBatchReceivedCallBack() {
         }
     }
 
-    private OpBatchReceive<T> peekNextBatchReceive() {
-        OpBatchReceive<T> opBatchReceive = null;
-        while (opBatchReceive == null) {
-            opBatchReceive = pendingBatchReceives.peek();
-            // no entry available
-            if (opBatchReceive == null) {
-                return null;
-            }
-            // remove entries where future is null or has been completed 
(cancel / timeout)
-            if (opBatchReceive.future == null || 
opBatchReceive.future.isDone()) {
-                OpBatchReceive<T> removed = pendingBatchReceives.poll();
-                if (removed != opBatchReceive) {
-                    log.error("Bug: Removed entry wasn't the expected one. 
expected={}, removed={}", opBatchReceive, removed);
-                }
-                opBatchReceive = null;
-            }
-        }
-        return opBatchReceive;
+    private boolean hasNextBatchReceive() {
+        return !pendingBatchReceives.isEmpty();
     }
 
 
-    private OpBatchReceive<T> pollNextBatchReceive() {
+    private OpBatchReceive<T> nextBatchReceive() {
         OpBatchReceive<T> opBatchReceive = null;
         while (opBatchReceive == null) {
-            opBatchReceive = pendingBatchReceives.poll();
+            try {
+                pendingBatchLock.lock();

Review comment:
       Please put the lock outside the `try`, otherwise the unlock will be 
executed if the lock is abnormal

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -204,26 +205,11 @@ protected ConsumerBase(PulsarClientImpl client, String 
topic, ConsumerConfigurat
         }
     }
 
-    protected CompletableFuture<Message<T>> peekPendingReceive() {
-        CompletableFuture<Message<T>> receivedFuture = null;
-        while (receivedFuture == null) {
-            receivedFuture = pendingReceives.peek();
-            if (receivedFuture == null) {
-                break;
-            }
-            // skip done futures (cancelling a future could mark it done)
-            if (receivedFuture.isDone()) {
-                CompletableFuture<Message<T>> removed = pendingReceives.poll();
-                if (removed != receivedFuture) {
-                    log.error("Bug! Removed future wasn't the expected one. 
expected={} removed={}", receivedFuture, removed);
-                }
-                receivedFuture = null;
-            }
-        }
-        return receivedFuture;
+    protected boolean hasNextPendingReceive() {

Review comment:
       This is not thread-safe. If multiple threads call this method at the 
same time, it will return true at the same time.
   Eventually cause multiple notifications

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -790,31 +778,20 @@ protected void notifyPendingBatchReceivedCallBack() {
         }
     }
 
-    private OpBatchReceive<T> peekNextBatchReceive() {
-        OpBatchReceive<T> opBatchReceive = null;
-        while (opBatchReceive == null) {
-            opBatchReceive = pendingBatchReceives.peek();
-            // no entry available
-            if (opBatchReceive == null) {
-                return null;
-            }
-            // remove entries where future is null or has been completed 
(cancel / timeout)
-            if (opBatchReceive.future == null || 
opBatchReceive.future.isDone()) {
-                OpBatchReceive<T> removed = pendingBatchReceives.poll();
-                if (removed != opBatchReceive) {
-                    log.error("Bug: Removed entry wasn't the expected one. 
expected={}, removed={}", opBatchReceive, removed);
-                }
-                opBatchReceive = null;
-            }
-        }
-        return opBatchReceive;
+    private boolean hasNextBatchReceive() {
+        return !pendingBatchReceives.isEmpty();
     }
 
 
-    private OpBatchReceive<T> pollNextBatchReceive() {
+    private OpBatchReceive<T> nextBatchReceive() {
         OpBatchReceive<T> opBatchReceive = null;
         while (opBatchReceive == null) {
-            opBatchReceive = pendingBatchReceives.poll();
+            try {
+                pendingBatchLock.lock();

Review comment:
       `pendingBatchReceives` is thread-safe, why do we need to lock it? If it 
is to make peek also thread-safe, should a read-write lock be used?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to