This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b7b923933a5 
[fix][flaky-test]DispatcherBlockConsumerTest.testBrokerSubscriptionRecovery 
(#18228)
b7b923933a5 is described below

commit b7b923933a5c7594a52c8bb761f80e25ca0065cb
Author: fengyubiao <[email protected]>
AuthorDate: Fri Oct 28 17:56:16 2022 +0800

    [fix][flaky-test]DispatcherBlockConsumerTest.testBrokerSubscriptionRecovery 
(#18228)
---
 .../org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java   | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
index d6a7f3b134d..ec24c066f36 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
@@ -618,12 +618,13 @@ public class DispatcherBlockConsumerTest extends 
ProducerConsumerBase {
         }
         latch.await();
         // (2) consume all messages except: unackMessages-set
-        Set<Integer> unackMessages = Sets.newHashSet(5, 10, 20, 21, 22, 23, 
25, 26, 30, 32, 40, 80, 160, 320);
+        Set<Integer> unackMsgIndex = Sets.newHashSet(5, 10, 20, 21, 22, 23, 
25, 26, 30, 32, 40, 80, 160, 320);
+        Set<String> unackMsgs = unackMsgIndex.stream().map(i -> "my-message-" 
+ i).collect(Collectors.toSet());
         int receivedMsgCount = 0;
         for (int i = 0; i < totalProducedMsgs; i++) {
             Message<?> msg = consumer.receive(500, TimeUnit.MILLISECONDS);
             assertNotNull(msg);
-            if (!unackMessages.contains(i)) {
+            if (!unackMsgs.contains(new String(msg.getData()))) {
                 consumer.acknowledge(msg);
             }
             receivedMsgCount++;
@@ -646,7 +647,6 @@ public class DispatcherBlockConsumerTest extends 
ProducerConsumerBase {
                 .subscriptionType(SubscriptionType.Shared).subscribe();
 
         // consumer should only receive unakced messages
-        Set<String> unackMsgs = unackMessages.stream().map(i -> "my-message-" 
+ i).collect(Collectors.toSet());
         Set<String> receivedMsgs = new HashSet<>();
         for (int i = 0; i < totalProducedMsgs; i++) {
             Message<?> msg = consumer.receive(500, TimeUnit.MILLISECONDS);

Reply via email to