This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c7ac08b  [pulsar-broker] Stop to dispatch when skip message temporally 
since Key_Shared consumer stuck on delivery (#7553)
c7ac08b is described below

commit c7ac08b13e95cd7bbbcf551d2112e9131153bd83
Author: Yuri Mizushima <yumiz...@yahoo-corp.jp>
AuthorDate: Wed Sep 2 23:36:29 2020 +0900

    [pulsar-broker] Stop to dispatch when skip message temporally since 
Key_Shared consumer stuck on delivery (#7553)
    
    ### Motivation
    In some case of Key_Shared consumer, messages ordering was broken.
    Here is how to reproduce(I think it is one of case to reproduce this issue).
    
    1. Connect Consumer1 to Key_Shared subscription `sub` and stop to receive
       - receiverQueueSize: 500
    2. Connect Producer and publish 500 messages with key `(i % 10)`
    3. Connect Consumer2 to same subscription and start to receive
       - receiverQueueSize: 1
       - since https://github.com/apache/pulsar/pull/7106 , Consumer2 can't 
receive (expected)
    4. Producer publish more 500 messages with same key generation algorithm
    5. After that, Consumer1 start to receive
    6. Check Consumer2 message ordering
       - sometimes message ordering was broken in same key
    
    Consumer1:
    ```
    Connected: Tue Jul 14 09:36:39 JST 2020
    [pulsar-client-io-1-1] WARN com.scurrilous.circe.checksum.Crc32cIntChecksum 
- Failed to load Circe JNI library. Falling back to Java based CRC32c provider
    [pulsar-timer-4-1] INFO 
org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - 
[persistent://public/default/key-shared-test] [sub0] [820f0] Prefetched 
messages: 499 --- Consume throughput received: 0.02 msgs/s --- 0.00 Mbit/s --- 
Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 
---Failed acks: 0
    Received: my-message-0 PublishTime: 1594687006203 Date: Tue Jul 14 09:37:46 
JST 2020
    Received: my-message-1 PublishTime: 1594687006243 Date: Tue Jul 14 09:37:46 
JST 2020
    Received: my-message-2 PublishTime: 1594687006247 Date: Tue Jul 14 09:37:46 
JST 2020
    ...
    Received: my-message-498 PublishTime: 1594687008727 Date: Tue Jul 14 
09:37:46 JST 2020
    Received: my-message-499 PublishTime: 1594687008731 Date: Tue Jul 14 
09:37:46 JST 2020
    Received: my-message-500 PublishTime: 1594687038742 Date: Tue Jul 14 
09:37:46 JST 2020
    ...
    Received: my-message-990 PublishTime: 1594687040094 Date: Tue Jul 14 
09:37:46 JST 2020
    Received: my-message-994 PublishTime: 1594687040103 Date: Tue Jul 14 
09:37:46 JST 2020
    Received: my-message-995 PublishTime: 1594687040105 Date: Tue Jul 14 
09:37:46 JST 2020
    Received: my-message-997 PublishTime: 1594687040113 Date: Tue Jul 14 
09:37:46 JST 2020
    ```
    
    Consumer2:
    ```
    Connected: Tue Jul 14 09:37:03 JST 2020
    [pulsar-client-io-1-1] WARN com.scurrilous.circe.checksum.Crc32cIntChecksum 
- Failed to load Circe JNI library. Falling back to Java based CRC32c provider
    Received: my-message-501 MessageId: 4:1501:-1 PublishTime: 1594687038753 
Date: Tue Jul 14 09:37:46 JST 2020
    Received: my-message-502 MessageId: 4:1502:-1 PublishTime: 1594687038755 
Date: Tue Jul 14 09:37:46 JST 2020
    Received: my-message-503 MessageId: 4:1503:-1 PublishTime: 1594687038759 
Date: Tue Jul 14 09:37:46 JST 2020
    Received: my-message-506 MessageId: 4:1506:-1 PublishTime: 1594687038785 
Date: Tue Jul 14 09:37:46 JST 2020
    Received: my-message-508 MessageId: 4:1508:-1 PublishTime: 1594687038812 
Date: Tue Jul 14 09:37:46 JST 2020
    Received: my-message-901 MessageId: 4:1901:-1 PublishTime: 1594687039871 
Date: Tue Jul 14 09:37:46 JST 2020
    Received: my-message-509 MessageId: 4:1509:-1 PublishTime: 1594687038815 
Date: Tue Jul 14 09:37:46 JST 2020
    ordering was broken, key: 1 oldNum: 901 newNum: 511
    Received: my-message-511 MessageId: 4:1511:-1 PublishTime: 1594687038826 
Date: Tue Jul 14 09:37:46 JST 2020
    Received: my-message-512 MessageId: 4:1512:-1 PublishTime: 1594687038830 
Date: Tue Jul 14 09:37:46 JST 2020
    ...
    ```
    
    I think this issue is caused by https://github.com/apache/pulsar/pull/7105.
    Here is an example.
    1. dispatch messages
    2. Consumer2 was stuck and `totalMessagesSent=0`
       - Consumer2 availablePermits was 0
    3. skip redeliver messages temporally
       - Consumer2 availablePermits was back to 1
    4. dispatch new messages
       - new message was dispatched to Consumer2
    5. back to redeliver messages
    4. dispatch messages
       - ordering was broken
    
    ### Modifications
    Stop to dispatch when skip message temporally since Key_Shared consumer 
stuck on delivery.
---
 ...istentStickyKeyDispatcherMultipleConsumers.java |  26 ++++-
 ...ntStickyKeyDispatcherMultipleConsumersTest.java | 105 ++++++++++++++++++++-
 2 files changed, 127 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index a56f566..095f9ba 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -25,6 +25,7 @@ import io.netty.util.concurrent.FastThreadLocal;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -64,12 +65,17 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
      */
     private final Map<Consumer, PositionImpl> recentlyJoinedConsumers;
 
+    private final Set<Consumer> stuckConsumers;
+    private final Set<Consumer> nextStuckConsumers;
+
     PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, 
ManagedCursor cursor,
             Subscription subscription, ServiceConfiguration conf, 
KeySharedMeta ksm) {
         super(topic, cursor, subscription);
 
         this.allowOutOfOrderDelivery = ksm.getAllowOutOfOrderDelivery();
         this.recentlyJoinedConsumers = allowOutOfOrderDelivery ? 
Collections.emptyMap() : new HashMap<>();
+        this.stuckConsumers = new HashSet<>();
+        this.nextStuckConsumers = new HashSet<>();
 
         switch (ksm.getKeySharedMode()) {
         case AUTO_SPLIT:
@@ -143,6 +149,8 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
             return;
         }
 
+        nextStuckConsumers.clear();
+
         final Map<Consumer, List<Entry>> groupedEntries = 
localGroupedEntries.get();
         groupedEntries.clear();
 
@@ -217,11 +225,14 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
             }
         }
 
+        stuckConsumers.clear();
+
         if (totalMessagesSent == 0 && recentlyJoinedConsumers.isEmpty()) {
             // This means, that all the messages we've just read cannot be 
dispatched right now.
             // This condition can only happen when:
             //  1. We have consumers ready to accept messages (otherwise the 
would not haven been triggered)
             //  2. All keys in the current set of messages are routing to 
consumers that are currently busy
+            //     and stuck is not caused by stuckConsumers
             //
             // The solution here is to move on and read next batch of messages 
which might hopefully contain
             // also keys meant for other consumers.
@@ -230,18 +241,31 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
             // ahead in the stream while the new consumers are not ready to 
accept the new messages,
             // therefore would be most likely only increase the distance 
between read-position and mark-delete
             // position.
-            isDispatcherStuckOnReplays = true;
+            if (!nextStuckConsumers.isEmpty()) {
+                isDispatcherStuckOnReplays = true;
+                stuckConsumers.addAll(nextStuckConsumers);
+            }
+            // readMoreEntries should run regardless whether or not stuck is 
caused by stuckConsumers for avoid stopping dispatch.
             readMoreEntries();
         }
     }
 
     private int getRestrictedMaxEntriesForConsumer(Consumer consumer, 
List<Entry> entries, int maxMessages) {
         if (maxMessages == 0) {
+            // the consumer was stuck
+            nextStuckConsumers.add(consumer);
             return 0;
         }
 
         PositionImpl maxReadPosition = recentlyJoinedConsumers.get(consumer);
         if (maxReadPosition == null) {
+            // stop to dispatch by stuckConsumers
+            if (stuckConsumers.contains(consumer)) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] stop to dispatch by stuckConsumers, 
consumer: {}", name, consumer);
+                }
+                return 0;
+            }
             // The consumer has not recently joined, so we can send all 
messages
             return maxMessages;
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index 6325f10..c281400 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -41,6 +41,7 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.ObjectFactory;
 import org.testng.annotations.Test;
 
+import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
@@ -49,6 +50,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static 
org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.*;
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.fail;
 
 @PrepareForTest({ DispatchRateLimiter.class })
@@ -79,6 +81,8 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersTest {
         configMock = mock(ServiceConfiguration.class);
         
doReturn(true).when(configMock).isSubscriptionRedeliveryTrackerEnabled();
         doReturn(100).when(configMock).getDispatcherMaxReadBatchSize();
+        
doReturn(true).when(configMock).isSubscriptionKeySharedUseConsistentHashing();
+        
doReturn(1).when(configMock).getSubscriptionKeySharedConsistentHashingReplicaPoints();
 
         pulsarMock = mock(PulsarService.class);
         doReturn(configMock).when(pulsarMock).getConfiguration();
@@ -96,6 +100,7 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersTest {
 
         consumerMock = mock(Consumer.class);
         channelMock = mock(ChannelPromise.class);
+        doReturn("consumer1").when(consumerMock).consumerName();
         doReturn(1000).when(consumerMock).getAvailablePermits();
         doReturn(true).when(consumerMock).isWritable();
         doReturn(channelMock).when(consumerMock).sendMessages(
@@ -120,12 +125,17 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersTest {
 
         persistentDispatcher = new 
PersistentStickyKeyDispatcherMultipleConsumers(
                 topicMock, cursorMock, subscriptionMock, configMock, 
KeySharedMeta.getDefaultInstance());
-        persistentDispatcher.addConsumer(consumerMock);
-        persistentDispatcher.consumerFlow(consumerMock, 1000);
     }
 
     @Test
     public void testSendMarkerMessage() {
+        try {
+            persistentDispatcher.addConsumer(consumerMock);
+            persistentDispatcher.consumerFlow(consumerMock, 1000);
+        } catch (Exception e) {
+            fail("Failed to add mock consumer", e);
+        }
+
         List<Entry> entries = new ArrayList<>();
         ByteBuf markerMessage = 
Markers.newReplicatedSubscriptionsSnapshotRequest("testSnapshotId", 
"testSourceCluster");
         entries.add(EntryImpl.create(1, 1, markerMessage));
@@ -156,11 +166,100 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersTest {
         Assert.assertEquals(allTotalMessagesCaptor.get(0).intValue(), 5);
     }
 
+    @Test
+    public void testSkipRedeliverTemporally() {
+        final Consumer slowConsumerMock = mock(Consumer.class);
+        final ChannelPromise slowChannelMock = mock(ChannelPromise.class);
+        // add entries to redeliver and read target
+        final List<Entry> redeliverEntries = new ArrayList<>();
+        redeliverEntries.add(EntryImpl.create(1, 1, createMessage("message1", 
1, "key1")));
+        final List<Entry> readEntries = new ArrayList<>();
+        readEntries.add(EntryImpl.create(1, 2, createMessage("message2", 2, 
"key1")));
+        readEntries.add(EntryImpl.create(1, 3, createMessage("message3", 3, 
"key2")));
+
+        try {
+            Field totalAvailablePermitsField = 
PersistentDispatcherMultipleConsumers.class.getDeclaredField("totalAvailablePermits");
+            totalAvailablePermitsField.setAccessible(true);
+            totalAvailablePermitsField.set(persistentDispatcher, 1000);
+
+            doAnswer(invocationOnMock -> {
+                ((PersistentStickyKeyDispatcherMultipleConsumers) 
invocationOnMock.getArgument(2))
+                        .readEntriesComplete(readEntries, 
PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal);
+                return null;
+            }).when(cursorMock).asyncReadEntriesOrWait(
+                    anyInt(), anyLong(), 
any(PersistentStickyKeyDispatcherMultipleConsumers.class),
+                    
eq(PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal));
+        } catch (Exception e) {
+            fail("Failed to set to field", e);
+        }
+
+        // Create 2Consumers
+        try {
+            doReturn("consumer2").when(slowConsumerMock).consumerName();
+            // Change slowConsumer availablePermits to 0 and back to normal
+            when(slowConsumerMock.getAvailablePermits())
+                    .thenReturn(0)
+                    .thenReturn(1);
+            doReturn(true).when(slowConsumerMock).isWritable();
+            doReturn(slowChannelMock).when(slowConsumerMock).sendMessages(
+                    anyList(),
+                    any(EntryBatchSizes.class),
+                    any(EntryBatchIndexesAcks.class),
+                    anyInt(),
+                    anyLong(),
+                    anyLong(),
+                    any(RedeliveryTracker.class)
+            );
+
+            persistentDispatcher.addConsumer(consumerMock);
+            persistentDispatcher.addConsumer(slowConsumerMock);
+        } catch (Exception e) {
+            fail("Failed to add mock consumer", e);
+        }
+
+        // run 
PersistentStickyKeyDispatcherMultipleConsumers#sendMessagesToConsumers
+        // run readMoreEntries internally (and skip internally)
+        // Change slowConsumer availablePermits to 1
+        // run 
PersistentStickyKeyDispatcherMultipleConsumers#sendMessagesToConsumers 
internally
+        // and then stop to dispatch to slowConsumer
+        
persistentDispatcher.sendMessagesToConsumers(PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal,
 redeliverEntries);
+
+        verify(consumerMock, times(1)).sendMessages(
+                argThat(arg -> {
+                    assertEquals(arg.size(), 1);
+                    Entry entry = arg.get(0);
+                    assertEquals(entry.getLedgerId(), 1);
+                    assertEquals(entry.getEntryId(), 3);
+                    return true;
+                }),
+                any(EntryBatchSizes.class),
+                any(EntryBatchIndexesAcks.class),
+                anyInt(),
+                anyLong(),
+                anyLong(),
+                any(RedeliveryTracker.class)
+        );
+        verify(slowConsumerMock, times(0)).sendMessages(
+                anyList(),
+                any(EntryBatchSizes.class),
+                any(EntryBatchIndexesAcks.class),
+                anyInt(),
+                anyLong(),
+                anyLong(),
+                any(RedeliveryTracker.class)
+        );
+    }
+
     private ByteBuf createMessage(String message, int sequenceId) {
+        return createMessage(message, sequenceId, "testKey");
+    }
+
+    private ByteBuf createMessage(String message, int sequenceId, String key) {
         PulsarApi.MessageMetadata.Builder messageMetadata = 
PulsarApi.MessageMetadata.newBuilder();
         messageMetadata.setSequenceId(sequenceId);
         messageMetadata.setProducerName("testProducer");
-        messageMetadata.setPartitionKey("testKey");
+        messageMetadata.setPartitionKey(key);
+        messageMetadata.setPartitionKeyB64Encoded(false);
         messageMetadata.setPublishTime(System.currentTimeMillis());
         return serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, 
messageMetadata.build(), Unpooled.copiedBuffer(message.getBytes(UTF_8)));
     }

Reply via email to