This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 77d4c28 Remove broker mode to handle persistent/non-persistent topics separately (#3348) 77d4c28 is described below commit 77d4c28d59604cb6ec589d3620ab1770e0523c87 Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Tue Feb 26 11:05:51 2019 -0800 Remove broker mode to handle persistent/non-persistent topics separately (#3348) * Remove broker mode to handle persistent/non-persistent topics separately * fix unused imports --- conf/broker.conf | 6 - conf/standalone.conf | 6 - deployment/terraform-ansible/templates/broker.conf | 6 - .../apache/pulsar/broker/ServiceConfiguration.java | 12 -- .../broker/loadbalance/impl/LoadManagerShared.java | 34 +---- .../loadbalance/impl/ModularLoadManagerImpl.java | 43 ++---- .../loadbalance/impl/SimpleLoadManagerImpl.java | 27 +--- .../pulsar/broker/service/BrokerService.java | 16 --- .../loadbalance/ModularLoadManagerImplTest.java | 24 +--- .../pulsar/client/api/NonPersistentTopicTest.java | 151 --------------------- .../data/loadbalancer/LoadManagerReport.java | 4 - .../policies/data/loadbalancer/LoadReport.java | 18 --- .../data/loadbalancer/LocalBrokerData.java | 20 --- site/_data/config/broker.yaml | 6 - .../explanations/ja/non-persistent-topics.md | 15 +- .../latest/cookbooks/non-persistent-messaging.md | 8 -- .../getting-started/ConceptsAndArchitecture.md | 2 +- site2/docs/concepts-messaging.md | 2 +- site2/docs/cookbooks-non-persistent.md | 7 - site2/docs/reference-configuration.md | 2 - 20 files changed, 25 insertions(+), 384 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 262c44c..9d7c341 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -198,12 +198,6 @@ maxConcurrentNonPersistentMessagePerConnection=1000 # Number of worker threads to serve non-persistent topic numWorkerThreadsForNonPersistentTopic=8 -# Enable broker to load persistent topics -enablePersistentTopics=true - -# Enable broker to load non-persistent topics -enableNonPersistentTopics=true - # Enable to run bookie along with broker enableRunBookieTogether=false diff --git a/conf/standalone.conf b/conf/standalone.conf index a8e7181..4f634e9 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -158,12 +158,6 @@ maxConcurrentNonPersistentMessagePerConnection=1000 # Number of worker threads to serve non-persistent topic numWorkerThreadsForNonPersistentTopic=8 -# Enable broker to load persistent topics -enablePersistentTopics=true - -# Enable broker to load non-persistent topics -enableNonPersistentTopics=true - # Max number of producers allowed to connect to topic. Once this limit reaches, Broker will reject new producers # until the number of connected producers decrease. # Using a value of 0, is disabling maxProducersPerTopic-limit check. diff --git a/deployment/terraform-ansible/templates/broker.conf b/deployment/terraform-ansible/templates/broker.conf index 22f74be..1db877c 100644 --- a/deployment/terraform-ansible/templates/broker.conf +++ b/deployment/terraform-ansible/templates/broker.conf @@ -163,12 +163,6 @@ maxConcurrentNonPersistentMessagePerConnection=1000 # Number of worker threads to serve non-persistent topic numWorkerThreadsForNonPersistentTopic=8 -# Enable broker to load persistent topics -enablePersistentTopics=true - -# Enable broker to load non-persistent topics -enableNonPersistentTopics=true - # Enable to run bookie along with broker enableRunBookieTogether=false diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index cb36df4..2be470b 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -397,18 +397,6 @@ public class ServiceConfiguration implements PulsarConfiguration { @FieldContext( category = CATEGORY_SERVER, - doc = "Enable broker to load persistent topics" - ) - private boolean enablePersistentTopics = true; - - @FieldContext( - category = CATEGORY_SERVER, - doc = "Enable broker to load non-persistent topics" - ) - private boolean enableNonPersistentTopics = true; - - @FieldContext( - category = CATEGORY_SERVER, doc = "Enable to run bookie along with broker" ) private boolean enableRunBookieTogether = false; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java index 6a5575c..04449cc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java @@ -96,7 +96,7 @@ public class LoadManagerShared { // The brokers are put into brokerCandidateCache. public static void applyNamespacePolicies(final ServiceUnitId serviceUnit, final SimpleResourceAllocationPolicies policies, final Set<String> brokerCandidateCache, - final Set<String> availableBrokers, final BrokerTopicLoadingPredicate brokerTopicLoadingPredicate) { + final Set<String> availableBrokers) { Set<String> primariesCache = localPrimariesCache.get(); primariesCache.clear(); @@ -145,27 +145,11 @@ public class LoadManagerShared { } } - } else { - // non-persistent topic can be assigned to only those brokers that enabled for non-persistent topic - if (isNonPersistentTopic - && !brokerTopicLoadingPredicate.isEnableNonPersistentTopics(brokerUrlString)) { - if (log.isDebugEnabled()) { - log.debug("Filter broker- [{}] because it doesn't support non-persistent namespace - [{}]", - brokerUrl.getHost(), namespace.toString()); - } - } else if (!isNonPersistentTopic - && !brokerTopicLoadingPredicate.isEnablePersistentTopics(brokerUrlString)) { - // persistent topic can be assigned to only brokers that enabled for persistent-topic - if (log.isDebugEnabled()) { - log.debug("Filter broker- [{}] because broker only supports non-persistent namespace - [{}]", - brokerUrl.getHost(), namespace.toString()); - } - } else if (policies.isSharedBroker(brokerUrl.getHost())) { - secondaryCache.add(broker); - if (log.isDebugEnabled()) { - log.debug("Added Shared Broker - [{}] as possible Candidates for namespace - [{}]", - brokerUrl.getHost(), namespace.toString()); - } + } else if (policies.isSharedBroker(brokerUrl.getHost())) { + secondaryCache.add(broker); + if (log.isDebugEnabled()) { + log.debug("Added Shared Broker - [{}] as possible Candidates for namespace - [{}]", + brokerUrl.getHost(), namespace.toString()); } } } @@ -516,12 +500,6 @@ public class LoadManagerShared { return true; } - public interface BrokerTopicLoadingPredicate { - boolean isEnablePersistentTopics(String brokerUrl); - - boolean isEnableNonPersistentTopics(String brokerUrl); - } - /** * It filters out brokers which owns topic higher than configured threshold at * {@link ServiceConfiguration.loadBalancerBrokerMaxTopics}. <br/> diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index 997d9bb..b426d1e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -60,7 +60,6 @@ import org.apache.pulsar.broker.loadbalance.LoadManager; import org.apache.pulsar.broker.loadbalance.LoadSheddingStrategy; import org.apache.pulsar.broker.loadbalance.ModularLoadManager; import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy; -import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.naming.NamespaceBundleFactory; import org.apache.pulsar.common.naming.NamespaceName; @@ -176,9 +175,6 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach // ZooKeeper belonging to the pulsar service. private ZooKeeper zkClient; - // check if given broker can load persistent/non-persistent topic - private final BrokerTopicLoadingPredicate brokerTopicLoadingPredicate; - private Map<String, String> brokerToFailureDomainMap; private static final Deserializer<LocalBrokerData> loadReportDeserializer = (key, content) -> jsonMapper() @@ -198,22 +194,6 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach preallocatedBundleToBroker = new ConcurrentHashMap<>(); scheduler = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-modular-load-manager")); this.brokerToFailureDomainMap = Maps.newHashMap(); - - this.brokerTopicLoadingPredicate = new BrokerTopicLoadingPredicate() { - @Override - public boolean isEnablePersistentTopics(String brokerUrl) { - final BrokerData brokerData = loadData.getBrokerData().get(brokerUrl.replace("http://", "")); - return brokerData != null && brokerData.getLocalData() != null - && brokerData.getLocalData().isPersistentTopicsEnabled(); - } - - @Override - public boolean isEnableNonPersistentTopics(String brokerUrl) { - final BrokerData brokerData = loadData.getBrokerData().get(brokerUrl.replace("http://", "")); - return brokerData != null && brokerData.getLocalData() != null - && brokerData.getLocalData().isNonPersistentTopicsEnabled(); - } - }; } /** @@ -268,12 +248,6 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach localData = new LocalBrokerData(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(), pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls()); localData.setBrokerVersionString(pulsar.getBrokerVersion()); - // configure broker-topic mode - lastData.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics()); - lastData.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics()); - localData.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics()); - localData.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics()); - placementStrategy = ModularLoadManagerStrategy.create(conf); policies = new SimpleResourceAllocationPolicies(pulsar); @@ -625,7 +599,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach ServiceUnitId serviceUnit = pulsar.getNamespaceService().getNamespaceBundleFactory() .getBundle(namespace, bundle); LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, - getAvailableBrokers(), brokerTopicLoadingPredicate); + getAvailableBrokers()); return LoadManagerShared.shouldAntiAffinityNamespaceUnload(namespace, bundle, currentBroker, pulsar, brokerToNamespaceToBundleRange, brokerCandidateCache); } @@ -706,8 +680,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach final BundleData data = loadData.getBundleData().computeIfAbsent(bundle, key -> getBundleDataOrDefault(bundle)); brokerCandidateCache.clear(); - LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(), - brokerTopicLoadingPredicate); + LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers()); // filter brokers which owns topic higher than threshold LoadManagerShared.filterBrokersWithLargeTopicCount(brokerCandidateCache, loadData, @@ -729,14 +702,14 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach } } catch ( BrokerFilterException x ) { // restore the list of brokers to the full set - LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(), - brokerTopicLoadingPredicate); + LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, + getAvailableBrokers()); } if ( brokerCandidateCache.isEmpty() ) { // restore the list of brokers to the full set - LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(), - brokerTopicLoadingPredicate); + LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, + getAvailableBrokers()); } // Choose a broker among the potentially smaller filtered list, when possible @@ -754,8 +727,8 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach final double maxUsage = loadData.getBrokerData().get(broker.get()).getLocalData().getMaxResourceUsage(); if (maxUsage > overloadThreshold) { // All brokers that were in the filtered list were overloaded, so check if there is a better broker - LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(), - brokerTopicLoadingPredicate); + LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, + getAvailableBrokers()); broker = placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java index 9f9b949..2ad67a1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java @@ -48,7 +48,6 @@ import org.apache.pulsar.broker.loadbalance.BrokerHostUsage; import org.apache.pulsar.broker.loadbalance.LoadManager; import org.apache.pulsar.broker.loadbalance.PlacementStrategy; import org.apache.pulsar.broker.loadbalance.ResourceUnit; -import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.ServiceUnitId; import org.apache.pulsar.common.policies.data.ResourceQuota; @@ -182,8 +181,6 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene private boolean forceLoadReportUpdate = false; private static final Deserializer<LoadReport> loadReportDeserializer = (key, content) -> jsonMapper() .readValue(content, LoadReport.class); - // check if given broker can load persistent/non-persistent topic - private final BrokerTopicLoadingPredicate brokerTopicLoadingPredicate; // Perform initializations which may be done without a PulsarService. public SimpleLoadManagerImpl() { @@ -200,21 +197,6 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene brokerCandidateCache = new HashSet<>(); availableBrokersCache = new HashSet<>(); brokerToNamespaceToBundleRange = new HashMap<>(); - this.brokerTopicLoadingPredicate = new BrokerTopicLoadingPredicate() { - @Override - public boolean isEnablePersistentTopics(String brokerUrl) { - ResourceUnit ru = new SimpleResourceUnit(brokerUrl, new PulsarResourceDescription()); - LoadReport loadReport = currentLoadReports.get(ru); - return loadReport != null && loadReport.isPersistentTopicsEnabled(); - } - - @Override - public boolean isEnableNonPersistentTopics(String brokerUrl) { - ResourceUnit ru = new SimpleResourceUnit(brokerUrl, new PulsarResourceDescription()); - LoadReport loadReport = currentLoadReports.get(ru); - return loadReport != null && loadReport.isNonPersistentTopicsEnabled(); - } - }; } @Override @@ -227,9 +209,6 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene this.policies = new SimpleResourceAllocationPolicies(pulsar); lastLoadReport = new LoadReport(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(), pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls()); - lastLoadReport.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics()); - lastLoadReport.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics()); - loadReportCacheZk = new ZooKeeperDataCache<LoadReport>(pulsar.getLocalZkCache()) { @Override public LoadReport deserialize(String key, byte[] content) throws Exception { @@ -919,8 +898,8 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene } brokerCandidateCache.clear(); try { - LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, availableBrokersCache, - brokerTopicLoadingPredicate); + LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, + availableBrokersCache); } catch (Exception e) { log.warn("Error when trying to apply policies: {}", e); for (final Map.Entry<Long, Set<ResourceUnit>> entry : availableBrokers.entrySet()) { @@ -1112,8 +1091,6 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene try { LoadReport loadReport = new LoadReport(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(), pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls()); - loadReport.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics()); - loadReport.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics()); loadReport.setName(String.format("%s:%s", pulsar.getAdvertisedAddress(), pulsar.getConfiguration().getWebServicePort().get())); loadReport.setBrokerVersionString(pulsar.getBrokerVersion()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index f48877d..0f78c72 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -501,14 +501,6 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic) { CompletableFuture<Optional<Topic>> topicFuture = new CompletableFuture<>(); - if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) { - if (log.isDebugEnabled()) { - log.debug("Broker is unable to load non-persistent topic {}", topic); - } - topicFuture.completeExceptionally( - new NotAllowedException("Broker is not unable to load non-persistent topic")); - return topicFuture; - } final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); NonPersistentTopic nonPersistentTopic = new NonPersistentTopic(topic, this); CompletableFuture<Void> replicationFuture = nonPersistentTopic.checkReplication(); @@ -591,14 +583,6 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies checkTopicNsOwnership(topic); final CompletableFuture<Optional<Topic>> topicFuture = new CompletableFuture<>(); - if (!pulsar.getConfiguration().isEnablePersistentTopics()) { - if (log.isDebugEnabled()) { - log.debug("Broker is unable to load persistent topic {}", topic); - } - topicFuture.completeExceptionally(new NotAllowedException("Broker is not unable to load persistent topic")); - return topicFuture; - } - final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get(); if (topicLoadSemaphore.tryAcquire()) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java index 0303fbf..d23c05e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java @@ -56,7 +56,6 @@ import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.TimeAverageMessageData; import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared; -import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate; import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl; import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper; import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies; @@ -515,17 +514,6 @@ public class ModularLoadManagerImplTest { SimpleResourceAllocationPolicies simpleResourceAllocationPolicies = new SimpleResourceAllocationPolicies( pulsar1); ServiceUnitId serviceUnit = LoadBalancerTestingUtils.makeBundles(nsFactory, tenant, cluster, namespace, 1)[0]; - BrokerTopicLoadingPredicate brokerTopicLoadingPredicate = new BrokerTopicLoadingPredicate() { - @Override - public boolean isEnablePersistentTopics(String brokerUrl) { - return true; - } - - @Override - public boolean isEnableNonPersistentTopics(String brokerUrl) { - return true; - } - }; // (1) now we have isolation policy : primary=broker1, secondary=broker2, minLimit=1 @@ -533,7 +521,7 @@ public class ModularLoadManagerImplTest { Set<String> brokerCandidateCache = Sets.newHashSet(); Set<String> availableBrokers = Sets.newHashSet(sharedBroker, broker1Address, broker2Address); LoadManagerShared.applyNamespacePolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache, - availableBrokers, brokerTopicLoadingPredicate); + availableBrokers); assertEquals(brokerCandidateCache.size(), 1); assertTrue(brokerCandidateCache.contains(broker1Address)); @@ -541,7 +529,7 @@ public class ModularLoadManagerImplTest { brokerCandidateCache = Sets.newHashSet(); availableBrokers = Sets.newHashSet(sharedBroker, broker2Address); LoadManagerShared.applyNamespacePolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache, - availableBrokers, brokerTopicLoadingPredicate); + availableBrokers); assertEquals(brokerCandidateCache.size(), 1); assertTrue(brokerCandidateCache.contains(broker2Address)); @@ -549,7 +537,7 @@ public class ModularLoadManagerImplTest { brokerCandidateCache = Sets.newHashSet(); availableBrokers = Sets.newHashSet(sharedBroker); LoadManagerShared.applyNamespacePolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache, - availableBrokers, brokerTopicLoadingPredicate); + availableBrokers); assertEquals(brokerCandidateCache.size(), 0); // (2) now we will have isolation policy : primary=broker1, secondary=broker2, minLimit=2 @@ -563,7 +551,7 @@ public class ModularLoadManagerImplTest { brokerCandidateCache = Sets.newHashSet(); availableBrokers = Sets.newHashSet(sharedBroker, broker1Address, broker2Address); LoadManagerShared.applyNamespacePolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache, - availableBrokers, brokerTopicLoadingPredicate); + availableBrokers); assertEquals(brokerCandidateCache.size(), 2); assertTrue(brokerCandidateCache.contains(broker1Address)); assertTrue(brokerCandidateCache.contains(broker2Address)); @@ -572,7 +560,7 @@ public class ModularLoadManagerImplTest { brokerCandidateCache = Sets.newHashSet(); availableBrokers = Sets.newHashSet(sharedBroker, broker2Address); LoadManagerShared.applyNamespacePolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache, - availableBrokers, brokerTopicLoadingPredicate); + availableBrokers); assertEquals(brokerCandidateCache.size(), 1); assertTrue(brokerCandidateCache.contains(broker2Address)); @@ -580,7 +568,7 @@ public class ModularLoadManagerImplTest { brokerCandidateCache = Sets.newHashSet(); availableBrokers = Sets.newHashSet(sharedBroker); LoadManagerShared.applyNamespacePolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache, - availableBrokers, brokerTopicLoadingPredicate); + availableBrokers); assertEquals(brokerCandidateCache.size(), 0); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java index 2e7a444..c436491 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java @@ -615,157 +615,6 @@ public class NonPersistentTopicTest extends ProducerConsumerBase { } /** - * verifies load manager assigns topic only if broker started in non-persistent mode - * - * <pre> - * 1. Start broker with disable non-persistent topic mode - * 2. Create namespace with non-persistency set - * 3. Create non-persistent topic - * 4. Load-manager should not be able to find broker - * 5. Create producer on that topic should fail - * </pre> - */ - @Test(dataProvider = "loadManager") - public void testLoadManagerAssignmentForNonPersistentTestAssignment(String loadManagerName) throws Exception { - - final String namespace = "my-property/my-ns"; - final String topicName = "non-persistent://" + namespace + "/loadManager"; - final String defaultLoadManagerName = conf.getLoadManagerClassName(); - final boolean defaultENableNonPersistentTopic = conf.isEnableNonPersistentTopics(); - try { - // start broker to not own non-persistent namespace and create non-persistent namespace - stopBroker(); - conf.setEnableNonPersistentTopics(false); - conf.setLoadManagerClassName(loadManagerName); - startBroker(); - - Field field = PulsarService.class.getDeclaredField("loadManager"); - field.setAccessible(true); - @SuppressWarnings("unchecked") - AtomicReference<LoadManager> loadManagerRef = (AtomicReference<LoadManager>) field.get(pulsar); - LoadManager manager = LoadManager.create(pulsar); - manager.start(); - loadManagerRef.set(manager); - - NamespaceBundle fdqn = pulsar.getNamespaceService().getBundle(TopicName.get(topicName)); - LoadManager loadManager = pulsar.getLoadManager().get(); - ResourceUnit broker = null; - try { - broker = loadManager.getLeastLoaded(fdqn).get(); - } catch (Exception e) { - // Ok. (ModulearLoadManagerImpl throws RuntimeException incase don't find broker) - } - assertNull(broker); - - try { - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).createAsync().get(1, - TimeUnit.SECONDS); - producer.close(); - fail("topic loading should have failed"); - } catch (Exception e) { - // Ok - } - assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent()); - - } finally { - conf.setEnableNonPersistentTopics(defaultENableNonPersistentTopic); - conf.setLoadManagerClassName(defaultLoadManagerName); - } - - } - - /** - * verifies: broker should reject non-persistent topic loading if broker is not enable for non-persistent topic - * - * @param loadManagerName - * @throws Exception - */ - @Test - public void testNonPersistentTopicUnderPersistentNamespace() throws Exception { - - final String namespace = "my-property/my-ns"; - final String topicName = "non-persistent://" + namespace + "/persitentNamespace"; - - final boolean defaultENableNonPersistentTopic = conf.isEnableNonPersistentTopics(); - try { - conf.setEnableNonPersistentTopics(false); - stopBroker(); - startBroker(); - try { - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).createAsync().get(1, - TimeUnit.SECONDS); - producer.close(); - fail("topic loading should have failed"); - } catch (Exception e) { - // Ok - } - - assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent()); - } finally { - conf.setEnableNonPersistentTopics(defaultENableNonPersistentTopic); - } - } - - /** - * verifies that broker started with onlyNonPersistent mode doesn't own persistent-topic - * - * @param loadManagerName - * @throws Exception - */ - @Test(dataProvider = "loadManager") - public void testNonPersistentBrokerModeRejectPersistentTopic(String loadManagerName) throws Exception { - - final String namespace = "my-property/my-ns"; - final String topicName = "persistent://" + namespace + "/loadManager"; - final String defaultLoadManagerName = conf.getLoadManagerClassName(); - final boolean defaultEnablePersistentTopic = conf.isEnablePersistentTopics(); - final boolean defaultEnableNonPersistentTopic = conf.isEnableNonPersistentTopics(); - try { - // start broker to not own non-persistent namespace and create non-persistent namespace - stopBroker(); - conf.setEnableNonPersistentTopics(true); - conf.setEnablePersistentTopics(false); - conf.setLoadManagerClassName(loadManagerName); - startBroker(); - - Field field = PulsarService.class.getDeclaredField("loadManager"); - field.setAccessible(true); - @SuppressWarnings("unchecked") - AtomicReference<LoadManager> loadManagerRef = (AtomicReference<LoadManager>) field.get(pulsar); - LoadManager manager = LoadManager.create(pulsar); - manager.start(); - loadManagerRef.set(manager); - - NamespaceBundle fdqn = pulsar.getNamespaceService().getBundle(TopicName.get(topicName)); - LoadManager loadManager = pulsar.getLoadManager().get(); - ResourceUnit broker = null; - try { - broker = loadManager.getLeastLoaded(fdqn).get(); - } catch (Exception e) { - // Ok. (ModulearLoadManagerImpl throws RuntimeException incase don't find broker) - } - assertNull(broker); - - try { - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).createAsync().get(1, - TimeUnit.SECONDS); - producer.close(); - fail("topic loading should have failed"); - } catch (Exception e) { - // Ok - } - - assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent()); - - } finally { - conf.setEnablePersistentTopics(defaultEnablePersistentTopic); - conf.setEnableNonPersistentTopics(defaultEnableNonPersistentTopic); - conf.setLoadManagerClassName(defaultLoadManagerName); - } - - } - - /** * Verifies msg-drop stats * * @throws Exception diff --git a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LoadManagerReport.java b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LoadManagerReport.java index 7fb173b..4027800 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LoadManagerReport.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LoadManagerReport.java @@ -61,8 +61,4 @@ public interface LoadManagerReport extends ServiceLookupData { public String getBrokerVersionString(); - public boolean isPersistentTopicsEnabled(); - - public boolean isNonPersistentTopicsEnabled(); - } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LoadReport.java b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LoadReport.java index 6ca7744..c98e0ce 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LoadReport.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LoadReport.java @@ -44,8 +44,6 @@ public class LoadReport implements LoadManagerReport { private final String webServiceUrlTls; private final String pulsarServiceUrl; private final String pulsarServiceUrlTls; - private boolean persistentTopicsEnabled = true; - private boolean nonPersistentTopicsEnabled = true; private boolean isUnderLoaded; private boolean isOverLoaded; @@ -404,22 +402,6 @@ public class LoadReport implements LoadManagerReport { return pulsarServiceUrlTls; } - public boolean isPersistentTopicsEnabled() { - return persistentTopicsEnabled; - } - - public void setPersistentTopicsEnabled(boolean persistentTopicsEnabled) { - this.persistentTopicsEnabled = persistentTopicsEnabled; - } - - public boolean isNonPersistentTopicsEnabled() { - return nonPersistentTopicsEnabled; - } - - public void setNonPersistentTopicsEnabled(boolean nonPersistentTopicsEnabled) { - this.nonPersistentTopicsEnabled = nonPersistentTopicsEnabled; - } - @Override public ResourceUsage getCpu() { return systemResourceUsage != null ? systemResourceUsage.cpu : null; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java index 129cf5d..de8f7ff 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java @@ -38,8 +38,6 @@ public class LocalBrokerData extends JSONWritable implements LoadManagerReport { private final String webServiceUrlTls; private final String pulsarServiceUrl; private final String pulsarServiceUrlTls; - private boolean persistentTopicsEnabled=true; - private boolean nonPersistentTopicsEnabled=true; // Most recently available system resource usage. private ResourceUsage cpu; @@ -404,24 +402,6 @@ public class LocalBrokerData extends JSONWritable implements LoadManagerReport { } @Override - public boolean isPersistentTopicsEnabled() { - return persistentTopicsEnabled; - } - - public void setPersistentTopicsEnabled(boolean persistentTopicsEnabled) { - this.persistentTopicsEnabled = persistentTopicsEnabled; - } - - @Override - public boolean isNonPersistentTopicsEnabled() { - return nonPersistentTopicsEnabled; - } - - public void setNonPersistentTopicsEnabled(boolean nonPersistentTopicsEnabled) { - this.nonPersistentTopicsEnabled = nonPersistentTopicsEnabled; - } - - @Override public Map<String, NamespaceBundleStats> getBundleStats() { return getLastStats(); } diff --git a/site/_data/config/broker.yaml b/site/_data/config/broker.yaml index b006e3e..c919c32 100644 --- a/site/_data/config/broker.yaml +++ b/site/_data/config/broker.yaml @@ -18,12 +18,6 @@ # configs: -- name: enablePersistentTopics - default: 'true' - description: Whether persistent topics are enabled on the broker -- name: enableNonPersistentTopics - default: 'true' - description: Whether non-persistent topics are enabled on the broker - name: functionsWorkerEnabled description: Whether the Pulsar Functions worker service is enabled in the broker default: 'false' diff --git a/site/_includes/explanations/ja/non-persistent-topics.md b/site/_includes/explanations/ja/non-persistent-topics.md index 768413f..1564755 100644 --- a/site/_includes/explanations/ja/non-persistent-topics.md +++ b/site/_includes/explanations/ja/non-persistent-topics.md @@ -63,17 +63,4 @@ PulsarClient client = PulsarClient.create("pulsar://localhost:6650"); Producer producer = client.createProducer( "non-persistent://sample/standalone/ns1/my-topic"); -``` - -### Brokerの設定 - -多くの場合、ノンパーシステントトピック提供するためだけにクラスタ内に専用のBrokerを設定する必要はほとんどないでしょう。 - -Brokerが特定のタイプのトピックのみを所有できるようにするための設定は次の通りです: - -``` -# Brokerによるパーシステントトピックのロードを無効化 -enablePersistentTopics=false -# Brokerによるノンパーシステントトピックのロードを有効化 -enableNonPersistentTopics=true -``` +``` \ No newline at end of file diff --git a/site/docs/latest/cookbooks/non-persistent-messaging.md b/site/docs/latest/cookbooks/non-persistent-messaging.md index b1e6ed4..a44c949 100644 --- a/site/docs/latest/cookbooks/non-persistent-messaging.md +++ b/site/docs/latest/cookbooks/non-persistent-messaging.md @@ -49,14 +49,6 @@ $ bin/pulsar-client produce non-persistent://public/default/example-np-topic \ {% include admonition.html type="success" content="For a more thorough guide to non-persistent topics from an administrative perspective, see the [Non-persistent topics](../../admin-api/non-persistent-topics) guide." %} -## Enabling non-persistent topics {#enabling} - -In order to enable non-persistent topics in a Pulsar {% popover broker %}, the [`enableNonPersistentTopics`](../../reference/Configuration#broker-enableNonPersistentTopics) must be set to `true`. This is the default, and so you won't need to take any action to enable non-persistent messaging. - -{% include admonition.html type="info" title="Configuration for standalone mode" content="If you're running Pulsar in standalone mode, the same configurable parameters are available but in the [`standalone.conf`](../../reference/Configuration#standalone) configuration file." %} - -If you'd like to enable *only* non-persistent topics in a broker, you can set the [`enablePersistentTopics`](../../reference/Configuration#broker-enablePersistentTopics) parameter to `false` and the `enableNonPersistentTopics` parameter to `true`. - ## Managing non-persistent topics via the CLI {#cli} Non-persistent topics can be managed using the [`pulsar-admin non-persistent`](../../reference/CliTools#pulsar-admin-non-persistent) command-line interface. With that interface you can perform actions like [create a partitioned non-persistent topic](../../reference/CliTools#pulsar-admin-non-persistent-create-partitioned-topic), get [stats](../../reference/CliTools#pulsar-admin-non-persistent-stats) for a non-persistent topic, [list](../../) non-persistent topics under a namespace, and more. diff --git a/site/docs/latest/getting-started/ConceptsAndArchitecture.md b/site/docs/latest/getting-started/ConceptsAndArchitecture.md index 961141a..607ae95 100644 --- a/site/docs/latest/getting-started/ConceptsAndArchitecture.md +++ b/site/docs/latest/getting-started/ConceptsAndArchitecture.md @@ -216,7 +216,7 @@ In non-persistent topics, {% popover brokers %} immediately deliver messages to {% include admonition.html type="danger" content="With non-persistent topics, message data lives only in memory. If a message broker fails or message data can otherwise not be retrieved from memory, your message data may be lost. Use non-persistent topics only if you're *certain* that your use case requires it and can sustain it." %} -By default, non-persistent topics are enabled on Pulsar {% popover brokers %}. You can disable them in the broker's [configuration](../../reference/Configuration#broker-enableNonPersistentTopics). You can manage non-persistent topics using the [`pulsar-admin non-persistent`](../../reference/CliTools#pulsar-admin-non-persistent) interface. +You can manage non-persistent topics using the [`pulsar-admin non-persistent`](../../reference/CliTools#pulsar-admin-non-persistent) interface. #### Performance diff --git a/site2/docs/concepts-messaging.md b/site2/docs/concepts-messaging.md index b5ed064..a0fee5c 100644 --- a/site2/docs/concepts-messaging.md +++ b/site2/docs/concepts-messaging.md @@ -248,7 +248,7 @@ In non-persistent topics, brokers immediately deliver messages to all connected > With non-persistent topics, message data lives only in memory. If a message > broker fails or message data can otherwise not be retrieved from memory, > your message data may be lost. Use non-persistent topics only if you're > *certain* that your use case requires it and can sustain it. -By default, non-persistent topics are enabled on Pulsar brokers. You can disable them in the broker's [configuration](reference-configuration.md#broker-enableNonPersistentTopics). You can manage non-persistent topics using the [`pulsar-admin topics`](referencereference--pulsar-admin/#topics-1) interface. +You can manage non-persistent topics using the [`pulsar-admin topics`](referencereference--pulsar-admin/#topics-1) interface. ### Performance diff --git a/site2/docs/cookbooks-non-persistent.md b/site2/docs/cookbooks-non-persistent.md index 481f002..2c6047a 100644 --- a/site2/docs/cookbooks-non-persistent.md +++ b/site2/docs/cookbooks-non-persistent.md @@ -38,16 +38,9 @@ $ bin/pulsar-client produce non-persistent://public/default/example-np-topic \ > For a more thorough guide to non-persistent topics from an administrative > perspective, see the [Non-persistent > topics](admin-api-non-persistent-topics.md) guide. -## Enabling - -In order to enable non-persistent topics in a Pulsar broker, the [`enableNonPersistentTopics`](reference-configuration.md#broker-enableNonPersistentTopics) must be set to `true`. This is the default, and so you won't need to take any action to enable non-persistent messaging. - - > #### Configuration for standalone mode > If you're running Pulsar in standalone mode, the same configurable > parameters are available but in the > [`standalone.conf`](reference-configuration.md#standalone) configuration > file. -If you'd like to enable *only* non-persistent topics in a broker, you can set the [`enablePersistentTopics`](reference-configuration.md#broker-enablePersistentTopics) parameter to `false` and the `enableNonPersistentTopics` parameter to `true`. - ## Managing with cli Non-persistent topics can be managed using the [`pulsar-admin non-persistent`](reference-pulsar-admin.md#non-persistent) command-line interface. With that interface you can perform actions like [create a partitioned non-persistent topic](reference-pulsar-admin.md#non-persistent-create-partitioned-topic), get [stats](reference-pulsar-admin.md#non-persistent-stats) for a non-persistent topic, [list](reference-pulsar-admin.md) non-persistent topics under a namespace, and more. diff --git a/site2/docs/reference-configuration.md b/site2/docs/reference-configuration.md index 08354bd..5f92d6f 100644 --- a/site2/docs/reference-configuration.md +++ b/site2/docs/reference-configuration.md @@ -104,8 +104,6 @@ Pulsar brokers are responsible for handling incoming messages from producers, di |Name|Description|Default| |---|---|---| -|enablePersistentTopics| Whether persistent topics are enabled on the broker |true| -|enableNonPersistentTopics| Whether non-persistent topics are enabled on the broker |true| |functionsWorkerEnabled| Whether the Pulsar Functions worker service is enabled in the broker |false| |zookeeperServers| Zookeeper quorum connection string || |configurationStoreServers| Configuration store connection string (as a comma-separated list) ||