This is an automated email from the ASF dual-hosted git repository. cbornet pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 8d5303514ef [cleanup][broker] Various cleanups (#20658) 8d5303514ef is described below commit 8d5303514efa6cf0fa98c7285339a11821c7cf79 Author: Christophe Bornet <cbor...@hotmail.com> AuthorDate: Fri Jun 30 14:35:21 2023 +0200 [cleanup][broker] Various cleanups (#20658) --- .../pulsar/broker/BookKeeperClientFactoryImpl.java | 3 +- .../pulsar/broker/ManagedLedgerClientFactory.java | 18 +-- .../org/apache/pulsar/broker/PulsarService.java | 51 ++++--- .../broker/TransactionMetadataStoreService.java | 32 ++-- .../pulsar/broker/loadbalance/LoadManager.java | 17 +-- .../pulsar/broker/namespace/NamespaceService.java | 169 ++++++++++----------- .../pulsar/broker/web/PulsarWebResource.java | 27 ++-- .../apache/pulsar/broker/web/RequestWrapper.java | 2 +- .../pulsar/broker/web/ResponseHandlerFilter.java | 8 +- .../apache/pulsar/broker/web/RestException.java | 4 +- .../org/apache/pulsar/broker/web/WebService.java | 4 +- 11 files changed, 152 insertions(+), 183 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java index 0259dfc7a58..0ecca755956 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java @@ -49,7 +49,6 @@ import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; import org.apache.pulsar.metadata.bookkeeper.AbstractMetadataDriver; import org.apache.pulsar.metadata.bookkeeper.PulsarMetadataClientDriver; -@SuppressWarnings("deprecation") @Slf4j public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory { @@ -71,7 +70,7 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory { ClientConfiguration bkConf = createBkClientConfiguration(store, conf); if (properties != null) { - properties.forEach((key, value) -> bkConf.setProperty(key, value)); + properties.forEach(bkConf::setProperty); } if (ensemblePlacementPolicyClass.isPresent()) { setEnsemblePlacementPolicy(bkConf, conf, store, ensemblePlacementPolicyClass.get()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java index d86649abd3c..51fb8bc1ae3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java @@ -158,17 +158,15 @@ public class ManagedLedgerClientFactory implements ManagedLedgerStorage { // factory, however that might be introducing more unknowns. log.warn("Encountered exceptions on closing bookkeeper client", ree); } - if (bkEnsemblePolicyToBkClientMap != null) { - bkEnsemblePolicyToBkClientMap.forEach((policy, bk) -> { - try { - if (bk != null) { - bk.close(); - } - } catch (Exception e) { - log.warn("Failed to close bookkeeper-client for policy {}", policy, e); + bkEnsemblePolicyToBkClientMap.forEach((policy, bk) -> { + try { + if (bk != null) { + bk.close(); } - }); - } + } catch (Exception e) { + log.warn("Failed to close bookkeeper-client for policy {}", policy, e); + } + }); log.info("Closed BookKeeper client"); } catch (Exception e) { log.warn(e.getMessage(), e); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 2b64a8cfb5c..3d1254eec42 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -185,13 +185,12 @@ import org.slf4j.LoggerFactory; /** * Main class for Pulsar broker service. */ - @Getter(AccessLevel.PUBLIC) @Setter(AccessLevel.PROTECTED) public class PulsarService implements AutoCloseable, ShutdownService { private static final Logger LOG = LoggerFactory.getLogger(PulsarService.class); private static final double GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT = 0.5d; - private ServiceConfiguration config = null; + private final ServiceConfiguration config; private NamespaceService nsService = null; private ManagedLedgerStorage managedLedgerClientFactory = null; private LeaderElectionService leaderElectionService = null; @@ -255,7 +254,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { private AdditionalServlets brokerAdditionalServlets; // packages management service - private Optional<PackagesManagement> packagesManagement = Optional.empty(); + private PackagesManagement packagesManagement = null; private PulsarPrometheusMetricsServlet metricsServlet; private List<PrometheusRawMetricsProvider> pendingMetricsProviders; @@ -285,10 +284,8 @@ public class PulsarService implements AutoCloseable, ShutdownService { private Map<String, AdvertisedListener> advertisedListeners; public PulsarService(ServiceConfiguration config) { - this(config, Optional.empty(), (exitCode) -> { - LOG.info("Process termination requested with code {}. " - + "Ignoring, as this constructor is intended for tests. ", exitCode); - }); + this(config, Optional.empty(), (exitCode) -> LOG.info("Process termination requested with code {}. " + + "Ignoring, as this constructor is intended for tests. ", exitCode)); } public PulsarService(ServiceConfiguration config, Optional<WorkerService> functionWorkerService, @@ -370,7 +367,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { /** * Close the session to the metadata service. - * + * <p> * This will immediately release all the resource locks held by this broker on the coordination service. * * @throws Exception if the close operation fails @@ -400,8 +397,12 @@ public class PulsarService implements AutoCloseable, ShutdownService { throw (PulsarServerException) cause; } else if (getConfiguration().getBrokerShutdownTimeoutMs() == 0 && (cause instanceof TimeoutException || cause instanceof CancellationException)) { - // ignore shutdown timeout when timeout is 0, which is primarily used in tests - // to forcefully shutdown the broker + if (LOG.isDebugEnabled()) { + LOG.debug( + "Shutdown timeout ignored when timeout is 0, " + + "which is primarily used in tests to forcefully shutdown the broker", + cause); + } } else { throw new PulsarServerException(cause); } @@ -693,7 +694,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { throw new PulsarServerException("Cannot start the service once it was stopped"); } - if (!config.getWebServicePort().isPresent() && !config.getWebServicePortTls().isPresent()) { + if (config.getWebServicePort().isEmpty() && config.getWebServicePortTls().isEmpty()) { throw new IllegalArgumentException("webServicePort/webServicePortTls must be present"); } @@ -722,7 +723,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { config.getDefaultRetentionTimeInMinutes() * 60)); } - if (!config.getLoadBalancerOverrideBrokerNicSpeedGbps().isPresent() + if (config.getLoadBalancerOverrideBrokerNicSpeedGbps().isEmpty() && config.isLoadBalancerEnabled() && LinuxInfoUtils.isLinux() && !LinuxInfoUtils.checkHasNicSpeeds()) { @@ -896,7 +897,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { if (isNotBlank(config.getResourceUsageTransportClassName())) { Class<?> clazz = Class.forName(config.getResourceUsageTransportClassName()); Constructor<?> ctor = clazz.getConstructor(PulsarService.class); - Object object = ctor.newInstance(new Object[]{this}); + Object object = ctor.newInstance(this); this.resourceUsageTransportManager = (ResourceUsageTopicTransportManager) object; } this.resourceGroupServiceManager = new ResourceGroupService(this); @@ -1241,7 +1242,6 @@ public class PulsarService implements AutoCloseable, ShutdownService { * Load all the topics contained in a namespace. * * @param bundle <code>NamespaceBundle</code> to identify the service unit - * @throws Exception */ public void loadNamespaceTopics(NamespaceBundle bundle) { executor.submit(() -> { @@ -1296,7 +1296,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { config.getConfigurationMetadataStoreUrl(), new ClientConfiguration().getZkLedgersRootPath(), config.isBookkeeperMetadataStoreSeparated() ? config.getBookkeeperMetadataStoreUrl() : null, - this.getWorkerConfig().map(wc -> wc.getStateStorageServiceUrl()).orElse(null)); + this.getWorkerConfig().map(WorkerConfig::getStateStorageServiceUrl).orElse(null)); } /** @@ -1411,7 +1411,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { Offloaders offloaders = offloadersCache.getOrLoadOffloaders( offloadPolicies.getOffloadersDirectory(), config.getNarExtractionDirectory()); - LedgerOffloaderFactory offloaderFactory = offloaders.getOffloaderFactory( + LedgerOffloaderFactory<?> offloaderFactory = offloaders.getOffloaderFactory( offloadPolicies.getManagedLedgerOffloadDriver()); try { return offloaderFactory.create( @@ -1699,7 +1699,8 @@ public class PulsarService implements AutoCloseable, ShutdownService { AdvertisedListener internalListener = ServiceConfigurationUtils.getInternalListener(config, "http"); return internalListener.getBrokerHttpUrl() != null ? internalListener.getBrokerHttpUrl().toString() - : webAddress(ServiceConfigurationUtils.getWebServiceAddress(config), getListenPortHTTP().get()); + : webAddress(ServiceConfigurationUtils.getWebServiceAddress(config), + getListenPortHTTP().orElseThrow()); } else { return null; } @@ -1714,7 +1715,8 @@ public class PulsarService implements AutoCloseable, ShutdownService { AdvertisedListener internalListener = ServiceConfigurationUtils.getInternalListener(config, "https"); return internalListener.getBrokerHttpsUrl() != null ? internalListener.getBrokerHttpsUrl().toString() - : webAddressTls(ServiceConfigurationUtils.getWebServiceAddress(config), getListenPortHTTPS().get()); + : webAddressTls(ServiceConfigurationUtils.getWebServiceAddress(config), + getListenPortHTTPS().orElseThrow()); } else { return null; } @@ -1736,7 +1738,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { public String getLookupServiceAddress() { return String.format("%s:%s", advertisedAddress, config.getWebServicePort().isPresent() ? config.getWebServicePort().get() - : config.getWebServicePortTls().get()); + : config.getWebServicePortTls().orElseThrow()); } public TopicPoliciesService getTopicPoliciesService() { @@ -1798,21 +1800,22 @@ public class PulsarService implements AutoCloseable, ShutdownService { } public PackagesManagement getPackagesManagement() throws UnsupportedOperationException { - return packagesManagement.orElseThrow(() -> new UnsupportedOperationException("Package Management Service " - + "is not enabled in the broker.")); + if (packagesManagement == null) { + throw new UnsupportedOperationException("Package Management Service is not enabled in the broker."); + } + return packagesManagement; } private void startPackagesManagementService() throws IOException { // TODO: using provider to initialize the packages management service. - PackagesManagement packagesManagementService = new PackagesManagementImpl(); - this.packagesManagement = Optional.of(packagesManagementService); + this.packagesManagement = new PackagesManagementImpl(); PackagesStorageProvider storageProvider = PackagesStorageProvider .newProvider(config.getPackagesManagementStorageProvider()); DefaultPackagesStorageConfiguration storageConfiguration = new DefaultPackagesStorageConfiguration(); storageConfiguration.setProperty(config.getProperties()); PackagesStorage storage = storageProvider.getStorage(storageConfiguration); storage.initialize(); - packagesManagementService.initialize(storage); + this.packagesManagement.initialize(storage); } public Optional<Integer> getListenPortHTTP() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java index 35aa7cc2fdd..c80580b02f1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java @@ -85,16 +85,13 @@ public class TransactionMetadataStoreService { private final Timer transactionOpRetryTimer; // this semaphore for loading one transaction coordinator with the same tc id on the same time private final ConcurrentLongHashMap<Semaphore> tcLoadSemaphores; - // one connect request open the transactionMetaStore the other request will add to the queue, when the open op - // finished the request will be poll and complete the future + // one connect request opens the transactionMetaStore the other request will add to the queue, when the open op + // finishes the request will be polled and will complete the future private final ConcurrentLongHashMap<ConcurrentLinkedDeque<CompletableFuture<Void>>> pendingConnectRequests; private final ExecutorService internalPinnedExecutor; private static final long HANDLE_PENDING_CONNECT_TIME_OUT = 30000L; - private final ThreadFactory threadFactory = - new ExecutorProvider.ExtendedThreadFactory("transaction-coordinator-thread-factory"); - public TransactionMetadataStoreService(TransactionMetadataStoreProvider transactionMetadataStoreProvider, PulsarService pulsarService, TransactionBufferClient tbClient, @@ -108,6 +105,8 @@ public class TransactionMetadataStoreService { this.tcLoadSemaphores = ConcurrentLongHashMap.<Semaphore>newBuilder().build(); this.pendingConnectRequests = ConcurrentLongHashMap.<ConcurrentLinkedDeque<CompletableFuture<Void>>>newBuilder().build(); + ThreadFactory threadFactory = + new ExecutorProvider.ExtendedThreadFactory("transaction-coordinator-thread-factory"); this.internalPinnedExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory); } @@ -200,7 +199,7 @@ public class TransactionMetadataStoreService { // then handle the requests witch in the queue deque.add(completableFuture); if (LOG.isDebugEnabled()) { - LOG.debug("Handle tc client connect added into pending queue! tcId : {}", tcId.toString()); + LOG.debug("Handle tc client connect added into pending queue! tcId : {}", tcId); } } })).exceptionally(ex -> { @@ -367,17 +366,11 @@ public class TransactionMetadataStoreService { private CompletionStage<Void> fakeAsyncCheckTxnStatus(TxnStatus txnStatus, int txnAction, TxnID txnID, TxnStatus expectStatus) { - boolean isLegal; - switch (txnStatus) { - case COMMITTING: - isLegal = (txnAction == TxnAction.COMMIT.getValue()); - break; - case ABORTING: - isLegal = (txnAction == TxnAction.ABORT.getValue()); - break; - default: - isLegal = false; - } + boolean isLegal = switch (txnStatus) { + case COMMITTING -> (txnAction == TxnAction.COMMIT.getValue()); + case ABORTING -> (txnAction == TxnAction.ABORT.getValue()); + default -> false; + }; if (!isLegal) { if (LOG.isDebugEnabled()) { LOG.debug("EndTxnInTransactionBuffer op retry! TxnId : {}, TxnAction : {}", txnID, txnAction); @@ -502,15 +495,14 @@ public class TransactionMetadataStoreService { public void close () { this.internalPinnedExecutor.shutdown(); - stores.forEach((tcId, metadataStore) -> { + stores.forEach((tcId, metadataStore) -> metadataStore.closeAsync().whenComplete((v, ex) -> { if (ex != null) { LOG.error("Close transaction metadata store with id " + tcId, ex); } else { LOG.info("Removed and closed transaction meta store {}", tcId); } - }); - }); + })); stores.clear(); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java index 17bff57b85c..2cce68b60cb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java @@ -42,7 +42,7 @@ import org.slf4j.LoggerFactory; * LoadManager runs through set of load reports collected from different brokers and generates a recommendation of * namespace/ServiceUnit placement on machines/ResourceUnit. Each Concrete Load Manager will use different algorithms to * generate this mapping. - * + * <p> * Concrete Load Manager is also return the least loaded broker that should own the new namespace. */ public interface LoadManager { @@ -88,7 +88,7 @@ public interface LoadManager { /** * Publish the current load report on ZK, forced or not. - * By default rely on method writeLoadReportOnZookeeper(). + * By default, rely on method writeLoadReportOnZookeeper(). */ default void writeLoadReportOnZookeeper(boolean force) throws Exception { writeLoadReportOnZookeeper(); @@ -118,15 +118,15 @@ public interface LoadManager { * Removes visibility of current broker from loadbalancer list so, other brokers can't redirect any request to this * broker and this broker won't accept new connection requests. * - * @throws Exception + * @throws Exception if there is any error while disabling broker */ void disableBroker() throws Exception; /** * Get list of available brokers in cluster. * - * @return - * @throws Exception + * @return the list of available brokers + * @throws Exception if there is any error while getting available brokers */ Set<String> getAvailableBrokers() throws Exception; @@ -150,12 +150,11 @@ public interface LoadManager { // Assume there is a constructor with one argument of PulsarService. final Object loadManagerInstance = Reflections.createInstance(conf.getLoadManagerClassName(), Thread.currentThread().getContextClassLoader()); - if (loadManagerInstance instanceof LoadManager) { - final LoadManager casted = (LoadManager) loadManagerInstance; + if (loadManagerInstance instanceof LoadManager casted) { casted.initialize(pulsar); return casted; - } else if (loadManagerInstance instanceof ModularLoadManager) { - final LoadManager casted = new ModularLoadManagerWrapper((ModularLoadManager) loadManagerInstance); + } else if (loadManagerInstance instanceof ModularLoadManager modularLoadManager) { + final LoadManager casted = new ModularLoadManagerWrapper(modularLoadManager); casted.initialize(pulsar); return casted; } else if (loadManagerInstance instanceof ExtensibleLoadManager) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 9be8d4938e3..585d62c5b1f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -46,6 +46,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; +import javax.annotation.Nullable; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.ListUtils; import org.apache.commons.lang3.StringUtils; @@ -248,28 +249,27 @@ public class NamespaceService implements AutoCloseable { /** * Return the URL of the broker who's owning a particular service unit in asynchronous way. - * + * <p> * If the service unit is not owned, return a CompletableFuture with empty optional. */ public CompletableFuture<Optional<URL>> getWebServiceUrlAsync(ServiceUnitId suName, LookupOptions options) { - if (suName instanceof TopicName) { - TopicName name = (TopicName) suName; + if (suName instanceof TopicName name) { if (LOG.isDebugEnabled()) { LOG.debug("Getting web service URL of topic: {} - options: {}", name, options); } return getBundleAsync(name) .thenCompose(namespaceBundle -> - internalGetWebServiceUrl(Optional.of(name), namespaceBundle, options)); + internalGetWebServiceUrl(name, namespaceBundle, options)); } - if (suName instanceof NamespaceName) { - return getFullBundleAsync((NamespaceName) suName) + if (suName instanceof NamespaceName namespaceName) { + return getFullBundleAsync(namespaceName) .thenCompose(namespaceBundle -> - internalGetWebServiceUrl(Optional.empty(), namespaceBundle, options)); + internalGetWebServiceUrl(null, namespaceBundle, options)); } - if (suName instanceof NamespaceBundle) { - return internalGetWebServiceUrl(Optional.empty(), (NamespaceBundle) suName, options); + if (suName instanceof NamespaceBundle namespaceBundle) { + return internalGetWebServiceUrl(null, namespaceBundle, options); } throw new IllegalArgumentException("Unrecognized class of NamespaceBundle: " + suName.getClass().getName()); @@ -277,7 +277,7 @@ public class NamespaceService implements AutoCloseable { /** * Return the URL of the broker who's owning a particular service unit. - * + * <p> * If the service unit is not owned, return an empty optional */ public Optional<URL> getWebServiceUrl(ServiceUnitId suName, LookupOptions options) throws Exception { @@ -285,7 +285,7 @@ public class NamespaceService implements AutoCloseable { .get(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), SECONDS); } - private CompletableFuture<Optional<URL>> internalGetWebServiceUrl(Optional<ServiceUnitId> topic, + private CompletableFuture<Optional<URL>> internalGetWebServiceUrl(@Nullable ServiceUnitId topic, NamespaceBundle bundle, LookupOptions options) { @@ -306,7 +306,7 @@ public class NamespaceService implements AutoCloseable { } CompletableFuture<Optional<LookupResult>> future = ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config) - ? loadManager.get().findBrokerServiceUrl(topic, bundle) : + ? loadManager.get().findBrokerServiceUrl(Optional.ofNullable(topic), bundle) : findBrokerServiceUrl(bundle, options); return future.thenApply(lookupResult -> { @@ -329,7 +329,7 @@ public class NamespaceService implements AutoCloseable { /** * Register all the bootstrap name spaces including the heartbeat namespace. * - * @throws PulsarServerException + * @throws PulsarServerException if an unexpected error occurs */ public void registerBootstrapNamespaces() throws PulsarServerException { @@ -352,20 +352,19 @@ public class NamespaceService implements AutoCloseable { } /** - * Tried to registers a namespace to this instance. + * Tries to register a namespace to this instance. * - * @param nsname - * @param ensureOwned - * @return - * @throws PulsarServerException - * @throws Exception + * @param nsname namespace name + * @param ensureOwned sets the behavior when the namespace is already owned by another broker. + * If this flag is set to true, then the method will throw an exception. + * If this flag is set to false, then the method will return false. + * @return true if the namespace was successfully registered, false otherwise + * @throws PulsarServerException if an error occurs when registering the namespace */ public boolean registerNamespace(NamespaceName nsname, boolean ensureOwned) throws PulsarServerException { try { - NamespaceBundle nsFullBundle = null; - // all pre-registered namespace is assumed to have bundles disabled - nsFullBundle = bundleFactory.getFullBundle(nsname); + NamespaceBundle nsFullBundle = bundleFactory.getFullBundle(nsname); // v2 namespace will always use full bundle object final NamespaceEphemeralData otherData; if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) { @@ -417,10 +416,9 @@ public class NamespaceService implements AutoCloseable { /** * Main internal method to lookup and setup ownership of service unit to a broker. * - * @param bundle - * @param options - * @return - * @throws PulsarServerException + * @param bundle the namespace bundle + * @param options the lookup options + * @return the lookup result */ private CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl( NamespaceBundle bundle, LookupOptions options) { @@ -440,7 +438,7 @@ public class NamespaceService implements AutoCloseable { // First check if we or someone else already owns the bundle ownershipCache.getOwnerAsync(bundle).thenAccept(nsData -> { - if (!nsData.isPresent()) { + if (nsData.isEmpty()) { // No one owns this bundle if (options.isReadOnly()) { @@ -448,9 +446,7 @@ public class NamespaceService implements AutoCloseable { future.complete(Optional.empty()); } else { // Now, no one owns the namespace yet. Hence, we will try to dynamically assign it - pulsar.getExecutor().execute(() -> { - searchForCandidateBroker(bundle, future, options); - }); + pulsar.getExecutor().execute(() -> searchForCandidateBroker(bundle, future, options)); } } else if (nsData.get().isDisabled()) { future.completeExceptionally( @@ -474,7 +470,6 @@ public class NamespaceService implements AutoCloseable { url == null ? null : url.toString(), urlTls == null ? null : urlTls.toString()))); } - return; } else { future.complete(Optional.of(new LookupResult(nsData.get()))); } @@ -502,13 +497,13 @@ public class NamespaceService implements AutoCloseable { new IllegalStateException("The leader election has not yet been completed!")); return; } - String candidateBroker = null; + String candidateBroker; String candidateBrokerAdvertisedAddr = null; LeaderElectionService les = pulsar.getLeaderElectionService(); if (les == null) { // The leader election service was not initialized yet. This can happen because the broker service is - // initialized first and it might start receiving lookup requests before the leader election service is + // initialized first, and it might start receiving lookup requests before the leader election service is // fully initialized. LOG.warn("Leader election service isn't initialized yet. " + "Returning empty result to lookup. NamespaceBundle[{}]", @@ -548,7 +543,7 @@ public class NamespaceService implements AutoCloseable { && isBrokerActive(currentLeader.get().getServiceUrl()); if (!leaderBrokerActive) { makeLoadManagerDecisionOnThisBroker = true; - if (!currentLeader.isPresent()) { + if (currentLeader.isEmpty()) { LOG.warn( "The information about the current leader broker wasn't available. " + "Handling load manager decisions in a decentralized way. " @@ -565,7 +560,7 @@ public class NamespaceService implements AutoCloseable { } if (makeLoadManagerDecisionOnThisBroker) { Optional<Pair<String, String>> availableBroker = getLeastLoadedFromLoadManager(bundle); - if (!availableBroker.isPresent()) { + if (availableBroker.isEmpty()) { LOG.warn("Load manager didn't return any available broker. " + "Returning empty result to lookup. NamespaceBundle[{}]", bundle); @@ -603,7 +598,7 @@ public class NamespaceService implements AutoCloseable { // Found owner for the namespace bundle if (options.isLoadTopicsInBundle()) { - // Schedule the task to pre-load topics + // Schedule the task to preload topics pulsar.loadNamespaceTopics(bundle); } // find the target @@ -614,7 +609,6 @@ public class NamespaceService implements AutoCloseable { lookupFuture.completeExceptionally( new PulsarServerException("the broker do not have " + options.getAdvertisedListenerName() + " listener")); - return; } else { URI url = listener.getBrokerServiceUrl(); URI urlTls = listener.getBrokerServiceUrlTls(); @@ -622,11 +616,9 @@ public class NamespaceService implements AutoCloseable { new LookupResult(ownerInfo, url == null ? null : url.toString(), urlTls == null ? null : urlTls.toString()))); - return; } } else { lookupFuture.complete(Optional.of(new LookupResult(ownerInfo))); - return; } } }).exceptionally(exception -> { @@ -712,7 +704,7 @@ public class NamespaceService implements AutoCloseable { } else { LOG.warn("Broker {} ({}) couldn't be found in available brokers {}", candidateBroker, candidateBrokerHostAndPort, - availableBrokers.stream().collect(Collectors.joining(","))); + String.join(",", availableBrokers)); return false; } } @@ -722,8 +714,7 @@ public class NamespaceService implements AutoCloseable { if (uriSeparatorPos == -1) { throw new IllegalArgumentException("'" + candidateBroker + "' isn't an URI."); } - String candidateBrokerHostAndPort = candidateBroker.substring(uriSeparatorPos + 3); - return candidateBrokerHostAndPort; + return candidateBroker.substring(uriSeparatorPos + 3); } private Set<String> getAvailableBrokers() { @@ -737,12 +728,13 @@ public class NamespaceService implements AutoCloseable { /** * Helper function to encapsulate the logic to invoke between old and new load manager. * - * @return - * @throws Exception + * @param serviceUnit the service unit + * @return the least loaded broker addresses + * @throws Exception if an error occurs */ private Optional<Pair<String, String>> getLeastLoadedFromLoadManager(ServiceUnitId serviceUnit) throws Exception { Optional<ResourceUnit> leastLoadedBroker = loadManager.get().getLeastLoaded(serviceUnit); - if (!leastLoadedBroker.isPresent()) { + if (leastLoadedBroker.isEmpty()) { LOG.warn("No broker is available for {}", serviceUnit); return Optional.empty(); } @@ -863,7 +855,7 @@ public class NamespaceService implements AutoCloseable { public boolean isNamespaceBundleDisabled(NamespaceBundle bundle) throws Exception { try { - // Does ZooKeeper says that the namespace is disabled? + // Does ZooKeeper say that the namespace is disabled? CompletableFuture<Optional<NamespaceEphemeralData>> nsDataFuture = ownershipCache.getOwnerAsync(bundle); if (nsDataFuture != null) { Optional<NamespaceEphemeralData> nsData = nsDataFuture.getNow(null); @@ -886,12 +878,14 @@ public class NamespaceService implements AutoCloseable { /** * 1. split the given bundle into two bundles 2. assign ownership of both the bundles to current broker 3. update * policies with newly created bundles into LocalZK 4. disable original bundle and refresh the cache. - * + * <p> * It will call splitAndOwnBundleOnceAndRetry to do the real retry work, which will retry "retryTimes". * - * @param bundle - * @return - * @throws Exception + * @param bundle the bundle to split + * @param unload whether to unload the new split bundles + * @param splitAlgorithm the algorithm to split the bundle + * @param boundaries the boundaries to split the bundle + * @return a future that will complete when the bundle is split and owned */ public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle bundle, boolean unload, NamespaceBundleSplitAlgorithm splitAlgorithm, @@ -926,36 +920,36 @@ public class NamespaceService implements AutoCloseable { } try { bundleFactory.splitBundles(bundle, splitBoundaries.size() + 1, splitBoundaries) - .thenAccept(splittedBundles -> { + .thenAccept(splitBundles -> { // Split and updateNamespaceBundles. Update may fail because of concurrent write to // Zookeeper. - if (splittedBundles == null) { + if (splitBundles == null) { String msg = format("bundle %s not found under namespace", bundle.toString()); LOG.warn(msg); updateFuture.completeExceptionally(new ServiceUnitNotReadyException(msg)); return; } - Objects.requireNonNull(splittedBundles.getLeft()); - Objects.requireNonNull(splittedBundles.getRight()); - checkArgument(splittedBundles.getRight().size() == splitBoundaries.size() + 1, + Objects.requireNonNull(splitBundles.getLeft()); + Objects.requireNonNull(splitBundles.getRight()); + checkArgument(splitBundles.getRight().size() == splitBoundaries.size() + 1, "bundle has to be split in " + (splitBoundaries.size() + 1) + " bundles"); NamespaceName nsname = bundle.getNamespaceObject(); if (LOG.isDebugEnabled()) { LOG.debug("[{}] splitAndOwnBundleOnce: {}, counter: {}, bundles: {}", nsname.toString(), bundle.getBundleRange(), counter.get(), - splittedBundles.getRight()); + splitBundles.getRight()); } try { // take ownership of newly split bundles - for (NamespaceBundle sBundle : splittedBundles.getRight()) { + for (NamespaceBundle sBundle : splitBundles.getRight()) { Objects.requireNonNull(ownershipCache.tryAcquiringOwnership(sBundle)); } - updateNamespaceBundles(nsname, splittedBundles.getLeft()).thenCompose(__ -> { - return updateNamespaceBundlesForPolicies(nsname, splittedBundles.getLeft()); - }).thenRun(() -> { - bundleFactory.invalidateBundleCache(bundle.getNamespaceObject()); - updateFuture.complete(splittedBundles.getRight()); + updateNamespaceBundles(nsname, splitBundles.getLeft()).thenCompose(__ -> + updateNamespaceBundlesForPolicies(nsname, splitBundles.getLeft())) + .thenRun(() -> { + bundleFactory.invalidateBundleCache(bundle.getNamespaceObject()); + updateFuture.complete(splitBundles.getRight()); }).exceptionally(ex1 -> { String msg = format("failed to update namespace policies [%s], " + "NamespaceBundle: %s due to %s", @@ -1023,7 +1017,7 @@ public class NamespaceService implements AutoCloseable { .exceptionally(e -> { String msg1 = format( "failed to disable bundle %s under namespace [%s] with error %s", - bundle.getNamespaceObject().toString(), bundle.toString(), ex.getMessage()); + bundle.getNamespaceObject().toString(), bundle, ex.getMessage()); LOG.warn(msg1, e); completionFuture.completeExceptionally(new ServiceUnitNotReadyException(msg1)); return null; @@ -1093,9 +1087,8 @@ public class NamespaceService implements AutoCloseable { * Update new bundle-range to admin/policies/namespace. * Update may fail because of concurrent write to Zookeeper. * - * @param nsname - * @param nsBundles - * @throws Exception + * @param nsname the namespace name + * @param nsBundles the new namespace bundles */ public CompletableFuture<Void> updateNamespaceBundlesForPolicies(NamespaceName nsname, NamespaceBundles nsBundles) { @@ -1122,9 +1115,8 @@ public class NamespaceService implements AutoCloseable { * Update new bundle-range to LocalZk (create a new node if not present). * Update may fail because of concurrent write to Zookeeper. * - * @param nsname - * @param nsBundles - * @throws Exception + * @param nsname the namespace name + * @param nsBundles the new namespace bundles */ public CompletableFuture<Void> updateNamespaceBundles(NamespaceName nsname, NamespaceBundles nsBundles) { Objects.requireNonNull(nsname); @@ -1176,7 +1168,7 @@ public class NamespaceService implements AutoCloseable { } /** - * @Deprecated This method is only used in test now. + * @deprecated This method is only used in test now. */ @Deprecated public boolean isServiceUnitActive(TopicName topicName) { @@ -1197,7 +1189,7 @@ public class NamespaceService implements AutoCloseable { } return getBundleAsync(topicName).thenCompose(bundle -> { Optional<CompletableFuture<OwnedBundle>> optionalFuture = ownershipCache.getOwnedBundleAsync(bundle); - if (!optionalFuture.isPresent()) { + if (optionalFuture.isEmpty()) { return CompletableFuture.completedFuture(false); } return optionalFuture.get().thenApply(ob -> ob != null && ob.isActive()); @@ -1220,7 +1212,7 @@ public class NamespaceService implements AutoCloseable { return getBundleAsync(topic) .thenCompose(bundle -> loadManager.get().checkOwnershipAsync(Optional.of(topic), bundle)); } - return getBundleAsync(topic).thenApply(bundle -> ownershipCache.isNamespaceBundleOwned(bundle)); + return getBundleAsync(topic).thenApply(ownershipCache::isNamespaceBundleOwned); } public CompletableFuture<Boolean> checkTopicOwnership(TopicName topicName) { @@ -1462,21 +1454,19 @@ public class NamespaceService implements AutoCloseable { if (peerClusterData != null) { return getNonPersistentTopicsFromPeerCluster(peerClusterData, namespaceName); } else { - // Non-persistent topics don't have managed ledgers so we have to retrieve them from local + // Non-persistent topics don't have managed ledgers. So we have to retrieve them from local // cache. List<String> topics = new ArrayList<>(); synchronized (pulsar.getBrokerService().getMultiLayerTopicMap()) { if (pulsar.getBrokerService().getMultiLayerTopicMap() .containsKey(namespaceName.toString())) { pulsar.getBrokerService().getMultiLayerTopicMap().get(namespaceName.toString()) - .forEach((__, bundle) -> { - bundle.forEach((topicName, topic) -> { - if (topic instanceof NonPersistentTopic - && ((NonPersistentTopic) topic).isActive()) { - topics.add(topicName); - } - }); - }); + .forEach((__, bundle) -> bundle.forEach((topicName, topic) -> { + if (topic instanceof NonPersistentTopic + && ((NonPersistentTopic) topic).isActive()) { + topics.add(topicName); + } + })); } } @@ -1545,14 +1535,10 @@ public class NamespaceService implements AutoCloseable { if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) { ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get()); return extensibleLoadManager.getOwnershipWithLookupDataAsync(bundle) - .thenCompose(lookupData -> { - if (lookupData.isPresent()) { - return CompletableFuture.completedFuture( - Optional.of(lookupData.get().toNamespaceEphemeralData())); - } else { - return CompletableFuture.completedFuture(Optional.empty()); - } - }); + .thenCompose(lookupData -> lookupData + .map(brokerLookupData -> + CompletableFuture.completedFuture(Optional.of(brokerLookupData.toNamespaceEphemeralData()))) + .orElseGet(() -> CompletableFuture.completedFuture(Optional.empty()))); } return ownershipCache.getOwnerAsync(bundle); } @@ -1576,7 +1562,6 @@ public class NamespaceService implements AutoCloseable { } public void unloadSLANamespace() throws Exception { - PulsarAdmin adminClient = null; NamespaceName namespaceName = getSLAMonitorNamespace(host, config); LOG.info("Checking owner for SLA namespace {}", namespaceName); @@ -1589,7 +1574,7 @@ public class NamespaceService implements AutoCloseable { } LOG.info("Trying to unload SLA namespace {}", namespaceName); - adminClient = pulsar.getAdminClient(); + PulsarAdmin adminClient = pulsar.getAdminClient(); adminClient.namespaces().unload(namespaceName.toString()); LOG.info("Namespace {} unloaded successfully", namespaceName); } @@ -1662,7 +1647,7 @@ public class NamespaceService implements AutoCloseable { /** * used for filtering bundles in special namespace. - * @param namespace + * @param namespace the namespace name * @return True if namespace is HEARTBEAT_NAMESPACE or SLA_NAMESPACE */ public static boolean filterNamespaceForShedding(String namespace) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index bfb94aa7740..fa121b8eb4d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -309,7 +309,7 @@ public abstract class PulsarWebResource { } return pulsar.getPulsarResources().getTenantResources().getTenantAsync(tenant) .thenCompose(tenantInfoOptional -> { - if (!tenantInfoOptional.isPresent()) { + if (tenantInfoOptional.isEmpty()) { throw new RestException(Status.NOT_FOUND, "Tenant does not exist"); } TenantInfo tenantInfo = tenantInfoOptional.get(); @@ -380,9 +380,11 @@ public abstract class PulsarWebResource { /** * It validates that peer-clusters can't coexist in replication-clusters. * - * @clusterName: given cluster whose peer-clusters can't be present into replication-cluster list - * @replicationClusters: replication-cluster list + * @param clusterName given cluster whose peer-clusters can't be present into replication-cluster list + * @param replicationClusters replication-cluster list + * @deprecated use {@link #validatePeerClusterConflictAsync(String, Set)} instead */ + @Deprecated protected void validatePeerClusterConflict(String clusterName, Set<String> replicationClusters) { try { ClusterData clusterData = clusterResources().getCluster(clusterName).orElseThrow( @@ -453,7 +455,7 @@ public abstract class PulsarWebResource { protected CompletableFuture<Void> validateClusterForTenantAsync(String tenant, String cluster) { return pulsar().getPulsarResources().getTenantResources().getTenantAsync(tenant) .thenAccept(tenantInfo -> { - if (!tenantInfo.isPresent()) { + if (tenantInfo.isEmpty()) { throw new RestException(Status.NOT_FOUND, "Tenant does not exist"); } if (!tenantInfo.get().getAllowedClusters().contains(cluster)) { @@ -488,7 +490,7 @@ public abstract class PulsarWebResource { * Check if the cluster exists and redirect the call to the owning cluster. * * @param cluster Cluster name - * @throws Exception In case the redirect happens + * @throws WebApplicationException In case the redirect happens */ protected void validateClusterOwnership(String cluster) throws WebApplicationException { sync(()-> validateClusterOwnershipAsync(cluster)); @@ -550,11 +552,8 @@ public abstract class PulsarWebResource { return true; } - if (!pulsarService.getConfiguration().isAuthorizationEnabled()) { - // Without authorization, any cluster name should be valid and accepted by the broker - return true; - } - return false; + // Without authorization, any cluster name should be valid and accepted by the broker + return !pulsarService.getConfiguration().isAuthorizationEnabled(); } protected void validateBundleOwnership(String tenant, String cluster, String namespace, boolean authoritative, @@ -611,7 +610,7 @@ public abstract class PulsarWebResource { .requestHttps(isRequestHttps()) .readOnly(true) .loadTopicsInBundle(false).build(); - return nsService.getWebServiceUrlAsync(nsBundle, options).thenApply(optionUrl -> optionUrl.isPresent()); + return nsService.getWebServiceUrlAsync(nsBundle, options).thenApply(Optional::isPresent); } protected NamespaceBundle validateNamespaceBundleOwnership(NamespaceName fqnn, BundlesData bundles, @@ -664,7 +663,7 @@ public abstract class PulsarWebResource { .loadTopicsInBundle(false).build(); Optional<URL> webUrl = nsService.getWebServiceUrl(bundle, options); // Ensure we get a url - if (webUrl == null || !webUrl.isPresent()) { + if (webUrl.isEmpty()) { log.warn("Unable to get web service url"); throw new RestException(Status.PRECONDITION_FAILED, "Failed to find ownership for ServiceUnit:" + bundle.toString()); @@ -697,8 +696,6 @@ public abstract class PulsarWebResource { } catch (NullPointerException e) { log.warn("Unable to get web service url"); throw new RestException(Status.PRECONDITION_FAILED, "Failed to find ownership for ServiceUnit:" + bundle); - } catch (WebApplicationException wae) { - throw wae; } } @@ -712,7 +709,7 @@ public abstract class PulsarWebResource { .loadTopicsInBundle(false).build(); return nsService.getWebServiceUrlAsync(bundle, options) .thenCompose(webUrl -> { - if (webUrl == null || !webUrl.isPresent()) { + if (webUrl.isEmpty()) { log.warn("Unable to get web service url"); throw new RestException(Status.PRECONDITION_FAILED, "Failed to find ownership for ServiceUnit:" + bundle.toString()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RequestWrapper.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RequestWrapper.java index 16d87baedbe..afebbd276eb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RequestWrapper.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RequestWrapper.java @@ -62,7 +62,7 @@ public class RequestWrapper extends HttpServletRequestWrapper { } - public int read() throws IOException { + public int read() { return byteArrayInputStream.read(); } }; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ResponseHandlerFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ResponseHandlerFilter.java index efed6140395..3fa00beea1f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ResponseHandlerFilter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ResponseHandlerFilter.java @@ -76,24 +76,24 @@ public class ResponseHandlerFilter implements Filter { if (request.isAsyncSupported() && request.isAsyncStarted()) { request.getAsyncContext().addListener(new AsyncListener() { @Override - public void onComplete(AsyncEvent asyncEvent) throws IOException { + public void onComplete(AsyncEvent asyncEvent) { handleInterceptor(request, response); } @Override - public void onTimeout(AsyncEvent asyncEvent) throws IOException { + public void onTimeout(AsyncEvent asyncEvent) { LOG.warn("Http request {} async context timeout.", request); handleInterceptor(request, response); } @Override - public void onError(AsyncEvent asyncEvent) throws IOException { + public void onError(AsyncEvent asyncEvent) { LOG.warn("Http request {} async context error.", request, asyncEvent.getThrowable()); handleInterceptor(request, response); } @Override - public void onStartAsync(AsyncEvent asyncEvent) throws IOException { + public void onStartAsync(AsyncEvent asyncEvent) { // nothing to do } }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestException.java index b18aa1c787a..c3ae3a495cf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestException.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestException.java @@ -30,7 +30,6 @@ import org.apache.pulsar.common.policies.data.ErrorData; /** * Exception used to provide better error messages to clients of the REST API. */ -@SuppressWarnings("serial") public class RestException extends WebApplicationException { private Throwable cause = null; static String getExceptionData(Throwable t) { @@ -75,8 +74,7 @@ public class RestException extends WebApplicationException { } private static Response getResponse(Throwable t) { - if (t instanceof WebApplicationException) { - WebApplicationException e = (WebApplicationException) t; + if (t instanceof WebApplicationException e) { return e.getResponse(); } else { return Response diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java index 2d6a6af5847..eada0436f4d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java @@ -264,9 +264,7 @@ public class WebService implements AutoCloseable { context.setContextPath(path); context.addServlet(servletHolder, MATCH_ALL); if (attributeMap != null) { - attributeMap.forEach((key, value) -> { - context.setAttribute(key, value); - }); + attributeMap.forEach(context::setAttribute); } filterInitializer.addFilters(context, requiresAuthentication); handlers.add(context);