Vanlightly commented on a change in pull request #11691:
URL: https://github.com/apache/pulsar/pull/11691#discussion_r693802906



##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -865,32 +863,44 @@ private void pendingBatchReceiveTask(Timeout timeout) 
throws Exception {
             if (getState() == State.Closing || getState() == State.Closed) {
                 return;
             }
-            if (pendingBatchReceives == null) {
-                pendingBatchReceives = Queues.newConcurrentLinkedQueue();
-            }
-            OpBatchReceive<T> firstOpBatchReceive = peekNextBatchReceive();
+
             timeToWaitMs = batchReceivePolicy.getTimeoutMs();
+            OpBatchReceive<T> opBatchReceive = pendingBatchReceives.peek();
 
-            while (firstOpBatchReceive != null) {
+            while (opBatchReceive != null) {
                 // If there is at least one batch receive, calculate the diff 
between the batch receive timeout
                 // and the elapsed time since the operation was created.
                 long diff = batchReceivePolicy.getTimeoutMs()
-                        - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
firstOpBatchReceive.createdAt);
+                        - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
opBatchReceive.createdAt);
+
                 if (diff <= 0) {
-                    // The diff is less than or equal to zero, meaning that 
the batch receive has been timed out.
-                    // complete the OpBatchReceive and continue to check the 
next OpBatchReceive in pendingBatchReceives.
-                    OpBatchReceive<T> op = pollNextBatchReceive();
-                    if (op != null) {
-                        completeOpBatchReceive(op);
+                    completeOpBatchReceive(opBatchReceive);
+
+                    // remove the peeked item from the queue
+                    OpBatchReceive<T> removed = pendingBatchReceives.poll();
+
+                    if (removed != opBatchReceive) {
+                        // regression check, if this were to happen due to 
incorrect code changes in the future,
+                        // (allowing multi-threaded calls to poll()), then 
ensure that the polled item is completed
+                        // to avoid blocking user code
+
+                        log.error("Race condition in consumer {} (should not 
cause data loss). "
+                                + " Concurrent operations on 
pendingBatchReceives is not safe", this.consumerName);
+                        if (!removed.future.isDone()) {

Review comment:
       Yes absolutely, good catch, fixing now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to