This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 02b25a3b246 [fix] [test] fix flaky test 
PersistentFailoverE2ETest.testSimpleConsumerEventsWithPartition (#19574)
02b25a3b246 is described below

commit 02b25a3b2461b975b46afdca4130cdba876721c0
Author: fengyubiao <[email protected]>
AuthorDate: Tue Feb 21 17:10:35 2023 +0800

    [fix] [test] fix flaky test 
PersistentFailoverE2ETest.testSimpleConsumerEventsWithPartition (#19574)
---
 .../PersistentDispatcherFailoverConsumerTest.java  |  80 ++++++++++++----
 .../broker/service/PersistentFailoverE2ETest.java  | 106 ++++++++++++++-------
 2 files changed, 131 insertions(+), 55 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index 29a3227c92b..0dafe4bca3e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -37,6 +37,8 @@ import static org.testng.AssertJUnit.assertSame;
 import static org.testng.AssertJUnit.assertTrue;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
 import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
@@ -47,6 +49,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
@@ -74,6 +77,8 @@ import 
org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.ProtocolVersion;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.netty.EventLoopUtil;
+import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -399,9 +404,54 @@ public class PersistentDispatcherFailoverConsumerTest {
         assertTrue(pdfc.canUnsubscribe(consumer1));
     }
 
-    @Test
+    private String[] sortConsumerNameByHashSelector(String...consumerNames) 
throws Exception {
+        String[] result = new String[consumerNames.length];
+        PersistentTopic topic =
+                new PersistentTopic(successTopicName, ledgerMock, 
pulsarTestContext.getBrokerService());
+        PersistentSubscription sub = new PersistentSubscription(topic, 
"sub-1", cursorMock, false);
+        int partitionIndex = -1;
+        PersistentDispatcherSingleActiveConsumer dispatcher = new 
PersistentDispatcherSingleActiveConsumer(cursorMock,
+                SubType.Failover, partitionIndex, topic, sub);
+        for (String consumerName : consumerNames){
+            Consumer consumer = spy(new Consumer(sub, SubType.Failover, 
topic.getName(), 999 /* consumer id */, 1,
+                    consumerName/* consumer name */, true, serverCnx, 
"myrole-1", Collections.emptyMap(),
+                    false /* read compacted */, null, MessageId.latest, 
DEFAULT_CONSUMER_EPOCH));
+            dispatcher.addConsumer(consumer);
+        }
+        for (int i = 0; i < consumerNames.length; i++) {
+            result[i] = dispatcher.getActiveConsumer().consumerName();
+            dispatcher.removeConsumer(dispatcher.getActiveConsumer());
+        }
+        consumerChanges.clear();
+        return result;
+    }
+
+    private CommandActiveConsumerChange waitActiveChangeEvent(int consumerId)
+            throws Exception {
+        AtomicReference<CommandActiveConsumerChange> res = new 
AtomicReference<>();
+        Awaitility.await().until(() -> {
+            while (!consumerChanges.isEmpty()){
+                CommandActiveConsumerChange change = consumerChanges.take();
+                if (change.getConsumerId() == consumerId){
+                    res.set(change);
+                    return true;
+                }
+            }
+            return false;
+        });
+        consumerChanges.clear();
+        return res.get();
+    }
+
+    @Test(invocationCount = 100)
     public void testAddRemoveConsumerNonPartitionedTopic() throws Exception {
         log.info("--- Starting 
PersistentDispatcherFailoverConsumerTest::testAddConsumer ---");
+        String[] sortedConsumerNameByHashSelector = 
sortConsumerNameByHashSelector("Cons1", "Cons2");
+        BrokerService spyBrokerService = 
spy(pulsarTestContext.getBrokerService());
+        final EventLoopGroup singleEventLoopGroup = 
EventLoopUtil.newEventLoopGroup(1,
+                
pulsarTestContext.getBrokerService().getPulsar().getConfig().isEnableBusyWait(),
+                new DefaultThreadFactory("pulsar-io"));
+        doAnswer(invocation -> 
singleEventLoopGroup).when(spyBrokerService).executor();
 
         PersistentTopic topic =
                 new PersistentTopic(successTopicName, ledgerMock, 
pulsarTestContext.getBrokerService());
@@ -417,23 +467,26 @@ public class PersistentDispatcherFailoverConsumerTest {
 
         // 2. Add a consumer
         Consumer consumer1 = spy(new Consumer(sub, SubType.Failover, 
topic.getName(), 1 /* consumer id */, 1,
-                "Cons1"/* consumer name */, true, serverCnx, "myrole-1", 
Collections.emptyMap(),
+                sortedConsumerNameByHashSelector[0]/* consumer name */,
+                true, serverCnx, "myrole-1", Collections.emptyMap(),
                 false /* read compacted */, null, MessageId.latest, 
DEFAULT_CONSUMER_EPOCH));
         pdfc.addConsumer(consumer1);
         List<Consumer> consumers = pdfc.getConsumers();
         assertEquals(1, consumers.size());
         assertSame(pdfc.getActiveConsumer().consumerName(), 
consumer1.consumerName());
+        waitActiveChangeEvent(1);
 
         // 3. Add a consumer with same priority level and consumer name is 
smaller in lexicographic order.
         Consumer consumer2 = spy(new Consumer(sub, SubType.Failover, 
topic.getName(), 2 /* consumer id */, 1,
-                "Cons2"/* consumer name */, true, serverCnx, "myrole-1", 
Collections.emptyMap(),
+                sortedConsumerNameByHashSelector[1]/* consumer name */,
+                true, serverCnx, "myrole-1", Collections.emptyMap(),
                 false /* read compacted */, null, MessageId.latest, 
DEFAULT_CONSUMER_EPOCH));
         pdfc.addConsumer(consumer2);
 
         // 4. Verify active consumer doesn't change
         consumers = pdfc.getConsumers();
         assertEquals(2, consumers.size());
-        CommandActiveConsumerChange change = consumerChanges.take();
+        CommandActiveConsumerChange change = waitActiveChangeEvent(2);
         verifyActiveConsumerChange(change, 2, false);
         assertSame(pdfc.getActiveConsumer().consumerName(), 
consumer1.consumerName());
         verify(consumer2, 
times(1)).notifyActiveConsumerChange(same(consumer1));
@@ -444,21 +497,10 @@ public class PersistentDispatcherFailoverConsumerTest {
         pdfc.addConsumer(consumer3);
         consumers = pdfc.getConsumers();
         assertEquals(3, consumers.size());
-        change = consumerChanges.take();
-        verifyActiveConsumerChange(change, 3, false);
-        assertSame(pdfc.getActiveConsumer().consumerName(), 
consumer1.consumerName());
-        verify(consumer3, 
times(1)).notifyActiveConsumerChange(same(consumer1));
-
-        // 7. Remove first consumer and active consumer should change to 
consumer2 since it's added before consumer3
-        // though consumer 3 has higher priority level
-        pdfc.removeConsumer(consumer1);
-        consumers = pdfc.getConsumers();
-        assertEquals(2, consumers.size());
-        change = consumerChanges.take();
-        verifyActiveConsumerChange(change, 2, true);
-        assertSame(pdfc.getActiveConsumer().consumerName(), 
consumer2.consumerName());
-        verify(consumer2, 
times(1)).notifyActiveConsumerChange(same(consumer2));
-        verify(consumer3, 
times(1)).notifyActiveConsumerChange(same(consumer2));
+        change = waitActiveChangeEvent(3);
+        verifyActiveConsumerChange(change, 3, true);
+        assertSame(pdfc.getActiveConsumer().consumerName(), 
consumer3.consumerName());
+        verify(consumer3, 
times(1)).notifyActiveConsumerChange(same(consumer3));
     }
 
     @Test
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
index ffc1444676b..b263d4448d8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
@@ -31,6 +31,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import lombok.AllArgsConstructor;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
@@ -107,14 +108,17 @@ public class PersistentFailoverE2ETest extends 
BrokerTestBase {
         Integer pid = listener.activeQueue.take();
         assertNotNull(pid);
         assertEquals(partitionId, pid.intValue());
-        assertNull(listener.inActiveQueue.poll());
     }
 
     private void verifyConsumerInactive(TestConsumerStateEventListener 
listener, int partitionId) throws Exception {
         Integer pid = listener.inActiveQueue.take();
         assertNotNull(pid);
         assertEquals(partitionId, pid.intValue());
-        assertNull(listener.activeQueue.poll());
+    }
+
+    private void clearEventQueue(TestConsumerStateEventListener listener) {
+        listener.inActiveQueue.clear();
+        listener.activeQueue.clear();
     }
 
     private static class ActiveInactiveListenerEvent implements 
ConsumerEventListener {
@@ -135,27 +139,57 @@ public class PersistentFailoverE2ETest extends 
BrokerTestBase {
         }
     }
 
+    @AllArgsConstructor
+    static class FailoverConsumer {
+        private String consumerName;
+        private Consumer<byte[]> consumer;
+        private TestConsumerStateEventListener listener;
+        private PersistentDispatcherSingleActiveConsumer dispatcher;
+        private boolean isActiveConsumer(){
+            return 
dispatcher.getActiveConsumer().consumerName().equals(consumerName);
+        }
+    }
+
+    FailoverConsumer createConsumer(String topicName, String subName, String 
listenerName, String consumerName)
+            throws Exception {
+        TestConsumerStateEventListener listener = new 
TestConsumerStateEventListener(listenerName);
+        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
+                .acknowledgmentGroupTime(0, TimeUnit.SECONDS)
+                .subscriptionType(SubscriptionType.Failover)
+                .consumerName(consumerName)
+                .consumerEventListener(listener)
+                .subscribe();
+        PersistentDispatcherSingleActiveConsumer dispatcher =
+                (PersistentDispatcherSingleActiveConsumer) 
pulsar.getBrokerService()
+                .getTopic(topicName, false).get().get()
+                .getSubscription(subName)
+                .getDispatcher();
+        return new FailoverConsumer(consumerName, consumer, listener, 
dispatcher);
+    }
+
     @Test
     public void testSimpleConsumerEventsWithoutPartition() throws Exception {
         final String topicName = 
"persistent://prop/use/ns-abc/failover-topic1-" + System.currentTimeMillis();
         final String subName = "sub1";
         final int numMsgs = 100;
 
-        TestConsumerStateEventListener listener1 = new 
TestConsumerStateEventListener("listener-1");
-        TestConsumerStateEventListener listener2 = new 
TestConsumerStateEventListener("listener-2");
-        ConsumerBuilder<byte[]> consumerBuilder = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
-                .acknowledgmentGroupTime(0, 
TimeUnit.SECONDS).subscriptionType(SubscriptionType.Failover);
-
+        // 1. Registry two consumers.
+        FailoverConsumer failoverConsumer1 = createConsumer(topicName, 
subName, "l1", "c1");
+        FailoverConsumer failoverConsumer2 = createConsumer(topicName, 
subName, "l2", "c2");
+        FailoverConsumer firstConsumer;
+        FailoverConsumer secondConsumer;
+        if (failoverConsumer1.isActiveConsumer()){
+            firstConsumer = failoverConsumer1;
+            secondConsumer = failoverConsumer2;
+        } else {
+            firstConsumer = failoverConsumer2;
+            secondConsumer = failoverConsumer1;
+        }
 
-        // 1. two consumers on the same subscription
-        ConsumerBuilder<byte[]> consumerBulder1 = 
consumerBuilder.clone().consumerName("1")
-                .consumerEventListener(listener1);
-        Consumer<byte[]> consumer1 = consumerBulder1.subscribe();
-        Consumer<byte[]> consumer2 = 
consumerBuilder.clone().consumerName("2").consumerEventListener(listener2)
-                .subscribe();
-        verifyConsumerActive(listener1, -1);
-        verifyConsumerInactive(listener2, -1);
-        listener2.inActiveQueue.clear();
+        verifyConsumerActive(firstConsumer.listener, -1);
+        verifyConsumerInactive(secondConsumer.listener, -1);
+        clearEventQueue(firstConsumer.listener);
+        clearEventQueue(secondConsumer.listener);
 
         PersistentTopic topicRef = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
         PersistentSubscription subRef = topicRef.getSubscription(subName);
@@ -185,14 +219,14 @@ public class PersistentFailoverE2ETest extends 
BrokerTestBase {
             assertEquals(subRef.getNumberOfEntriesInBacklog(false), numMsgs);
         });
 
-        // 3. consumer1 should have all the messages while consumer2 should 
have no messages
+        // 3. firstConsumer should have all the messages while secondConsumer 
should have no messages
         Message<byte[]> msg = null;
-        Assert.assertNull(consumer2.receive(100, TimeUnit.MILLISECONDS));
+        Assert.assertNull(secondConsumer.consumer.receive(100, 
TimeUnit.MILLISECONDS));
         for (int i = 0; i < numMsgs; i++) {
-            msg = consumer1.receive(1, TimeUnit.SECONDS);
+            msg = firstConsumer.consumer.receive(1, TimeUnit.SECONDS);
             Assert.assertNotNull(msg);
             Assert.assertEquals(new String(msg.getData()), "my-message-" + i);
-            consumer1.acknowledge(msg);
+            firstConsumer.consumer.acknowledge(msg);
         }
 
         rolloverPerIntervalStats();
@@ -211,51 +245,52 @@ public class PersistentFailoverE2ETest extends 
BrokerTestBase {
 
         // 5. master consumer failure should resend unacked messages and new 
messages to another consumer
         for (int i = 0; i < 5; i++) {
-            msg = consumer1.receive(1, TimeUnit.SECONDS);
+            msg = firstConsumer.consumer.receive(1, TimeUnit.SECONDS);
             Assert.assertNotNull(msg);
             Assert.assertEquals(new String(msg.getData()), "my-message-" + i);
-            consumer1.acknowledge(msg);
+            firstConsumer.consumer.acknowledge(msg);
         }
         for (int i = 5; i < 10; i++) {
-            msg = consumer1.receive(1, TimeUnit.SECONDS);
+            msg = firstConsumer.consumer.receive(1, TimeUnit.SECONDS);
             Assert.assertNotNull(msg);
             Assert.assertEquals(new String(msg.getData()), "my-message-" + i);
             // do not ack
         }
-        consumer1.close();
+        firstConsumer.consumer.close();
 
         Awaitility.await().untilAsserted(() -> {
-            verifyConsumerActive(listener2, -1);
-            verifyConsumerNotReceiveAnyStateChanges(listener1);
+            verifyConsumerActive(secondConsumer.listener, -1);
+            verifyConsumerNotReceiveAnyStateChanges(firstConsumer.listener);
+            clearEventQueue(firstConsumer.listener);
+            clearEventQueue(secondConsumer.listener);
         });
 
         for (int i = 5; i < numMsgs; i++) {
-            msg = consumer2.receive(1, TimeUnit.SECONDS);
+            msg = secondConsumer.consumer.receive(1, TimeUnit.SECONDS);
             Assert.assertNotNull(msg);
             Assert.assertEquals(new String(msg.getData()), "my-message-" + i);
-            consumer2.acknowledge(msg);
+            secondConsumer.consumer.acknowledge(msg);
         }
-        Assert.assertNull(consumer2.receive(100, TimeUnit.MILLISECONDS));
+        Assert.assertNull(secondConsumer.consumer.receive(100, 
TimeUnit.MILLISECONDS));
 
         rolloverPerIntervalStats();
         Awaitility.await().untilAsserted(() -> {
             assertEquals(subRef.getNumberOfEntriesInBacklog(false), 0);
-
         });
 
         // 8. unsubscribe not allowed if multiple consumers connected
         try {
-            consumer1.unsubscribe();
+            firstConsumer.consumer.unsubscribe();
             fail("should fail");
         } catch (PulsarClientException e) {
             // ok
         }
 
-        // 9. unsubscribe allowed if there is a lone consumer
-        consumer1.close();
+        // 9. unsubscribe allowed if there is alone consumer
+        firstConsumer.consumer.close();
         Thread.sleep(CONSUMER_ADD_OR_REMOVE_WAIT_TIME);
         try {
-            consumer2.unsubscribe();
+            secondConsumer.consumer.unsubscribe();
         } catch (PulsarClientException e) {
             fail("Should not fail", e);
         }
@@ -265,8 +300,7 @@ public class PersistentFailoverE2ETest extends 
BrokerTestBase {
         });
 
         producer.close();
-        consumer2.close();
-
+        secondConsumer.consumer.close();
         admin.topics().delete(topicName);
     }
 

Reply via email to