This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a2e3a1  [Java Client] Remove data race in MultiTopicsConsumerImpl to 
ensure correct message order (#12456)
6a2e3a1 is described below

commit 6a2e3a1ad735465154dc3fa12988c3068eae7da5
Author: Michael Marshall <michael.marsh...@datastax.com>
AuthorDate: Mon Oct 25 16:31:59 2021 -0500

    [Java Client] Remove data race in MultiTopicsConsumerImpl to ensure correct 
message order (#12456)
    
    * [Java Client] Remove data race in MultiTopicsConsumerImpl to ensure 
correct message order
    
    * Fix test
    
    * Return the checkState method call to keep original behavior
    
    * Reproduce out-of-order delivery issue in PR 12456
    
    * Remove unnecessary scheduling of receiveMessageFromConsumer
    
    Co-authored-by: Lari Hotari <lhot...@apache.org>
---
 .../pulsar/client/api/MultiTopicsConsumerTest.java | 75 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  3 +-
 .../client/impl/MultiTopicsConsumerImpl.java       | 37 ++++++-----
 .../client/impl/MultiTopicsConsumerImplTest.java   |  3 +-
 4 files changed, 98 insertions(+), 20 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
index 715f3ad..d8c8bd6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.pulsar.client.api;
 
 import static org.mockito.ArgumentMatchers.any;
@@ -24,9 +25,16 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import com.google.common.collect.Lists;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
 import lombok.Cleanup;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
@@ -34,6 +42,7 @@ import org.mockito.AdditionalAnswers;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -70,6 +79,7 @@ public class MultiTopicsConsumerTest extends 
ProducerConsumerBase {
                         // method calls on the interface.
                         
Mockito.withSettings().defaultAnswer(AdditionalAnswers.delegatesTo(internalExecutorService)));
             }
+
             @Override
             public ExecutorService getInternalExecutorService() {
                 return internalExecutorServiceDelegate;
@@ -119,4 +129,69 @@ public class MultiTopicsConsumerTest extends 
ProducerConsumerBase {
         verify(internalExecutorServiceDelegate, times(0))
                 .schedule(any(Runnable.class), anyLong(), any());
     }
+
+    // test that reproduces the issue that PR 
https://github.com/apache/pulsar/pull/12456 fixes
+    // where MultiTopicsConsumerImpl has a data race that causes out-of-order 
delivery of messages
+    @Test
+    public void 
testShouldMaintainOrderForIndividualTopicInMultiTopicsConsumer()
+            throws PulsarAdminException, PulsarClientException, 
ExecutionException, InterruptedException,
+            TimeoutException {
+        String topicName = newTopicName();
+        int numPartitions = 2;
+        int numMessages = 100000;
+        admin.topics().createPartitionedTopic(topicName, numPartitions);
+
+        Producer<Long>[] producers = new Producer[numPartitions];
+
+        for (int i = 0; i < numPartitions; i++) {
+            producers[i] = pulsarClient.newProducer(Schema.INT64)
+                    // produce to each partition directly so that order can be 
maintained in sending
+                    .topic(topicName + "-partition-" + i)
+                    .enableBatching(true)
+                    .maxPendingMessages(30000)
+                    .maxPendingMessagesAcrossPartitions(60000)
+                    .batchingMaxMessages(10000)
+                    .batchingMaxPublishDelay(5, TimeUnit.SECONDS)
+                    .batchingMaxBytes(4 * 1024 * 1024)
+                    .blockIfQueueFull(true)
+                    .create();
+        }
+
+        @Cleanup
+        Consumer<Long> consumer = pulsarClient
+                .newConsumer(Schema.INT64)
+                // consume on the partitioned topic
+                .topic(topicName)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .receiverQueueSize(numMessages)
+                .subscriptionName(methodName)
+                .subscribe();
+
+        // produce sequence numbers to each partition topic
+        long sequenceNumber = 1L;
+        for (int i = 0; i < numMessages; i++) {
+            for (Producer<Long> producer : producers) {
+                producer.newMessage()
+                        .value(sequenceNumber)
+                        .sendAsync();
+            }
+            sequenceNumber++;
+        }
+        for (Producer<Long> producer : producers) {
+            producer.close();
+        }
+
+        // receive and validate sequences in the partitioned topic
+        Map<String, AtomicLong> receivedSequences = new HashMap<>();
+        int receivedCount = 0;
+        while (receivedCount < numPartitions * numMessages) {
+            Message<Long> message = consumer.receiveAsync().get(5, 
TimeUnit.SECONDS);
+            consumer.acknowledge(message);
+            receivedCount++;
+            AtomicLong receivedSequenceCounter =
+                    receivedSequences.computeIfAbsent(message.getTopicName(), 
k -> new AtomicLong(1L));
+            Assert.assertEquals(message.getValue().longValue(), 
receivedSequenceCounter.getAndIncrement());
+        }
+        Assert.assertEquals(numPartitions * numMessages, receivedCount);
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 62fff49..3159734 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -427,8 +427,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             if (message == null) {
                 pendingReceives.add(result);
                 cancellationHandler.setCancelAction(() -> 
pendingReceives.remove(result));
-            }
-            if (message != null) {
+            } else {
                 messageProcessed(message);
                 result.complete(beforeConsume(message));
             }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 520e7f3..21ae2d7 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -245,7 +245,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
     }
 
     private void receiveMessageFromConsumer(ConsumerImpl<T> consumer) {
-        consumer.receiveAsync().thenAccept(message -> {
+        consumer.receiveAsync().thenAcceptAsync(message -> {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] [{}] Receive message from sub consumer:{}",
                     topic, subscription, consumer.getTopic());
@@ -260,16 +260,16 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                 // or if any consumer is already paused (to create fair chance 
for already paused consumers)
                 pausedConsumers.add(consumer);
 
-                // Since we din't get a mutex, the condition on the incoming 
queue might have changed after
+                // Since we didn't get a mutex, the condition on the incoming 
queue might have changed after
                 // we have paused the current consumer. We need to re-check in 
order to avoid this consumer
                 // from getting stalled.
                 resumeReceivingFromPausedConsumersIfNeeded();
             } else {
-                // Schedule next receiveAsync() if the incoming queue is not 
full. Use a different thread to avoid
-                // recursion and stack overflow
-                internalPinnedExecutor.execute(() -> 
receiveMessageFromConsumer(consumer));
+                // Call receiveAsync() if the incoming queue is not full. 
Because this block is run with
+                // thenAcceptAsync, there is no chance for recursion that 
would lead to stack overflow.
+                receiveMessageFromConsumer(consumer);
             }
-        }).exceptionally(ex -> {
+        }, internalPinnedExecutor).exceptionally(ex -> {
             if (ex instanceof PulsarClientException.AlreadyClosedException
                     || ex.getCause() instanceof 
PulsarClientException.AlreadyClosedException) {
                 // ignore the exception that happens when the consumer is 
closed
@@ -281,6 +281,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         });
     }
 
+    // Must be called from the internalPinnedExecutor thread
     private void messageReceived(ConsumerImpl<T> consumer, Message<T> message) 
{
         checkArgument(message instanceof MessageImpl);
         TopicMessageImpl<T> topicMessage = new 
TopicMessageImpl<>(consumer.getTopic(),
@@ -409,17 +410,19 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
     protected CompletableFuture<Message<T>> internalReceiveAsync() {
         CompletableFutureCancellationHandler cancellationHandler = new 
CompletableFutureCancellationHandler();
         CompletableFuture<Message<T>> result = 
cancellationHandler.createFuture();
-        Message<T> message = incomingMessages.poll();
-        if (message == null) {
-            pendingReceives.add(result);
-            cancellationHandler.setCancelAction(() -> 
pendingReceives.remove(result));
-        } else {
-            decreaseIncomingMessageSize(message);
-            checkState(message instanceof TopicMessageImpl);
-            unAckedMessageTracker.add(message.getMessageId());
-            resumeReceivingFromPausedConsumersIfNeeded();
-            result.complete(message);
-        }
+        internalPinnedExecutor.execute(() -> {
+            Message<T> message = incomingMessages.poll();
+            if (message == null) {
+                pendingReceives.add(result);
+                cancellationHandler.setCancelAction(() -> 
pendingReceives.remove(result));
+            } else {
+                decreaseIncomingMessageSize(message);
+                checkState(message instanceof TopicMessageImpl);
+                unAckedMessageTracker.add(message.getMessageId());
+                resumeReceivingFromPausedConsumersIfNeeded();
+                result.complete(message);
+            }
+        });
         return result;
     }
 
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
index 6af8914..faa621c 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
@@ -36,6 +36,7 @@ import 
org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
+import org.awaitility.Awaitility;
 import org.junit.After;
 import org.junit.Before;
 import org.testng.annotations.AfterMethod;
@@ -165,7 +166,7 @@ public class MultiTopicsConsumerImplTest {
         // given
         MultiTopicsConsumerImpl<byte[]> consumer = createMultiTopicsConsumer();
         CompletableFuture<Message<byte[]>> future = consumer.receiveAsync();
-        assertTrue(consumer.hasNextPendingReceive());
+        Awaitility.await().untilAsserted(() -> 
assertTrue(consumer.hasNextPendingReceive()));
         // when
         future.cancel(true);
         // then

Reply via email to