This is an automated email from the ASF dual-hosted git repository.

cbornet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d5303514ef [cleanup][broker] Various cleanups (#20658)
8d5303514ef is described below

commit 8d5303514efa6cf0fa98c7285339a11821c7cf79
Author: Christophe Bornet <cbor...@hotmail.com>
AuthorDate: Fri Jun 30 14:35:21 2023 +0200

    [cleanup][broker] Various cleanups (#20658)
---
 .../pulsar/broker/BookKeeperClientFactoryImpl.java |   3 +-
 .../pulsar/broker/ManagedLedgerClientFactory.java  |  18 +--
 .../org/apache/pulsar/broker/PulsarService.java    |  51 ++++---
 .../broker/TransactionMetadataStoreService.java    |  32 ++--
 .../pulsar/broker/loadbalance/LoadManager.java     |  17 +--
 .../pulsar/broker/namespace/NamespaceService.java  | 169 ++++++++++-----------
 .../pulsar/broker/web/PulsarWebResource.java       |  27 ++--
 .../apache/pulsar/broker/web/RequestWrapper.java   |   2 +-
 .../pulsar/broker/web/ResponseHandlerFilter.java   |   8 +-
 .../apache/pulsar/broker/web/RestException.java    |   4 +-
 .../org/apache/pulsar/broker/web/WebService.java   |   4 +-
 11 files changed, 152 insertions(+), 183 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
index 0259dfc7a58..0ecca755956 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
@@ -49,7 +49,6 @@ import 
org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.metadata.bookkeeper.AbstractMetadataDriver;
 import org.apache.pulsar.metadata.bookkeeper.PulsarMetadataClientDriver;
 
-@SuppressWarnings("deprecation")
 @Slf4j
 public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
 
@@ -71,7 +70,7 @@ public class BookKeeperClientFactoryImpl implements 
BookKeeperClientFactory {
 
         ClientConfiguration bkConf = createBkClientConfiguration(store, conf);
         if (properties != null) {
-            properties.forEach((key, value) -> bkConf.setProperty(key, value));
+            properties.forEach(bkConf::setProperty);
         }
         if (ensemblePlacementPolicyClass.isPresent()) {
             setEnsemblePlacementPolicy(bkConf, conf, store, 
ensemblePlacementPolicyClass.get());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
index d86649abd3c..51fb8bc1ae3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
@@ -158,17 +158,15 @@ public class ManagedLedgerClientFactory implements 
ManagedLedgerStorage {
                 // factory, however that might be introducing more unknowns.
                 log.warn("Encountered exceptions on closing bookkeeper 
client", ree);
             }
-            if (bkEnsemblePolicyToBkClientMap != null) {
-                bkEnsemblePolicyToBkClientMap.forEach((policy, bk) -> {
-                    try {
-                        if (bk != null) {
-                            bk.close();
-                        }
-                    } catch (Exception e) {
-                        log.warn("Failed to close bookkeeper-client for policy 
{}", policy, e);
+            bkEnsemblePolicyToBkClientMap.forEach((policy, bk) -> {
+                try {
+                    if (bk != null) {
+                        bk.close();
                     }
-                });
-            }
+                } catch (Exception e) {
+                    log.warn("Failed to close bookkeeper-client for policy 
{}", policy, e);
+                }
+            });
             log.info("Closed BookKeeper client");
         } catch (Exception e) {
             log.warn(e.getMessage(), e);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 2b64a8cfb5c..3d1254eec42 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -185,13 +185,12 @@ import org.slf4j.LoggerFactory;
 /**
  * Main class for Pulsar broker service.
  */
-
 @Getter(AccessLevel.PUBLIC)
 @Setter(AccessLevel.PROTECTED)
 public class PulsarService implements AutoCloseable, ShutdownService {
     private static final Logger LOG = 
LoggerFactory.getLogger(PulsarService.class);
     private static final double 
GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT = 0.5d;
-    private ServiceConfiguration config = null;
+    private final ServiceConfiguration config;
     private NamespaceService nsService = null;
     private ManagedLedgerStorage managedLedgerClientFactory = null;
     private LeaderElectionService leaderElectionService = null;
@@ -255,7 +254,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
     private AdditionalServlets brokerAdditionalServlets;
 
     // packages management service
-    private Optional<PackagesManagement> packagesManagement = Optional.empty();
+    private PackagesManagement packagesManagement = null;
     private PulsarPrometheusMetricsServlet metricsServlet;
     private List<PrometheusRawMetricsProvider> pendingMetricsProviders;
 
@@ -285,10 +284,8 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
     private Map<String, AdvertisedListener> advertisedListeners;
 
     public PulsarService(ServiceConfiguration config) {
-        this(config, Optional.empty(), (exitCode) -> {
-                LOG.info("Process termination requested with code {}. "
-                         + "Ignoring, as this constructor is intended for 
tests. ", exitCode);
-            });
+        this(config, Optional.empty(), (exitCode) -> LOG.info("Process 
termination requested with code {}. "
+                 + "Ignoring, as this constructor is intended for tests. ", 
exitCode));
     }
 
     public PulsarService(ServiceConfiguration config, Optional<WorkerService> 
functionWorkerService,
@@ -370,7 +367,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
 
     /**
      * Close the session to the metadata service.
-     *
+     * <p>
      * This will immediately release all the resource locks held by this 
broker on the coordination service.
      *
      * @throws Exception if the close operation fails
@@ -400,8 +397,12 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                 throw (PulsarServerException) cause;
             } else if (getConfiguration().getBrokerShutdownTimeoutMs() == 0
                     && (cause instanceof TimeoutException || cause instanceof 
CancellationException)) {
-                // ignore shutdown timeout when timeout is 0, which is 
primarily used in tests
-                // to forcefully shutdown the broker
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(
+                        "Shutdown timeout ignored when timeout is 0, "
+                            + "which is primarily used in tests to forcefully 
shutdown the broker",
+                        cause);
+                }
             } else {
                 throw new PulsarServerException(cause);
             }
@@ -693,7 +694,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                 throw new PulsarServerException("Cannot start the service once 
it was stopped");
             }
 
-            if (!config.getWebServicePort().isPresent() && 
!config.getWebServicePortTls().isPresent()) {
+            if (config.getWebServicePort().isEmpty() && 
config.getWebServicePortTls().isEmpty()) {
                 throw new 
IllegalArgumentException("webServicePort/webServicePortTls must be present");
             }
 
@@ -722,7 +723,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                         config.getDefaultRetentionTimeInMinutes() * 60));
             }
 
-            if (!config.getLoadBalancerOverrideBrokerNicSpeedGbps().isPresent()
+            if (config.getLoadBalancerOverrideBrokerNicSpeedGbps().isEmpty()
                     && config.isLoadBalancerEnabled()
                     && LinuxInfoUtils.isLinux()
                     && !LinuxInfoUtils.checkHasNicSpeeds()) {
@@ -896,7 +897,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
             if (isNotBlank(config.getResourceUsageTransportClassName())) {
                 Class<?> clazz = 
Class.forName(config.getResourceUsageTransportClassName());
                 Constructor<?> ctor = 
clazz.getConstructor(PulsarService.class);
-                Object object = ctor.newInstance(new Object[]{this});
+                Object object = ctor.newInstance(this);
                 this.resourceUsageTransportManager = 
(ResourceUsageTopicTransportManager) object;
             }
             this.resourceGroupServiceManager = new ResourceGroupService(this);
@@ -1241,7 +1242,6 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
      * Load all the topics contained in a namespace.
      *
      * @param bundle <code>NamespaceBundle</code> to identify the service unit
-     * @throws Exception
      */
     public void loadNamespaceTopics(NamespaceBundle bundle) {
         executor.submit(() -> {
@@ -1296,7 +1296,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
             config.getConfigurationMetadataStoreUrl(),
             new ClientConfiguration().getZkLedgersRootPath(),
             config.isBookkeeperMetadataStoreSeparated() ? 
config.getBookkeeperMetadataStoreUrl() : null,
-            this.getWorkerConfig().map(wc -> 
wc.getStateStorageServiceUrl()).orElse(null));
+            
this.getWorkerConfig().map(WorkerConfig::getStateStorageServiceUrl).orElse(null));
     }
 
     /**
@@ -1411,7 +1411,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                     Offloaders offloaders = 
offloadersCache.getOrLoadOffloaders(
                             offloadPolicies.getOffloadersDirectory(), 
config.getNarExtractionDirectory());
 
-                    LedgerOffloaderFactory offloaderFactory = 
offloaders.getOffloaderFactory(
+                    LedgerOffloaderFactory<?> offloaderFactory = 
offloaders.getOffloaderFactory(
                             offloadPolicies.getManagedLedgerOffloadDriver());
                     try {
                         return offloaderFactory.create(
@@ -1699,7 +1699,8 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
             AdvertisedListener internalListener = 
ServiceConfigurationUtils.getInternalListener(config, "http");
             return internalListener.getBrokerHttpUrl() != null
                     ? internalListener.getBrokerHttpUrl().toString()
-                    : 
webAddress(ServiceConfigurationUtils.getWebServiceAddress(config), 
getListenPortHTTP().get());
+                    : 
webAddress(ServiceConfigurationUtils.getWebServiceAddress(config),
+                            getListenPortHTTP().orElseThrow());
         } else {
             return null;
         }
@@ -1714,7 +1715,8 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
             AdvertisedListener internalListener = 
ServiceConfigurationUtils.getInternalListener(config, "https");
             return internalListener.getBrokerHttpsUrl() != null
                     ? internalListener.getBrokerHttpsUrl().toString()
-                    : 
webAddressTls(ServiceConfigurationUtils.getWebServiceAddress(config), 
getListenPortHTTPS().get());
+                    : 
webAddressTls(ServiceConfigurationUtils.getWebServiceAddress(config),
+                            getListenPortHTTPS().orElseThrow());
         } else {
             return null;
         }
@@ -1736,7 +1738,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
     public String getLookupServiceAddress() {
         return String.format("%s:%s", advertisedAddress, 
config.getWebServicePort().isPresent()
                 ? config.getWebServicePort().get()
-                : config.getWebServicePortTls().get());
+                : config.getWebServicePortTls().orElseThrow());
     }
 
     public TopicPoliciesService getTopicPoliciesService() {
@@ -1798,21 +1800,22 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
     }
 
     public PackagesManagement getPackagesManagement() throws 
UnsupportedOperationException {
-        return packagesManagement.orElseThrow(() -> new 
UnsupportedOperationException("Package Management Service "
-                + "is not enabled in the broker."));
+        if (packagesManagement == null) {
+            throw new UnsupportedOperationException("Package Management 
Service is not enabled in the broker.");
+        }
+        return packagesManagement;
     }
 
     private void startPackagesManagementService() throws IOException {
         // TODO: using provider to initialize the packages management service.
-        PackagesManagement packagesManagementService = new 
PackagesManagementImpl();
-        this.packagesManagement = Optional.of(packagesManagementService);
+        this.packagesManagement = new PackagesManagementImpl();
         PackagesStorageProvider storageProvider = PackagesStorageProvider
             .newProvider(config.getPackagesManagementStorageProvider());
         DefaultPackagesStorageConfiguration storageConfiguration = new 
DefaultPackagesStorageConfiguration();
         storageConfiguration.setProperty(config.getProperties());
         PackagesStorage storage = 
storageProvider.getStorage(storageConfiguration);
         storage.initialize();
-        packagesManagementService.initialize(storage);
+        this.packagesManagement.initialize(storage);
     }
 
     public Optional<Integer> getListenPortHTTP() {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index 35aa7cc2fdd..c80580b02f1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -85,16 +85,13 @@ public class TransactionMetadataStoreService {
     private final Timer transactionOpRetryTimer;
     // this semaphore for loading one transaction coordinator with the same tc 
id on the same time
     private final ConcurrentLongHashMap<Semaphore> tcLoadSemaphores;
-    // one connect request open the transactionMetaStore the other request 
will add to the queue, when the open op
-    // finished the request will be poll and complete the future
+    // one connect request opens the transactionMetaStore the other request 
will add to the queue, when the open op
+    // finishes the request will be polled and will complete the future
     private final 
ConcurrentLongHashMap<ConcurrentLinkedDeque<CompletableFuture<Void>>> 
pendingConnectRequests;
     private final ExecutorService internalPinnedExecutor;
 
     private static final long HANDLE_PENDING_CONNECT_TIME_OUT = 30000L;
 
-    private final ThreadFactory threadFactory =
-            new 
ExecutorProvider.ExtendedThreadFactory("transaction-coordinator-thread-factory");
-
 
     public TransactionMetadataStoreService(TransactionMetadataStoreProvider 
transactionMetadataStoreProvider,
                                            PulsarService pulsarService, 
TransactionBufferClient tbClient,
@@ -108,6 +105,8 @@ public class TransactionMetadataStoreService {
         this.tcLoadSemaphores = 
ConcurrentLongHashMap.<Semaphore>newBuilder().build();
         this.pendingConnectRequests =
                 
ConcurrentLongHashMap.<ConcurrentLinkedDeque<CompletableFuture<Void>>>newBuilder().build();
+        ThreadFactory threadFactory =
+                new 
ExecutorProvider.ExtendedThreadFactory("transaction-coordinator-thread-factory");
         this.internalPinnedExecutor = 
Executors.newSingleThreadScheduledExecutor(threadFactory);
     }
 
@@ -200,7 +199,7 @@ public class TransactionMetadataStoreService {
                         // then handle the requests witch in the queue
                         deque.add(completableFuture);
                         if (LOG.isDebugEnabled()) {
-                            LOG.debug("Handle tc client connect added into 
pending queue! tcId : {}", tcId.toString());
+                            LOG.debug("Handle tc client connect added into 
pending queue! tcId : {}", tcId);
                         }
                     }
                 })).exceptionally(ex -> {
@@ -367,17 +366,11 @@ public class TransactionMetadataStoreService {
 
     private CompletionStage<Void> fakeAsyncCheckTxnStatus(TxnStatus txnStatus, 
int txnAction,
                                                           TxnID txnID, 
TxnStatus expectStatus) {
-        boolean isLegal;
-        switch (txnStatus) {
-            case COMMITTING:
-                isLegal =  (txnAction == TxnAction.COMMIT.getValue());
-                break;
-            case ABORTING:
-                isLegal =  (txnAction == TxnAction.ABORT.getValue());
-                break;
-            default:
-                isLegal = false;
-        }
+        boolean isLegal = switch (txnStatus) {
+            case COMMITTING -> (txnAction == TxnAction.COMMIT.getValue());
+            case ABORTING -> (txnAction == TxnAction.ABORT.getValue());
+            default -> false;
+        };
         if (!isLegal) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("EndTxnInTransactionBuffer op retry! TxnId : {}, 
TxnAction : {}", txnID, txnAction);
@@ -502,15 +495,14 @@ public class TransactionMetadataStoreService {
 
     public void close () {
         this.internalPinnedExecutor.shutdown();
-        stores.forEach((tcId, metadataStore) -> {
+        stores.forEach((tcId, metadataStore) ->
             metadataStore.closeAsync().whenComplete((v, ex) -> {
                 if (ex != null) {
                     LOG.error("Close transaction metadata store with id " + 
tcId, ex);
                 } else {
                     LOG.info("Removed and closed transaction meta store {}", 
tcId);
                 }
-            });
-        });
+        }));
         stores.clear();
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java
index 17bff57b85c..2cce68b60cb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java
@@ -42,7 +42,7 @@ import org.slf4j.LoggerFactory;
  * LoadManager runs through set of load reports collected from different 
brokers and generates a recommendation of
  * namespace/ServiceUnit placement on machines/ResourceUnit. Each Concrete 
Load Manager will use different algorithms to
  * generate this mapping.
- *
+ * <p>
  * Concrete Load Manager is also return the least loaded broker that should 
own the new namespace.
  */
 public interface LoadManager {
@@ -88,7 +88,7 @@ public interface LoadManager {
 
     /**
      * Publish the current load report on ZK, forced or not.
-     * By default rely on method writeLoadReportOnZookeeper().
+     * By default, rely on method writeLoadReportOnZookeeper().
      */
     default void writeLoadReportOnZookeeper(boolean force) throws Exception {
         writeLoadReportOnZookeeper();
@@ -118,15 +118,15 @@ public interface LoadManager {
      * Removes visibility of current broker from loadbalancer list so, other 
brokers can't redirect any request to this
      * broker and this broker won't accept new connection requests.
      *
-     * @throws Exception
+     * @throws Exception if there is any error while disabling broker
      */
     void disableBroker() throws Exception;
 
     /**
      * Get list of available brokers in cluster.
      *
-     * @return
-     * @throws Exception
+     * @return the list of available brokers
+     * @throws Exception if there is any error while getting available brokers
      */
     Set<String> getAvailableBrokers() throws Exception;
 
@@ -150,12 +150,11 @@ public interface LoadManager {
             // Assume there is a constructor with one argument of 
PulsarService.
             final Object loadManagerInstance = 
Reflections.createInstance(conf.getLoadManagerClassName(),
                     Thread.currentThread().getContextClassLoader());
-            if (loadManagerInstance instanceof LoadManager) {
-                final LoadManager casted = (LoadManager) loadManagerInstance;
+            if (loadManagerInstance instanceof LoadManager casted) {
                 casted.initialize(pulsar);
                 return casted;
-            } else if (loadManagerInstance instanceof ModularLoadManager) {
-                final LoadManager casted = new 
ModularLoadManagerWrapper((ModularLoadManager) loadManagerInstance);
+            } else if (loadManagerInstance instanceof ModularLoadManager 
modularLoadManager) {
+                final LoadManager casted = new 
ModularLoadManagerWrapper(modularLoadManager);
                 casted.initialize(pulsar);
                 return casted;
             } else if (loadManagerInstance instanceof ExtensibleLoadManager) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 9be8d4938e3..585d62c5b1f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -46,6 +46,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
+import javax.annotation.Nullable;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.collections4.ListUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -248,28 +249,27 @@ public class NamespaceService implements AutoCloseable {
 
     /**
      * Return the URL of the broker who's owning a particular service unit in 
asynchronous way.
-     *
+     * <p>
      * If the service unit is not owned, return a CompletableFuture with empty 
optional.
      */
     public CompletableFuture<Optional<URL>> 
getWebServiceUrlAsync(ServiceUnitId suName, LookupOptions options) {
-        if (suName instanceof TopicName) {
-            TopicName name = (TopicName) suName;
+        if (suName instanceof TopicName name) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Getting web service URL of topic: {} - options: 
{}", name, options);
             }
             return getBundleAsync(name)
                     .thenCompose(namespaceBundle ->
-                            internalGetWebServiceUrl(Optional.of(name), 
namespaceBundle, options));
+                            internalGetWebServiceUrl(name, namespaceBundle, 
options));
         }
 
-        if (suName instanceof NamespaceName) {
-            return getFullBundleAsync((NamespaceName) suName)
+        if (suName instanceof NamespaceName namespaceName) {
+            return getFullBundleAsync(namespaceName)
                     .thenCompose(namespaceBundle ->
-                            internalGetWebServiceUrl(Optional.empty(), 
namespaceBundle, options));
+                            internalGetWebServiceUrl(null, namespaceBundle, 
options));
         }
 
-        if (suName instanceof NamespaceBundle) {
-            return internalGetWebServiceUrl(Optional.empty(), 
(NamespaceBundle) suName, options);
+        if (suName instanceof NamespaceBundle namespaceBundle) {
+            return internalGetWebServiceUrl(null, namespaceBundle, options);
         }
 
         throw new IllegalArgumentException("Unrecognized class of 
NamespaceBundle: " + suName.getClass().getName());
@@ -277,7 +277,7 @@ public class NamespaceService implements AutoCloseable {
 
     /**
      * Return the URL of the broker who's owning a particular service unit.
-     *
+     * <p>
      * If the service unit is not owned, return an empty optional
      */
     public Optional<URL> getWebServiceUrl(ServiceUnitId suName, LookupOptions 
options) throws Exception {
@@ -285,7 +285,7 @@ public class NamespaceService implements AutoCloseable {
                 
.get(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), 
SECONDS);
     }
 
-    private CompletableFuture<Optional<URL>> 
internalGetWebServiceUrl(Optional<ServiceUnitId> topic,
+    private CompletableFuture<Optional<URL>> 
internalGetWebServiceUrl(@Nullable ServiceUnitId topic,
                                                                       
NamespaceBundle bundle,
                                                                       
LookupOptions options) {
 
@@ -306,7 +306,7 @@ public class NamespaceService implements AutoCloseable {
             }
             CompletableFuture<Optional<LookupResult>> future =
                     
ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)
-                    ? loadManager.get().findBrokerServiceUrl(topic, bundle) :
+                    ? 
loadManager.get().findBrokerServiceUrl(Optional.ofNullable(topic), bundle) :
                     findBrokerServiceUrl(bundle, options);
 
             return future.thenApply(lookupResult -> {
@@ -329,7 +329,7 @@ public class NamespaceService implements AutoCloseable {
     /**
      * Register all the bootstrap name spaces including the heartbeat 
namespace.
      *
-     * @throws PulsarServerException
+     * @throws PulsarServerException if an unexpected error occurs
      */
     public void registerBootstrapNamespaces() throws PulsarServerException {
 
@@ -352,20 +352,19 @@ public class NamespaceService implements AutoCloseable {
     }
 
     /**
-     * Tried to registers a namespace to this instance.
+     * Tries to register a namespace to this instance.
      *
-     * @param nsname
-     * @param ensureOwned
-     * @return
-     * @throws PulsarServerException
-     * @throws Exception
+     * @param nsname namespace name
+     * @param ensureOwned sets the behavior when the namespace is already 
owned by another broker.
+     *                    If this flag is set to true, then the method will 
throw an exception.
+     *                    If this flag is set to false, then the method will 
return false.
+     * @return true if the namespace was successfully registered, false 
otherwise
+     * @throws PulsarServerException if an error occurs when registering the 
namespace
      */
     public boolean registerNamespace(NamespaceName nsname, boolean 
ensureOwned) throws PulsarServerException {
         try {
-            NamespaceBundle nsFullBundle = null;
-
             // all pre-registered namespace is assumed to have bundles disabled
-            nsFullBundle = bundleFactory.getFullBundle(nsname);
+            NamespaceBundle nsFullBundle = bundleFactory.getFullBundle(nsname);
             // v2 namespace will always use full bundle object
             final NamespaceEphemeralData otherData;
             if 
(ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
@@ -417,10 +416,9 @@ public class NamespaceService implements AutoCloseable {
     /**
      * Main internal method to lookup and setup ownership of service unit to a 
broker.
      *
-     * @param bundle
-     * @param options
-     * @return
-     * @throws PulsarServerException
+     * @param bundle the namespace bundle
+     * @param options the lookup options
+     * @return the lookup result
      */
     private CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
             NamespaceBundle bundle, LookupOptions options) {
@@ -440,7 +438,7 @@ public class NamespaceService implements AutoCloseable {
 
             // First check if we or someone else already owns the bundle
             ownershipCache.getOwnerAsync(bundle).thenAccept(nsData -> {
-                if (!nsData.isPresent()) {
+                if (nsData.isEmpty()) {
                     // No one owns this bundle
 
                     if (options.isReadOnly()) {
@@ -448,9 +446,7 @@ public class NamespaceService implements AutoCloseable {
                         future.complete(Optional.empty());
                     } else {
                         // Now, no one owns the namespace yet. Hence, we will 
try to dynamically assign it
-                        pulsar.getExecutor().execute(() -> {
-                            searchForCandidateBroker(bundle, future, options);
-                        });
+                        pulsar.getExecutor().execute(() -> 
searchForCandidateBroker(bundle, future, options));
                     }
                 } else if (nsData.get().isDisabled()) {
                     future.completeExceptionally(
@@ -474,7 +470,6 @@ public class NamespaceService implements AutoCloseable {
                                     url == null ? null : url.toString(),
                                     urlTls == null ? null : 
urlTls.toString())));
                         }
-                        return;
                     } else {
                         future.complete(Optional.of(new 
LookupResult(nsData.get())));
                     }
@@ -502,13 +497,13 @@ public class NamespaceService implements AutoCloseable {
                     new IllegalStateException("The leader election has not yet 
been completed!"));
             return;
         }
-        String candidateBroker = null;
+        String candidateBroker;
         String candidateBrokerAdvertisedAddr = null;
 
         LeaderElectionService les = pulsar.getLeaderElectionService();
         if (les == null) {
             // The leader election service was not initialized yet. This can 
happen because the broker service is
-            // initialized first and it might start receiving lookup requests 
before the leader election service is
+            // initialized first, and it might start receiving lookup requests 
before the leader election service is
             // fully initialized.
             LOG.warn("Leader election service isn't initialized yet. "
                             + "Returning empty result to lookup. 
NamespaceBundle[{}]",
@@ -548,7 +543,7 @@ public class NamespaceService implements AutoCloseable {
                                 && 
isBrokerActive(currentLeader.get().getServiceUrl());
                         if (!leaderBrokerActive) {
                             makeLoadManagerDecisionOnThisBroker = true;
-                            if (!currentLeader.isPresent()) {
+                            if (currentLeader.isEmpty()) {
                                 LOG.warn(
                                         "The information about the current 
leader broker wasn't available. "
                                                 + "Handling load manager 
decisions in a decentralized way. "
@@ -565,7 +560,7 @@ public class NamespaceService implements AutoCloseable {
                     }
                     if (makeLoadManagerDecisionOnThisBroker) {
                         Optional<Pair<String, String>> availableBroker = 
getLeastLoadedFromLoadManager(bundle);
-                        if (!availableBroker.isPresent()) {
+                        if (availableBroker.isEmpty()) {
                             LOG.warn("Load manager didn't return any available 
broker. "
                                             + "Returning empty result to 
lookup. NamespaceBundle[{}]",
                                     bundle);
@@ -603,7 +598,7 @@ public class NamespaceService implements AutoCloseable {
                         // Found owner for the namespace bundle
 
                         if (options.isLoadTopicsInBundle()) {
-                            // Schedule the task to pre-load topics
+                            // Schedule the task to preload topics
                             pulsar.loadNamespaceTopics(bundle);
                         }
                         // find the target
@@ -614,7 +609,6 @@ public class NamespaceService implements AutoCloseable {
                                 lookupFuture.completeExceptionally(
                                         new PulsarServerException("the broker 
do not have "
                                                 + 
options.getAdvertisedListenerName() + " listener"));
-                                return;
                             } else {
                                 URI url = listener.getBrokerServiceUrl();
                                 URI urlTls = listener.getBrokerServiceUrlTls();
@@ -622,11 +616,9 @@ public class NamespaceService implements AutoCloseable {
                                         new LookupResult(ownerInfo,
                                                 url == null ? null : 
url.toString(),
                                                 urlTls == null ? null : 
urlTls.toString())));
-                                return;
                             }
                         } else {
                             lookupFuture.complete(Optional.of(new 
LookupResult(ownerInfo)));
-                            return;
                         }
                     }
                 }).exceptionally(exception -> {
@@ -712,7 +704,7 @@ public class NamespaceService implements AutoCloseable {
         } else {
             LOG.warn("Broker {} ({}) couldn't be found in available brokers 
{}",
                     candidateBroker, candidateBrokerHostAndPort,
-                    
availableBrokers.stream().collect(Collectors.joining(",")));
+                    String.join(",", availableBrokers));
             return false;
         }
     }
@@ -722,8 +714,7 @@ public class NamespaceService implements AutoCloseable {
         if (uriSeparatorPos == -1) {
             throw new IllegalArgumentException("'" + candidateBroker + "' 
isn't an URI.");
         }
-        String candidateBrokerHostAndPort = 
candidateBroker.substring(uriSeparatorPos + 3);
-        return candidateBrokerHostAndPort;
+        return candidateBroker.substring(uriSeparatorPos + 3);
     }
 
     private Set<String> getAvailableBrokers() {
@@ -737,12 +728,13 @@ public class NamespaceService implements AutoCloseable {
     /**
      * Helper function to encapsulate the logic to invoke between old and new 
load manager.
      *
-     * @return
-     * @throws Exception
+     * @param serviceUnit the service unit
+     * @return the least loaded broker addresses
+     * @throws Exception if an error occurs
      */
     private Optional<Pair<String, String>> 
getLeastLoadedFromLoadManager(ServiceUnitId serviceUnit) throws Exception {
         Optional<ResourceUnit> leastLoadedBroker = 
loadManager.get().getLeastLoaded(serviceUnit);
-        if (!leastLoadedBroker.isPresent()) {
+        if (leastLoadedBroker.isEmpty()) {
             LOG.warn("No broker is available for {}", serviceUnit);
             return Optional.empty();
         }
@@ -863,7 +855,7 @@ public class NamespaceService implements AutoCloseable {
 
     public boolean isNamespaceBundleDisabled(NamespaceBundle bundle) throws 
Exception {
         try {
-            // Does ZooKeeper says that the namespace is disabled?
+            // Does ZooKeeper say that the namespace is disabled?
             CompletableFuture<Optional<NamespaceEphemeralData>> nsDataFuture = 
ownershipCache.getOwnerAsync(bundle);
             if (nsDataFuture != null) {
                 Optional<NamespaceEphemeralData> nsData = 
nsDataFuture.getNow(null);
@@ -886,12 +878,14 @@ public class NamespaceService implements AutoCloseable {
     /**
      * 1. split the given bundle into two bundles 2. assign ownership of both 
the bundles to current broker 3. update
      * policies with newly created bundles into LocalZK 4. disable original 
bundle and refresh the cache.
-     *
+     * <p>
      * It will call splitAndOwnBundleOnceAndRetry to do the real retry work, 
which will retry "retryTimes".
      *
-     * @param bundle
-     * @return
-     * @throws Exception
+     * @param bundle the bundle to split
+     * @param unload whether to unload the new split bundles
+     * @param splitAlgorithm the algorithm to split the bundle
+     * @param boundaries the boundaries to split the bundle
+     * @return a future that will complete when the bundle is split and owned
      */
     public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle bundle, 
boolean unload,
                                                      
NamespaceBundleSplitAlgorithm splitAlgorithm,
@@ -926,36 +920,36 @@ public class NamespaceService implements AutoCloseable {
                 }
                 try {
                     bundleFactory.splitBundles(bundle, splitBoundaries.size() 
+ 1, splitBoundaries)
-                            .thenAccept(splittedBundles -> {
+                            .thenAccept(splitBundles -> {
                                 // Split and updateNamespaceBundles. Update 
may fail because of concurrent write to
                                 // Zookeeper.
-                                if (splittedBundles == null) {
+                                if (splitBundles == null) {
                                     String msg = format("bundle %s not found 
under namespace", bundle.toString());
                                     LOG.warn(msg);
                                     updateFuture.completeExceptionally(new 
ServiceUnitNotReadyException(msg));
                                     return;
                                 }
 
-                                
Objects.requireNonNull(splittedBundles.getLeft());
-                                
Objects.requireNonNull(splittedBundles.getRight());
-                                
checkArgument(splittedBundles.getRight().size() == splitBoundaries.size() + 1,
+                                Objects.requireNonNull(splitBundles.getLeft());
+                                
Objects.requireNonNull(splitBundles.getRight());
+                                checkArgument(splitBundles.getRight().size() 
== splitBoundaries.size() + 1,
                                         "bundle has to be split in " + 
(splitBoundaries.size() + 1) + " bundles");
                                 NamespaceName nsname = 
bundle.getNamespaceObject();
                                 if (LOG.isDebugEnabled()) {
                                     LOG.debug("[{}] splitAndOwnBundleOnce: {}, 
counter: {}, bundles: {}",
                                             nsname.toString(), 
bundle.getBundleRange(), counter.get(),
-                                            splittedBundles.getRight());
+                                            splitBundles.getRight());
                                 }
                                 try {
                                     // take ownership of newly split bundles
-                                    for (NamespaceBundle sBundle : 
splittedBundles.getRight()) {
+                                    for (NamespaceBundle sBundle : 
splitBundles.getRight()) {
                                         
Objects.requireNonNull(ownershipCache.tryAcquiringOwnership(sBundle));
                                     }
-                                    updateNamespaceBundles(nsname, 
splittedBundles.getLeft()).thenCompose(__ -> {
-                                        return 
updateNamespaceBundlesForPolicies(nsname, splittedBundles.getLeft());
-                                    }).thenRun(() -> {
-                                        
bundleFactory.invalidateBundleCache(bundle.getNamespaceObject());
-                                        
updateFuture.complete(splittedBundles.getRight());
+                                    updateNamespaceBundles(nsname, 
splitBundles.getLeft()).thenCompose(__ ->
+                                        
updateNamespaceBundlesForPolicies(nsname, splitBundles.getLeft()))
+                                            .thenRun(() -> {
+                                                
bundleFactory.invalidateBundleCache(bundle.getNamespaceObject());
+                                                
updateFuture.complete(splitBundles.getRight());
                                     }).exceptionally(ex1 -> {
                                         String msg = format("failed to update 
namespace policies [%s], "
                                                         + "NamespaceBundle: %s 
due to %s",
@@ -1023,7 +1017,7 @@ public class NamespaceService implements AutoCloseable {
                         .exceptionally(e -> {
                             String msg1 = format(
                                     "failed to disable bundle %s under 
namespace [%s] with error %s",
-                                    bundle.getNamespaceObject().toString(), 
bundle.toString(), ex.getMessage());
+                                    bundle.getNamespaceObject().toString(), 
bundle, ex.getMessage());
                             LOG.warn(msg1, e);
                             completionFuture.completeExceptionally(new 
ServiceUnitNotReadyException(msg1));
                             return null;
@@ -1093,9 +1087,8 @@ public class NamespaceService implements AutoCloseable {
      * Update new bundle-range to admin/policies/namespace.
      * Update may fail because of concurrent write to Zookeeper.
      *
-     * @param nsname
-     * @param nsBundles
-     * @throws Exception
+     * @param nsname the namespace name
+     * @param nsBundles the new namespace bundles
      */
     public CompletableFuture<Void> 
updateNamespaceBundlesForPolicies(NamespaceName nsname,
                                                                       
NamespaceBundles nsBundles) {
@@ -1122,9 +1115,8 @@ public class NamespaceService implements AutoCloseable {
      * Update new bundle-range to LocalZk (create a new node if not present).
      * Update may fail because of concurrent write to Zookeeper.
      *
-     * @param nsname
-     * @param nsBundles
-     * @throws Exception
+     * @param nsname the namespace name
+     * @param nsBundles the new namespace bundles
      */
     public CompletableFuture<Void> updateNamespaceBundles(NamespaceName 
nsname, NamespaceBundles nsBundles) {
         Objects.requireNonNull(nsname);
@@ -1176,7 +1168,7 @@ public class NamespaceService implements AutoCloseable {
     }
 
     /**
-     * @Deprecated This method is only used in test now.
+     * @deprecated This method is only used in test now.
      */
     @Deprecated
     public boolean isServiceUnitActive(TopicName topicName) {
@@ -1197,7 +1189,7 @@ public class NamespaceService implements AutoCloseable {
         }
         return getBundleAsync(topicName).thenCompose(bundle -> {
             Optional<CompletableFuture<OwnedBundle>> optionalFuture = 
ownershipCache.getOwnedBundleAsync(bundle);
-            if (!optionalFuture.isPresent()) {
+            if (optionalFuture.isEmpty()) {
                 return CompletableFuture.completedFuture(false);
             }
             return optionalFuture.get().thenApply(ob -> ob != null && 
ob.isActive());
@@ -1220,7 +1212,7 @@ public class NamespaceService implements AutoCloseable {
             return getBundleAsync(topic)
                     .thenCompose(bundle -> 
loadManager.get().checkOwnershipAsync(Optional.of(topic), bundle));
         }
-        return getBundleAsync(topic).thenApply(bundle -> 
ownershipCache.isNamespaceBundleOwned(bundle));
+        return 
getBundleAsync(topic).thenApply(ownershipCache::isNamespaceBundleOwned);
     }
 
     public CompletableFuture<Boolean> checkTopicOwnership(TopicName topicName) 
{
@@ -1462,21 +1454,19 @@ public class NamespaceService implements AutoCloseable {
                     if (peerClusterData != null) {
                         return 
getNonPersistentTopicsFromPeerCluster(peerClusterData, namespaceName);
                     } else {
-                        // Non-persistent topics don't have managed ledgers so 
we have to retrieve them from local
+                        // Non-persistent topics don't have managed ledgers. 
So we have to retrieve them from local
                         // cache.
                         List<String> topics = new ArrayList<>();
                         synchronized 
(pulsar.getBrokerService().getMultiLayerTopicMap()) {
                             if 
(pulsar.getBrokerService().getMultiLayerTopicMap()
                                     .containsKey(namespaceName.toString())) {
                                 
pulsar.getBrokerService().getMultiLayerTopicMap().get(namespaceName.toString())
-                                        .forEach((__, bundle) -> {
-                                            bundle.forEach((topicName, topic) 
-> {
-                                                if (topic instanceof 
NonPersistentTopic
-                                                        && 
((NonPersistentTopic) topic).isActive()) {
-                                                    topics.add(topicName);
-                                                }
-                                            });
-                                        });
+                                        .forEach((__, bundle) -> 
bundle.forEach((topicName, topic) -> {
+                                            if (topic instanceof 
NonPersistentTopic
+                                                    && ((NonPersistentTopic) 
topic).isActive()) {
+                                                topics.add(topicName);
+                                            }
+                                        }));
                             }
                         }
 
@@ -1545,14 +1535,10 @@ public class NamespaceService implements AutoCloseable {
         if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
             ExtensibleLoadManagerImpl extensibleLoadManager = 
ExtensibleLoadManagerImpl.get(loadManager.get());
             return 
extensibleLoadManager.getOwnershipWithLookupDataAsync(bundle)
-                    .thenCompose(lookupData -> {
-                        if (lookupData.isPresent()) {
-                            return CompletableFuture.completedFuture(
-                                    
Optional.of(lookupData.get().toNamespaceEphemeralData()));
-                        } else {
-                            return 
CompletableFuture.completedFuture(Optional.empty());
-                        }
-                    });
+                    .thenCompose(lookupData -> lookupData
+                        .map(brokerLookupData ->
+                            
CompletableFuture.completedFuture(Optional.of(brokerLookupData.toNamespaceEphemeralData())))
+                        .orElseGet(() -> 
CompletableFuture.completedFuture(Optional.empty())));
         }
         return ownershipCache.getOwnerAsync(bundle);
     }
@@ -1576,7 +1562,6 @@ public class NamespaceService implements AutoCloseable {
     }
 
     public void unloadSLANamespace() throws Exception {
-        PulsarAdmin adminClient = null;
         NamespaceName namespaceName = getSLAMonitorNamespace(host, config);
 
         LOG.info("Checking owner for SLA namespace {}", namespaceName);
@@ -1589,7 +1574,7 @@ public class NamespaceService implements AutoCloseable {
         }
 
         LOG.info("Trying to unload SLA namespace {}", namespaceName);
-        adminClient = pulsar.getAdminClient();
+        PulsarAdmin adminClient = pulsar.getAdminClient();
         adminClient.namespaces().unload(namespaceName.toString());
         LOG.info("Namespace {} unloaded successfully", namespaceName);
     }
@@ -1662,7 +1647,7 @@ public class NamespaceService implements AutoCloseable {
 
     /**
      * used for filtering bundles in special namespace.
-     * @param namespace
+     * @param namespace the namespace name
      * @return True if namespace is HEARTBEAT_NAMESPACE or SLA_NAMESPACE
      */
     public static boolean filterNamespaceForShedding(String namespace) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index bfb94aa7740..fa121b8eb4d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -309,7 +309,7 @@ public abstract class PulsarWebResource {
         }
         return 
pulsar.getPulsarResources().getTenantResources().getTenantAsync(tenant)
                 .thenCompose(tenantInfoOptional -> {
-                    if (!tenantInfoOptional.isPresent()) {
+                    if (tenantInfoOptional.isEmpty()) {
                         throw new RestException(Status.NOT_FOUND, "Tenant does 
not exist");
                     }
                     TenantInfo tenantInfo = tenantInfoOptional.get();
@@ -380,9 +380,11 @@ public abstract class PulsarWebResource {
     /**
      * It validates that peer-clusters can't coexist in replication-clusters.
      *
-     * @clusterName: given cluster whose peer-clusters can't be present into 
replication-cluster list
-     * @replicationClusters: replication-cluster list
+     * @param clusterName given cluster whose peer-clusters can't be present 
into replication-cluster list
+     * @param replicationClusters replication-cluster list
+     * @deprecated use {@link #validatePeerClusterConflictAsync(String, Set)} 
instead
      */
+    @Deprecated
     protected void validatePeerClusterConflict(String clusterName, Set<String> 
replicationClusters) {
         try {
             ClusterData clusterData = 
clusterResources().getCluster(clusterName).orElseThrow(
@@ -453,7 +455,7 @@ public abstract class PulsarWebResource {
     protected CompletableFuture<Void> validateClusterForTenantAsync(String 
tenant, String cluster) {
         return 
pulsar().getPulsarResources().getTenantResources().getTenantAsync(tenant)
                 .thenAccept(tenantInfo -> {
-                    if (!tenantInfo.isPresent()) {
+                    if (tenantInfo.isEmpty()) {
                         throw new RestException(Status.NOT_FOUND, "Tenant does 
not exist");
                     }
                     if 
(!tenantInfo.get().getAllowedClusters().contains(cluster)) {
@@ -488,7 +490,7 @@ public abstract class PulsarWebResource {
      * Check if the cluster exists and redirect the call to the owning cluster.
      *
      * @param cluster Cluster name
-     * @throws Exception In case the redirect happens
+     * @throws WebApplicationException In case the redirect happens
      */
     protected void validateClusterOwnership(String cluster) throws 
WebApplicationException {
         sync(()-> validateClusterOwnershipAsync(cluster));
@@ -550,11 +552,8 @@ public abstract class PulsarWebResource {
             return true;
         }
 
-        if (!pulsarService.getConfiguration().isAuthorizationEnabled()) {
-            // Without authorization, any cluster name should be valid and 
accepted by the broker
-            return true;
-        }
-        return false;
+        // Without authorization, any cluster name should be valid and 
accepted by the broker
+        return !pulsarService.getConfiguration().isAuthorizationEnabled();
     }
 
     protected void validateBundleOwnership(String tenant, String cluster, 
String namespace, boolean authoritative,
@@ -611,7 +610,7 @@ public abstract class PulsarWebResource {
                 .requestHttps(isRequestHttps())
                 .readOnly(true)
                 .loadTopicsInBundle(false).build();
-        return nsService.getWebServiceUrlAsync(nsBundle, 
options).thenApply(optionUrl -> optionUrl.isPresent());
+        return nsService.getWebServiceUrlAsync(nsBundle, 
options).thenApply(Optional::isPresent);
     }
 
     protected NamespaceBundle validateNamespaceBundleOwnership(NamespaceName 
fqnn, BundlesData bundles,
@@ -664,7 +663,7 @@ public abstract class PulsarWebResource {
                     .loadTopicsInBundle(false).build();
             Optional<URL> webUrl = nsService.getWebServiceUrl(bundle, options);
             // Ensure we get a url
-            if (webUrl == null || !webUrl.isPresent()) {
+            if (webUrl.isEmpty()) {
                 log.warn("Unable to get web service url");
                 throw new RestException(Status.PRECONDITION_FAILED,
                         "Failed to find ownership for ServiceUnit:" + 
bundle.toString());
@@ -697,8 +696,6 @@ public abstract class PulsarWebResource {
         } catch (NullPointerException e) {
             log.warn("Unable to get web service url");
             throw new RestException(Status.PRECONDITION_FAILED, "Failed to 
find ownership for ServiceUnit:" + bundle);
-        } catch (WebApplicationException wae) {
-            throw wae;
         }
     }
 
@@ -712,7 +709,7 @@ public abstract class PulsarWebResource {
                 .loadTopicsInBundle(false).build();
         return nsService.getWebServiceUrlAsync(bundle, options)
                 .thenCompose(webUrl -> {
-                    if (webUrl == null || !webUrl.isPresent()) {
+                    if (webUrl.isEmpty()) {
                         log.warn("Unable to get web service url");
                         throw new RestException(Status.PRECONDITION_FAILED,
                                 "Failed to find ownership for ServiceUnit:" + 
bundle.toString());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RequestWrapper.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RequestWrapper.java
index 16d87baedbe..afebbd276eb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RequestWrapper.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RequestWrapper.java
@@ -62,7 +62,7 @@ public class RequestWrapper extends HttpServletRequestWrapper 
{
 
             }
 
-            public int read() throws IOException {
+            public int read() {
                 return byteArrayInputStream.read();
             }
         };
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ResponseHandlerFilter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ResponseHandlerFilter.java
index efed6140395..3fa00beea1f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ResponseHandlerFilter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ResponseHandlerFilter.java
@@ -76,24 +76,24 @@ public class ResponseHandlerFilter implements Filter {
         if (request.isAsyncSupported() && request.isAsyncStarted()) {
             request.getAsyncContext().addListener(new AsyncListener() {
                 @Override
-                public void onComplete(AsyncEvent asyncEvent) throws 
IOException {
+                public void onComplete(AsyncEvent asyncEvent) {
                     handleInterceptor(request, response);
                 }
 
                 @Override
-                public void onTimeout(AsyncEvent asyncEvent) throws 
IOException {
+                public void onTimeout(AsyncEvent asyncEvent) {
                     LOG.warn("Http request {} async context timeout.", 
request);
                     handleInterceptor(request, response);
                 }
 
                 @Override
-                public void onError(AsyncEvent asyncEvent) throws IOException {
+                public void onError(AsyncEvent asyncEvent) {
                     LOG.warn("Http request {} async context error.", request, 
asyncEvent.getThrowable());
                     handleInterceptor(request, response);
                 }
 
                 @Override
-                public void onStartAsync(AsyncEvent asyncEvent) throws 
IOException {
+                public void onStartAsync(AsyncEvent asyncEvent) {
                     // nothing to do
                 }
             });
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestException.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestException.java
index b18aa1c787a..c3ae3a495cf 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestException.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestException.java
@@ -30,7 +30,6 @@ import org.apache.pulsar.common.policies.data.ErrorData;
 /**
  * Exception used to provide better error messages to clients of the REST API.
  */
-@SuppressWarnings("serial")
 public class RestException extends WebApplicationException {
     private Throwable cause = null;
     static String getExceptionData(Throwable t) {
@@ -75,8 +74,7 @@ public class RestException extends WebApplicationException {
     }
 
     private static Response getResponse(Throwable t) {
-        if (t instanceof WebApplicationException) {
-            WebApplicationException e = (WebApplicationException) t;
+        if (t instanceof WebApplicationException e) {
             return e.getResponse();
         } else {
             return Response
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
index 2d6a6af5847..eada0436f4d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
@@ -264,9 +264,7 @@ public class WebService implements AutoCloseable {
         context.setContextPath(path);
         context.addServlet(servletHolder, MATCH_ALL);
         if (attributeMap != null) {
-            attributeMap.forEach((key, value) -> {
-                context.setAttribute(key, value);
-            });
+            attributeMap.forEach(context::setAttribute);
         }
         filterInitializer.addFilters(context, requiresAuthentication);
         handlers.add(context);

Reply via email to