This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 059149c  Fix race condition in testRedeliveryOnBlockedDistpatcher 
(#1144)
059149c is described below

commit 059149c494592a3913a4fb24ed6939e1b068fc58
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Mon Jan 29 20:12:09 2018 -0800

    Fix race condition in testRedeliveryOnBlockedDistpatcher (#1144)
---
 .../client/api/DispatcherBlockConsumerTest.java    | 52 ++++++++++------------
 1 file changed, 23 insertions(+), 29 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
index 1a32776..3167006 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
@@ -31,6 +31,7 @@ import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
@@ -40,23 +41,14 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
-import com.google.common.collect.Sets;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerService;
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConfiguration;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicStats;
+import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -69,6 +61,8 @@ import org.testng.collections.Lists;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
+import com.google.common.collect.Queues;
+import com.google.common.collect.Sets;
 
 public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
     private static final Logger log = 
LoggerFactory.getLogger(DispatcherBlockConsumerTest.class);
@@ -157,10 +151,10 @@ public class DispatcherBlockConsumerTest extends 
ProducerConsumerBase {
                     fail("ack failed", e);
                 }
             });
-            
+
             // try to consume remaining messages: broker may take time to 
deliver so, retry multiple time to consume
             // all messages
-            List<MessageId> result = Lists.newArrayList();
+            Queue<MessageId> result = Queues.newConcurrentLinkedQueue();
             // expecting messages which are not received
             int expectedRemainingMessages = totalProducedMsgs - 
messages.size();
             CountDownLatch latch = new 
CountDownLatch(expectedRemainingMessages);
@@ -264,7 +258,7 @@ public class DispatcherBlockConsumerTest extends 
ProducerConsumerBase {
 
             // (4) try to consume remaining messages: broker may take time to 
deliver so, retry multiple time to consume
             // all messages
-            List<MessageId> result = Lists.newArrayList();
+            Queue<MessageId> result = Queues.newConcurrentLinkedQueue();
             CountDownLatch latch = new CountDownLatch(totalProducedMsgs);
             for (int i = 0; i < consumers.length; i++) {
                 final int consumerCount = i;
@@ -495,7 +489,7 @@ public class DispatcherBlockConsumerTest extends 
ProducerConsumerBase {
             // try to consume remaining messages: broker may take time to 
deliver so, retry multiple time to consume
             // all messages
             CountDownLatch latch = new CountDownLatch(remainingMessages);
-            List<MessageId> consumedMessages = Lists.newArrayList();
+            Queue<MessageId> consumedMessages = 
Queues.newConcurrentLinkedQueue();
             for (int i = 0; i < consumers.length; i++) {
                 final int counsumerIndex = i;
                 for (int j = 0; j < remainingMessages; j++) {
@@ -666,26 +660,26 @@ public class DispatcherBlockConsumerTest extends 
ProducerConsumerBase {
         receivedMsgs.removeAll(unackMsgs);
         assertTrue(receivedMsgs.isEmpty());
     }
-    
+
     /**
      * </pre>
      * verifies perBroker dispatching blocking. A. maxUnAckPerBroker = 200, 
maxUnAckPerDispatcher = 20 Now, it tests
      * with 3 subscriptions.
-     * 
-     * 1. Subscription-1: try to consume without acking 
+     *
+     * 1. Subscription-1: try to consume without acking
      *  a. consumer will be blocked after 200 (maxUnAckPerBroker) msgs
-     *  b. even second consumer will not receive any new messages 
-     *  c. broker will have 1 blocked dispatcher 
-     * 2. Subscription-2: try to consume without acking 
-     *  a. as broker is already blocked it will block subscription after 20 
msgs (maxUnAckPerDispatcher) 
-     *  b. broker will have 2 blocked dispatchers 
-     * 3. Subscription-3: try to consume with acking 
-     *  a. as consumer is acking not reached maxUnAckPerDispatcher=20 unack 
msg => consumes all produced msgs 
-     * 4.Subscription-1 : acks all pending msgs and consume by acking 
-     *  a. broker unblocks all dispatcher and sub-1 consumes all messages 
+     *  b. even second consumer will not receive any new messages
+     *  c. broker will have 1 blocked dispatcher
+     * 2. Subscription-2: try to consume without acking
+     *  a. as broker is already blocked it will block subscription after 20 
msgs (maxUnAckPerDispatcher)
+     *  b. broker will have 2 blocked dispatchers
+     * 3. Subscription-3: try to consume with acking
+     *  a. as consumer is acking not reached maxUnAckPerDispatcher=20 unack 
msg => consumes all produced msgs
+     * 4.Subscription-1 : acks all pending msgs and consume by acking
+     *  a. broker unblocks all dispatcher and sub-1 consumes all messages
      * 5. Subscription-2 : it triggers redelivery and acks all messages so, it 
consumes all produced messages
      * </pre>
-     * 
+     *
      * @throws Exception
      */
     @Test(timeOut = 10000)
@@ -868,14 +862,14 @@ public class DispatcherBlockConsumerTest extends 
ProducerConsumerBase {
     /**
      * Verifies if broker is already blocked multiple subscriptions if one of 
them acked back perBrokerDispatcherLimit
      * messages then that dispatcher gets unblocked and starts consuming 
messages
-     * 
+     *
      * <pre>
      * 1. subscription-1 consume messages and doesn't ack so it reaches 
maxUnAckPerBroker(200) and blocks sub-1
      * 2. subscription-2 can consume only 
dispatcherLimitWhenBrokerIsBlocked(20) and then sub-2 gets blocked
      * 3. subscription-2 acks back 10 messages 
(dispatcherLimitWhenBrokerIsBlocked/2) to gets unblock
      * 4. sub-2 starts acking once it gets unblocked and it consumes all 
published messages
      * </pre>
-     * 
+     *
      */
     @Test
     public void testBrokerDispatchBlockAndSubAckBackRequiredMsgs() {

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to