codelipenghui commented on a change in pull request #11691: URL: https://github.com/apache/pulsar/pull/11691#discussion_r693297930
########## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java ########## @@ -992,9 +1002,9 @@ protected void clearIncomingMessages() { private ExecutorService getExecutor(Message<T> msg) { ConsumerImpl receivedConsumer = (msg instanceof TopicMessageImpl) ? ((TopicMessageImpl) msg).receivedByconsumer : null; - ExecutorService executor = receivedConsumer != null && receivedConsumer.pinnedExecutor != null - ? receivedConsumer.pinnedExecutor - : pinnedExecutor; + ExecutorService executor = receivedConsumer != null && receivedConsumer.externalPinnedExecutor != null + ? receivedConsumer.externalPinnedExecutor + : externalPinnedExecutor; Review comment: Looks like we need to have 2 methods for getting an internal executor and an external executor. We have 2 places use this method, one is `completePendingReceive()` method, it should use the internal executor and another one is `triggerListener()`, it should use the external executor. ########## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java ########## @@ -1220,19 +1208,19 @@ void notifyPendingReceivedCallback(final Message<T> message, Exception exception } // fetch receivedCallback from queue - final CompletableFuture<Message<T>> receivedFuture = pollPendingReceive(); + final CompletableFuture<Message<T>> receivedFuture = nextPendingReceive(); if (receivedFuture == null) { return; } if (exception != null) { - pinnedExecutor.execute(() -> receivedFuture.completeExceptionally(exception)); + externalPinnedExecutor.execute(() -> receivedFuture.completeExceptionally(exception)); Review comment: It should be internalPinnedExecutor? ########## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java ########## @@ -2098,7 +2086,7 @@ private void internalGetLastMessageIdAsync(final Backoff backoff, return; } - pinnedExecutor.schedule(() -> { + externalPinnedExecutor.schedule(() -> { Review comment: It should be the internalPinnedExecutor. ########## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java ########## @@ -1132,7 +1120,7 @@ private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata m // Lazy task scheduling to expire incomplete chunk message if (!expireChunkMessageTaskScheduled && expireTimeOfIncompleteChunkedMessageMillis > 0) { - pinnedExecutor.scheduleAtFixedRate(() -> { + externalPinnedExecutor.scheduleAtFixedRate(() -> { Review comment: Looks this is an issue not related to this PR want to fix, I think here should be internalPinnedExecutor because here will not call user's code, just process internally of the consumer. ########## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java ########## @@ -865,32 +863,44 @@ private void pendingBatchReceiveTask(Timeout timeout) throws Exception { if (getState() == State.Closing || getState() == State.Closed) { return; } - if (pendingBatchReceives == null) { - pendingBatchReceives = Queues.newConcurrentLinkedQueue(); - } - OpBatchReceive<T> firstOpBatchReceive = peekNextBatchReceive(); + timeToWaitMs = batchReceivePolicy.getTimeoutMs(); + OpBatchReceive<T> opBatchReceive = pendingBatchReceives.peek(); - while (firstOpBatchReceive != null) { + while (opBatchReceive != null) { // If there is at least one batch receive, calculate the diff between the batch receive timeout // and the elapsed time since the operation was created. long diff = batchReceivePolicy.getTimeoutMs() - - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - firstOpBatchReceive.createdAt); + - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - opBatchReceive.createdAt); + if (diff <= 0) { - // The diff is less than or equal to zero, meaning that the batch receive has been timed out. - // complete the OpBatchReceive and continue to check the next OpBatchReceive in pendingBatchReceives. - OpBatchReceive<T> op = pollNextBatchReceive(); - if (op != null) { - completeOpBatchReceive(op); + completeOpBatchReceive(opBatchReceive); + + // remove the peeked item from the queue + OpBatchReceive<T> removed = pendingBatchReceives.poll(); + + if (removed != opBatchReceive) { + // regression check, if this were to happen due to incorrect code changes in the future, + // (allowing multi-threaded calls to poll()), then ensure that the polled item is completed + // to avoid blocking user code + + log.error("Race condition in consumer {} (should not cause data loss). " + + " Concurrent operations on pendingBatchReceives is not safe", this.consumerName); + if (!removed.future.isDone()) { Review comment: The `removed` can be null here? ########## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java ########## @@ -244,29 +230,52 @@ protected void completePendingReceive(CompletableFuture<Message<T>> receivedFutu }); } - protected void failPendingReceives(ConcurrentLinkedQueue<CompletableFuture<Message<T>>> pendingReceives) { + protected CompletableFuture<Void> failPendingReceive() { + if (internalPinnedExecutor.isShutdown()) { + // we need to fail any pending receives no matter what, + // to avoid blocking user code + failPendingReceives(); + failPendingBatchReceives(); + return CompletableFuture.completedFuture(null); + } else { + CompletableFuture<Void> future = new CompletableFuture<>(); + internalPinnedExecutor.execute(() -> { + try { + failPendingReceives(); + failPendingBatchReceives(); + } finally { + future.complete(null); + } + }); + return future; + } + } + + protected void failPendingReceives() { while (!pendingReceives.isEmpty()) { CompletableFuture<Message<T>> receiveFuture = pendingReceives.poll(); if (receiveFuture == null) { break; } if (!receiveFuture.isDone()) { receiveFuture.completeExceptionally( - new PulsarClientException.AlreadyClosedException(String.format("The consumer which subscribes the topic %s with subscription name %s " + + new PulsarClientException.AlreadyClosedException( + String.format("The consumer which subscribes the topic %s with subscription name %s " + "was already closed when cleaning and closing the consumers", topic, subscription))); } } } - protected void failPendingBatchReceives(ConcurrentLinkedQueue<OpBatchReceive<T>> pendingBatchReceives) { - while (!pendingBatchReceives.isEmpty()) { - OpBatchReceive<T> opBatchReceive = pendingBatchReceives.poll(); + protected void failPendingBatchReceives() { Review comment: ```suggestion private void failPendingBatchReceives() { ``` ########## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java ########## @@ -1220,19 +1208,19 @@ void notifyPendingReceivedCallback(final Message<T> message, Exception exception } // fetch receivedCallback from queue - final CompletableFuture<Message<T>> receivedFuture = pollPendingReceive(); + final CompletableFuture<Message<T>> receivedFuture = nextPendingReceive(); if (receivedFuture == null) { return; } if (exception != null) { - pinnedExecutor.execute(() -> receivedFuture.completeExceptionally(exception)); + externalPinnedExecutor.execute(() -> receivedFuture.completeExceptionally(exception)); return; } if (message == null) { IllegalStateException e = new IllegalStateException("received message can't be null"); - pinnedExecutor.execute(() -> receivedFuture.completeExceptionally(e)); + externalPinnedExecutor.execute(() -> receivedFuture.completeExceptionally(e)); Review comment: It should be internalPinnedExecutor? ########## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java ########## @@ -244,29 +230,52 @@ protected void completePendingReceive(CompletableFuture<Message<T>> receivedFutu }); } - protected void failPendingReceives(ConcurrentLinkedQueue<CompletableFuture<Message<T>>> pendingReceives) { + protected CompletableFuture<Void> failPendingReceive() { + if (internalPinnedExecutor.isShutdown()) { + // we need to fail any pending receives no matter what, + // to avoid blocking user code + failPendingReceives(); + failPendingBatchReceives(); + return CompletableFuture.completedFuture(null); + } else { + CompletableFuture<Void> future = new CompletableFuture<>(); + internalPinnedExecutor.execute(() -> { + try { + failPendingReceives(); + failPendingBatchReceives(); + } finally { + future.complete(null); + } + }); + return future; + } + } + + protected void failPendingReceives() { Review comment: ```suggestion private void failPendingReceives() { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org