This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 77d4c28  Remove broker mode to handle persistent/non-persistent topics 
separately (#3348)
77d4c28 is described below

commit 77d4c28d59604cb6ec589d3620ab1770e0523c87
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Tue Feb 26 11:05:51 2019 -0800

    Remove broker mode to handle persistent/non-persistent topics separately 
(#3348)
    
    * Remove broker mode to handle persistent/non-persistent topics separately
    
    * fix unused imports
---
 conf/broker.conf                                   |   6 -
 conf/standalone.conf                               |   6 -
 deployment/terraform-ansible/templates/broker.conf |   6 -
 .../apache/pulsar/broker/ServiceConfiguration.java |  12 --
 .../broker/loadbalance/impl/LoadManagerShared.java |  34 +----
 .../loadbalance/impl/ModularLoadManagerImpl.java   |  43 ++----
 .../loadbalance/impl/SimpleLoadManagerImpl.java    |  27 +---
 .../pulsar/broker/service/BrokerService.java       |  16 ---
 .../loadbalance/ModularLoadManagerImplTest.java    |  24 +---
 .../pulsar/client/api/NonPersistentTopicTest.java  | 151 ---------------------
 .../data/loadbalancer/LoadManagerReport.java       |   4 -
 .../policies/data/loadbalancer/LoadReport.java     |  18 ---
 .../data/loadbalancer/LocalBrokerData.java         |  20 ---
 site/_data/config/broker.yaml                      |   6 -
 .../explanations/ja/non-persistent-topics.md       |  15 +-
 .../latest/cookbooks/non-persistent-messaging.md   |   8 --
 .../getting-started/ConceptsAndArchitecture.md     |   2 +-
 site2/docs/concepts-messaging.md                   |   2 +-
 site2/docs/cookbooks-non-persistent.md             |   7 -
 site2/docs/reference-configuration.md              |   2 -
 20 files changed, 25 insertions(+), 384 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 262c44c..9d7c341 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -198,12 +198,6 @@ maxConcurrentNonPersistentMessagePerConnection=1000
 # Number of worker threads to serve non-persistent topic
 numWorkerThreadsForNonPersistentTopic=8
 
-# Enable broker to load persistent topics
-enablePersistentTopics=true
-
-# Enable broker to load non-persistent topics
-enableNonPersistentTopics=true
-
 # Enable to run bookie along with broker
 enableRunBookieTogether=false
 
diff --git a/conf/standalone.conf b/conf/standalone.conf
index a8e7181..4f634e9 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -158,12 +158,6 @@ maxConcurrentNonPersistentMessagePerConnection=1000
 # Number of worker threads to serve non-persistent topic
 numWorkerThreadsForNonPersistentTopic=8
 
-# Enable broker to load persistent topics
-enablePersistentTopics=true
-
-# Enable broker to load non-persistent topics
-enableNonPersistentTopics=true
-
 # Max number of producers allowed to connect to topic. Once this limit 
reaches, Broker will reject new producers
 # until the number of connected producers decrease.
 # Using a value of 0, is disabling maxProducersPerTopic-limit check.
diff --git a/deployment/terraform-ansible/templates/broker.conf 
b/deployment/terraform-ansible/templates/broker.conf
index 22f74be..1db877c 100644
--- a/deployment/terraform-ansible/templates/broker.conf
+++ b/deployment/terraform-ansible/templates/broker.conf
@@ -163,12 +163,6 @@ maxConcurrentNonPersistentMessagePerConnection=1000
 # Number of worker threads to serve non-persistent topic
 numWorkerThreadsForNonPersistentTopic=8
 
-# Enable broker to load persistent topics
-enablePersistentTopics=true
-
-# Enable broker to load non-persistent topics
-enableNonPersistentTopics=true
-
 # Enable to run bookie along with broker
 enableRunBookieTogether=false
 
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index cb36df4..2be470b 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -397,18 +397,6 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
 
     @FieldContext(
         category = CATEGORY_SERVER,
-        doc = "Enable broker to load persistent topics"
-    )
-    private boolean enablePersistentTopics = true;
-
-    @FieldContext(
-        category = CATEGORY_SERVER,
-        doc = "Enable broker to load non-persistent topics"
-    )
-    private boolean enableNonPersistentTopics = true;
-
-    @FieldContext(
-        category = CATEGORY_SERVER,
         doc = "Enable to run bookie along with broker"
     )
     private boolean enableRunBookieTogether = false;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
index 6a5575c..04449cc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
@@ -96,7 +96,7 @@ public class LoadManagerShared {
     // The brokers are put into brokerCandidateCache.
     public static void applyNamespacePolicies(final ServiceUnitId serviceUnit,
             final SimpleResourceAllocationPolicies policies, final Set<String> 
brokerCandidateCache,
-            final Set<String> availableBrokers, final 
BrokerTopicLoadingPredicate brokerTopicLoadingPredicate) {
+            final Set<String> availableBrokers) {
         Set<String> primariesCache = localPrimariesCache.get();
         primariesCache.clear();
 
@@ -145,27 +145,11 @@ public class LoadManagerShared {
                     }
 
                 }
-            } else {
-                // non-persistent topic can be assigned to only those brokers 
that enabled for non-persistent topic
-                if (isNonPersistentTopic
-                        && 
!brokerTopicLoadingPredicate.isEnableNonPersistentTopics(brokerUrlString)) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Filter broker- [{}] because it doesn't 
support non-persistent namespace - [{}]",
-                                brokerUrl.getHost(), namespace.toString());
-                    }
-                } else if (!isNonPersistentTopic
-                        && 
!brokerTopicLoadingPredicate.isEnablePersistentTopics(brokerUrlString)) {
-                    // persistent topic can be assigned to only brokers that 
enabled for persistent-topic
-                    if (log.isDebugEnabled()) {
-                        log.debug("Filter broker- [{}] because broker only 
supports non-persistent namespace - [{}]",
-                                brokerUrl.getHost(), namespace.toString());
-                    }
-                } else if (policies.isSharedBroker(brokerUrl.getHost())) {
-                    secondaryCache.add(broker);
-                    if (log.isDebugEnabled()) {
-                        log.debug("Added Shared Broker - [{}] as possible 
Candidates for namespace - [{}]",
-                                brokerUrl.getHost(), namespace.toString());
-                    }
+            } else if (policies.isSharedBroker(brokerUrl.getHost())) {
+                secondaryCache.add(broker);
+                if (log.isDebugEnabled()) {
+                    log.debug("Added Shared Broker - [{}] as possible 
Candidates for namespace - [{}]",
+                            brokerUrl.getHost(), namespace.toString());
                 }
             }
         }
@@ -516,12 +500,6 @@ public class LoadManagerShared {
         return true;
     }
 
-    public interface BrokerTopicLoadingPredicate {
-        boolean isEnablePersistentTopics(String brokerUrl);
-
-        boolean isEnableNonPersistentTopics(String brokerUrl);
-    }
-
     /**
      * It filters out brokers which owns topic higher than configured 
threshold at
      * {@link ServiceConfiguration.loadBalancerBrokerMaxTopics}. <br/>
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 997d9bb..b426d1e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -60,7 +60,6 @@ import org.apache.pulsar.broker.loadbalance.LoadManager;
 import org.apache.pulsar.broker.loadbalance.LoadSheddingStrategy;
 import org.apache.pulsar.broker.loadbalance.ModularLoadManager;
 import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
-import 
org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.naming.NamespaceBundleFactory;
 import org.apache.pulsar.common.naming.NamespaceName;
@@ -176,9 +175,6 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager, ZooKeeperCach
     // ZooKeeper belonging to the pulsar service.
     private ZooKeeper zkClient;
 
-    // check if given broker can load persistent/non-persistent topic
-    private final BrokerTopicLoadingPredicate brokerTopicLoadingPredicate;
-
     private Map<String, String> brokerToFailureDomainMap;
 
     private static final Deserializer<LocalBrokerData> loadReportDeserializer 
= (key, content) -> jsonMapper()
@@ -198,22 +194,6 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager, ZooKeeperCach
         preallocatedBundleToBroker = new ConcurrentHashMap<>();
         scheduler = Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("pulsar-modular-load-manager"));
         this.brokerToFailureDomainMap = Maps.newHashMap();
-
-        this.brokerTopicLoadingPredicate = new BrokerTopicLoadingPredicate() {
-            @Override
-            public boolean isEnablePersistentTopics(String brokerUrl) {
-                final BrokerData brokerData = 
loadData.getBrokerData().get(brokerUrl.replace("http://";, ""));
-                return brokerData != null && brokerData.getLocalData() != null
-                        && 
brokerData.getLocalData().isPersistentTopicsEnabled();
-            }
-
-            @Override
-            public boolean isEnableNonPersistentTopics(String brokerUrl) {
-                final BrokerData brokerData = 
loadData.getBrokerData().get(brokerUrl.replace("http://";, ""));
-                return brokerData != null && brokerData.getLocalData() != null
-                        && 
brokerData.getLocalData().isNonPersistentTopicsEnabled();
-            }
-        };
     }
 
     /**
@@ -268,12 +248,6 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager, ZooKeeperCach
         localData = new LocalBrokerData(pulsar.getWebServiceAddress(), 
pulsar.getWebServiceAddressTls(),
                 pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
         localData.setBrokerVersionString(pulsar.getBrokerVersion());
-        // configure broker-topic mode
-        
lastData.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
-        
lastData.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());
-        
localData.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
-        
localData.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());
-
 
         placementStrategy = ModularLoadManagerStrategy.create(conf);
         policies = new SimpleResourceAllocationPolicies(pulsar);
@@ -625,7 +599,7 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager, ZooKeeperCach
                 ServiceUnitId serviceUnit = 
pulsar.getNamespaceService().getNamespaceBundleFactory()
                         .getBundle(namespace, bundle);
                 LoadManagerShared.applyNamespacePolicies(serviceUnit, 
policies, brokerCandidateCache,
-                        getAvailableBrokers(), brokerTopicLoadingPredicate);
+                        getAvailableBrokers());
                 return 
LoadManagerShared.shouldAntiAffinityNamespaceUnload(namespace, bundle, 
currentBroker, pulsar,
                         brokerToNamespaceToBundleRange, brokerCandidateCache);
             }
@@ -706,8 +680,7 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager, ZooKeeperCach
             final BundleData data = 
loadData.getBundleData().computeIfAbsent(bundle,
                     key -> getBundleDataOrDefault(bundle));
             brokerCandidateCache.clear();
-            LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, 
brokerCandidateCache, getAvailableBrokers(),
-                    brokerTopicLoadingPredicate);
+            LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, 
brokerCandidateCache, getAvailableBrokers());
 
             // filter brokers which owns topic higher than threshold
             
LoadManagerShared.filterBrokersWithLargeTopicCount(brokerCandidateCache, 
loadData,
@@ -729,14 +702,14 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager, ZooKeeperCach
                 }
             } catch ( BrokerFilterException x ) {
                 // restore the list of brokers to the full set
-                LoadManagerShared.applyNamespacePolicies(serviceUnit, 
policies, brokerCandidateCache, getAvailableBrokers(),
-                        brokerTopicLoadingPredicate);
+                LoadManagerShared.applyNamespacePolicies(serviceUnit, 
policies, brokerCandidateCache,
+                        getAvailableBrokers());
             }
 
             if ( brokerCandidateCache.isEmpty() ) {
                 // restore the list of brokers to the full set
-                LoadManagerShared.applyNamespacePolicies(serviceUnit, 
policies, brokerCandidateCache, getAvailableBrokers(),
-                        brokerTopicLoadingPredicate);
+                LoadManagerShared.applyNamespacePolicies(serviceUnit, 
policies, brokerCandidateCache,
+                        getAvailableBrokers());
             }
 
             // Choose a broker among the potentially smaller filtered list, 
when possible
@@ -754,8 +727,8 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager, ZooKeeperCach
             final double maxUsage = 
loadData.getBrokerData().get(broker.get()).getLocalData().getMaxResourceUsage();
             if (maxUsage > overloadThreshold) {
                 // All brokers that were in the filtered list were overloaded, 
so check if there is a better broker
-                LoadManagerShared.applyNamespacePolicies(serviceUnit, 
policies, brokerCandidateCache, getAvailableBrokers(),
-                        brokerTopicLoadingPredicate);
+                LoadManagerShared.applyNamespacePolicies(serviceUnit, 
policies, brokerCandidateCache,
+                        getAvailableBrokers());
                 broker = placementStrategy.selectBroker(brokerCandidateCache, 
data, loadData, conf);
             }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
index 9f9b949..2ad67a1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
@@ -48,7 +48,6 @@ import org.apache.pulsar.broker.loadbalance.BrokerHostUsage;
 import org.apache.pulsar.broker.loadbalance.LoadManager;
 import org.apache.pulsar.broker.loadbalance.PlacementStrategy;
 import org.apache.pulsar.broker.loadbalance.ResourceUnit;
-import 
org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.ServiceUnitId;
 import org.apache.pulsar.common.policies.data.ResourceQuota;
@@ -182,8 +181,6 @@ public class SimpleLoadManagerImpl implements LoadManager, 
ZooKeeperCacheListene
     private boolean forceLoadReportUpdate = false;
     private static final Deserializer<LoadReport> loadReportDeserializer = 
(key, content) -> jsonMapper()
             .readValue(content, LoadReport.class);
-    // check if given broker can load persistent/non-persistent topic
-    private final BrokerTopicLoadingPredicate brokerTopicLoadingPredicate;
 
     // Perform initializations which may be done without a PulsarService.
     public SimpleLoadManagerImpl() {
@@ -200,21 +197,6 @@ public class SimpleLoadManagerImpl implements LoadManager, 
ZooKeeperCacheListene
         brokerCandidateCache = new HashSet<>();
         availableBrokersCache = new HashSet<>();
         brokerToNamespaceToBundleRange = new HashMap<>();
-        this.brokerTopicLoadingPredicate = new BrokerTopicLoadingPredicate() {
-            @Override
-            public boolean isEnablePersistentTopics(String brokerUrl) {
-                ResourceUnit ru = new SimpleResourceUnit(brokerUrl, new 
PulsarResourceDescription());
-                LoadReport loadReport = currentLoadReports.get(ru);
-                return loadReport != null && 
loadReport.isPersistentTopicsEnabled();
-            }
-
-            @Override
-            public boolean isEnableNonPersistentTopics(String brokerUrl) {
-                ResourceUnit ru = new SimpleResourceUnit(brokerUrl, new 
PulsarResourceDescription());
-                LoadReport loadReport = currentLoadReports.get(ru);
-                return loadReport != null && 
loadReport.isNonPersistentTopicsEnabled();
-            }
-        };
     }
 
     @Override
@@ -227,9 +209,6 @@ public class SimpleLoadManagerImpl implements LoadManager, 
ZooKeeperCacheListene
         this.policies = new SimpleResourceAllocationPolicies(pulsar);
         lastLoadReport = new LoadReport(pulsar.getWebServiceAddress(), 
pulsar.getWebServiceAddressTls(),
                 pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
-        
lastLoadReport.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
-        
lastLoadReport.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());
-
         loadReportCacheZk = new 
ZooKeeperDataCache<LoadReport>(pulsar.getLocalZkCache()) {
             @Override
             public LoadReport deserialize(String key, byte[] content) throws 
Exception {
@@ -919,8 +898,8 @@ public class SimpleLoadManagerImpl implements LoadManager, 
ZooKeeperCacheListene
             }
             brokerCandidateCache.clear();
             try {
-                LoadManagerShared.applyNamespacePolicies(serviceUnit, 
policies, brokerCandidateCache, availableBrokersCache,
-                        brokerTopicLoadingPredicate);
+                LoadManagerShared.applyNamespacePolicies(serviceUnit, 
policies, brokerCandidateCache,
+                        availableBrokersCache);
             } catch (Exception e) {
                 log.warn("Error when trying to apply policies: {}", e);
                 for (final Map.Entry<Long, Set<ResourceUnit>> entry : 
availableBrokers.entrySet()) {
@@ -1112,8 +1091,6 @@ public class SimpleLoadManagerImpl implements 
LoadManager, ZooKeeperCacheListene
             try {
                 LoadReport loadReport = new 
LoadReport(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
                         pulsar.getBrokerServiceUrl(), 
pulsar.getBrokerServiceUrlTls());
-                
loadReport.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());
-                
loadReport.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
                 loadReport.setName(String.format("%s:%s", 
pulsar.getAdvertisedAddress(),
                         pulsar.getConfiguration().getWebServicePort().get()));
                 loadReport.setBrokerVersionString(pulsar.getBrokerVersion());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index f48877d..0f78c72 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -501,14 +501,6 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
     private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String 
topic) {
         CompletableFuture<Optional<Topic>> topicFuture = new 
CompletableFuture<>();
 
-        if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) {
-            if (log.isDebugEnabled()) {
-                log.debug("Broker is unable to load non-persistent topic {}", 
topic);
-            }
-            topicFuture.completeExceptionally(
-                    new NotAllowedException("Broker is not unable to load 
non-persistent topic"));
-            return topicFuture;
-        }
         final long topicCreateTimeMs = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
         NonPersistentTopic nonPersistentTopic = new NonPersistentTopic(topic, 
this);
         CompletableFuture<Void> replicationFuture = 
nonPersistentTopic.checkReplication();
@@ -591,14 +583,6 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
         checkTopicNsOwnership(topic);
 
         final CompletableFuture<Optional<Topic>> topicFuture = new 
CompletableFuture<>();
-        if (!pulsar.getConfiguration().isEnablePersistentTopics()) {
-            if (log.isDebugEnabled()) {
-                log.debug("Broker is unable to load persistent topic {}", 
topic);
-            }
-            topicFuture.completeExceptionally(new NotAllowedException("Broker 
is not unable to load persistent topic"));
-            return topicFuture;
-        }
-
         final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get();
 
         if (topicLoadSemaphore.tryAcquire()) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
index 0303fbf..d23c05e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
@@ -56,7 +56,6 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.TimeAverageMessageData;
 import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
-import 
org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate;
 import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
 import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
 import 
org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
@@ -515,17 +514,6 @@ public class ModularLoadManagerImplTest {
         SimpleResourceAllocationPolicies simpleResourceAllocationPolicies = 
new SimpleResourceAllocationPolicies(
                 pulsar1);
         ServiceUnitId serviceUnit = 
LoadBalancerTestingUtils.makeBundles(nsFactory, tenant, cluster, namespace, 
1)[0];
-        BrokerTopicLoadingPredicate brokerTopicLoadingPredicate = new 
BrokerTopicLoadingPredicate() {
-            @Override
-            public boolean isEnablePersistentTopics(String brokerUrl) {
-                return true;
-            }
-
-            @Override
-            public boolean isEnableNonPersistentTopics(String brokerUrl) {
-                return true;
-            }
-        };
 
         // (1) now we have isolation policy : primary=broker1, 
secondary=broker2, minLimit=1
 
@@ -533,7 +521,7 @@ public class ModularLoadManagerImplTest {
         Set<String> brokerCandidateCache = Sets.newHashSet();
         Set<String> availableBrokers = Sets.newHashSet(sharedBroker, 
broker1Address, broker2Address);
         LoadManagerShared.applyNamespacePolicies(serviceUnit, 
simpleResourceAllocationPolicies, brokerCandidateCache,
-                availableBrokers, brokerTopicLoadingPredicate);
+                availableBrokers);
         assertEquals(brokerCandidateCache.size(), 1);
         assertTrue(brokerCandidateCache.contains(broker1Address));
 
@@ -541,7 +529,7 @@ public class ModularLoadManagerImplTest {
         brokerCandidateCache = Sets.newHashSet();
         availableBrokers = Sets.newHashSet(sharedBroker, broker2Address);
         LoadManagerShared.applyNamespacePolicies(serviceUnit, 
simpleResourceAllocationPolicies, brokerCandidateCache,
-                availableBrokers, brokerTopicLoadingPredicate);
+                availableBrokers);
         assertEquals(brokerCandidateCache.size(), 1);
         assertTrue(brokerCandidateCache.contains(broker2Address));
 
@@ -549,7 +537,7 @@ public class ModularLoadManagerImplTest {
         brokerCandidateCache = Sets.newHashSet();
         availableBrokers = Sets.newHashSet(sharedBroker);
         LoadManagerShared.applyNamespacePolicies(serviceUnit, 
simpleResourceAllocationPolicies, brokerCandidateCache,
-                availableBrokers, brokerTopicLoadingPredicate);
+                availableBrokers);
         assertEquals(brokerCandidateCache.size(), 0);
 
         // (2) now we will have isolation policy : primary=broker1, 
secondary=broker2, minLimit=2
@@ -563,7 +551,7 @@ public class ModularLoadManagerImplTest {
         brokerCandidateCache = Sets.newHashSet();
         availableBrokers = Sets.newHashSet(sharedBroker, broker1Address, 
broker2Address);
         LoadManagerShared.applyNamespacePolicies(serviceUnit, 
simpleResourceAllocationPolicies, brokerCandidateCache,
-                availableBrokers, brokerTopicLoadingPredicate);
+                availableBrokers);
         assertEquals(brokerCandidateCache.size(), 2);
         assertTrue(brokerCandidateCache.contains(broker1Address));
         assertTrue(brokerCandidateCache.contains(broker2Address));
@@ -572,7 +560,7 @@ public class ModularLoadManagerImplTest {
         brokerCandidateCache = Sets.newHashSet();
         availableBrokers = Sets.newHashSet(sharedBroker, broker2Address);
         LoadManagerShared.applyNamespacePolicies(serviceUnit, 
simpleResourceAllocationPolicies, brokerCandidateCache,
-                availableBrokers, brokerTopicLoadingPredicate);
+                availableBrokers);
         assertEquals(brokerCandidateCache.size(), 1);
         assertTrue(brokerCandidateCache.contains(broker2Address));
 
@@ -580,7 +568,7 @@ public class ModularLoadManagerImplTest {
         brokerCandidateCache = Sets.newHashSet();
         availableBrokers = Sets.newHashSet(sharedBroker);
         LoadManagerShared.applyNamespacePolicies(serviceUnit, 
simpleResourceAllocationPolicies, brokerCandidateCache,
-                availableBrokers, brokerTopicLoadingPredicate);
+                availableBrokers);
         assertEquals(brokerCandidateCache.size(), 0);
 
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
index 2e7a444..c436491 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
@@ -615,157 +615,6 @@ public class NonPersistentTopicTest extends 
ProducerConsumerBase {
     }
 
     /**
-     * verifies load manager assigns topic only if broker started in 
non-persistent mode
-     *
-     * <pre>
-     * 1. Start broker with disable non-persistent topic mode
-     * 2. Create namespace with non-persistency set
-     * 3. Create non-persistent topic
-     * 4. Load-manager should not be able to find broker
-     * 5. Create producer on that topic should fail
-     * </pre>
-     */
-    @Test(dataProvider = "loadManager")
-    public void testLoadManagerAssignmentForNonPersistentTestAssignment(String 
loadManagerName) throws Exception {
-
-        final String namespace = "my-property/my-ns";
-        final String topicName = "non-persistent://" + namespace + 
"/loadManager";
-        final String defaultLoadManagerName = conf.getLoadManagerClassName();
-        final boolean defaultENableNonPersistentTopic = 
conf.isEnableNonPersistentTopics();
-        try {
-            // start broker to not own non-persistent namespace and create 
non-persistent namespace
-            stopBroker();
-            conf.setEnableNonPersistentTopics(false);
-            conf.setLoadManagerClassName(loadManagerName);
-            startBroker();
-
-            Field field = PulsarService.class.getDeclaredField("loadManager");
-            field.setAccessible(true);
-            @SuppressWarnings("unchecked")
-            AtomicReference<LoadManager> loadManagerRef = 
(AtomicReference<LoadManager>) field.get(pulsar);
-            LoadManager manager = LoadManager.create(pulsar);
-            manager.start();
-            loadManagerRef.set(manager);
-
-            NamespaceBundle fdqn = 
pulsar.getNamespaceService().getBundle(TopicName.get(topicName));
-            LoadManager loadManager = pulsar.getLoadManager().get();
-            ResourceUnit broker = null;
-            try {
-                broker = loadManager.getLeastLoaded(fdqn).get();
-            } catch (Exception e) {
-                // Ok. (ModulearLoadManagerImpl throws RuntimeException incase 
don't find broker)
-            }
-            assertNull(broker);
-
-            try {
-                Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).createAsync().get(1,
-                        TimeUnit.SECONDS);
-                producer.close();
-                fail("topic loading should have failed");
-            } catch (Exception e) {
-                // Ok
-            }
-            
assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
-
-        } finally {
-            conf.setEnableNonPersistentTopics(defaultENableNonPersistentTopic);
-            conf.setLoadManagerClassName(defaultLoadManagerName);
-        }
-
-    }
-
-    /**
-     * verifies: broker should reject non-persistent topic loading if broker 
is not enable for non-persistent topic
-     *
-     * @param loadManagerName
-     * @throws Exception
-     */
-    @Test
-    public void testNonPersistentTopicUnderPersistentNamespace() throws 
Exception {
-
-        final String namespace = "my-property/my-ns";
-        final String topicName = "non-persistent://" + namespace + 
"/persitentNamespace";
-
-        final boolean defaultENableNonPersistentTopic = 
conf.isEnableNonPersistentTopics();
-        try {
-            conf.setEnableNonPersistentTopics(false);
-            stopBroker();
-            startBroker();
-            try {
-                Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).createAsync().get(1,
-                        TimeUnit.SECONDS);
-                producer.close();
-                fail("topic loading should have failed");
-            } catch (Exception e) {
-                // Ok
-            }
-
-            
assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
-        } finally {
-            conf.setEnableNonPersistentTopics(defaultENableNonPersistentTopic);
-        }
-    }
-
-    /**
-     * verifies that broker started with onlyNonPersistent mode doesn't own 
persistent-topic
-     *
-     * @param loadManagerName
-     * @throws Exception
-     */
-    @Test(dataProvider = "loadManager")
-    public void testNonPersistentBrokerModeRejectPersistentTopic(String 
loadManagerName) throws Exception {
-
-        final String namespace = "my-property/my-ns";
-        final String topicName = "persistent://" + namespace + "/loadManager";
-        final String defaultLoadManagerName = conf.getLoadManagerClassName();
-        final boolean defaultEnablePersistentTopic = 
conf.isEnablePersistentTopics();
-        final boolean defaultEnableNonPersistentTopic = 
conf.isEnableNonPersistentTopics();
-        try {
-            // start broker to not own non-persistent namespace and create 
non-persistent namespace
-            stopBroker();
-            conf.setEnableNonPersistentTopics(true);
-            conf.setEnablePersistentTopics(false);
-            conf.setLoadManagerClassName(loadManagerName);
-            startBroker();
-
-            Field field = PulsarService.class.getDeclaredField("loadManager");
-            field.setAccessible(true);
-            @SuppressWarnings("unchecked")
-            AtomicReference<LoadManager> loadManagerRef = 
(AtomicReference<LoadManager>) field.get(pulsar);
-            LoadManager manager = LoadManager.create(pulsar);
-            manager.start();
-            loadManagerRef.set(manager);
-
-            NamespaceBundle fdqn = 
pulsar.getNamespaceService().getBundle(TopicName.get(topicName));
-            LoadManager loadManager = pulsar.getLoadManager().get();
-            ResourceUnit broker = null;
-            try {
-                broker = loadManager.getLeastLoaded(fdqn).get();
-            } catch (Exception e) {
-                // Ok. (ModulearLoadManagerImpl throws RuntimeException incase 
don't find broker)
-            }
-            assertNull(broker);
-
-            try {
-                Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).createAsync().get(1,
-                        TimeUnit.SECONDS);
-                producer.close();
-                fail("topic loading should have failed");
-            } catch (Exception e) {
-                // Ok
-            }
-
-            
assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
-
-        } finally {
-            conf.setEnablePersistentTopics(defaultEnablePersistentTopic);
-            conf.setEnableNonPersistentTopics(defaultEnableNonPersistentTopic);
-            conf.setLoadManagerClassName(defaultLoadManagerName);
-        }
-
-    }
-
-    /**
      * Verifies msg-drop stats
      *
      * @throws Exception
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LoadManagerReport.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LoadManagerReport.java
index 7fb173b..4027800 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LoadManagerReport.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LoadManagerReport.java
@@ -61,8 +61,4 @@ public interface LoadManagerReport extends ServiceLookupData {
 
     public String getBrokerVersionString();
 
-    public boolean isPersistentTopicsEnabled();
-
-    public boolean isNonPersistentTopicsEnabled();
-
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LoadReport.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LoadReport.java
index 6ca7744..c98e0ce 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LoadReport.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LoadReport.java
@@ -44,8 +44,6 @@ public class LoadReport implements LoadManagerReport {
     private final String webServiceUrlTls;
     private final String pulsarServiceUrl;
     private final String pulsarServiceUrlTls;
-    private boolean persistentTopicsEnabled = true;
-    private boolean nonPersistentTopicsEnabled = true;
 
     private boolean isUnderLoaded;
     private boolean isOverLoaded;
@@ -404,22 +402,6 @@ public class LoadReport implements LoadManagerReport {
         return pulsarServiceUrlTls;
     }
 
-    public boolean isPersistentTopicsEnabled() {
-        return persistentTopicsEnabled;
-    }
-
-    public void setPersistentTopicsEnabled(boolean persistentTopicsEnabled) {
-        this.persistentTopicsEnabled = persistentTopicsEnabled;
-    }
-
-    public boolean isNonPersistentTopicsEnabled() {
-        return nonPersistentTopicsEnabled;
-    }
-
-    public void setNonPersistentTopicsEnabled(boolean 
nonPersistentTopicsEnabled) {
-        this.nonPersistentTopicsEnabled = nonPersistentTopicsEnabled;
-    }
-
     @Override
     public ResourceUsage getCpu() {
         return systemResourceUsage != null ? systemResourceUsage.cpu : null;
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
index 129cf5d..de8f7ff 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
@@ -38,8 +38,6 @@ public class LocalBrokerData extends JSONWritable implements 
LoadManagerReport {
     private final String webServiceUrlTls;
     private final String pulsarServiceUrl;
     private final String pulsarServiceUrlTls;
-    private boolean persistentTopicsEnabled=true;
-    private boolean nonPersistentTopicsEnabled=true;
 
     // Most recently available system resource usage.
     private ResourceUsage cpu;
@@ -404,24 +402,6 @@ public class LocalBrokerData extends JSONWritable 
implements LoadManagerReport {
     }
 
     @Override
-    public boolean isPersistentTopicsEnabled() {
-        return persistentTopicsEnabled;
-    }
-
-    public void setPersistentTopicsEnabled(boolean persistentTopicsEnabled) {
-        this.persistentTopicsEnabled = persistentTopicsEnabled;
-    }
-
-    @Override
-    public boolean isNonPersistentTopicsEnabled() {
-        return nonPersistentTopicsEnabled;
-    }
-
-    public void setNonPersistentTopicsEnabled(boolean 
nonPersistentTopicsEnabled) {
-        this.nonPersistentTopicsEnabled = nonPersistentTopicsEnabled;
-    }
-
-    @Override
     public Map<String, NamespaceBundleStats> getBundleStats() {
         return getLastStats();
     }
diff --git a/site/_data/config/broker.yaml b/site/_data/config/broker.yaml
index b006e3e..c919c32 100644
--- a/site/_data/config/broker.yaml
+++ b/site/_data/config/broker.yaml
@@ -18,12 +18,6 @@
 #
 
 configs:
-- name: enablePersistentTopics
-  default: 'true'
-  description: Whether persistent topics are enabled on the broker
-- name: enableNonPersistentTopics
-  default: 'true'
-  description: Whether non-persistent topics are enabled on the broker
 - name: functionsWorkerEnabled
   description: Whether the Pulsar Functions worker service is enabled in the 
broker
   default: 'false'
diff --git a/site/_includes/explanations/ja/non-persistent-topics.md 
b/site/_includes/explanations/ja/non-persistent-topics.md
index 768413f..1564755 100644
--- a/site/_includes/explanations/ja/non-persistent-topics.md
+++ b/site/_includes/explanations/ja/non-persistent-topics.md
@@ -63,17 +63,4 @@ PulsarClient client = 
PulsarClient.create("pulsar://localhost:6650");
 
 Producer producer = client.createProducer(
             "non-persistent://sample/standalone/ns1/my-topic");
-```
-
-### Brokerの設定
-
-多くの場合、ノンパーシステントトピック提供するためだけにクラスタ内に専用のBrokerを設定する必要はほとんどないでしょう。
-
-Brokerが特定のタイプのトピックのみを所有できるようにするための設定は次の通りです:
-
-```
-# Brokerによるパーシステントトピックのロードを無効化
-enablePersistentTopics=false
-# Brokerによるノンパーシステントトピックのロードを有効化
-enableNonPersistentTopics=true
-```
+```
\ No newline at end of file
diff --git a/site/docs/latest/cookbooks/non-persistent-messaging.md 
b/site/docs/latest/cookbooks/non-persistent-messaging.md
index b1e6ed4..a44c949 100644
--- a/site/docs/latest/cookbooks/non-persistent-messaging.md
+++ b/site/docs/latest/cookbooks/non-persistent-messaging.md
@@ -49,14 +49,6 @@ $ bin/pulsar-client produce 
non-persistent://public/default/example-np-topic \
 
 {% include admonition.html type="success" content="For a more thorough guide 
to non-persistent topics from an administrative perspective, see the 
[Non-persistent topics](../../admin-api/non-persistent-topics) guide." %}
 
-## Enabling non-persistent topics {#enabling}
-
-In order to enable non-persistent topics in a Pulsar {% popover broker %}, the 
[`enableNonPersistentTopics`](../../reference/Configuration#broker-enableNonPersistentTopics)
 must be set to `true`. This is the default, and so you won't need to take any 
action to enable non-persistent messaging.
-
-{% include admonition.html type="info" title="Configuration for standalone 
mode" content="If you're running Pulsar in standalone mode, the same 
configurable parameters are available but in the 
[`standalone.conf`](../../reference/Configuration#standalone) configuration 
file." %}
-
-If you'd like to enable *only* non-persistent topics in a broker, you can set 
the 
[`enablePersistentTopics`](../../reference/Configuration#broker-enablePersistentTopics)
 parameter to `false` and the `enableNonPersistentTopics` parameter to `true`.
-
 ## Managing non-persistent topics via the CLI {#cli}
 
 Non-persistent topics can be managed using the [`pulsar-admin 
non-persistent`](../../reference/CliTools#pulsar-admin-non-persistent) 
command-line interface. With that interface you can perform actions like 
[create a partitioned non-persistent 
topic](../../reference/CliTools#pulsar-admin-non-persistent-create-partitioned-topic),
 get [stats](../../reference/CliTools#pulsar-admin-non-persistent-stats) for a 
non-persistent topic, [list](../../) non-persistent topics under a namespace, 
and more.
diff --git a/site/docs/latest/getting-started/ConceptsAndArchitecture.md 
b/site/docs/latest/getting-started/ConceptsAndArchitecture.md
index 961141a..607ae95 100644
--- a/site/docs/latest/getting-started/ConceptsAndArchitecture.md
+++ b/site/docs/latest/getting-started/ConceptsAndArchitecture.md
@@ -216,7 +216,7 @@ In non-persistent topics, {% popover brokers %} immediately 
deliver messages to
 
 {% include admonition.html type="danger" content="With non-persistent topics, 
message data lives only in memory. If a message broker fails or message data 
can otherwise not be retrieved from memory, your message data may be lost. Use 
non-persistent topics only if you're *certain* that your use case requires it 
and can sustain it." %}
 
-By default, non-persistent topics are enabled on Pulsar {% popover brokers %}. 
You can disable them in the broker's 
[configuration](../../reference/Configuration#broker-enableNonPersistentTopics).
 You can manage non-persistent topics using the [`pulsar-admin 
non-persistent`](../../reference/CliTools#pulsar-admin-non-persistent) 
interface.
+You can manage non-persistent topics using the [`pulsar-admin 
non-persistent`](../../reference/CliTools#pulsar-admin-non-persistent) 
interface.
 
 #### Performance
 
diff --git a/site2/docs/concepts-messaging.md b/site2/docs/concepts-messaging.md
index b5ed064..a0fee5c 100644
--- a/site2/docs/concepts-messaging.md
+++ b/site2/docs/concepts-messaging.md
@@ -248,7 +248,7 @@ In non-persistent topics, brokers immediately deliver 
messages to all connected
 
 > With non-persistent topics, message data lives only in memory. If a message 
 > broker fails or message data can otherwise not be retrieved from memory, 
 > your message data may be lost. Use non-persistent topics only if you're 
 > *certain* that your use case requires it and can sustain it.
 
-By default, non-persistent topics are enabled on Pulsar brokers. You can 
disable them in the broker's 
[configuration](reference-configuration.md#broker-enableNonPersistentTopics). 
You can manage non-persistent topics using the [`pulsar-admin 
topics`](referencereference--pulsar-admin/#topics-1) interface.
+You can manage non-persistent topics using the [`pulsar-admin 
topics`](referencereference--pulsar-admin/#topics-1) interface.
 
 ### Performance
 
diff --git a/site2/docs/cookbooks-non-persistent.md 
b/site2/docs/cookbooks-non-persistent.md
index 481f002..2c6047a 100644
--- a/site2/docs/cookbooks-non-persistent.md
+++ b/site2/docs/cookbooks-non-persistent.md
@@ -38,16 +38,9 @@ $ bin/pulsar-client produce 
non-persistent://public/default/example-np-topic \
 
 > For a more thorough guide to non-persistent topics from an administrative 
 > perspective, see the [Non-persistent 
 > topics](admin-api-non-persistent-topics.md) guide.
 
-## Enabling
-
-In order to enable non-persistent topics in a Pulsar broker, the 
[`enableNonPersistentTopics`](reference-configuration.md#broker-enableNonPersistentTopics)
 must be set to `true`. This is the default, and so you won't need to take any 
action to enable non-persistent messaging.
-
-
 > #### Configuration for standalone mode
 > If you're running Pulsar in standalone mode, the same configurable 
 > parameters are available but in the 
 > [`standalone.conf`](reference-configuration.md#standalone) configuration 
 > file. 
 
-If you'd like to enable *only* non-persistent topics in a broker, you can set 
the 
[`enablePersistentTopics`](reference-configuration.md#broker-enablePersistentTopics)
 parameter to `false` and the `enableNonPersistentTopics` parameter to `true`.
-
 ## Managing with cli
 
 Non-persistent topics can be managed using the [`pulsar-admin 
non-persistent`](reference-pulsar-admin.md#non-persistent) command-line 
interface. With that interface you can perform actions like [create a 
partitioned non-persistent 
topic](reference-pulsar-admin.md#non-persistent-create-partitioned-topic), get 
[stats](reference-pulsar-admin.md#non-persistent-stats) for a non-persistent 
topic, [list](reference-pulsar-admin.md) non-persistent topics under a 
namespace, and more.
diff --git a/site2/docs/reference-configuration.md 
b/site2/docs/reference-configuration.md
index 08354bd..5f92d6f 100644
--- a/site2/docs/reference-configuration.md
+++ b/site2/docs/reference-configuration.md
@@ -104,8 +104,6 @@ Pulsar brokers are responsible for handling incoming 
messages from producers, di
 
 |Name|Description|Default|
 |---|---|---|
-|enablePersistentTopics|  Whether persistent topics are enabled on the broker 
|true|
-|enableNonPersistentTopics| Whether non-persistent topics are enabled on the 
broker |true|
 |functionsWorkerEnabled|  Whether the Pulsar Functions worker service is 
enabled in the broker  |false|
 |zookeeperServers|  Zookeeper quorum connection string  ||
 |configurationStoreServers| Configuration store connection string (as a 
comma-separated list) ||

Reply via email to