Vanlightly commented on a change in pull request #11691: URL: https://github.com/apache/pulsar/pull/11691#discussion_r693802906
########## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java ########## @@ -865,32 +863,44 @@ private void pendingBatchReceiveTask(Timeout timeout) throws Exception { if (getState() == State.Closing || getState() == State.Closed) { return; } - if (pendingBatchReceives == null) { - pendingBatchReceives = Queues.newConcurrentLinkedQueue(); - } - OpBatchReceive<T> firstOpBatchReceive = peekNextBatchReceive(); + timeToWaitMs = batchReceivePolicy.getTimeoutMs(); + OpBatchReceive<T> opBatchReceive = pendingBatchReceives.peek(); - while (firstOpBatchReceive != null) { + while (opBatchReceive != null) { // If there is at least one batch receive, calculate the diff between the batch receive timeout // and the elapsed time since the operation was created. long diff = batchReceivePolicy.getTimeoutMs() - - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - firstOpBatchReceive.createdAt); + - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - opBatchReceive.createdAt); + if (diff <= 0) { - // The diff is less than or equal to zero, meaning that the batch receive has been timed out. - // complete the OpBatchReceive and continue to check the next OpBatchReceive in pendingBatchReceives. - OpBatchReceive<T> op = pollNextBatchReceive(); - if (op != null) { - completeOpBatchReceive(op); + completeOpBatchReceive(opBatchReceive); + + // remove the peeked item from the queue + OpBatchReceive<T> removed = pendingBatchReceives.poll(); + + if (removed != opBatchReceive) { + // regression check, if this were to happen due to incorrect code changes in the future, + // (allowing multi-threaded calls to poll()), then ensure that the polled item is completed + // to avoid blocking user code + + log.error("Race condition in consumer {} (should not cause data loss). " + + " Concurrent operations on pendingBatchReceives is not safe", this.consumerName); + if (!removed.future.isDone()) { Review comment: Yes absolutely, good catch, fixing now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org