This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 2d162ba3333 [fix][broker]A failed consumer/producer future in
ServerCnx can never be removed (#23123)
2d162ba3333 is described below
commit 2d162ba333385917457e47110ad8ae8aa825d459
Author: fengyubiao <[email protected]>
AuthorDate: Tue Aug 6 10:16:33 2024 +0800
[fix][broker]A failed consumer/producer future in ServerCnx can never be
removed (#23123)
(cherry picked from commit 114880b1428ac1f6bbd97c43a26d4fa313a87b96)
---
.../apache/pulsar/broker/service/ServerCnx.java | 27 ++++++++--
.../broker/service/ServerCnxNonInjectionTest.java | 62 ++++++++++++++++++++++
.../client/impl/BrokerClientIntegrationTest.java | 58 ++++++++++++--------
3 files changed, 121 insertions(+), 26 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index a5c37f27845..1352be7a513 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -3531,18 +3531,39 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
@Override
public CompletableFuture<Boolean> checkConnectionLiveness() {
+ if (!isActive()) {
+ return CompletableFuture.completedFuture(false);
+ }
if (connectionLivenessCheckTimeoutMillis > 0) {
return
NettyFutureUtil.toCompletableFuture(ctx.executor().submit(() -> {
+ if (!isActive()) {
+ return CompletableFuture.completedFuture(false);
+ }
if (connectionCheckInProgress != null) {
return connectionCheckInProgress;
} else {
- final CompletableFuture<Boolean>
finalConnectionCheckInProgress = new CompletableFuture<>();
+ final CompletableFuture<Boolean>
finalConnectionCheckInProgress =
+ new CompletableFuture<>();
connectionCheckInProgress = finalConnectionCheckInProgress;
ctx.executor().schedule(() -> {
- if (finalConnectionCheckInProgress ==
connectionCheckInProgress
- && !finalConnectionCheckInProgress.isDone()) {
+ if (!isActive()) {
+ finalConnectionCheckInProgress.complete(false);
+ return;
+ }
+ if (finalConnectionCheckInProgress.isDone()) {
+ return;
+ }
+ if (finalConnectionCheckInProgress ==
connectionCheckInProgress) {
+ /**
+ * {@link #connectionCheckInProgress} will be
completed when
+ * {@link #channelInactive(ChannelHandlerContext)}
event occurs, so skip set it here.
+ */
log.warn("[{}] Connection check timed out. Closing
connection.", this.toString());
ctx.close();
+ } else {
+ log.error("[{}] Reached unexpected code block.
Completing connection check.",
+ this.toString());
+ finalConnectionCheckInProgress.complete(true);
}
}, connectionLivenessCheckTimeoutMillis,
TimeUnit.MILLISECONDS);
sendPing();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxNonInjectionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxNonInjectionTest.java
new file mode 100644
index 00000000000..3acc941a2c8
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxNonInjectionTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.service;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.Schema;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class ServerCnxNonInjectionTest extends ProducerConsumerBase {
+
+ @BeforeClass
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ super.producerBaseSetup();
+ }
+
+ @AfterClass
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test(timeOut = 60 * 1000)
+ public void testCheckConnectionLivenessAfterClosed() throws Exception {
+ // Create a ServerCnx
+ final String tp = BrokerTestUtil.newUniqueName("public/default/tp");
+ Producer<String> p =
pulsarClient.newProducer(Schema.STRING).topic(tp).create();
+ ServerCnx serverCnx = (ServerCnx)
pulsar.getBrokerService().getTopic(tp, false).join().get()
+ .getProducers().values().iterator().next().getCnx();
+ // Call "CheckConnectionLiveness" after serverCnx is closed. The
resulted future should be done eventually.
+ p.close();
+ serverCnx.close();
+ Thread.sleep(1000);
+ serverCnx.checkConnectionLiveness().join();
+ }
+
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index 716dd1019f4..db3050216f6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -67,11 +67,11 @@ import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.namespace.OwnershipCache;
import org.apache.pulsar.broker.resources.BaseResources;
import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
-import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -1018,49 +1018,61 @@ public class BrokerClientIntegrationTest extends
ProducerConsumerBase {
int numMessages = 100;
final CountDownLatch latch = new CountDownLatch(numMessages);
- String topic = "persistent://my-property/my-ns/closed-cnx-topic";
+ String topic =
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/closed-cnx-topic");
+ admin.topics().createNonPartitionedTopic(topic);
String sub = "my-subscriber-name";
-
+ @Cleanup
PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
-
pulsarClient.newConsumer().topic(topic).subscriptionName(sub).messageListener((c1,
msg) -> {
- Assert.assertNotNull(msg, "Message cannot be null");
- String receivedMessage = new String(msg.getData());
- log.debug("Received message [{}] in the listener",
receivedMessage);
- c1.acknowledgeAsync(msg);
- latch.countDown();
- }).subscribe();
-
+ ConsumerImpl c =
+ (ConsumerImpl)
pulsarClient.newConsumer().topic(topic).subscriptionName(sub).messageListener((c1,
msg) -> {
+ Assert.assertNotNull(msg, "Message cannot be null");
+ String receivedMessage = new String(msg.getData());
+ log.debug("Received message [{}] in the listener",
receivedMessage);
+ c1.acknowledgeAsync(msg);
+ latch.countDown();
+ }).subscribe();
PersistentTopic topicRef = (PersistentTopic)
pulsar.getBrokerService().getTopicReference(topic).get();
-
AbstractDispatcherSingleActiveConsumer dispatcher =
(AbstractDispatcherSingleActiveConsumer) topicRef
.getSubscription(sub).getDispatcher();
- ServerCnx cnx = (ServerCnx) dispatcher.getActiveConsumer().cnx();
- Field field = ServerCnx.class.getDeclaredField("isActive");
- field.setAccessible(true);
- field.set(cnx, false);
-
assertNotNull(dispatcher.getActiveConsumer());
- pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
+ // Inject an blocker to make the "ping & pong" does not work.
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ ConnectionHandler connectionHandler = c.getConnectionHandler();
+ ClientCnx clientCnx = connectionHandler.cnx();
+ clientCnx.ctx().executor().submit(() -> {
+ try {
+ countDownLatch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ @Cleanup
+ PulsarClient pulsarClient2 = newPulsarClient(lookupUrl.toString(), 0);
Consumer<byte[]> consumer = null;
for (int i = 0; i < 2; i++) {
try {
- consumer =
pulsarClient.newConsumer().topic(topic).subscriptionName(sub).messageListener((c1,
msg) -> {
+ consumer =
pulsarClient2.newConsumer().topic(topic).subscriptionName(sub).messageListener((c1,
msg) -> {
Assert.assertNotNull(msg, "Message cannot be null");
String receivedMessage = new String(msg.getData());
log.debug("Received message [{}] in the listener",
receivedMessage);
c1.acknowledgeAsync(msg);
latch.countDown();
}).subscribe();
- if (i == 0) {
- fail("Should failed with ConsumerBusyException!");
- }
} catch (PulsarClientException.ConsumerBusyException ignore) {
- // It's ok.
+ // It's ok.
}
}
assertNotNull(consumer);
log.info("-- Exiting {} test --", methodName);
+
+ // cleanup.
+ countDownLatch.countDown();
+ consumer.close();
+ pulsarClient.close();
+ pulsarClient2.close();
+ admin.topics().delete(topic, false);
}
@Test