merlimat closed pull request #1144: Fix race condition in 
testRedeliveryOnBlockedDistpatcher
URL: https://github.com/apache/incubator-pulsar/pull/1144
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
index 1a32776cf..3167006ee 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
@@ -31,6 +31,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
@@ -40,23 +41,14 @@
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
-import com.google.common.collect.Sets;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerService;
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConfiguration;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicStats;
+import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -69,6 +61,8 @@
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
+import com.google.common.collect.Queues;
+import com.google.common.collect.Sets;
 
 public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
     private static final Logger log = 
LoggerFactory.getLogger(DispatcherBlockConsumerTest.class);
@@ -157,10 +151,10 @@ public void 
testConsumerBlockingWithUnAckedMessagesAtDispatcher() throws Excepti
                     fail("ack failed", e);
                 }
             });
-            
+
             // try to consume remaining messages: broker may take time to 
deliver so, retry multiple time to consume
             // all messages
-            List<MessageId> result = Lists.newArrayList();
+            Queue<MessageId> result = Queues.newConcurrentLinkedQueue();
             // expecting messages which are not received
             int expectedRemainingMessages = totalProducedMsgs - 
messages.size();
             CountDownLatch latch = new 
CountDownLatch(expectedRemainingMessages);
@@ -264,7 +258,7 @@ public void 
testConsumerBlockingWithUnAckedMessagesAndRedelivery() throws Except
 
             // (4) try to consume remaining messages: broker may take time to 
deliver so, retry multiple time to consume
             // all messages
-            List<MessageId> result = Lists.newArrayList();
+            Queue<MessageId> result = Queues.newConcurrentLinkedQueue();
             CountDownLatch latch = new CountDownLatch(totalProducedMsgs);
             for (int i = 0; i < consumers.length; i++) {
                 final int consumerCount = i;
@@ -495,7 +489,7 @@ public void testRedeliveryOnBlockedDistpatcher() throws 
Exception {
             // try to consume remaining messages: broker may take time to 
deliver so, retry multiple time to consume
             // all messages
             CountDownLatch latch = new CountDownLatch(remainingMessages);
-            List<MessageId> consumedMessages = Lists.newArrayList();
+            Queue<MessageId> consumedMessages = 
Queues.newConcurrentLinkedQueue();
             for (int i = 0; i < consumers.length; i++) {
                 final int counsumerIndex = i;
                 for (int j = 0; j < remainingMessages; j++) {
@@ -666,26 +660,26 @@ public void testBrokerSubscriptionRecovery(boolean 
unloadBundleGracefully) throw
         receivedMsgs.removeAll(unackMsgs);
         assertTrue(receivedMsgs.isEmpty());
     }
-    
+
     /**
      * </pre>
      * verifies perBroker dispatching blocking. A. maxUnAckPerBroker = 200, 
maxUnAckPerDispatcher = 20 Now, it tests
      * with 3 subscriptions.
-     * 
-     * 1. Subscription-1: try to consume without acking 
+     *
+     * 1. Subscription-1: try to consume without acking
      *  a. consumer will be blocked after 200 (maxUnAckPerBroker) msgs
-     *  b. even second consumer will not receive any new messages 
-     *  c. broker will have 1 blocked dispatcher 
-     * 2. Subscription-2: try to consume without acking 
-     *  a. as broker is already blocked it will block subscription after 20 
msgs (maxUnAckPerDispatcher) 
-     *  b. broker will have 2 blocked dispatchers 
-     * 3. Subscription-3: try to consume with acking 
-     *  a. as consumer is acking not reached maxUnAckPerDispatcher=20 unack 
msg => consumes all produced msgs 
-     * 4.Subscription-1 : acks all pending msgs and consume by acking 
-     *  a. broker unblocks all dispatcher and sub-1 consumes all messages 
+     *  b. even second consumer will not receive any new messages
+     *  c. broker will have 1 blocked dispatcher
+     * 2. Subscription-2: try to consume without acking
+     *  a. as broker is already blocked it will block subscription after 20 
msgs (maxUnAckPerDispatcher)
+     *  b. broker will have 2 blocked dispatchers
+     * 3. Subscription-3: try to consume with acking
+     *  a. as consumer is acking not reached maxUnAckPerDispatcher=20 unack 
msg => consumes all produced msgs
+     * 4.Subscription-1 : acks all pending msgs and consume by acking
+     *  a. broker unblocks all dispatcher and sub-1 consumes all messages
      * 5. Subscription-2 : it triggers redelivery and acks all messages so, it 
consumes all produced messages
      * </pre>
-     * 
+     *
      * @throws Exception
      */
     @Test(timeOut = 10000)
@@ -868,14 +862,14 @@ public void testBlockBrokerDispatching() throws Exception 
{
     /**
      * Verifies if broker is already blocked multiple subscriptions if one of 
them acked back perBrokerDispatcherLimit
      * messages then that dispatcher gets unblocked and starts consuming 
messages
-     * 
+     *
      * <pre>
      * 1. subscription-1 consume messages and doesn't ack so it reaches 
maxUnAckPerBroker(200) and blocks sub-1
      * 2. subscription-2 can consume only 
dispatcherLimitWhenBrokerIsBlocked(20) and then sub-2 gets blocked
      * 3. subscription-2 acks back 10 messages 
(dispatcherLimitWhenBrokerIsBlocked/2) to gets unblock
      * 4. sub-2 starts acking once it gets unblocked and it consumes all 
published messages
      * </pre>
-     * 
+     *
      */
     @Test
     public void testBrokerDispatchBlockAndSubAckBackRequiredMsgs() {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to