This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 29db8f84e5f [fix] [broker] Make the new exclusive consumer instead the 
inactive one faster (#21183)
29db8f84e5f is described below

commit 29db8f84e5f0f99b110d62090ab670c59bf4638a
Author: fengyubiao <yubiao.f...@streamnative.io>
AuthorDate: Mon Oct 30 18:57:49 2023 +0800

    [fix] [broker] Make the new exclusive consumer instead the inactive one 
faster (#21183)
    
    ### Motivation
    
    There is an issue similar to the 
https://github.com/apache/pulsar/pull/21155 fixed one.
    
    The client assumed the connection was inactive, but the Broker assumed the 
connection was fine. The Client tried to  use a new connection to reconnect an 
exclusive consumer, then got an error `Exclusive consumer is already connected`
    
    ### Modifications
    
    - Check the connection of the old consumer is available when the new one 
tries to subscribe
---
 .../AbstractDispatcherSingleActiveConsumer.java    |  17 ++-
 .../pulsar/broker/service/PersistentTopicTest.java |  23 ++--
 .../pulsar/broker/service/ServerCnxTest.java       | 150 +++++++++++++++++++--
 .../broker/service/utils/ClientChannelHelper.java  |  12 ++
 4 files changed, 180 insertions(+), 22 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index 5098890242b..310354dcd3b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -166,7 +166,22 @@ public abstract class 
AbstractDispatcherSingleActiveConsumer extends AbstractBas
         }
 
         if (subscriptionType == SubType.Exclusive && !consumers.isEmpty()) {
-            return FutureUtil.failedFuture(new 
ConsumerBusyException("Exclusive consumer is already connected"));
+            Consumer actConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+            if (actConsumer != null) {
+                return 
actConsumer.cnx().checkConnectionLiveness().thenCompose(actConsumerStillAlive 
-> {
+                    if (actConsumerStillAlive == null || 
actConsumerStillAlive) {
+                        return FutureUtil.failedFuture(new 
ConsumerBusyException("Exclusive consumer is already"
+                                + " connected"));
+                    } else {
+                        return addConsumer(consumer);
+                    }
+                });
+            } else {
+                // It should never happen.
+
+                return FutureUtil.failedFuture(new 
ConsumerBusyException("Active consumer is in a strange state."
+                        + " Active consumer is null, but there are " + 
consumers.size() + " registered."));
+            }
         }
 
         if (subscriptionType == SubType.Failover && 
isConsumersExceededOnSubscription()) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 51d3ef4577f..a2b42b5cca0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -209,6 +209,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         doReturn(eventLoopGroup.next()).when(channel).eventLoop();
         doReturn(channel).when(ctx).channel();
         doReturn(ctx).when(serverCnx).ctx();
+        
doReturn(CompletableFuture.completedFuture(true)).when(serverCnx).checkConnectionLiveness();
 
         NamespaceService nsSvc = mock(NamespaceService.class);
         NamespaceBundle bundle = mock(NamespaceBundle.class);
@@ -693,7 +694,15 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         f1.get();
 
         // 2. duplicate subscribe
-        Future<Consumer> f2 = topic.subscribe(getSubscriptionOption(cmd));
+        CommandSubscribe cmd2 = new CommandSubscribe()
+                .setConsumerId(2)
+                .setTopic(successTopicName)
+                .setSubscription(successSubName)
+                .setConsumerName("consumer-name")
+                .setReadCompacted(false)
+                .setRequestId(2)
+                .setSubType(SubType.Exclusive);
+        Future<Consumer> f2 = topic.subscribe(getSubscriptionOption(cmd2));
         try {
             f2.get();
             fail("should fail with exception");
@@ -758,19 +767,11 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         sub.addConsumer(consumer);
         assertTrue(sub.getDispatcher().isConsumerConnected());
 
-        // 2. duplicate add consumer
-        try {
-            sub.addConsumer(consumer).get();
-            fail("Should fail with ConsumerBusyException");
-        } catch (Exception e) {
-            assertTrue(e.getCause() instanceof 
BrokerServiceException.ConsumerBusyException);
-        }
-
-        // 3. simple remove consumer
+        // 2. simple remove consumer
         sub.removeConsumer(consumer);
         assertFalse(sub.getDispatcher().isConsumerConnected());
 
-        // 4. duplicate remove consumer
+        // 3. duplicate remove consumer
         try {
             sub.removeConsumer(consumer);
             fail("Should fail with ServerMetadataException");
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 5fd48819813..79178ec491f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -45,6 +45,7 @@ import static org.testng.Assert.assertTrue;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandler;
+import io.netty.channel.DefaultChannelId;
 import io.netty.channel.embedded.EmbeddedChannel;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import io.vertx.core.impl.ConcurrentHashSet;
@@ -52,6 +53,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.nio.charset.StandardCharsets;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -62,10 +64,15 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
@@ -99,6 +106,7 @@ import 
org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.utils.ClientChannelHelper;
 import org.apache.pulsar.client.api.ProducerAccessMode;
 import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.util.ConsumerName;
 import org.apache.pulsar.common.api.AuthData;
 import org.apache.pulsar.common.api.proto.AuthMethod;
 import org.apache.pulsar.common.api.proto.BaseCommand;
@@ -997,7 +1005,8 @@ public class ServerCnxTest {
         channelsStoppedAnswerHealthCheck.add(channel);
         ClientChannel channel2 = new ClientChannel();
         setChannelConnected(channel2.serverCnx);
-        Awaitility.await().untilAsserted(() -> {
+        Awaitility.await().atMost(Duration.ofSeconds(15)).untilAsserted(() -> {
+            channel.runPendingTasks();
             ByteBuf cmdProducer2 = Commands.newProducer(tName, producerId, 
requestId.incrementAndGet(),
                     pName, false, metadata, null, epoch.incrementAndGet(), 
false,
                     ProducerAccessMode.Shared, Optional.empty(), false);
@@ -1011,10 +1020,132 @@ public class ServerCnxTest {
         channel2.close();
     }
 
+    @Test
+    public void testHandleConsumerAfterClientChannelInactive() throws 
Exception {
+        final String tName = successTopicName;
+        final long consumerId = 1;
+        final MutableInt requestId = new MutableInt(1);
+        final String sName = successSubName;
+        final String cName1 = ConsumerName.generateRandomName();
+        final String cName2 = ConsumerName.generateRandomName();
+        resetChannel();
+        setChannelConnected();
+
+        // The producer register using the first connection.
+        ByteBuf cmdSubscribe1 = Commands.newSubscribe(tName, sName, 
consumerId, requestId.incrementAndGet(),
+                SubType.Exclusive, 0, cName1, 0);
+        channel.writeInbound(cmdSubscribe1);
+        assertTrue(getResponse() instanceof CommandSuccess);
+        PersistentTopic topicRef = (PersistentTopic) 
brokerService.getTopicReference(tName).get();
+        assertNotNull(topicRef);
+        assertNotNull(topicRef.getSubscription(sName).getConsumers());
+        assertEquals(topicRef.getSubscription(sName).getConsumers().size(), 1);
+        
assertEquals(topicRef.getSubscription(sName).getConsumers().iterator().next().consumerName(),
 cName1);
+
+        // Verify the second producer using a new connection will override the 
consumer who using a stopped channel.
+        channelsStoppedAnswerHealthCheck.add(channel);
+        ClientChannel channel2 = new ClientChannel();
+        setChannelConnected(channel2.serverCnx);
+        ByteBuf cmdSubscribe2 = Commands.newSubscribe(tName, sName, 
consumerId, requestId.incrementAndGet(),
+                CommandSubscribe.SubType.Exclusive, 0, cName2, 0);
+        channel2.channel.writeInbound(cmdSubscribe2);
+        BackGroundExecutor backGroundExecutor = 
startBackgroundExecutorForEmbeddedChannel(channel);
+
+        assertTrue(getResponse(channel2.channel, channel2.clientChannelHelper) 
instanceof CommandSuccess);
+        assertEquals(topicRef.getSubscription(sName).getConsumers().size(), 1);
+        
assertEquals(topicRef.getSubscription(sName).getConsumers().iterator().next().consumerName(),
 cName2);
+        backGroundExecutor.close();
+
+        // cleanup.
+        channel.finish();
+        channel2.close();
+    }
+
+    @Test
+    public void test2ndSubFailedIfDisabledConCheck()
+            throws Exception {
+        final String tName = successTopicName;
+        final long consumerId = 1;
+        final MutableInt requestId = new MutableInt(1);
+        final String sName = successSubName;
+        final String cName1 = ConsumerName.generateRandomName();
+        final String cName2 = ConsumerName.generateRandomName();
+        // Disabled connection check.
+        pulsar.getConfig().setConnectionLivenessCheckTimeoutMillis(-1);
+        resetChannel();
+        setChannelConnected();
+
+        // The consumer register using the first connection.
+        ByteBuf cmdSubscribe1 = Commands.newSubscribe(tName, sName, 
consumerId, requestId.incrementAndGet(),
+                SubType.Exclusive, 0, cName1, 0);
+        channel.writeInbound(cmdSubscribe1);
+        assertTrue(getResponse() instanceof CommandSuccess);
+        PersistentTopic topicRef = (PersistentTopic) 
brokerService.getTopicReference(tName).orElse(null);
+        assertNotNull(topicRef);
+        assertNotNull(topicRef.getSubscription(sName).getConsumers());
+        
assertEquals(topicRef.getSubscription(sName).getConsumers().stream().map(Consumer::consumerName)
+                .collect(Collectors.toList()), 
Collections.singletonList(cName1));
+
+        // Verify the consumer using a new connection will override the 
consumer who using a stopped channel.
+        channelsStoppedAnswerHealthCheck.add(channel);
+        ClientChannel channel2 = new ClientChannel();
+        setChannelConnected(channel2.serverCnx);
+        ByteBuf cmdSubscribe2 = Commands.newSubscribe(tName, sName, 
consumerId, requestId.incrementAndGet(),
+                CommandSubscribe.SubType.Exclusive, 0, cName2, 0);
+        channel2.channel.writeInbound(cmdSubscribe2);
+        BackGroundExecutor backGroundExecutor = 
startBackgroundExecutorForEmbeddedChannel(channel);
+
+        // Since the feature "ConnectionLiveness" has been disabled, the fix
+        // by https://github.com/apache/pulsar/pull/21183 will not be 
affected, so the client will still get an error.
+        Object responseOfConnection2 = getResponse(channel2.channel, 
channel2.clientChannelHelper);
+        assertTrue(responseOfConnection2 instanceof CommandError);
+        assertTrue(((CommandError) responseOfConnection2).getMessage()
+                .contains("Exclusive consumer is already connected"));
+        assertEquals(topicRef.getSubscription(sName).getConsumers().size(), 1);
+        
assertEquals(topicRef.getSubscription(sName).getConsumers().iterator().next().consumerName(),
 cName1);
+        backGroundExecutor.close();
+
+        // cleanup.
+        channel.finish();
+        channel2.close();
+        // Reset configuration.
+        pulsar.getConfig().setConnectionLivenessCheckTimeoutMillis(5000);
+    }
+
+    /**
+     * When a channel typed "EmbeddedChannel", once we call 
channel.execute(runnable), there is no background thread
+     * to run it.
+     * So starting a background thread to trigger the tasks in the queue.
+     */
+    private BackGroundExecutor startBackgroundExecutorForEmbeddedChannel(final 
EmbeddedChannel channel) {
+        ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
+        ScheduledFuture scheduledFuture = executor.scheduleWithFixedDelay(() 
-> {
+            channel.runPendingTasks();
+        }, 100, 100, TimeUnit.MILLISECONDS);
+        return new BackGroundExecutor(executor, scheduledFuture);
+    }
+
+    @AllArgsConstructor
+    private static class BackGroundExecutor implements Closeable {
+
+        private ScheduledExecutorService executor;
+
+        private ScheduledFuture scheduledFuture;
+
+        @Override
+        public void close() throws IOException {
+            if (scheduledFuture != null) {
+                scheduledFuture.cancel(true);
+            }
+            executor.shutdown();
+        }
+    }
+
     private class ClientChannel implements Closeable {
         private ClientChannelHelper clientChannelHelper = new 
ClientChannelHelper();
         private ServerCnx serverCnx = new ServerCnx(pulsar);
-        private EmbeddedChannel channel = new EmbeddedChannel(new 
LengthFieldBasedFrameDecoder(
+        private EmbeddedChannel channel = new 
EmbeddedChannel(DefaultChannelId.newInstance(),
+                new LengthFieldBasedFrameDecoder(
                 5 * 1024 * 1024,
                 0,
                 4,
@@ -1810,9 +1941,11 @@ public class ServerCnxTest {
                 "test" /* consumer name */, 0 /* avoid reseting cursor */);
         channel.writeInbound(clientCommand);
 
+        BackGroundExecutor backGroundExecutor = 
startBackgroundExecutorForEmbeddedChannel(channel);
+
         // Create producer second time
         clientCommand = Commands.newSubscribe(successTopicName, //
-                successSubName, 2 /* consumer id */, 1 /* request id */, 
SubType.Exclusive, 0,
+                successSubName, 2 /* consumer id */, 2 /* request id */, 
SubType.Exclusive, 0,
                 "test" /* consumer name */, 0 /* avoid reseting cursor */);
         channel.writeInbound(clientCommand);
 
@@ -1822,6 +1955,9 @@ public class ServerCnxTest {
             CommandError error = (CommandError) response;
             assertEquals(error.getError(), ServerError.ConsumerBusy);
         });
+
+        // cleanup.
+        backGroundExecutor.close();
         channel.finish();
     }
 
@@ -2676,13 +2812,7 @@ public class ServerCnxTest {
                     if (channelsStoppedAnswerHealthCheck.contains(channel)) {
                         continue;
                     }
-                    
channel.writeAndFlush(Commands.newPong()).addListener(future -> {
-                        if (!future.isSuccess()) {
-                            log.warn("[{}] Forcing connection to close since 
cannot send a pong message.",
-                                    channel, future.cause());
-                            channel.close();
-                        }
-                    });
+                    channel.writeInbound(Commands.newPong());
                     continue;
                 }
                 return cmd;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
index bf0dd3aa9c1..c8fce32efc5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
@@ -27,6 +27,8 @@ import 
org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscriptionResponse;
 import org.apache.pulsar.common.api.proto.CommandEndTxnResponse;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespaceResponse;
 import 
org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse;
+import org.apache.pulsar.common.api.proto.CommandPing;
+import org.apache.pulsar.common.api.proto.CommandPong;
 import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
 import org.apache.pulsar.common.protocol.PulsarDecoder;
 import org.apache.pulsar.common.api.proto.CommandAck;
@@ -207,6 +209,16 @@ public class ClientChannelHelper {
                 CommandEndTxnOnSubscriptionResponse 
commandEndTxnOnSubscriptionResponse) {
             queue.offer(new 
CommandEndTxnOnSubscriptionResponse().copyFrom(commandEndTxnOnSubscriptionResponse));
         }
+
+        @Override
+        protected void handlePing(CommandPing ping) {
+            queue.offer(new CommandPing().copyFrom(ping));
+        }
+
+        @Override
+        protected void handlePong(CommandPong pong) {
+            return;
+        }
     };
 
 }

Reply via email to