This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 059149c Fix race condition in testRedeliveryOnBlockedDistpatcher (#1144) 059149c is described below commit 059149c494592a3913a4fb24ed6939e1b068fc58 Author: Matteo Merli <mme...@apache.org> AuthorDate: Mon Jan 29 20:12:09 2018 -0800 Fix race condition in testRedeliveryOnBlockedDistpatcher (#1144) --- .../client/api/DispatcherBlockConsumerTest.java | 52 ++++++++++------------ 1 file changed, 23 insertions(+), 29 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java index 1a32776..3167006 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java @@ -31,6 +31,7 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; @@ -40,23 +41,14 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import java.util.stream.Collectors; -import com.google.common.collect.Sets; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; import org.apache.pulsar.broker.service.persistent.PersistentTopic; -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.ConsumerConfiguration; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.ProducerConfiguration; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.ConsumerImpl; import org.apache.pulsar.client.impl.MessageIdImpl; -import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.policies.data.PersistentTopicStats; +import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,6 +61,8 @@ import org.testng.collections.Lists; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; +import com.google.common.collect.Queues; +import com.google.common.collect.Sets; public class DispatcherBlockConsumerTest extends ProducerConsumerBase { private static final Logger log = LoggerFactory.getLogger(DispatcherBlockConsumerTest.class); @@ -157,10 +151,10 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase { fail("ack failed", e); } }); - + // try to consume remaining messages: broker may take time to deliver so, retry multiple time to consume // all messages - List<MessageId> result = Lists.newArrayList(); + Queue<MessageId> result = Queues.newConcurrentLinkedQueue(); // expecting messages which are not received int expectedRemainingMessages = totalProducedMsgs - messages.size(); CountDownLatch latch = new CountDownLatch(expectedRemainingMessages); @@ -264,7 +258,7 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase { // (4) try to consume remaining messages: broker may take time to deliver so, retry multiple time to consume // all messages - List<MessageId> result = Lists.newArrayList(); + Queue<MessageId> result = Queues.newConcurrentLinkedQueue(); CountDownLatch latch = new CountDownLatch(totalProducedMsgs); for (int i = 0; i < consumers.length; i++) { final int consumerCount = i; @@ -495,7 +489,7 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase { // try to consume remaining messages: broker may take time to deliver so, retry multiple time to consume // all messages CountDownLatch latch = new CountDownLatch(remainingMessages); - List<MessageId> consumedMessages = Lists.newArrayList(); + Queue<MessageId> consumedMessages = Queues.newConcurrentLinkedQueue(); for (int i = 0; i < consumers.length; i++) { final int counsumerIndex = i; for (int j = 0; j < remainingMessages; j++) { @@ -666,26 +660,26 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase { receivedMsgs.removeAll(unackMsgs); assertTrue(receivedMsgs.isEmpty()); } - + /** * </pre> * verifies perBroker dispatching blocking. A. maxUnAckPerBroker = 200, maxUnAckPerDispatcher = 20 Now, it tests * with 3 subscriptions. - * - * 1. Subscription-1: try to consume without acking + * + * 1. Subscription-1: try to consume without acking * a. consumer will be blocked after 200 (maxUnAckPerBroker) msgs - * b. even second consumer will not receive any new messages - * c. broker will have 1 blocked dispatcher - * 2. Subscription-2: try to consume without acking - * a. as broker is already blocked it will block subscription after 20 msgs (maxUnAckPerDispatcher) - * b. broker will have 2 blocked dispatchers - * 3. Subscription-3: try to consume with acking - * a. as consumer is acking not reached maxUnAckPerDispatcher=20 unack msg => consumes all produced msgs - * 4.Subscription-1 : acks all pending msgs and consume by acking - * a. broker unblocks all dispatcher and sub-1 consumes all messages + * b. even second consumer will not receive any new messages + * c. broker will have 1 blocked dispatcher + * 2. Subscription-2: try to consume without acking + * a. as broker is already blocked it will block subscription after 20 msgs (maxUnAckPerDispatcher) + * b. broker will have 2 blocked dispatchers + * 3. Subscription-3: try to consume with acking + * a. as consumer is acking not reached maxUnAckPerDispatcher=20 unack msg => consumes all produced msgs + * 4.Subscription-1 : acks all pending msgs and consume by acking + * a. broker unblocks all dispatcher and sub-1 consumes all messages * 5. Subscription-2 : it triggers redelivery and acks all messages so, it consumes all produced messages * </pre> - * + * * @throws Exception */ @Test(timeOut = 10000) @@ -868,14 +862,14 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase { /** * Verifies if broker is already blocked multiple subscriptions if one of them acked back perBrokerDispatcherLimit * messages then that dispatcher gets unblocked and starts consuming messages - * + * * <pre> * 1. subscription-1 consume messages and doesn't ack so it reaches maxUnAckPerBroker(200) and blocks sub-1 * 2. subscription-2 can consume only dispatcherLimitWhenBrokerIsBlocked(20) and then sub-2 gets blocked * 3. subscription-2 acks back 10 messages (dispatcherLimitWhenBrokerIsBlocked/2) to gets unblock * 4. sub-2 starts acking once it gets unblocked and it consumes all published messages * </pre> - * + * */ @Test public void testBrokerDispatchBlockAndSubAckBackRequiredMsgs() { -- To stop receiving notification emails like this one, please contact mme...@apache.org.