codelipenghui commented on a change in pull request #11691:
URL: https://github.com/apache/pulsar/pull/11691#discussion_r693297930



##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -992,9 +1002,9 @@ protected void clearIncomingMessages() {
     private ExecutorService getExecutor(Message<T> msg) {
         ConsumerImpl receivedConsumer = (msg instanceof TopicMessageImpl) ? 
((TopicMessageImpl) msg).receivedByconsumer
                 : null;
-        ExecutorService executor = receivedConsumer != null && 
receivedConsumer.pinnedExecutor != null
-                ? receivedConsumer.pinnedExecutor
-                : pinnedExecutor;
+        ExecutorService executor = receivedConsumer != null && 
receivedConsumer.externalPinnedExecutor != null
+                ? receivedConsumer.externalPinnedExecutor
+                : externalPinnedExecutor;

Review comment:
       Looks like we need to have 2 methods for getting an internal executor 
and an external executor. We have 2 places use this method, one is 
`completePendingReceive()` method, it should use the internal executor and 
another one is `triggerListener()`, it should use the external executor.

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1220,19 +1208,19 @@ void notifyPendingReceivedCallback(final Message<T> 
message, Exception exception
         }
 
         // fetch receivedCallback from queue
-        final CompletableFuture<Message<T>> receivedFuture = 
pollPendingReceive();
+        final CompletableFuture<Message<T>> receivedFuture = 
nextPendingReceive();
         if (receivedFuture == null) {
             return;
         }
 
         if (exception != null) {
-            pinnedExecutor.execute(() -> 
receivedFuture.completeExceptionally(exception));
+            externalPinnedExecutor.execute(() -> 
receivedFuture.completeExceptionally(exception));

Review comment:
       It should be internalPinnedExecutor?

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -2098,7 +2086,7 @@ private void internalGetLastMessageIdAsync(final Backoff 
backoff,
                 return;
             }
 
-            pinnedExecutor.schedule(() -> {
+            externalPinnedExecutor.schedule(() -> {

Review comment:
       It should be the internalPinnedExecutor.

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1132,7 +1120,7 @@ private ByteBuf processMessageChunk(ByteBuf 
compressedPayload, MessageMetadata m
 
         // Lazy task scheduling to expire incomplete chunk message
         if (!expireChunkMessageTaskScheduled && 
expireTimeOfIncompleteChunkedMessageMillis > 0) {
-            pinnedExecutor.scheduleAtFixedRate(() -> {
+            externalPinnedExecutor.scheduleAtFixedRate(() -> {

Review comment:
       Looks this is an issue not related to this PR want to fix, I think here 
should be internalPinnedExecutor because here will not call user's code, just 
process internally of the consumer.

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -865,32 +863,44 @@ private void pendingBatchReceiveTask(Timeout timeout) 
throws Exception {
             if (getState() == State.Closing || getState() == State.Closed) {
                 return;
             }
-            if (pendingBatchReceives == null) {
-                pendingBatchReceives = Queues.newConcurrentLinkedQueue();
-            }
-            OpBatchReceive<T> firstOpBatchReceive = peekNextBatchReceive();
+
             timeToWaitMs = batchReceivePolicy.getTimeoutMs();
+            OpBatchReceive<T> opBatchReceive = pendingBatchReceives.peek();
 
-            while (firstOpBatchReceive != null) {
+            while (opBatchReceive != null) {
                 // If there is at least one batch receive, calculate the diff 
between the batch receive timeout
                 // and the elapsed time since the operation was created.
                 long diff = batchReceivePolicy.getTimeoutMs()
-                        - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
firstOpBatchReceive.createdAt);
+                        - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
opBatchReceive.createdAt);
+
                 if (diff <= 0) {
-                    // The diff is less than or equal to zero, meaning that 
the batch receive has been timed out.
-                    // complete the OpBatchReceive and continue to check the 
next OpBatchReceive in pendingBatchReceives.
-                    OpBatchReceive<T> op = pollNextBatchReceive();
-                    if (op != null) {
-                        completeOpBatchReceive(op);
+                    completeOpBatchReceive(opBatchReceive);
+
+                    // remove the peeked item from the queue
+                    OpBatchReceive<T> removed = pendingBatchReceives.poll();
+
+                    if (removed != opBatchReceive) {
+                        // regression check, if this were to happen due to 
incorrect code changes in the future,
+                        // (allowing multi-threaded calls to poll()), then 
ensure that the polled item is completed
+                        // to avoid blocking user code
+
+                        log.error("Race condition in consumer {} (should not 
cause data loss). "
+                                + " Concurrent operations on 
pendingBatchReceives is not safe", this.consumerName);
+                        if (!removed.future.isDone()) {

Review comment:
       The `removed` can be null here?

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -244,29 +230,52 @@ protected void 
completePendingReceive(CompletableFuture<Message<T>> receivedFutu
         });
     }
 
-    protected void 
failPendingReceives(ConcurrentLinkedQueue<CompletableFuture<Message<T>>> 
pendingReceives) {
+    protected CompletableFuture<Void> failPendingReceive() {
+        if (internalPinnedExecutor.isShutdown()) {
+            // we need to fail any pending receives no matter what,
+            // to avoid blocking user code
+            failPendingReceives();
+            failPendingBatchReceives();
+            return CompletableFuture.completedFuture(null);
+        } else {
+            CompletableFuture<Void> future = new CompletableFuture<>();
+            internalPinnedExecutor.execute(() -> {
+                try {
+                    failPendingReceives();
+                    failPendingBatchReceives();
+                } finally {
+                    future.complete(null);
+                }
+            });
+            return future;
+        }
+    }
+
+    protected void failPendingReceives() {
         while (!pendingReceives.isEmpty()) {
             CompletableFuture<Message<T>> receiveFuture = 
pendingReceives.poll();
             if (receiveFuture == null) {
                 break;
             }
             if (!receiveFuture.isDone()) {
                 receiveFuture.completeExceptionally(
-                        new 
PulsarClientException.AlreadyClosedException(String.format("The consumer which 
subscribes the topic %s with subscription name %s " +
+                        new PulsarClientException.AlreadyClosedException(
+                                String.format("The consumer which subscribes 
the topic %s with subscription name %s " +
                                 "was already closed when cleaning and closing 
the consumers", topic, subscription)));
             }
         }
     }
 
-    protected void 
failPendingBatchReceives(ConcurrentLinkedQueue<OpBatchReceive<T>> 
pendingBatchReceives) {
-        while (!pendingBatchReceives.isEmpty()) {
-            OpBatchReceive<T> opBatchReceive = pendingBatchReceives.poll();
+    protected void failPendingBatchReceives() {

Review comment:
       ```suggestion
       private void failPendingBatchReceives() {
   ```

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1220,19 +1208,19 @@ void notifyPendingReceivedCallback(final Message<T> 
message, Exception exception
         }
 
         // fetch receivedCallback from queue
-        final CompletableFuture<Message<T>> receivedFuture = 
pollPendingReceive();
+        final CompletableFuture<Message<T>> receivedFuture = 
nextPendingReceive();
         if (receivedFuture == null) {
             return;
         }
 
         if (exception != null) {
-            pinnedExecutor.execute(() -> 
receivedFuture.completeExceptionally(exception));
+            externalPinnedExecutor.execute(() -> 
receivedFuture.completeExceptionally(exception));
             return;
         }
 
         if (message == null) {
             IllegalStateException e = new IllegalStateException("received 
message can't be null");
-            pinnedExecutor.execute(() -> 
receivedFuture.completeExceptionally(e));
+            externalPinnedExecutor.execute(() -> 
receivedFuture.completeExceptionally(e));

Review comment:
       It should be internalPinnedExecutor?

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -244,29 +230,52 @@ protected void 
completePendingReceive(CompletableFuture<Message<T>> receivedFutu
         });
     }
 
-    protected void 
failPendingReceives(ConcurrentLinkedQueue<CompletableFuture<Message<T>>> 
pendingReceives) {
+    protected CompletableFuture<Void> failPendingReceive() {
+        if (internalPinnedExecutor.isShutdown()) {
+            // we need to fail any pending receives no matter what,
+            // to avoid blocking user code
+            failPendingReceives();
+            failPendingBatchReceives();
+            return CompletableFuture.completedFuture(null);
+        } else {
+            CompletableFuture<Void> future = new CompletableFuture<>();
+            internalPinnedExecutor.execute(() -> {
+                try {
+                    failPendingReceives();
+                    failPendingBatchReceives();
+                } finally {
+                    future.complete(null);
+                }
+            });
+            return future;
+        }
+    }
+
+    protected void failPendingReceives() {

Review comment:
       ```suggestion
       private void failPendingReceives() {
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to