This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 6a2e3a1 [Java Client] Remove data race in MultiTopicsConsumerImpl to ensure correct message order (#12456) 6a2e3a1 is described below commit 6a2e3a1ad735465154dc3fa12988c3068eae7da5 Author: Michael Marshall <michael.marsh...@datastax.com> AuthorDate: Mon Oct 25 16:31:59 2021 -0500 [Java Client] Remove data race in MultiTopicsConsumerImpl to ensure correct message order (#12456) * [Java Client] Remove data race in MultiTopicsConsumerImpl to ensure correct message order * Fix test * Return the checkState method call to keep original behavior * Reproduce out-of-order delivery issue in PR 12456 * Remove unnecessary scheduling of receiveMessageFromConsumer Co-authored-by: Lari Hotari <lhot...@apache.org> --- .../pulsar/client/api/MultiTopicsConsumerTest.java | 75 ++++++++++++++++++++++ .../apache/pulsar/client/impl/ConsumerImpl.java | 3 +- .../client/impl/MultiTopicsConsumerImpl.java | 37 ++++++----- .../client/impl/MultiTopicsConsumerImplTest.java | 3 +- 4 files changed, 98 insertions(+), 20 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java index 715f3ad..d8c8bd6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.pulsar.client.api; import static org.mockito.ArgumentMatchers.any; @@ -24,9 +25,16 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import com.google.common.collect.Lists; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; import lombok.Cleanup; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.impl.ClientBuilderImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; @@ -34,6 +42,7 @@ import org.mockito.AdditionalAnswers; import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -70,6 +79,7 @@ public class MultiTopicsConsumerTest extends ProducerConsumerBase { // method calls on the interface. Mockito.withSettings().defaultAnswer(AdditionalAnswers.delegatesTo(internalExecutorService))); } + @Override public ExecutorService getInternalExecutorService() { return internalExecutorServiceDelegate; @@ -119,4 +129,69 @@ public class MultiTopicsConsumerTest extends ProducerConsumerBase { verify(internalExecutorServiceDelegate, times(0)) .schedule(any(Runnable.class), anyLong(), any()); } + + // test that reproduces the issue that PR https://github.com/apache/pulsar/pull/12456 fixes + // where MultiTopicsConsumerImpl has a data race that causes out-of-order delivery of messages + @Test + public void testShouldMaintainOrderForIndividualTopicInMultiTopicsConsumer() + throws PulsarAdminException, PulsarClientException, ExecutionException, InterruptedException, + TimeoutException { + String topicName = newTopicName(); + int numPartitions = 2; + int numMessages = 100000; + admin.topics().createPartitionedTopic(topicName, numPartitions); + + Producer<Long>[] producers = new Producer[numPartitions]; + + for (int i = 0; i < numPartitions; i++) { + producers[i] = pulsarClient.newProducer(Schema.INT64) + // produce to each partition directly so that order can be maintained in sending + .topic(topicName + "-partition-" + i) + .enableBatching(true) + .maxPendingMessages(30000) + .maxPendingMessagesAcrossPartitions(60000) + .batchingMaxMessages(10000) + .batchingMaxPublishDelay(5, TimeUnit.SECONDS) + .batchingMaxBytes(4 * 1024 * 1024) + .blockIfQueueFull(true) + .create(); + } + + @Cleanup + Consumer<Long> consumer = pulsarClient + .newConsumer(Schema.INT64) + // consume on the partitioned topic + .topic(topicName) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .receiverQueueSize(numMessages) + .subscriptionName(methodName) + .subscribe(); + + // produce sequence numbers to each partition topic + long sequenceNumber = 1L; + for (int i = 0; i < numMessages; i++) { + for (Producer<Long> producer : producers) { + producer.newMessage() + .value(sequenceNumber) + .sendAsync(); + } + sequenceNumber++; + } + for (Producer<Long> producer : producers) { + producer.close(); + } + + // receive and validate sequences in the partitioned topic + Map<String, AtomicLong> receivedSequences = new HashMap<>(); + int receivedCount = 0; + while (receivedCount < numPartitions * numMessages) { + Message<Long> message = consumer.receiveAsync().get(5, TimeUnit.SECONDS); + consumer.acknowledge(message); + receivedCount++; + AtomicLong receivedSequenceCounter = + receivedSequences.computeIfAbsent(message.getTopicName(), k -> new AtomicLong(1L)); + Assert.assertEquals(message.getValue().longValue(), receivedSequenceCounter.getAndIncrement()); + } + Assert.assertEquals(numPartitions * numMessages, receivedCount); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 62fff49..3159734 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -427,8 +427,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle if (message == null) { pendingReceives.add(result); cancellationHandler.setCancelAction(() -> pendingReceives.remove(result)); - } - if (message != null) { + } else { messageProcessed(message); result.complete(beforeConsume(message)); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 520e7f3..21ae2d7 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -245,7 +245,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { } private void receiveMessageFromConsumer(ConsumerImpl<T> consumer) { - consumer.receiveAsync().thenAccept(message -> { + consumer.receiveAsync().thenAcceptAsync(message -> { if (log.isDebugEnabled()) { log.debug("[{}] [{}] Receive message from sub consumer:{}", topic, subscription, consumer.getTopic()); @@ -260,16 +260,16 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { // or if any consumer is already paused (to create fair chance for already paused consumers) pausedConsumers.add(consumer); - // Since we din't get a mutex, the condition on the incoming queue might have changed after + // Since we didn't get a mutex, the condition on the incoming queue might have changed after // we have paused the current consumer. We need to re-check in order to avoid this consumer // from getting stalled. resumeReceivingFromPausedConsumersIfNeeded(); } else { - // Schedule next receiveAsync() if the incoming queue is not full. Use a different thread to avoid - // recursion and stack overflow - internalPinnedExecutor.execute(() -> receiveMessageFromConsumer(consumer)); + // Call receiveAsync() if the incoming queue is not full. Because this block is run with + // thenAcceptAsync, there is no chance for recursion that would lead to stack overflow. + receiveMessageFromConsumer(consumer); } - }).exceptionally(ex -> { + }, internalPinnedExecutor).exceptionally(ex -> { if (ex instanceof PulsarClientException.AlreadyClosedException || ex.getCause() instanceof PulsarClientException.AlreadyClosedException) { // ignore the exception that happens when the consumer is closed @@ -281,6 +281,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { }); } + // Must be called from the internalPinnedExecutor thread private void messageReceived(ConsumerImpl<T> consumer, Message<T> message) { checkArgument(message instanceof MessageImpl); TopicMessageImpl<T> topicMessage = new TopicMessageImpl<>(consumer.getTopic(), @@ -409,17 +410,19 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { protected CompletableFuture<Message<T>> internalReceiveAsync() { CompletableFutureCancellationHandler cancellationHandler = new CompletableFutureCancellationHandler(); CompletableFuture<Message<T>> result = cancellationHandler.createFuture(); - Message<T> message = incomingMessages.poll(); - if (message == null) { - pendingReceives.add(result); - cancellationHandler.setCancelAction(() -> pendingReceives.remove(result)); - } else { - decreaseIncomingMessageSize(message); - checkState(message instanceof TopicMessageImpl); - unAckedMessageTracker.add(message.getMessageId()); - resumeReceivingFromPausedConsumersIfNeeded(); - result.complete(message); - } + internalPinnedExecutor.execute(() -> { + Message<T> message = incomingMessages.poll(); + if (message == null) { + pendingReceives.add(result); + cancellationHandler.setCancelAction(() -> pendingReceives.remove(result)); + } else { + decreaseIncomingMessageSize(message); + checkState(message instanceof TopicMessageImpl); + unAckedMessageTracker.add(message.getMessageId()); + resumeReceivingFromPausedConsumersIfNeeded(); + result.complete(message); + } + }); return result; } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java index 6af8914..faa621c 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java @@ -36,6 +36,7 @@ import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.util.ExecutorProvider; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.util.netty.EventLoopUtil; +import org.awaitility.Awaitility; import org.junit.After; import org.junit.Before; import org.testng.annotations.AfterMethod; @@ -165,7 +166,7 @@ public class MultiTopicsConsumerImplTest { // given MultiTopicsConsumerImpl<byte[]> consumer = createMultiTopicsConsumer(); CompletableFuture<Message<byte[]>> future = consumer.receiveAsync(); - assertTrue(consumer.hasNextPendingReceive()); + Awaitility.await().untilAsserted(() -> assertTrue(consumer.hasNextPendingReceive())); // when future.cancel(true); // then