This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 02b25a3b246 [fix] [test] fix flaky test
PersistentFailoverE2ETest.testSimpleConsumerEventsWithPartition (#19574)
02b25a3b246 is described below
commit 02b25a3b2461b975b46afdca4130cdba876721c0
Author: fengyubiao <[email protected]>
AuthorDate: Tue Feb 21 17:10:35 2023 +0800
[fix] [test] fix flaky test
PersistentFailoverE2ETest.testSimpleConsumerEventsWithPartition (#19574)
---
.../PersistentDispatcherFailoverConsumerTest.java | 80 ++++++++++++----
.../broker/service/PersistentFailoverE2ETest.java | 106 ++++++++++++++-------
2 files changed, 131 insertions(+), 55 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index 29a3227c92b..0dafe4bca3e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -37,6 +37,8 @@ import static org.testng.AssertJUnit.assertSame;
import static org.testng.AssertJUnit.assertTrue;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.ArrayList;
@@ -47,6 +49,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
@@ -74,6 +77,8 @@ import
org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.netty.EventLoopUtil;
+import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -399,9 +404,54 @@ public class PersistentDispatcherFailoverConsumerTest {
assertTrue(pdfc.canUnsubscribe(consumer1));
}
- @Test
+ private String[] sortConsumerNameByHashSelector(String...consumerNames)
throws Exception {
+ String[] result = new String[consumerNames.length];
+ PersistentTopic topic =
+ new PersistentTopic(successTopicName, ledgerMock,
pulsarTestContext.getBrokerService());
+ PersistentSubscription sub = new PersistentSubscription(topic,
"sub-1", cursorMock, false);
+ int partitionIndex = -1;
+ PersistentDispatcherSingleActiveConsumer dispatcher = new
PersistentDispatcherSingleActiveConsumer(cursorMock,
+ SubType.Failover, partitionIndex, topic, sub);
+ for (String consumerName : consumerNames){
+ Consumer consumer = spy(new Consumer(sub, SubType.Failover,
topic.getName(), 999 /* consumer id */, 1,
+ consumerName/* consumer name */, true, serverCnx,
"myrole-1", Collections.emptyMap(),
+ false /* read compacted */, null, MessageId.latest,
DEFAULT_CONSUMER_EPOCH));
+ dispatcher.addConsumer(consumer);
+ }
+ for (int i = 0; i < consumerNames.length; i++) {
+ result[i] = dispatcher.getActiveConsumer().consumerName();
+ dispatcher.removeConsumer(dispatcher.getActiveConsumer());
+ }
+ consumerChanges.clear();
+ return result;
+ }
+
+ private CommandActiveConsumerChange waitActiveChangeEvent(int consumerId)
+ throws Exception {
+ AtomicReference<CommandActiveConsumerChange> res = new
AtomicReference<>();
+ Awaitility.await().until(() -> {
+ while (!consumerChanges.isEmpty()){
+ CommandActiveConsumerChange change = consumerChanges.take();
+ if (change.getConsumerId() == consumerId){
+ res.set(change);
+ return true;
+ }
+ }
+ return false;
+ });
+ consumerChanges.clear();
+ return res.get();
+ }
+
+ @Test(invocationCount = 100)
public void testAddRemoveConsumerNonPartitionedTopic() throws Exception {
log.info("--- Starting
PersistentDispatcherFailoverConsumerTest::testAddConsumer ---");
+ String[] sortedConsumerNameByHashSelector =
sortConsumerNameByHashSelector("Cons1", "Cons2");
+ BrokerService spyBrokerService =
spy(pulsarTestContext.getBrokerService());
+ final EventLoopGroup singleEventLoopGroup =
EventLoopUtil.newEventLoopGroup(1,
+
pulsarTestContext.getBrokerService().getPulsar().getConfig().isEnableBusyWait(),
+ new DefaultThreadFactory("pulsar-io"));
+ doAnswer(invocation ->
singleEventLoopGroup).when(spyBrokerService).executor();
PersistentTopic topic =
new PersistentTopic(successTopicName, ledgerMock,
pulsarTestContext.getBrokerService());
@@ -417,23 +467,26 @@ public class PersistentDispatcherFailoverConsumerTest {
// 2. Add a consumer
Consumer consumer1 = spy(new Consumer(sub, SubType.Failover,
topic.getName(), 1 /* consumer id */, 1,
- "Cons1"/* consumer name */, true, serverCnx, "myrole-1",
Collections.emptyMap(),
+ sortedConsumerNameByHashSelector[0]/* consumer name */,
+ true, serverCnx, "myrole-1", Collections.emptyMap(),
false /* read compacted */, null, MessageId.latest,
DEFAULT_CONSUMER_EPOCH));
pdfc.addConsumer(consumer1);
List<Consumer> consumers = pdfc.getConsumers();
assertEquals(1, consumers.size());
assertSame(pdfc.getActiveConsumer().consumerName(),
consumer1.consumerName());
+ waitActiveChangeEvent(1);
// 3. Add a consumer with same priority level and consumer name is
smaller in lexicographic order.
Consumer consumer2 = spy(new Consumer(sub, SubType.Failover,
topic.getName(), 2 /* consumer id */, 1,
- "Cons2"/* consumer name */, true, serverCnx, "myrole-1",
Collections.emptyMap(),
+ sortedConsumerNameByHashSelector[1]/* consumer name */,
+ true, serverCnx, "myrole-1", Collections.emptyMap(),
false /* read compacted */, null, MessageId.latest,
DEFAULT_CONSUMER_EPOCH));
pdfc.addConsumer(consumer2);
// 4. Verify active consumer doesn't change
consumers = pdfc.getConsumers();
assertEquals(2, consumers.size());
- CommandActiveConsumerChange change = consumerChanges.take();
+ CommandActiveConsumerChange change = waitActiveChangeEvent(2);
verifyActiveConsumerChange(change, 2, false);
assertSame(pdfc.getActiveConsumer().consumerName(),
consumer1.consumerName());
verify(consumer2,
times(1)).notifyActiveConsumerChange(same(consumer1));
@@ -444,21 +497,10 @@ public class PersistentDispatcherFailoverConsumerTest {
pdfc.addConsumer(consumer3);
consumers = pdfc.getConsumers();
assertEquals(3, consumers.size());
- change = consumerChanges.take();
- verifyActiveConsumerChange(change, 3, false);
- assertSame(pdfc.getActiveConsumer().consumerName(),
consumer1.consumerName());
- verify(consumer3,
times(1)).notifyActiveConsumerChange(same(consumer1));
-
- // 7. Remove first consumer and active consumer should change to
consumer2 since it's added before consumer3
- // though consumer 3 has higher priority level
- pdfc.removeConsumer(consumer1);
- consumers = pdfc.getConsumers();
- assertEquals(2, consumers.size());
- change = consumerChanges.take();
- verifyActiveConsumerChange(change, 2, true);
- assertSame(pdfc.getActiveConsumer().consumerName(),
consumer2.consumerName());
- verify(consumer2,
times(1)).notifyActiveConsumerChange(same(consumer2));
- verify(consumer3,
times(1)).notifyActiveConsumerChange(same(consumer2));
+ change = waitActiveChangeEvent(3);
+ verifyActiveConsumerChange(change, 3, true);
+ assertSame(pdfc.getActiveConsumer().consumerName(),
consumer3.consumerName());
+ verify(consumer3,
times(1)).notifyActiveConsumerChange(same(consumer3));
}
@Test
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
index ffc1444676b..b263d4448d8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
@@ -31,6 +31,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import lombok.AllArgsConstructor;
import org.apache.pulsar.broker.BrokerTestUtil;
import
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
@@ -107,14 +108,17 @@ public class PersistentFailoverE2ETest extends
BrokerTestBase {
Integer pid = listener.activeQueue.take();
assertNotNull(pid);
assertEquals(partitionId, pid.intValue());
- assertNull(listener.inActiveQueue.poll());
}
private void verifyConsumerInactive(TestConsumerStateEventListener
listener, int partitionId) throws Exception {
Integer pid = listener.inActiveQueue.take();
assertNotNull(pid);
assertEquals(partitionId, pid.intValue());
- assertNull(listener.activeQueue.poll());
+ }
+
+ private void clearEventQueue(TestConsumerStateEventListener listener) {
+ listener.inActiveQueue.clear();
+ listener.activeQueue.clear();
}
private static class ActiveInactiveListenerEvent implements
ConsumerEventListener {
@@ -135,27 +139,57 @@ public class PersistentFailoverE2ETest extends
BrokerTestBase {
}
}
+ @AllArgsConstructor
+ static class FailoverConsumer {
+ private String consumerName;
+ private Consumer<byte[]> consumer;
+ private TestConsumerStateEventListener listener;
+ private PersistentDispatcherSingleActiveConsumer dispatcher;
+ private boolean isActiveConsumer(){
+ return
dispatcher.getActiveConsumer().consumerName().equals(consumerName);
+ }
+ }
+
+ FailoverConsumer createConsumer(String topicName, String subName, String
listenerName, String consumerName)
+ throws Exception {
+ TestConsumerStateEventListener listener = new
TestConsumerStateEventListener(listenerName);
+ Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
+ .acknowledgmentGroupTime(0, TimeUnit.SECONDS)
+ .subscriptionType(SubscriptionType.Failover)
+ .consumerName(consumerName)
+ .consumerEventListener(listener)
+ .subscribe();
+ PersistentDispatcherSingleActiveConsumer dispatcher =
+ (PersistentDispatcherSingleActiveConsumer)
pulsar.getBrokerService()
+ .getTopic(topicName, false).get().get()
+ .getSubscription(subName)
+ .getDispatcher();
+ return new FailoverConsumer(consumerName, consumer, listener,
dispatcher);
+ }
+
@Test
public void testSimpleConsumerEventsWithoutPartition() throws Exception {
final String topicName =
"persistent://prop/use/ns-abc/failover-topic1-" + System.currentTimeMillis();
final String subName = "sub1";
final int numMsgs = 100;
- TestConsumerStateEventListener listener1 = new
TestConsumerStateEventListener("listener-1");
- TestConsumerStateEventListener listener2 = new
TestConsumerStateEventListener("listener-2");
- ConsumerBuilder<byte[]> consumerBuilder =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
- .acknowledgmentGroupTime(0,
TimeUnit.SECONDS).subscriptionType(SubscriptionType.Failover);
-
+ // 1. Registry two consumers.
+ FailoverConsumer failoverConsumer1 = createConsumer(topicName,
subName, "l1", "c1");
+ FailoverConsumer failoverConsumer2 = createConsumer(topicName,
subName, "l2", "c2");
+ FailoverConsumer firstConsumer;
+ FailoverConsumer secondConsumer;
+ if (failoverConsumer1.isActiveConsumer()){
+ firstConsumer = failoverConsumer1;
+ secondConsumer = failoverConsumer2;
+ } else {
+ firstConsumer = failoverConsumer2;
+ secondConsumer = failoverConsumer1;
+ }
- // 1. two consumers on the same subscription
- ConsumerBuilder<byte[]> consumerBulder1 =
consumerBuilder.clone().consumerName("1")
- .consumerEventListener(listener1);
- Consumer<byte[]> consumer1 = consumerBulder1.subscribe();
- Consumer<byte[]> consumer2 =
consumerBuilder.clone().consumerName("2").consumerEventListener(listener2)
- .subscribe();
- verifyConsumerActive(listener1, -1);
- verifyConsumerInactive(listener2, -1);
- listener2.inActiveQueue.clear();
+ verifyConsumerActive(firstConsumer.listener, -1);
+ verifyConsumerInactive(secondConsumer.listener, -1);
+ clearEventQueue(firstConsumer.listener);
+ clearEventQueue(secondConsumer.listener);
PersistentTopic topicRef = (PersistentTopic)
pulsar.getBrokerService().getTopicReference(topicName).get();
PersistentSubscription subRef = topicRef.getSubscription(subName);
@@ -185,14 +219,14 @@ public class PersistentFailoverE2ETest extends
BrokerTestBase {
assertEquals(subRef.getNumberOfEntriesInBacklog(false), numMsgs);
});
- // 3. consumer1 should have all the messages while consumer2 should
have no messages
+ // 3. firstConsumer should have all the messages while secondConsumer
should have no messages
Message<byte[]> msg = null;
- Assert.assertNull(consumer2.receive(100, TimeUnit.MILLISECONDS));
+ Assert.assertNull(secondConsumer.consumer.receive(100,
TimeUnit.MILLISECONDS));
for (int i = 0; i < numMsgs; i++) {
- msg = consumer1.receive(1, TimeUnit.SECONDS);
+ msg = firstConsumer.consumer.receive(1, TimeUnit.SECONDS);
Assert.assertNotNull(msg);
Assert.assertEquals(new String(msg.getData()), "my-message-" + i);
- consumer1.acknowledge(msg);
+ firstConsumer.consumer.acknowledge(msg);
}
rolloverPerIntervalStats();
@@ -211,51 +245,52 @@ public class PersistentFailoverE2ETest extends
BrokerTestBase {
// 5. master consumer failure should resend unacked messages and new
messages to another consumer
for (int i = 0; i < 5; i++) {
- msg = consumer1.receive(1, TimeUnit.SECONDS);
+ msg = firstConsumer.consumer.receive(1, TimeUnit.SECONDS);
Assert.assertNotNull(msg);
Assert.assertEquals(new String(msg.getData()), "my-message-" + i);
- consumer1.acknowledge(msg);
+ firstConsumer.consumer.acknowledge(msg);
}
for (int i = 5; i < 10; i++) {
- msg = consumer1.receive(1, TimeUnit.SECONDS);
+ msg = firstConsumer.consumer.receive(1, TimeUnit.SECONDS);
Assert.assertNotNull(msg);
Assert.assertEquals(new String(msg.getData()), "my-message-" + i);
// do not ack
}
- consumer1.close();
+ firstConsumer.consumer.close();
Awaitility.await().untilAsserted(() -> {
- verifyConsumerActive(listener2, -1);
- verifyConsumerNotReceiveAnyStateChanges(listener1);
+ verifyConsumerActive(secondConsumer.listener, -1);
+ verifyConsumerNotReceiveAnyStateChanges(firstConsumer.listener);
+ clearEventQueue(firstConsumer.listener);
+ clearEventQueue(secondConsumer.listener);
});
for (int i = 5; i < numMsgs; i++) {
- msg = consumer2.receive(1, TimeUnit.SECONDS);
+ msg = secondConsumer.consumer.receive(1, TimeUnit.SECONDS);
Assert.assertNotNull(msg);
Assert.assertEquals(new String(msg.getData()), "my-message-" + i);
- consumer2.acknowledge(msg);
+ secondConsumer.consumer.acknowledge(msg);
}
- Assert.assertNull(consumer2.receive(100, TimeUnit.MILLISECONDS));
+ Assert.assertNull(secondConsumer.consumer.receive(100,
TimeUnit.MILLISECONDS));
rolloverPerIntervalStats();
Awaitility.await().untilAsserted(() -> {
assertEquals(subRef.getNumberOfEntriesInBacklog(false), 0);
-
});
// 8. unsubscribe not allowed if multiple consumers connected
try {
- consumer1.unsubscribe();
+ firstConsumer.consumer.unsubscribe();
fail("should fail");
} catch (PulsarClientException e) {
// ok
}
- // 9. unsubscribe allowed if there is a lone consumer
- consumer1.close();
+ // 9. unsubscribe allowed if there is alone consumer
+ firstConsumer.consumer.close();
Thread.sleep(CONSUMER_ADD_OR_REMOVE_WAIT_TIME);
try {
- consumer2.unsubscribe();
+ secondConsumer.consumer.unsubscribe();
} catch (PulsarClientException e) {
fail("Should not fail", e);
}
@@ -265,8 +300,7 @@ public class PersistentFailoverE2ETest extends
BrokerTestBase {
});
producer.close();
- consumer2.close();
-
+ secondConsumer.consumer.close();
admin.topics().delete(topicName);
}