merlimat closed pull request #1144: Fix race condition in testRedeliveryOnBlockedDistpatcher URL: https://github.com/apache/incubator-pulsar/pull/1144
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java index 1a32776cf..3167006ee 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java @@ -31,6 +31,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; @@ -40,23 +41,14 @@ import java.util.function.Supplier; import java.util.stream.Collectors; -import com.google.common.collect.Sets; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; import org.apache.pulsar.broker.service.persistent.PersistentTopic; -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.ConsumerConfiguration; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.ProducerConfiguration; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.ConsumerImpl; import org.apache.pulsar.client.impl.MessageIdImpl; -import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.policies.data.PersistentTopicStats; +import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,6 +61,8 @@ import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; +import com.google.common.collect.Queues; +import com.google.common.collect.Sets; public class DispatcherBlockConsumerTest extends ProducerConsumerBase { private static final Logger log = LoggerFactory.getLogger(DispatcherBlockConsumerTest.class); @@ -157,10 +151,10 @@ public void testConsumerBlockingWithUnAckedMessagesAtDispatcher() throws Excepti fail("ack failed", e); } }); - + // try to consume remaining messages: broker may take time to deliver so, retry multiple time to consume // all messages - List<MessageId> result = Lists.newArrayList(); + Queue<MessageId> result = Queues.newConcurrentLinkedQueue(); // expecting messages which are not received int expectedRemainingMessages = totalProducedMsgs - messages.size(); CountDownLatch latch = new CountDownLatch(expectedRemainingMessages); @@ -264,7 +258,7 @@ public void testConsumerBlockingWithUnAckedMessagesAndRedelivery() throws Except // (4) try to consume remaining messages: broker may take time to deliver so, retry multiple time to consume // all messages - List<MessageId> result = Lists.newArrayList(); + Queue<MessageId> result = Queues.newConcurrentLinkedQueue(); CountDownLatch latch = new CountDownLatch(totalProducedMsgs); for (int i = 0; i < consumers.length; i++) { final int consumerCount = i; @@ -495,7 +489,7 @@ public void testRedeliveryOnBlockedDistpatcher() throws Exception { // try to consume remaining messages: broker may take time to deliver so, retry multiple time to consume // all messages CountDownLatch latch = new CountDownLatch(remainingMessages); - List<MessageId> consumedMessages = Lists.newArrayList(); + Queue<MessageId> consumedMessages = Queues.newConcurrentLinkedQueue(); for (int i = 0; i < consumers.length; i++) { final int counsumerIndex = i; for (int j = 0; j < remainingMessages; j++) { @@ -666,26 +660,26 @@ public void testBrokerSubscriptionRecovery(boolean unloadBundleGracefully) throw receivedMsgs.removeAll(unackMsgs); assertTrue(receivedMsgs.isEmpty()); } - + /** * </pre> * verifies perBroker dispatching blocking. A. maxUnAckPerBroker = 200, maxUnAckPerDispatcher = 20 Now, it tests * with 3 subscriptions. - * - * 1. Subscription-1: try to consume without acking + * + * 1. Subscription-1: try to consume without acking * a. consumer will be blocked after 200 (maxUnAckPerBroker) msgs - * b. even second consumer will not receive any new messages - * c. broker will have 1 blocked dispatcher - * 2. Subscription-2: try to consume without acking - * a. as broker is already blocked it will block subscription after 20 msgs (maxUnAckPerDispatcher) - * b. broker will have 2 blocked dispatchers - * 3. Subscription-3: try to consume with acking - * a. as consumer is acking not reached maxUnAckPerDispatcher=20 unack msg => consumes all produced msgs - * 4.Subscription-1 : acks all pending msgs and consume by acking - * a. broker unblocks all dispatcher and sub-1 consumes all messages + * b. even second consumer will not receive any new messages + * c. broker will have 1 blocked dispatcher + * 2. Subscription-2: try to consume without acking + * a. as broker is already blocked it will block subscription after 20 msgs (maxUnAckPerDispatcher) + * b. broker will have 2 blocked dispatchers + * 3. Subscription-3: try to consume with acking + * a. as consumer is acking not reached maxUnAckPerDispatcher=20 unack msg => consumes all produced msgs + * 4.Subscription-1 : acks all pending msgs and consume by acking + * a. broker unblocks all dispatcher and sub-1 consumes all messages * 5. Subscription-2 : it triggers redelivery and acks all messages so, it consumes all produced messages * </pre> - * + * * @throws Exception */ @Test(timeOut = 10000) @@ -868,14 +862,14 @@ public void testBlockBrokerDispatching() throws Exception { /** * Verifies if broker is already blocked multiple subscriptions if one of them acked back perBrokerDispatcherLimit * messages then that dispatcher gets unblocked and starts consuming messages - * + * * <pre> * 1. subscription-1 consume messages and doesn't ack so it reaches maxUnAckPerBroker(200) and blocks sub-1 * 2. subscription-2 can consume only dispatcherLimitWhenBrokerIsBlocked(20) and then sub-2 gets blocked * 3. subscription-2 acks back 10 messages (dispatcherLimitWhenBrokerIsBlocked/2) to gets unblock * 4. sub-2 starts acking once it gets unblocked and it consumes all published messages * </pre> - * + * */ @Test public void testBrokerDispatchBlockAndSubAckBackRequiredMsgs() { ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services