This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 2d162ba3333 [fix][broker]A failed consumer/producer future in 
ServerCnx can never be removed (#23123)
2d162ba3333 is described below

commit 2d162ba333385917457e47110ad8ae8aa825d459
Author: fengyubiao <[email protected]>
AuthorDate: Tue Aug 6 10:16:33 2024 +0800

    [fix][broker]A failed consumer/producer future in ServerCnx can never be 
removed (#23123)
    
    (cherry picked from commit 114880b1428ac1f6bbd97c43a26d4fa313a87b96)
---
 .../apache/pulsar/broker/service/ServerCnx.java    | 27 ++++++++--
 .../broker/service/ServerCnxNonInjectionTest.java  | 62 ++++++++++++++++++++++
 .../client/impl/BrokerClientIntegrationTest.java   | 58 ++++++++++++--------
 3 files changed, 121 insertions(+), 26 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index a5c37f27845..1352be7a513 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -3531,18 +3531,39 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
     @Override
     public CompletableFuture<Boolean> checkConnectionLiveness() {
+        if (!isActive()) {
+            return CompletableFuture.completedFuture(false);
+        }
         if (connectionLivenessCheckTimeoutMillis > 0) {
             return 
NettyFutureUtil.toCompletableFuture(ctx.executor().submit(() -> {
+                if (!isActive()) {
+                    return CompletableFuture.completedFuture(false);
+                }
                 if (connectionCheckInProgress != null) {
                     return connectionCheckInProgress;
                 } else {
-                    final CompletableFuture<Boolean> 
finalConnectionCheckInProgress = new CompletableFuture<>();
+                    final CompletableFuture<Boolean> 
finalConnectionCheckInProgress =
+                            new CompletableFuture<>();
                     connectionCheckInProgress = finalConnectionCheckInProgress;
                     ctx.executor().schedule(() -> {
-                        if (finalConnectionCheckInProgress == 
connectionCheckInProgress
-                                && !finalConnectionCheckInProgress.isDone()) {
+                        if (!isActive()) {
+                            finalConnectionCheckInProgress.complete(false);
+                            return;
+                        }
+                        if (finalConnectionCheckInProgress.isDone()) {
+                            return;
+                        }
+                        if (finalConnectionCheckInProgress == 
connectionCheckInProgress) {
+                            /**
+                             * {@link #connectionCheckInProgress} will be 
completed when
+                             * {@link #channelInactive(ChannelHandlerContext)} 
event occurs, so skip set it here.
+                             */
                             log.warn("[{}] Connection check timed out. Closing 
connection.", this.toString());
                             ctx.close();
+                        } else {
+                            log.error("[{}] Reached unexpected code block. 
Completing connection check.",
+                                    this.toString());
+                            finalConnectionCheckInProgress.complete(true);
                         }
                     }, connectionLivenessCheckTimeoutMillis, 
TimeUnit.MILLISECONDS);
                     sendPing();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxNonInjectionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxNonInjectionTest.java
new file mode 100644
index 00000000000..3acc941a2c8
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxNonInjectionTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.service;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.Schema;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class ServerCnxNonInjectionTest extends ProducerConsumerBase {
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterClass
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test(timeOut = 60 * 1000)
+    public void testCheckConnectionLivenessAfterClosed() throws Exception {
+        // Create a ServerCnx
+        final String tp = BrokerTestUtil.newUniqueName("public/default/tp");
+        Producer<String> p = 
pulsarClient.newProducer(Schema.STRING).topic(tp).create();
+        ServerCnx serverCnx = (ServerCnx) 
pulsar.getBrokerService().getTopic(tp, false).join().get()
+                        .getProducers().values().iterator().next().getCnx();
+        // Call "CheckConnectionLiveness" after serverCnx is closed. The 
resulted future should be done eventually.
+        p.close();
+        serverCnx.close();
+        Thread.sleep(1000);
+        serverCnx.checkConnectionLiveness().join();
+    }
+
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index 716dd1019f4..db3050216f6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -67,11 +67,11 @@ import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.PulsarMockBookKeeper;
 import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.namespace.OwnershipCache;
 import org.apache.pulsar.broker.resources.BaseResources;
 import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
-import org.apache.pulsar.broker.service.ServerCnx;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -1018,49 +1018,61 @@ public class BrokerClientIntegrationTest extends 
ProducerConsumerBase {
 
         int numMessages = 100;
         final CountDownLatch latch = new CountDownLatch(numMessages);
-        String topic = "persistent://my-property/my-ns/closed-cnx-topic";
+        String topic = 
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/closed-cnx-topic");
+        admin.topics().createNonPartitionedTopic(topic);
         String sub = "my-subscriber-name";
-
+        @Cleanup
         PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
-        
pulsarClient.newConsumer().topic(topic).subscriptionName(sub).messageListener((c1,
 msg) -> {
-            Assert.assertNotNull(msg, "Message cannot be null");
-            String receivedMessage = new String(msg.getData());
-            log.debug("Received message [{}] in the listener", 
receivedMessage);
-            c1.acknowledgeAsync(msg);
-            latch.countDown();
-        }).subscribe();
-
+        ConsumerImpl c =
+                (ConsumerImpl) 
pulsarClient.newConsumer().topic(topic).subscriptionName(sub).messageListener((c1,
 msg) -> {
+                    Assert.assertNotNull(msg, "Message cannot be null");
+                    String receivedMessage = new String(msg.getData());
+                    log.debug("Received message [{}] in the listener", 
receivedMessage);
+                    c1.acknowledgeAsync(msg);
+                    latch.countDown();
+                }).subscribe();
         PersistentTopic topicRef = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topic).get();
-
         AbstractDispatcherSingleActiveConsumer dispatcher = 
(AbstractDispatcherSingleActiveConsumer) topicRef
                 .getSubscription(sub).getDispatcher();
-        ServerCnx cnx = (ServerCnx) dispatcher.getActiveConsumer().cnx();
-        Field field = ServerCnx.class.getDeclaredField("isActive");
-        field.setAccessible(true);
-        field.set(cnx, false);
-
         assertNotNull(dispatcher.getActiveConsumer());
 
-        pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
+        // Inject an blocker to make the "ping & pong" does not work.
+        CountDownLatch countDownLatch = new CountDownLatch(1);
+        ConnectionHandler connectionHandler = c.getConnectionHandler();
+        ClientCnx clientCnx = connectionHandler.cnx();
+        clientCnx.ctx().executor().submit(() -> {
+            try {
+                countDownLatch.await();
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        });
+
+        @Cleanup
+        PulsarClient pulsarClient2 = newPulsarClient(lookupUrl.toString(), 0);
         Consumer<byte[]> consumer = null;
         for (int i = 0; i < 2; i++) {
             try {
-                consumer = 
pulsarClient.newConsumer().topic(topic).subscriptionName(sub).messageListener((c1,
 msg) -> {
+                consumer = 
pulsarClient2.newConsumer().topic(topic).subscriptionName(sub).messageListener((c1,
 msg) -> {
                     Assert.assertNotNull(msg, "Message cannot be null");
                     String receivedMessage = new String(msg.getData());
                     log.debug("Received message [{}] in the listener", 
receivedMessage);
                     c1.acknowledgeAsync(msg);
                     latch.countDown();
                 }).subscribe();
-                if (i == 0) {
-                    fail("Should failed with ConsumerBusyException!");
-                }
             } catch (PulsarClientException.ConsumerBusyException ignore) {
-               // It's ok.
+                // It's ok.
             }
         }
         assertNotNull(consumer);
         log.info("-- Exiting {} test --", methodName);
+
+        // cleanup.
+        countDownLatch.countDown();
+        consumer.close();
+        pulsarClient.close();
+        pulsarClient2.close();
+        admin.topics().delete(topic, false);
     }
 
     @Test

Reply via email to