This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new 895e05d088f [improve][broker] Close protocol handlers before unloading namespace bundles (#22728) 895e05d088f is described below commit 895e05d088f4db23f1ca4d30e9d2ecaf7d3a6761 Author: Yunze Xu <xyzinfern...@163.com> AuthorDate: Tue May 21 16:26:36 2024 +0800 [improve][broker] Close protocol handlers before unloading namespace bundles (#22728) --- .../org/apache/pulsar/broker/PulsarService.java | 12 +- .../channel/ServiceUnitStateChannelImpl.java | 2 +- .../broker/protocol/PulsarClientBasedHandler.java | 152 +++++++++++++++++++++ .../protocol/PulsarClientBasedHandlerTest.java | 87 ++++++++++++ .../protocol/SimpleProtocolHandlerTestsBase.java | 16 ++- 5 files changed, 258 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index bf266d44d83..1a45bedfce4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -427,6 +427,12 @@ public class PulsarService implements AutoCloseable, ShutdownService { public CompletableFuture<Void> closeAsync() { mutex.lock(); try { + // Close protocol handler before unloading namespace bundles because protocol handlers might maintain + // Pulsar clients that could send lookup requests that affect unloading. + if (protocolHandlers != null) { + protocolHandlers.close(); + protocolHandlers = null; + } if (closeFuture != null) { return closeFuture; } @@ -434,6 +440,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { if (brokerService != null) { brokerService.unloadNamespaceBundlesGracefully(); } + // It only tells the Pulsar clients that this service is not ready to serve for the lookup requests state = State.Closing; // close the service in reverse order v.s. in which they are started @@ -492,11 +499,6 @@ public class PulsarService implements AutoCloseable, ShutdownService { (long) (GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT * getConfiguration() .getBrokerShutdownTimeoutMs()))); - // close protocol handler before closing broker service - if (protocolHandlers != null) { - protocolHandlers.close(); - protocolHandlers = null; - } // cancel loadShedding task and shutdown the loadManager executor before shutting down the broker if (this.loadSheddingTask != null) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index c7702a40d0b..477a9239538 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -114,7 +114,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { public static final CompressionType MSG_COMPRESSION_TYPE = CompressionType.ZSTD; private static final int OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS = 5000; private static final int OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS = 100; - private static final int OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS = 3000; + public static final int OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS = 3000; public static final long VERSION_ID_INIT = 1; // initial versionId public static final long MAX_CLEAN_UP_DELAY_TIME_IN_SECS = 3 * 60; // 3 mins private static final long MIN_CLEAN_UP_DELAY_TIME_IN_SECS = 0; // 0 secs to clean immediately diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandler.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandler.java new file mode 100644 index 00000000000..ed9881a8cad --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandler.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.protocol; + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.socket.SocketChannel; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import lombok.Cleanup; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.TenantInfo; + +public class PulsarClientBasedHandler implements ProtocolHandler { + + static final String PROTOCOL = "test"; + + private String topic; + private int partitions; + private String cluster; + private PulsarClient client; + private List<Reader<byte[]>> readers; + private ExecutorService executor; + private volatile boolean running = false; + volatile long closeTimeMs; + + @Override + public String protocolName() { + return PROTOCOL; + } + + @Override + public boolean accept(String protocol) { + return protocol.equals(PROTOCOL); + } + + @Override + public void initialize(ServiceConfiguration conf) throws Exception { + final var properties = conf.getProperties(); + topic = (String) properties.getOrDefault("metadata.topic", "metadata-topic"); + partitions = (Integer) properties.getOrDefault("metadata.partitions", 1); + cluster = conf.getClusterName(); + } + + @Override + public String getProtocolDataToAdvertise() { + return ""; + } + + @Override + public void start(BrokerService service) { + try { + final var port = service.getPulsar().getListenPortHTTP().orElseThrow(); + @Cleanup final var admin = PulsarAdmin.builder().serviceHttpUrl("http://localhost:" + port).build(); + try { + admin.clusters().createCluster(cluster, ClusterData.builder() + .serviceUrl(service.getPulsar().getWebServiceAddress()) + .serviceUrlTls(service.getPulsar().getWebServiceAddressTls()) + .brokerServiceUrl(service.getPulsar().getBrokerServiceUrl()) + .brokerServiceUrlTls(service.getPulsar().getBrokerServiceUrlTls()) + .build()); + } catch (PulsarAdminException ignored) { + } + try { + admin.tenants().createTenant("public", TenantInfo.builder().allowedClusters(Set.of(cluster)).build()); + } catch (PulsarAdminException ignored) { + } + try { + admin.namespaces().createNamespace("public/default"); + } catch (PulsarAdminException ignored) { + } + } catch (PulsarClientException e) { + throw new RuntimeException(e); + } + try { + final var port = service.getListenPort().orElseThrow(); + client = PulsarClient.builder().serviceUrl("pulsar://localhost:" + port).build(); + readers = new ArrayList<>(); + for (int i = 0; i < partitions; i++) { + readers.add(client.newReader().topic(topic + TopicName.PARTITIONED_TOPIC_SUFFIX + i) + .startMessageId(MessageId.earliest).create()); + } + running = true; + executor = Executors.newSingleThreadExecutor(); + executor.execute(() -> { + while (running) { + readers.forEach(reader -> { + try { + reader.readNext(1, TimeUnit.MILLISECONDS); + } catch (PulsarClientException ignored) { + } + }); + } + }); + } catch (PulsarClientException e) { + throw new RuntimeException(e); + } + } + + @Override + public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelInitializers() { + return Map.of(); + } + + @Override + public void close() { + final var start = System.currentTimeMillis(); + running = false; + if (client != null) { + try { + client.close(); + } catch (PulsarClientException ignored) { + } + client = null; + } + if (executor != null) { + executor.shutdown(); + executor = null; + } + closeTimeMs = System.currentTimeMillis() - start; + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandlerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandlerTest.java new file mode 100644 index 00000000000..9cc20cf7b9d --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandlerTest.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.protocol; + +import java.io.File; +import java.util.Optional; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.util.PortManager; +import org.apache.commons.io.FileUtils; +import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl; +import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Slf4j +public class PulsarClientBasedHandlerTest { + + private final static String clusterName = "cluster"; + private final static int shutdownTimeoutMs = 100; + private final int zkPort = PortManager.nextFreePort(); + private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, zkPort, PortManager::nextFreePort); + private File tempDirectory; + private PulsarService pulsar; + + @BeforeClass + public void setup() throws Exception { + bk.start(); + final var config = new ServiceConfiguration(); + config.setClusterName(clusterName); + config.setAdvertisedAddress("localhost"); + config.setBrokerServicePort(Optional.of(0)); + config.setWebServicePort(Optional.of(0)); + config.setMetadataStoreUrl("zk:127.0.0.1:" + zkPort); + + tempDirectory = SimpleProtocolHandlerTestsBase.configureProtocolHandler(config, + PulsarClientBasedHandler.class.getName(), true); + + config.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName()); + config.setLoadBalancerDebugModeEnabled(true); + config.setBrokerShutdownTimeoutMs(shutdownTimeoutMs); + + pulsar = new PulsarService(config); + pulsar.start(); + } + + @Test(timeOut = 30000) + public void testStopBroker() throws PulsarServerException { + final var beforeStop = System.currentTimeMillis(); + final var handler = (PulsarClientBasedHandler) pulsar.getProtocolHandlers() + .protocol(PulsarClientBasedHandler.PROTOCOL); + pulsar.close(); + final var elapsedMs = System.currentTimeMillis() - beforeStop; + log.info("It spends {} ms to stop the broker ({} for protocol handler)", elapsedMs, handler.closeTimeMs); + Assert.assertTrue(elapsedMs < ServiceUnitStateChannelImpl.OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS + + handler.closeTimeMs + shutdownTimeoutMs + 1000); // tolerate 1 more second for other processes + } + + @AfterClass(alwaysRun = true) + public void cleanup() throws Exception { + bk.stop(); + if (tempDirectory != null) { + FileUtils.deleteDirectory(tempDirectory); + } + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java index c894b7d77c4..6c80f220c3d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java @@ -127,12 +127,18 @@ public abstract class SimpleProtocolHandlerTestsBase extends BrokerTestBase { @BeforeClass @Override protected void setup() throws Exception { - tempDirectory = Files.createTempDirectory("SimpleProtocolHandlerTest").toFile(); + tempDirectory = configureProtocolHandler(conf, MyProtocolHandler.class.getName(), useSeparateThreadPool); + super.baseSetup(); + } + + static File configureProtocolHandler(ServiceConfiguration conf, String className, boolean useSeparateThreadPool) + throws Exception { + final var tempDirectory = Files.createTempDirectory("SimpleProtocolHandlerTest").toFile(); conf.setUseSeparateThreadPoolForProtocolHandlers(useSeparateThreadPool); conf.setProtocolHandlerDirectory(tempDirectory.getAbsolutePath()); conf.setMessagingProtocols(Collections.singleton("test")); - buildMockNarFile(tempDirectory); - super.baseSetup(); + buildMockNarFile(tempDirectory, className); + return tempDirectory; } @Test @@ -163,7 +169,7 @@ public abstract class SimpleProtocolHandlerTestsBase extends BrokerTestBase { } } - private static void buildMockNarFile(File tempDirectory) throws Exception { + private static void buildMockNarFile(File tempDirectory, String className) throws Exception { File file = new File(tempDirectory, "temp.nar"); try (ZipOutputStream zipfile = new ZipOutputStream(new FileOutputStream(file))) { @@ -176,7 +182,7 @@ public abstract class SimpleProtocolHandlerTestsBase extends BrokerTestBase { zipfile.putNextEntry(manifest); String yaml = "name: test\n" + "description: this is a test\n" + - "handlerClass: " + MyProtocolHandler.class.getName() + "\n"; + "handlerClass: " + className + "\n"; zipfile.write(yaml.getBytes(StandardCharsets.UTF_8)); zipfile.closeEntry(); }