This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 895e05d088f [improve][broker] Close protocol handlers before unloading 
namespace bundles (#22728)
895e05d088f is described below

commit 895e05d088f4db23f1ca4d30e9d2ecaf7d3a6761
Author: Yunze Xu <xyzinfern...@163.com>
AuthorDate: Tue May 21 16:26:36 2024 +0800

    [improve][broker] Close protocol handlers before unloading namespace 
bundles (#22728)
---
 .../org/apache/pulsar/broker/PulsarService.java    |  12 +-
 .../channel/ServiceUnitStateChannelImpl.java       |   2 +-
 .../broker/protocol/PulsarClientBasedHandler.java  | 152 +++++++++++++++++++++
 .../protocol/PulsarClientBasedHandlerTest.java     |  87 ++++++++++++
 .../protocol/SimpleProtocolHandlerTestsBase.java   |  16 ++-
 5 files changed, 258 insertions(+), 11 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index bf266d44d83..1a45bedfce4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -427,6 +427,12 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
     public CompletableFuture<Void> closeAsync() {
         mutex.lock();
         try {
+            // Close protocol handler before unloading namespace bundles 
because protocol handlers might maintain
+            // Pulsar clients that could send lookup requests that affect 
unloading.
+            if (protocolHandlers != null) {
+                protocolHandlers.close();
+                protocolHandlers = null;
+            }
             if (closeFuture != null) {
                 return closeFuture;
             }
@@ -434,6 +440,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
             if (brokerService != null) {
                 brokerService.unloadNamespaceBundlesGracefully();
             }
+            // It only tells the Pulsar clients that this service is not ready 
to serve for the lookup requests
             state = State.Closing;
 
             // close the service in reverse order v.s. in which they are 
started
@@ -492,11 +499,6 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                                             (long) 
(GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT
                                                     * getConfiguration()
                                                     
.getBrokerShutdownTimeoutMs())));
-            // close protocol handler before closing broker service
-            if (protocolHandlers != null) {
-                protocolHandlers.close();
-                protocolHandlers = null;
-            }
 
             // cancel loadShedding task and shutdown the loadManager executor 
before shutting down the broker
             if (this.loadSheddingTask != null) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index c7702a40d0b..477a9239538 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -114,7 +114,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
     public static final CompressionType MSG_COMPRESSION_TYPE = 
CompressionType.ZSTD;
     private static final int OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS = 5000;
     private static final int OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS = 
100;
-    private static final int OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS = 
3000;
+    public static final int OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS = 
3000;
     public static final long VERSION_ID_INIT = 1; // initial versionId
     public static final long MAX_CLEAN_UP_DELAY_TIME_IN_SECS = 3 * 60; // 3 
mins
     private static final long MIN_CLEAN_UP_DELAY_TIME_IN_SECS = 0; // 0 secs 
to clean immediately
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandler.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandler.java
new file mode 100644
index 00000000000..ed9881a8cad
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandler.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.protocol;
+
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.socket.SocketChannel;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+
+public class PulsarClientBasedHandler implements ProtocolHandler {
+
+    static final String PROTOCOL = "test";
+
+    private String topic;
+    private int partitions;
+    private String cluster;
+    private PulsarClient client;
+    private List<Reader<byte[]>> readers;
+    private ExecutorService executor;
+    private volatile boolean running = false;
+    volatile long closeTimeMs;
+
+    @Override
+    public String protocolName() {
+        return PROTOCOL;
+    }
+
+    @Override
+    public boolean accept(String protocol) {
+        return protocol.equals(PROTOCOL);
+    }
+
+    @Override
+    public void initialize(ServiceConfiguration conf) throws Exception {
+        final var properties = conf.getProperties();
+        topic = (String) properties.getOrDefault("metadata.topic", 
"metadata-topic");
+        partitions = (Integer) properties.getOrDefault("metadata.partitions", 
1);
+        cluster = conf.getClusterName();
+    }
+
+    @Override
+    public String getProtocolDataToAdvertise() {
+        return "";
+    }
+
+    @Override
+    public void start(BrokerService service) {
+        try {
+            final var port = 
service.getPulsar().getListenPortHTTP().orElseThrow();
+            @Cleanup final var admin = 
PulsarAdmin.builder().serviceHttpUrl("http://localhost:"; + port).build();
+            try {
+                admin.clusters().createCluster(cluster, ClusterData.builder()
+                        .serviceUrl(service.getPulsar().getWebServiceAddress())
+                        
.serviceUrlTls(service.getPulsar().getWebServiceAddressTls())
+                        
.brokerServiceUrl(service.getPulsar().getBrokerServiceUrl())
+                        
.brokerServiceUrlTls(service.getPulsar().getBrokerServiceUrlTls())
+                        .build());
+            } catch (PulsarAdminException ignored) {
+            }
+            try {
+                admin.tenants().createTenant("public", 
TenantInfo.builder().allowedClusters(Set.of(cluster)).build());
+            } catch (PulsarAdminException ignored) {
+            }
+            try {
+                admin.namespaces().createNamespace("public/default");
+            } catch (PulsarAdminException ignored) {
+            }
+        } catch (PulsarClientException e) {
+            throw new RuntimeException(e);
+        }
+        try {
+            final var port = service.getListenPort().orElseThrow();
+            client = PulsarClient.builder().serviceUrl("pulsar://localhost:" + 
port).build();
+            readers = new ArrayList<>();
+            for (int i = 0; i < partitions; i++) {
+                readers.add(client.newReader().topic(topic + 
TopicName.PARTITIONED_TOPIC_SUFFIX + i)
+                        .startMessageId(MessageId.earliest).create());
+            }
+            running = true;
+            executor = Executors.newSingleThreadExecutor();
+            executor.execute(() -> {
+                while (running) {
+                    readers.forEach(reader -> {
+                        try {
+                            reader.readNext(1, TimeUnit.MILLISECONDS);
+                        } catch (PulsarClientException ignored) {
+                        }
+                    });
+                }
+            });
+        } catch (PulsarClientException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> 
newChannelInitializers() {
+        return Map.of();
+    }
+
+    @Override
+    public void close() {
+        final var start = System.currentTimeMillis();
+        running = false;
+        if (client != null) {
+            try {
+                client.close();
+            } catch (PulsarClientException ignored) {
+            }
+            client = null;
+        }
+        if (executor != null) {
+            executor.shutdown();
+            executor = null;
+        }
+        closeTimeMs = System.currentTimeMillis() - start;
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandlerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandlerTest.java
new file mode 100644
index 00000000000..9cc20cf7b9d
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandlerTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.protocol;
+
+import java.io.File;
+import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.util.PortManager;
+import org.apache.commons.io.FileUtils;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import 
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
+import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class PulsarClientBasedHandlerTest {
+
+    private final static String clusterName = "cluster";
+    private final static int shutdownTimeoutMs = 100;
+    private final int zkPort = PortManager.nextFreePort();
+    private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, 
zkPort, PortManager::nextFreePort);
+    private File tempDirectory;
+    private PulsarService pulsar;
+
+    @BeforeClass
+    public void setup() throws Exception {
+        bk.start();
+        final var config = new ServiceConfiguration();
+        config.setClusterName(clusterName);
+        config.setAdvertisedAddress("localhost");
+        config.setBrokerServicePort(Optional.of(0));
+        config.setWebServicePort(Optional.of(0));
+        config.setMetadataStoreUrl("zk:127.0.0.1:" + zkPort);
+
+        tempDirectory = 
SimpleProtocolHandlerTestsBase.configureProtocolHandler(config,
+                PulsarClientBasedHandler.class.getName(), true);
+
+        
config.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
+        config.setLoadBalancerDebugModeEnabled(true);
+        config.setBrokerShutdownTimeoutMs(shutdownTimeoutMs);
+
+        pulsar = new PulsarService(config);
+        pulsar.start();
+    }
+
+    @Test(timeOut = 30000)
+    public void testStopBroker() throws PulsarServerException {
+        final var beforeStop = System.currentTimeMillis();
+        final var handler = (PulsarClientBasedHandler) 
pulsar.getProtocolHandlers()
+                .protocol(PulsarClientBasedHandler.PROTOCOL);
+        pulsar.close();
+        final var elapsedMs = System.currentTimeMillis() - beforeStop;
+        log.info("It spends {} ms to stop the broker ({} for protocol 
handler)", elapsedMs, handler.closeTimeMs);
+        Assert.assertTrue(elapsedMs < 
ServiceUnitStateChannelImpl.OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS
+                + handler.closeTimeMs + shutdownTimeoutMs + 1000); // tolerate 
1 more second for other processes
+    }
+
+    @AfterClass(alwaysRun = true)
+    public void cleanup() throws Exception {
+        bk.stop();
+        if (tempDirectory != null) {
+            FileUtils.deleteDirectory(tempDirectory);
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java
index c894b7d77c4..6c80f220c3d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java
@@ -127,12 +127,18 @@ public abstract class SimpleProtocolHandlerTestsBase 
extends BrokerTestBase {
     @BeforeClass
     @Override
     protected void setup() throws Exception {
-        tempDirectory = 
Files.createTempDirectory("SimpleProtocolHandlerTest").toFile();
+        tempDirectory = configureProtocolHandler(conf, 
MyProtocolHandler.class.getName(), useSeparateThreadPool);
+        super.baseSetup();
+    }
+
+    static File configureProtocolHandler(ServiceConfiguration conf, String 
className, boolean useSeparateThreadPool)
+            throws Exception {
+        final var tempDirectory = 
Files.createTempDirectory("SimpleProtocolHandlerTest").toFile();
         
conf.setUseSeparateThreadPoolForProtocolHandlers(useSeparateThreadPool);
         conf.setProtocolHandlerDirectory(tempDirectory.getAbsolutePath());
         conf.setMessagingProtocols(Collections.singleton("test"));
-        buildMockNarFile(tempDirectory);
-        super.baseSetup();
+        buildMockNarFile(tempDirectory, className);
+        return tempDirectory;
     }
 
     @Test
@@ -163,7 +169,7 @@ public abstract class SimpleProtocolHandlerTestsBase 
extends BrokerTestBase {
         }
     }
 
-    private static void buildMockNarFile(File tempDirectory) throws Exception {
+    private static void buildMockNarFile(File tempDirectory, String className) 
throws Exception {
         File file = new File(tempDirectory, "temp.nar");
         try (ZipOutputStream zipfile = new ZipOutputStream(new 
FileOutputStream(file))) {
 
@@ -176,7 +182,7 @@ public abstract class SimpleProtocolHandlerTestsBase 
extends BrokerTestBase {
             zipfile.putNextEntry(manifest);
             String yaml = "name: test\n" +
                     "description: this is a test\n" +
-                    "handlerClass: " + MyProtocolHandler.class.getName() + 
"\n";
+                    "handlerClass: " + className + "\n";
             zipfile.write(yaml.getBytes(StandardCharsets.UTF_8));
             zipfile.closeEntry();
         }

Reply via email to