heesung-sn commented on code in PR #19102:
URL: https://github.com/apache/pulsar/pull/19102#discussion_r1059106101
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java:
##########
@@ -794,6 +802,12 @@ public void start() throws PulsarServerException {
}
brokerService.start();
+ if (this.loadManager.get() instanceof
ExtensibleLoadManagerWrapper) {
Review Comment:
Let's define a static function and globally use it from PulsarService,
NamespaceService, and other places.
Proposal:
```
ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(ServiceConfiguration
conf){
return
ExtensibleLoadManagerImpl.class.getname().equals(conf.getLoadManagerClassName())
}
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java:
##########
@@ -1085,6 +1129,9 @@ protected void closeLocalMetadataStore() throws Exception
{
}
protected void startLeaderElectionService() {
+ if (this.loadManager.get() instanceof ExtensibleLoadManagerWrapper) {
+ return;
Review Comment:
Use the global `isLoadManagerExtensionEnabled ` static func, as discussed
above.
Also, plz add a log.
log.info("The load manager extension is enabled. Skipping PulsarService
LeaderElectionService.")
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java:
##########
@@ -825,6 +839,11 @@ public void start() throws PulsarServerException {
this.webSocketService.setLocalCluster(clusterData);
}
+ // By starting the Load manager service, the broker will also
become visible
+ // to the rest of the broker by creating the registration z-node.
This needs
+ // to be done only when the broker is fully operative.
+ this.startLoadManagementService();
Review Comment:
Please add a comment that the load manager service and its service unit
state channel need to be initialized first( namespace service depends on load
manager)
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java:
##########
@@ -925,6 +939,36 @@ public void start() throws PulsarServerException {
}
}
+ protected void createNamespaceIfNotExists(String cluster, String
publicTenant, NamespaceName ns) throws Exception {
+ ClusterResources cr = this.getPulsarResources().getClusterResources();
+ TenantResources tr = this.getPulsarResources().getTenantResources();
+ NamespaceResources nsr =
this.getPulsarResources().getNamespaceResources();
+
+ if (!cr.clusterExists(cluster)) {
+ cr.createCluster(cluster,
+ ClusterData.builder()
+ .serviceUrl(this.getWebServiceAddress())
+ .serviceUrlTls(this.getWebServiceAddressTls())
+ .brokerServiceUrl(this.getBrokerServiceUrl())
+ .brokerServiceUrlTls(this.getBrokerServiceUrlTls())
+ .build());
+ }
+
+ if (!tr.tenantExists(publicTenant)) {
+ tr.createTenant(publicTenant,
+ TenantInfo.builder()
+
.adminRoles(Sets.newHashSet(config.getSuperUserRoles()))
+ .allowedClusters(Sets.newHashSet(cluster))
+ .build());
+ }
+
+ if (!nsr.namespaceExists(ns)) {
+ Policies nsp = new Policies();
Review Comment:
Don't we need to create the namespace before?
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java:
##########
@@ -58,6 +61,15 @@ public interface LoadManager {
*/
Optional<ResourceUnit> getLeastLoaded(ServiceUnitId su) throws Exception;
+ default CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
+ Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
+ return null;
Review Comment:
Please throw `UnsupportedOperationException` for the default functions.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java:
##########
@@ -58,6 +61,15 @@ public interface LoadManager {
*/
Optional<ResourceUnit> getLeastLoaded(ServiceUnitId su) throws Exception;
+ default CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
+ Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
+ return null;
+ }
+
+ default CompletableFuture<Boolean>
checkOwnershipAsync(Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
+ return null;
Review Comment:
Please throw `UnsupportedOperationException` for the default functions.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
+import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
+import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter;
+import
org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+
+@Slf4j
+public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
+
+ private PulsarService pulsar;
+
+ private ServiceConfiguration conf;
+
+ @Getter
+ private BrokerRegistry brokerRegistry;
+
+ private ServiceUnitStateChannel serviceUnitStateChannel;
+
+ @Getter
+ private LoadManagerContext context;
+
+ @Getter
+ private final BrokerSelectionStrategy brokerSelectionStrategy;
+
+ @Getter
+ private List<BrokerFilter> brokerFilterPipeline;
+
+ private final AtomicBoolean started = new AtomicBoolean(false);
+
+ private final ConcurrentOpenHashMap<String,
CompletableFuture<Optional<BrokerLookupData>>>
+ lookupRequests = ConcurrentOpenHashMap.<String,
+ CompletableFuture<Optional<BrokerLookupData>>>newBuilder()
+ .build();
+
+ /**
+ * Life cycle: Constructor -> initialize -> start -> close.
+ */
+ public ExtensibleLoadManagerImpl() {
+ this.brokerFilterPipeline = new ArrayList<>();
+ this.brokerSelectionStrategy = (brokers, bundle, context) -> {
+ if (brokers.isEmpty()) {
+ return Optional.empty();
+ }
+ return
Optional.of(brokers.get(ThreadLocalRandom.current().nextInt(brokers.size())));
+ };
+ }
+
+ @Override
+ public void start() throws PulsarServerException {
+ if (this.started.get()) {
Review Comment:
Though this `start()` call is not expected from multiple threads on the same
obj. This atomic get() check can still allow multiple threads to `start()` on
this obj.
We can simply make this `synchronized` because we don't expect start() to be
called competitively from many threads.
Or, if we want to use the atomic boolean, we can use CAS here.
```
if(started.compareAndSet(false, true)){
try{
// start logic here
} catch ( Exception e){
started.set(false) // to allow retry.
}
}
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java:
##########
@@ -0,0 +1,222 @@
+/*
Review Comment:
I assume this class does not need a review here, as it is added in a
separate PR, https://github.com/apache/pulsar/pull/18810.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
+import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
+import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter;
+import
org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+
+@Slf4j
+public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
+
+ private PulsarService pulsar;
+
+ private ServiceConfiguration conf;
+
+ @Getter
+ private BrokerRegistry brokerRegistry;
+
+ private ServiceUnitStateChannel serviceUnitStateChannel;
+
+ @Getter
+ private LoadManagerContext context;
+
+ @Getter
+ private final BrokerSelectionStrategy brokerSelectionStrategy;
+
+ @Getter
+ private List<BrokerFilter> brokerFilterPipeline;
+
+ private final AtomicBoolean started = new AtomicBoolean(false);
+
+ private final ConcurrentOpenHashMap<String,
CompletableFuture<Optional<BrokerLookupData>>>
+ lookupRequests = ConcurrentOpenHashMap.<String,
+ CompletableFuture<Optional<BrokerLookupData>>>newBuilder()
+ .build();
+
+ /**
+ * Life cycle: Constructor -> initialize -> start -> close.
+ */
+ public ExtensibleLoadManagerImpl() {
+ this.brokerFilterPipeline = new ArrayList<>();
+ this.brokerSelectionStrategy = (brokers, bundle, context) -> {
Review Comment:
https://github.com/apache/pulsar/pull/18964 has been merged. We could use
LeastResourceUsageWithWeight here.
Also, we need a TODO: comment here to make brokerSelectionStrategy
configurable.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.loadbalance.LoadManager;
+import org.apache.pulsar.broker.loadbalance.ResourceUnit;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
+
+public class ExtensibleLoadManagerWrapper implements LoadManager {
+
+ private PulsarService pulsar;
+
+ private final ExtensibleLoadManagerImpl loadManager;
+
+ public ExtensibleLoadManagerWrapper(ExtensibleLoadManagerImpl loadManager)
{
+ this.loadManager = loadManager;
+ }
+
+ @Override
+ public void start() throws PulsarServerException {
+ loadManager.start();
+ }
+
+ @Override
+ public void initialize(PulsarService pulsar) {
+ loadManager.initialize(pulsar);
+ this.pulsar = pulsar;
+ }
+
+ @Override
+ public boolean isCentralized() {
+ return true;
+ }
+
+ @Override
+ public CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
+ Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
+ return loadManager.assign(topic, bundle)
+ .thenApply(lookupData ->
lookupData.map(BrokerLookupData::toLookupResult));
+ }
+
+ @Override
+ public CompletableFuture<Boolean>
checkOwnershipAsync(Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
+ return loadManager.checkOwnershipAsync(topic, bundle);
+ }
+
+ @Override
+ public void disableBroker() throws Exception {
+ this.loadManager.getBrokerRegistry().unregister();
+ }
+
+ @Override
+ public Set<String> getAvailableBrokers() throws Exception {
+ return getAvailableBrokersAsync()
+
.get(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(),
TimeUnit.SECONDS);
+ }
+
+ @Override
+ public CompletableFuture<Set<String>> getAvailableBrokersAsync() {
+ return
this.loadManager.getBrokerRegistry().getAvailableBrokersAsync().thenApply(HashSet::new);
+ }
+
+ @Override
+ public void stop() throws PulsarServerException {
+ this.loadManager.close();
+ }
+
+
+ @Override
+ public Optional<ResourceUnit> getLeastLoaded(ServiceUnitId su) throws
Exception {
+ return Optional.empty();
+ }
+
+ @Override
+ public LoadManagerReport generateLoadReport() {
+ return null;
+ }
+
+ @Override
+ public void setLoadReportForceUpdateFlag() {
+ // No-op.
+ }
+
+ @Override
+ public void writeLoadReportOnZookeeper() throws Exception {
+ // No-op, this operation is not useful, the load data reporter will
automatically write.
Review Comment:
Throw UnsupportedOperationException.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.loadbalance.LoadManager;
+import org.apache.pulsar.broker.loadbalance.ResourceUnit;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
+
+public class ExtensibleLoadManagerWrapper implements LoadManager {
+
+ private PulsarService pulsar;
+
+ private final ExtensibleLoadManagerImpl loadManager;
+
+ public ExtensibleLoadManagerWrapper(ExtensibleLoadManagerImpl loadManager)
{
+ this.loadManager = loadManager;
+ }
+
+ @Override
+ public void start() throws PulsarServerException {
+ loadManager.start();
+ }
+
+ @Override
+ public void initialize(PulsarService pulsar) {
+ loadManager.initialize(pulsar);
+ this.pulsar = pulsar;
+ }
+
+ @Override
+ public boolean isCentralized() {
+ return true;
+ }
+
+ @Override
+ public CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
+ Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
+ return loadManager.assign(topic, bundle)
+ .thenApply(lookupData ->
lookupData.map(BrokerLookupData::toLookupResult));
+ }
+
+ @Override
+ public CompletableFuture<Boolean>
checkOwnershipAsync(Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
+ return loadManager.checkOwnershipAsync(topic, bundle);
+ }
+
+ @Override
+ public void disableBroker() throws Exception {
+ this.loadManager.getBrokerRegistry().unregister();
+ }
+
+ @Override
+ public Set<String> getAvailableBrokers() throws Exception {
+ return getAvailableBrokersAsync()
+
.get(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(),
TimeUnit.SECONDS);
+ }
+
+ @Override
+ public CompletableFuture<Set<String>> getAvailableBrokersAsync() {
+ return
this.loadManager.getBrokerRegistry().getAvailableBrokersAsync().thenApply(HashSet::new);
+ }
+
+ @Override
+ public void stop() throws PulsarServerException {
+ this.loadManager.close();
+ }
+
+
+ @Override
+ public Optional<ResourceUnit> getLeastLoaded(ServiceUnitId su) throws
Exception {
+ return Optional.empty();
+ }
+
+ @Override
+ public LoadManagerReport generateLoadReport() {
+ return null;
+ }
+
+ @Override
+ public void setLoadReportForceUpdateFlag() {
+ // No-op.
+ }
+
+ @Override
+ public void writeLoadReportOnZookeeper() throws Exception {
+ // No-op, this operation is not useful, the load data reporter will
automatically write.
+ }
+
+ @Override
+ public void writeResourceQuotasToZooKeeper() throws Exception {
+ // No-op, this operation is not useful, the load data reporter will
automatically write.
Review Comment:
Throw UnsupportedOperationException.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java:
##########
@@ -118,7 +118,7 @@ public interface ServiceUnitStateChannel extends Closeable {
* the future object will time out.
* Case 3: If none of them, it returns null.
Review Comment:
With this change, the case 3 returns Optional.empty(). Please update the
comment.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -242,11 +242,12 @@ public CompletableFuture<Optional<String>>
getChannelOwnerAsync() {
// TODO: discard this protocol prefix removal
// by a util func that returns
lookupServiceAddress(serviceUrl)
if (leader.isPresent()) {
- String broker = leader.get().getServiceUrl();
- broker = broker.substring(broker.lastIndexOf('/') + 1);
- return Optional.of(broker);
+ return Optional.of(leader.get().getServiceUrl());
} else {
- return Optional.empty();
Review Comment:
If we don't return Optional.empty(), we can change the output signature to
`CompletableFuture<String>`.
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java:
##########
@@ -0,0 +1,313 @@
+/*
Review Comment:
I assume this class does not need a review here, as it is added in a
separate PR, https://github.com/apache/pulsar/pull/18810.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java:
##########
@@ -171,11 +172,21 @@ public void initialize() {
}
}
+ public boolean isExtensibleLoadManager(){
Review Comment:
Please use the static func, isExtensibleLoadManagerEnabled(), as discussed
above.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java:
##########
@@ -132,7 +132,7 @@ public interface ServiceUnitStateChannel extends Closeable {
* case 2: If the assigned broker does not take the ownership in time,
* the future object will time out.
*/
- CompletableFuture<String> publishAssignEventAsync(String serviceUnit,
String broker);
+ CompletableFuture<Optional<String>> publishAssignEventAsync(String
serviceUnit, String broker);
Review Comment:
Why do we need Optional here? I don't think we return Optional.empty here.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java:
##########
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.loadbalance.extensions;
Review Comment:
I assume this class does not need a review here, as it is added in a
separate PR, https://github.com/apache/pulsar/pull/18810.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
+import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
+import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter;
+import
org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+
+@Slf4j
+public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
+
+ private PulsarService pulsar;
+
+ private ServiceConfiguration conf;
+
+ @Getter
+ private BrokerRegistry brokerRegistry;
+
+ private ServiceUnitStateChannel serviceUnitStateChannel;
+
+ @Getter
+ private LoadManagerContext context;
+
+ @Getter
+ private final BrokerSelectionStrategy brokerSelectionStrategy;
+
+ @Getter
+ private List<BrokerFilter> brokerFilterPipeline;
+
+ private final AtomicBoolean started = new AtomicBoolean(false);
+
+ private final ConcurrentOpenHashMap<String,
CompletableFuture<Optional<BrokerLookupData>>>
+ lookupRequests = ConcurrentOpenHashMap.<String,
+ CompletableFuture<Optional<BrokerLookupData>>>newBuilder()
+ .build();
+
+ /**
+ * Life cycle: Constructor -> initialize -> start -> close.
+ */
+ public ExtensibleLoadManagerImpl() {
+ this.brokerFilterPipeline = new ArrayList<>();
+ this.brokerSelectionStrategy = (brokers, bundle, context) -> {
+ if (brokers.isEmpty()) {
+ return Optional.empty();
+ }
+ return
Optional.of(brokers.get(ThreadLocalRandom.current().nextInt(brokers.size())));
+ };
+ }
+
+ @Override
+ public void start() throws PulsarServerException {
+ if (this.started.get()) {
+ return;
+ }
+ this.brokerRegistry = new BrokerRegistryImpl(pulsar);
+ this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar);
+ this.brokerRegistry.start();
+ this.serviceUnitStateChannel.start();
+
+ // TODO: Start the load data store.
+
+ this.context = LoadManagerContextImpl.builder()
+ .configuration(conf)
+ .brokerRegistry(brokerRegistry)
+ .brokerLoadDataStore(null)
+ .topBundleLoadDataStore(null).build();
+ // TODO: Start load data reporter.
+
+ // TODO: Start unload scheduler and bundle split scheduler
+
+ this.started.set(true);
+ }
+
+ @Override
+ public void initialize(PulsarService pulsar) {
+ this.pulsar = pulsar;
+ this.conf = pulsar.getConfiguration();
+ }
+
+ @Override
+ public CompletableFuture<Optional<BrokerLookupData>>
assign(Optional<ServiceUnitId> topic,
+ ServiceUnitId
serviceUnit) {
+
+ final String bundle = serviceUnit.toString();
+
+ CompletableFuture<Optional<BrokerLookupData>> future =
lookupRequests.computeIfAbsent(bundle, k -> {
+ final CompletableFuture<Optional<String>> owner;
+ // Assign the bundle to channel owner if is internal topic, to
avoid circular references.
+ if (topic.isPresent() && isInternalTopic(topic.get().toString())) {
+ owner = serviceUnitStateChannel.getChannelOwnerAsync();
+ } else {
+ owner =
serviceUnitStateChannel.getOwnerAsync(bundle).thenCompose(broker -> {
+ // If the bundle not assign yet, select and publish assign
event to channel.
+ if (broker.isEmpty()) {
+ return
this.selectAsync(serviceUnit).thenCompose(brokerOpt -> {
+ if (brokerOpt.isPresent()) {
+ log.info("Selected new owner broker: {} for
bundle: {}.", brokerOpt.get(), bundle);
+ return
serviceUnitStateChannel.publishAssignEventAsync(bundle, brokerOpt.get());
+ } else {
+ throw new IllegalStateException(
+ "Failed to discover(select) the new
owner broker for bundle: " + bundle);
+ }
+ });
+ }
+ // Already assigned, return it.
+ return CompletableFuture.completedFuture(broker);
+ });
+ }
+
+ return owner.thenCompose(broker -> {
+ if (broker.isEmpty()) {
+ String errorMsg = String.format(
+ "Failed to look up a broker registry:%s for
bundle:%s", broker, bundle);
+ log.error(errorMsg);
+ throw new IllegalStateException(errorMsg);
+ }
+ return CompletableFuture.completedFuture(broker.get());
+ }).thenCompose(broker ->
this.getBrokerRegistry().lookupAsync(broker).thenCompose(brokerLookupData -> {
+ if (brokerLookupData.isEmpty()) {
+ String errorMsg = String.format(
+ "Failed to look up a broker registry:%s for
bundle:%s", broker, bundle);
+ log.error(errorMsg);
+ throw new IllegalStateException(errorMsg);
+ }
+ return CompletableFuture.completedFuture(brokerLookupData);
+ }));
+ });
+ future.whenComplete((r, t) -> lookupRequests.remove(bundle));
+ return future;
+ }
+
+ public CompletableFuture<Optional<String>> selectAsync(ServiceUnitId
bundle) {
+ BrokerRegistry brokerRegistry = getBrokerRegistry();
+ return brokerRegistry.getAvailableBrokerLookupDataAsync()
+ .thenCompose(availableBrokers -> {
+ // TODO: Support isolation policies
+ LoadManagerContext context = this.getContext();
+
+ // Filter out brokers that do not meet the rules.
+ List<BrokerFilter> filterPipeline =
getBrokerFilterPipeline();
+ Map<String, BrokerLookupData> availableBrokerCandidates =
new HashMap<>(availableBrokers);
+ for (final BrokerFilter filter : filterPipeline) {
+ try {
+ filter.filter(availableBrokerCandidates, context);
+ } catch (BrokerFilterException e) {
+ availableBrokerCandidates = availableBrokers;
Review Comment:
plz add an error log.
Why don't we throw an exception here or return the empty broker?
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
+import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
+import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter;
+import
org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+
+@Slf4j
+public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
+
+ private PulsarService pulsar;
+
+ private ServiceConfiguration conf;
+
+ @Getter
+ private BrokerRegistry brokerRegistry;
+
+ private ServiceUnitStateChannel serviceUnitStateChannel;
+
+ @Getter
+ private LoadManagerContext context;
+
+ @Getter
+ private final BrokerSelectionStrategy brokerSelectionStrategy;
+
+ @Getter
+ private List<BrokerFilter> brokerFilterPipeline;
+
+ private final AtomicBoolean started = new AtomicBoolean(false);
+
+ private final ConcurrentOpenHashMap<String,
CompletableFuture<Optional<BrokerLookupData>>>
+ lookupRequests = ConcurrentOpenHashMap.<String,
+ CompletableFuture<Optional<BrokerLookupData>>>newBuilder()
+ .build();
+
+ /**
+ * Life cycle: Constructor -> initialize -> start -> close.
+ */
+ public ExtensibleLoadManagerImpl() {
+ this.brokerFilterPipeline = new ArrayList<>();
+ this.brokerSelectionStrategy = (brokers, bundle, context) -> {
+ if (brokers.isEmpty()) {
+ return Optional.empty();
+ }
+ return
Optional.of(brokers.get(ThreadLocalRandom.current().nextInt(brokers.size())));
+ };
+ }
+
+ @Override
+ public void start() throws PulsarServerException {
+ if (this.started.get()) {
+ return;
+ }
+ this.brokerRegistry = new BrokerRegistryImpl(pulsar);
+ this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar);
+ this.brokerRegistry.start();
+ this.serviceUnitStateChannel.start();
+
+ // TODO: Start the load data store.
+
+ this.context = LoadManagerContextImpl.builder()
+ .configuration(conf)
+ .brokerRegistry(brokerRegistry)
+ .brokerLoadDataStore(null)
+ .topBundleLoadDataStore(null).build();
+ // TODO: Start load data reporter.
+
+ // TODO: Start unload scheduler and bundle split scheduler
+
+ this.started.set(true);
+ }
+
+ @Override
+ public void initialize(PulsarService pulsar) {
+ this.pulsar = pulsar;
+ this.conf = pulsar.getConfiguration();
+ }
+
+ @Override
+ public CompletableFuture<Optional<BrokerLookupData>>
assign(Optional<ServiceUnitId> topic,
+ ServiceUnitId
serviceUnit) {
+
+ final String bundle = serviceUnit.toString();
+
+ CompletableFuture<Optional<BrokerLookupData>> future =
lookupRequests.computeIfAbsent(bundle, k -> {
+ final CompletableFuture<Optional<String>> owner;
+ // Assign the bundle to channel owner if is internal topic, to
avoid circular references.
+ if (topic.isPresent() && isInternalTopic(topic.get().toString())) {
+ owner = serviceUnitStateChannel.getChannelOwnerAsync();
+ } else {
+ owner =
serviceUnitStateChannel.getOwnerAsync(bundle).thenCompose(broker -> {
+ // If the bundle not assign yet, select and publish assign
event to channel.
+ if (broker.isEmpty()) {
+ return
this.selectAsync(serviceUnit).thenCompose(brokerOpt -> {
+ if (brokerOpt.isPresent()) {
+ log.info("Selected new owner broker: {} for
bundle: {}.", brokerOpt.get(), bundle);
+ return
serviceUnitStateChannel.publishAssignEventAsync(bundle, brokerOpt.get());
+ } else {
+ throw new IllegalStateException(
+ "Failed to discover(select) the new
owner broker for bundle: " + bundle);
Review Comment:
nit: discover(select) -> select
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.loadbalance.LoadManager;
+import org.apache.pulsar.broker.loadbalance.ResourceUnit;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
+
+public class ExtensibleLoadManagerWrapper implements LoadManager {
+
+ private PulsarService pulsar;
+
+ private final ExtensibleLoadManagerImpl loadManager;
+
+ public ExtensibleLoadManagerWrapper(ExtensibleLoadManagerImpl loadManager)
{
+ this.loadManager = loadManager;
+ }
+
+ @Override
+ public void start() throws PulsarServerException {
+ loadManager.start();
+ }
+
+ @Override
+ public void initialize(PulsarService pulsar) {
+ loadManager.initialize(pulsar);
+ this.pulsar = pulsar;
+ }
+
+ @Override
+ public boolean isCentralized() {
+ return true;
+ }
+
+ @Override
+ public CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
+ Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
+ return loadManager.assign(topic, bundle)
+ .thenApply(lookupData ->
lookupData.map(BrokerLookupData::toLookupResult));
+ }
+
+ @Override
+ public CompletableFuture<Boolean>
checkOwnershipAsync(Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
+ return loadManager.checkOwnershipAsync(topic, bundle);
+ }
+
+ @Override
+ public void disableBroker() throws Exception {
+ this.loadManager.getBrokerRegistry().unregister();
+ }
+
+ @Override
+ public Set<String> getAvailableBrokers() throws Exception {
+ return getAvailableBrokersAsync()
+
.get(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(),
TimeUnit.SECONDS);
+ }
+
+ @Override
+ public CompletableFuture<Set<String>> getAvailableBrokersAsync() {
+ return
this.loadManager.getBrokerRegistry().getAvailableBrokersAsync().thenApply(HashSet::new);
+ }
+
+ @Override
+ public void stop() throws PulsarServerException {
+ this.loadManager.close();
+ }
+
+
+ @Override
+ public Optional<ResourceUnit> getLeastLoaded(ServiceUnitId su) throws
Exception {
+ return Optional.empty();
Review Comment:
Throw UnsupportedOperationException.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java:
##########
@@ -143,6 +155,11 @@ static LoadManager create(final PulsarService pulsar) {
final LoadManager casted = new
ModularLoadManagerWrapper((ModularLoadManager) loadManagerInstance);
casted.initialize(pulsar);
return casted;
+ } else if (loadManagerInstance instanceof
ExtensibleLoadManagerImpl) {
Review Comment:
nit : can we compare against the interface, `(loadManagerInstance instanceof
ExtensibleLoadManager) ` to be consistent with the other cases?
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.loadbalance.LoadManager;
+import org.apache.pulsar.broker.loadbalance.ResourceUnit;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
+
+public class ExtensibleLoadManagerWrapper implements LoadManager {
+
+ private PulsarService pulsar;
+
+ private final ExtensibleLoadManagerImpl loadManager;
+
+ public ExtensibleLoadManagerWrapper(ExtensibleLoadManagerImpl loadManager)
{
+ this.loadManager = loadManager;
+ }
+
+ @Override
+ public void start() throws PulsarServerException {
+ loadManager.start();
+ }
+
+ @Override
+ public void initialize(PulsarService pulsar) {
+ loadManager.initialize(pulsar);
+ this.pulsar = pulsar;
+ }
+
+ @Override
+ public boolean isCentralized() {
+ return true;
+ }
+
+ @Override
+ public CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
+ Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
+ return loadManager.assign(topic, bundle)
+ .thenApply(lookupData ->
lookupData.map(BrokerLookupData::toLookupResult));
+ }
+
+ @Override
+ public CompletableFuture<Boolean>
checkOwnershipAsync(Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
+ return loadManager.checkOwnershipAsync(topic, bundle);
+ }
+
+ @Override
+ public void disableBroker() throws Exception {
+ this.loadManager.getBrokerRegistry().unregister();
+ }
+
+ @Override
+ public Set<String> getAvailableBrokers() throws Exception {
+ return getAvailableBrokersAsync()
+
.get(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(),
TimeUnit.SECONDS);
+ }
+
+ @Override
+ public CompletableFuture<Set<String>> getAvailableBrokersAsync() {
+ return
this.loadManager.getBrokerRegistry().getAvailableBrokersAsync().thenApply(HashSet::new);
+ }
+
+ @Override
+ public void stop() throws PulsarServerException {
+ this.loadManager.close();
+ }
+
+
+ @Override
+ public Optional<ResourceUnit> getLeastLoaded(ServiceUnitId su) throws
Exception {
+ return Optional.empty();
+ }
+
+ @Override
+ public LoadManagerReport generateLoadReport() {
+ return null;
+ }
+
+ @Override
+ public void setLoadReportForceUpdateFlag() {
+ // No-op.
+ }
+
+ @Override
+ public void writeLoadReportOnZookeeper() throws Exception {
+ // No-op, this operation is not useful, the load data reporter will
automatically write.
+ }
+
+ @Override
+ public void writeResourceQuotasToZooKeeper() throws Exception {
+ // No-op, this operation is not useful, the load data reporter will
automatically write.
+ }
+
+ @Override
+ public List<Metrics> getLoadBalancingMetrics() {
+ return null;
Review Comment:
Add a TODO comment here.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.loadbalance.LoadManager;
+import org.apache.pulsar.broker.loadbalance.ResourceUnit;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
+
+public class ExtensibleLoadManagerWrapper implements LoadManager {
+
+ private PulsarService pulsar;
+
+ private final ExtensibleLoadManagerImpl loadManager;
+
+ public ExtensibleLoadManagerWrapper(ExtensibleLoadManagerImpl loadManager)
{
+ this.loadManager = loadManager;
+ }
+
+ @Override
+ public void start() throws PulsarServerException {
+ loadManager.start();
+ }
+
+ @Override
+ public void initialize(PulsarService pulsar) {
+ loadManager.initialize(pulsar);
+ this.pulsar = pulsar;
+ }
+
+ @Override
+ public boolean isCentralized() {
+ return true;
+ }
+
+ @Override
+ public CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
+ Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
+ return loadManager.assign(topic, bundle)
+ .thenApply(lookupData ->
lookupData.map(BrokerLookupData::toLookupResult));
+ }
+
+ @Override
+ public CompletableFuture<Boolean>
checkOwnershipAsync(Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
+ return loadManager.checkOwnershipAsync(topic, bundle);
+ }
+
+ @Override
+ public void disableBroker() throws Exception {
+ this.loadManager.getBrokerRegistry().unregister();
+ }
+
+ @Override
+ public Set<String> getAvailableBrokers() throws Exception {
+ return getAvailableBrokersAsync()
+
.get(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(),
TimeUnit.SECONDS);
+ }
+
+ @Override
+ public CompletableFuture<Set<String>> getAvailableBrokersAsync() {
+ return
this.loadManager.getBrokerRegistry().getAvailableBrokersAsync().thenApply(HashSet::new);
+ }
+
+ @Override
+ public void stop() throws PulsarServerException {
+ this.loadManager.close();
+ }
+
+
+ @Override
+ public Optional<ResourceUnit> getLeastLoaded(ServiceUnitId su) throws
Exception {
+ return Optional.empty();
+ }
+
+ @Override
+ public LoadManagerReport generateLoadReport() {
+ return null;
Review Comment:
Throw UnsupportedOperationException.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
+import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
+import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter;
+import
org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+
+@Slf4j
+public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
+
+ private PulsarService pulsar;
+
+ private ServiceConfiguration conf;
+
+ @Getter
+ private BrokerRegistry brokerRegistry;
+
+ private ServiceUnitStateChannel serviceUnitStateChannel;
+
+ @Getter
+ private LoadManagerContext context;
+
+ @Getter
+ private final BrokerSelectionStrategy brokerSelectionStrategy;
+
+ @Getter
+ private List<BrokerFilter> brokerFilterPipeline;
+
+ private final AtomicBoolean started = new AtomicBoolean(false);
+
+ private final ConcurrentOpenHashMap<String,
CompletableFuture<Optional<BrokerLookupData>>>
+ lookupRequests = ConcurrentOpenHashMap.<String,
+ CompletableFuture<Optional<BrokerLookupData>>>newBuilder()
+ .build();
+
+ /**
+ * Life cycle: Constructor -> initialize -> start -> close.
+ */
+ public ExtensibleLoadManagerImpl() {
+ this.brokerFilterPipeline = new ArrayList<>();
+ this.brokerSelectionStrategy = (brokers, bundle, context) -> {
+ if (brokers.isEmpty()) {
+ return Optional.empty();
+ }
+ return
Optional.of(brokers.get(ThreadLocalRandom.current().nextInt(brokers.size())));
+ };
+ }
+
+ @Override
+ public void start() throws PulsarServerException {
+ if (this.started.get()) {
+ return;
+ }
+ this.brokerRegistry = new BrokerRegistryImpl(pulsar);
+ this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar);
+ this.brokerRegistry.start();
+ this.serviceUnitStateChannel.start();
+
+ // TODO: Start the load data store.
+
+ this.context = LoadManagerContextImpl.builder()
+ .configuration(conf)
+ .brokerRegistry(brokerRegistry)
+ .brokerLoadDataStore(null)
+ .topBundleLoadDataStore(null).build();
+ // TODO: Start load data reporter.
+
+ // TODO: Start unload scheduler and bundle split scheduler
+
+ this.started.set(true);
+ }
+
+ @Override
+ public void initialize(PulsarService pulsar) {
+ this.pulsar = pulsar;
+ this.conf = pulsar.getConfiguration();
+ }
+
+ @Override
+ public CompletableFuture<Optional<BrokerLookupData>>
assign(Optional<ServiceUnitId> topic,
+ ServiceUnitId
serviceUnit) {
+
+ final String bundle = serviceUnit.toString();
+
+ CompletableFuture<Optional<BrokerLookupData>> future =
lookupRequests.computeIfAbsent(bundle, k -> {
+ final CompletableFuture<Optional<String>> owner;
+ // Assign the bundle to channel owner if is internal topic, to
avoid circular references.
+ if (topic.isPresent() && isInternalTopic(topic.get().toString())) {
+ owner = serviceUnitStateChannel.getChannelOwnerAsync();
+ } else {
+ owner =
serviceUnitStateChannel.getOwnerAsync(bundle).thenCompose(broker -> {
+ // If the bundle not assign yet, select and publish assign
event to channel.
+ if (broker.isEmpty()) {
+ return
this.selectAsync(serviceUnit).thenCompose(brokerOpt -> {
+ if (brokerOpt.isPresent()) {
+ log.info("Selected new owner broker: {} for
bundle: {}.", brokerOpt.get(), bundle);
+ return
serviceUnitStateChannel.publishAssignEventAsync(bundle, brokerOpt.get());
+ } else {
+ throw new IllegalStateException(
+ "Failed to discover(select) the new
owner broker for bundle: " + bundle);
+ }
+ });
+ }
+ // Already assigned, return it.
+ return CompletableFuture.completedFuture(broker);
+ });
+ }
+
+ return owner.thenCompose(broker -> {
+ if (broker.isEmpty()) {
+ String errorMsg = String.format(
+ "Failed to look up a broker registry:%s for
bundle:%s", broker, bundle);
+ log.error(errorMsg);
+ throw new IllegalStateException(errorMsg);
+ }
+ return CompletableFuture.completedFuture(broker.get());
+ }).thenCompose(broker ->
this.getBrokerRegistry().lookupAsync(broker).thenCompose(brokerLookupData -> {
+ if (brokerLookupData.isEmpty()) {
+ String errorMsg = String.format(
+ "Failed to look up a broker registry:%s for
bundle:%s", broker, bundle);
+ log.error(errorMsg);
+ throw new IllegalStateException(errorMsg);
+ }
+ return CompletableFuture.completedFuture(brokerLookupData);
+ }));
+ });
+ future.whenComplete((r, t) -> lookupRequests.remove(bundle));
+ return future;
+ }
+
+ public CompletableFuture<Optional<String>> selectAsync(ServiceUnitId
bundle) {
+ BrokerRegistry brokerRegistry = getBrokerRegistry();
+ return brokerRegistry.getAvailableBrokerLookupDataAsync()
+ .thenCompose(availableBrokers -> {
+ // TODO: Support isolation policies
+ LoadManagerContext context = this.getContext();
+
+ // Filter out brokers that do not meet the rules.
+ List<BrokerFilter> filterPipeline =
getBrokerFilterPipeline();
+ Map<String, BrokerLookupData> availableBrokerCandidates =
new HashMap<>(availableBrokers);
+ for (final BrokerFilter filter : filterPipeline) {
+ try {
+ filter.filter(availableBrokerCandidates, context);
+ } catch (BrokerFilterException e) {
+ availableBrokerCandidates = availableBrokers;
+ }
+ }
+ if (availableBrokerCandidates.isEmpty()) {
+ return
CompletableFuture.completedFuture(Optional.empty());
+ }
+ ArrayList<String> candidateBrokers = new
ArrayList<>(availableBrokerCandidates.keySet());
+
+ return CompletableFuture.completedFuture(
+
getBrokerSelectionStrategy().select(candidateBrokers, bundle, context));
+ });
+ }
+
+ @Override
+ public CompletableFuture<Boolean>
checkOwnershipAsync(Optional<ServiceUnitId> topic, ServiceUnitId bundleUnit) {
+ final String bundle = bundleUnit.toString();
+ CompletableFuture<Optional<String>> owner;
+ if (topic.isPresent() && isInternalTopic(topic.get().toString())) {
+ owner = serviceUnitStateChannel.getChannelOwnerAsync();
+ } else {
+ owner = serviceUnitStateChannel.getOwnerAsync(bundle);
+ }
+
+ return owner.thenApply(broker ->
brokerRegistry.getBrokerId().equals(broker.orElse(null)));
+ }
+
+ @Override
+ public void close() throws PulsarServerException {
Review Comment:
Like `start()` this can still allow multiple threads can `close()` on this
obj.
I think we can simply make this synchronized too.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.loadbalance.LoadManager;
+import org.apache.pulsar.broker.loadbalance.ResourceUnit;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
+
+public class ExtensibleLoadManagerWrapper implements LoadManager {
+
+ private PulsarService pulsar;
+
+ private final ExtensibleLoadManagerImpl loadManager;
+
+ public ExtensibleLoadManagerWrapper(ExtensibleLoadManagerImpl loadManager)
{
+ this.loadManager = loadManager;
+ }
+
+ @Override
+ public void start() throws PulsarServerException {
+ loadManager.start();
+ }
+
+ @Override
+ public void initialize(PulsarService pulsar) {
+ loadManager.initialize(pulsar);
+ this.pulsar = pulsar;
+ }
+
+ @Override
+ public boolean isCentralized() {
+ return true;
+ }
+
+ @Override
+ public CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
+ Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
+ return loadManager.assign(topic, bundle)
+ .thenApply(lookupData ->
lookupData.map(BrokerLookupData::toLookupResult));
+ }
+
+ @Override
+ public CompletableFuture<Boolean>
checkOwnershipAsync(Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
+ return loadManager.checkOwnershipAsync(topic, bundle);
+ }
+
+ @Override
+ public void disableBroker() throws Exception {
+ this.loadManager.getBrokerRegistry().unregister();
+ }
+
+ @Override
+ public Set<String> getAvailableBrokers() throws Exception {
+ return getAvailableBrokersAsync()
+
.get(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(),
TimeUnit.SECONDS);
+ }
+
+ @Override
+ public CompletableFuture<Set<String>> getAvailableBrokersAsync() {
+ return
this.loadManager.getBrokerRegistry().getAvailableBrokersAsync().thenApply(HashSet::new);
+ }
+
+ @Override
+ public void stop() throws PulsarServerException {
+ this.loadManager.close();
+ }
+
+
+ @Override
+ public Optional<ResourceUnit> getLeastLoaded(ServiceUnitId su) throws
Exception {
+ return Optional.empty();
+ }
+
+ @Override
+ public LoadManagerReport generateLoadReport() {
+ return null;
+ }
+
+ @Override
+ public void setLoadReportForceUpdateFlag() {
+ // No-op.
Review Comment:
Throw UnsupportedOperationException.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.loadbalance.LoadManager;
+import org.apache.pulsar.broker.loadbalance.ResourceUnit;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
+
+public class ExtensibleLoadManagerWrapper implements LoadManager {
+
+ private PulsarService pulsar;
+
+ private final ExtensibleLoadManagerImpl loadManager;
+
+ public ExtensibleLoadManagerWrapper(ExtensibleLoadManagerImpl loadManager)
{
+ this.loadManager = loadManager;
+ }
+
+ @Override
+ public void start() throws PulsarServerException {
+ loadManager.start();
+ }
+
+ @Override
+ public void initialize(PulsarService pulsar) {
+ loadManager.initialize(pulsar);
+ this.pulsar = pulsar;
+ }
+
+ @Override
+ public boolean isCentralized() {
+ return true;
+ }
+
+ @Override
+ public CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
+ Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
+ return loadManager.assign(topic, bundle)
+ .thenApply(lookupData ->
lookupData.map(BrokerLookupData::toLookupResult));
+ }
+
+ @Override
+ public CompletableFuture<Boolean>
checkOwnershipAsync(Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
+ return loadManager.checkOwnershipAsync(topic, bundle);
+ }
+
+ @Override
+ public void disableBroker() throws Exception {
+ this.loadManager.getBrokerRegistry().unregister();
+ }
+
+ @Override
+ public Set<String> getAvailableBrokers() throws Exception {
+ return getAvailableBrokersAsync()
+
.get(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(),
TimeUnit.SECONDS);
+ }
+
+ @Override
+ public CompletableFuture<Set<String>> getAvailableBrokersAsync() {
+ return
this.loadManager.getBrokerRegistry().getAvailableBrokersAsync().thenApply(HashSet::new);
+ }
+
+ @Override
+ public void stop() throws PulsarServerException {
+ this.loadManager.close();
+ }
+
+
+ @Override
+ public Optional<ResourceUnit> getLeastLoaded(ServiceUnitId su) throws
Exception {
+ return Optional.empty();
+ }
+
+ @Override
+ public LoadManagerReport generateLoadReport() {
+ return null;
+ }
+
+ @Override
+ public void setLoadReportForceUpdateFlag() {
+ // No-op.
+ }
+
+ @Override
+ public void writeLoadReportOnZookeeper() throws Exception {
+ // No-op, this operation is not useful, the load data reporter will
automatically write.
+ }
+
+ @Override
+ public void writeResourceQuotasToZooKeeper() throws Exception {
+ // No-op, this operation is not useful, the load data reporter will
automatically write.
+ }
+
+ @Override
+ public List<Metrics> getLoadBalancingMetrics() {
+ return null;
+ }
+
+ @Override
+ public void doLoadShedding() {
+ // No-op.
Review Comment:
Throw UnsupportedOperationException.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java:
##########
@@ -171,11 +172,21 @@ public void initialize() {
}
}
+ public boolean isExtensibleLoadManager(){
+ return loadManager.get() instanceof ExtensibleLoadManagerWrapper;
+ }
+
public CompletableFuture<Optional<LookupResult>>
getBrokerServiceUrlAsync(TopicName topic, LookupOptions options) {
long startTime = System.nanoTime();
CompletableFuture<Optional<LookupResult>> future =
getBundleAsync(topic)
- .thenCompose(bundle -> findBrokerServiceUrl(bundle, options));
+ .thenCompose(bundle -> {
+ if (isExtensibleLoadManager()) {
Review Comment:
I was working on these NamespaceService change as per `pluggable new broker
load balancer` work.
Do you want to keep this change here in this PR?
I think we need to add NamespaceService unit tests to track this variation.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -242,11 +242,12 @@ public CompletableFuture<Optional<String>>
getChannelOwnerAsync() {
// TODO: discard this protocol prefix removal
// by a util func that returns
lookupServiceAddress(serviceUrl)
if (leader.isPresent()) {
- String broker = leader.get().getServiceUrl();
- broker = broker.substring(broker.lastIndexOf('/') + 1);
- return Optional.of(broker);
+ return Optional.of(leader.get().getServiceUrl());
Review Comment:
Do we keep the prefix here?
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
+import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
+import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter;
+import
org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+
+@Slf4j
+public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
+
+ private PulsarService pulsar;
+
+ private ServiceConfiguration conf;
+
+ @Getter
+ private BrokerRegistry brokerRegistry;
+
+ private ServiceUnitStateChannel serviceUnitStateChannel;
+
+ @Getter
+ private LoadManagerContext context;
+
+ @Getter
+ private final BrokerSelectionStrategy brokerSelectionStrategy;
+
+ @Getter
+ private List<BrokerFilter> brokerFilterPipeline;
+
+ private final AtomicBoolean started = new AtomicBoolean(false);
+
+ private final ConcurrentOpenHashMap<String,
CompletableFuture<Optional<BrokerLookupData>>>
+ lookupRequests = ConcurrentOpenHashMap.<String,
+ CompletableFuture<Optional<BrokerLookupData>>>newBuilder()
+ .build();
+
+ /**
+ * Life cycle: Constructor -> initialize -> start -> close.
+ */
+ public ExtensibleLoadManagerImpl() {
+ this.brokerFilterPipeline = new ArrayList<>();
+ this.brokerSelectionStrategy = (brokers, bundle, context) -> {
+ if (brokers.isEmpty()) {
+ return Optional.empty();
+ }
+ return
Optional.of(brokers.get(ThreadLocalRandom.current().nextInt(brokers.size())));
+ };
+ }
+
+ @Override
+ public void start() throws PulsarServerException {
+ if (this.started.get()) {
+ return;
+ }
+ this.brokerRegistry = new BrokerRegistryImpl(pulsar);
+ this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar);
+ this.brokerRegistry.start();
+ this.serviceUnitStateChannel.start();
+
+ // TODO: Start the load data store.
+
+ this.context = LoadManagerContextImpl.builder()
+ .configuration(conf)
+ .brokerRegistry(brokerRegistry)
+ .brokerLoadDataStore(null)
+ .topBundleLoadDataStore(null).build();
+ // TODO: Start load data reporter.
+
+ // TODO: Start unload scheduler and bundle split scheduler
+
+ this.started.set(true);
+ }
+
+ @Override
+ public void initialize(PulsarService pulsar) {
+ this.pulsar = pulsar;
+ this.conf = pulsar.getConfiguration();
+ }
+
+ @Override
+ public CompletableFuture<Optional<BrokerLookupData>>
assign(Optional<ServiceUnitId> topic,
+ ServiceUnitId
serviceUnit) {
+
+ final String bundle = serviceUnit.toString();
+
+ CompletableFuture<Optional<BrokerLookupData>> future =
lookupRequests.computeIfAbsent(bundle, k -> {
+ final CompletableFuture<Optional<String>> owner;
+ // Assign the bundle to channel owner if is internal topic, to
avoid circular references.
+ if (topic.isPresent() && isInternalTopic(topic.get().toString())) {
+ owner = serviceUnitStateChannel.getChannelOwnerAsync();
+ } else {
+ owner =
serviceUnitStateChannel.getOwnerAsync(bundle).thenCompose(broker -> {
+ // If the bundle not assign yet, select and publish assign
event to channel.
+ if (broker.isEmpty()) {
+ return
this.selectAsync(serviceUnit).thenCompose(brokerOpt -> {
+ if (brokerOpt.isPresent()) {
+ log.info("Selected new owner broker: {} for
bundle: {}.", brokerOpt.get(), bundle);
+ return
serviceUnitStateChannel.publishAssignEventAsync(bundle, brokerOpt.get());
+ } else {
+ throw new IllegalStateException(
+ "Failed to discover(select) the new
owner broker for bundle: " + bundle);
+ }
+ });
+ }
+ // Already assigned, return it.
+ return CompletableFuture.completedFuture(broker);
+ });
+ }
+
+ return owner.thenCompose(broker -> {
+ if (broker.isEmpty()) {
+ String errorMsg = String.format(
+ "Failed to look up a broker registry:%s for
bundle:%s", broker, bundle);
+ log.error(errorMsg);
+ throw new IllegalStateException(errorMsg);
+ }
+ return CompletableFuture.completedFuture(broker.get());
+ }).thenCompose(broker ->
this.getBrokerRegistry().lookupAsync(broker).thenCompose(brokerLookupData -> {
+ if (brokerLookupData.isEmpty()) {
+ String errorMsg = String.format(
+ "Failed to look up a broker registry:%s for
bundle:%s", broker, bundle);
+ log.error(errorMsg);
+ throw new IllegalStateException(errorMsg);
+ }
+ return CompletableFuture.completedFuture(brokerLookupData);
+ }));
+ });
+ future.whenComplete((r, t) -> lookupRequests.remove(bundle));
+ return future;
+ }
+
+ public CompletableFuture<Optional<String>> selectAsync(ServiceUnitId
bundle) {
+ BrokerRegistry brokerRegistry = getBrokerRegistry();
+ return brokerRegistry.getAvailableBrokerLookupDataAsync()
+ .thenCompose(availableBrokers -> {
+ // TODO: Support isolation policies
+ LoadManagerContext context = this.getContext();
+
+ // Filter out brokers that do not meet the rules.
+ List<BrokerFilter> filterPipeline =
getBrokerFilterPipeline();
+ Map<String, BrokerLookupData> availableBrokerCandidates =
new HashMap<>(availableBrokers);
+ for (final BrokerFilter filter : filterPipeline) {
+ try {
+ filter.filter(availableBrokerCandidates, context);
+ } catch (BrokerFilterException e) {
+ availableBrokerCandidates = availableBrokers;
+ }
+ }
+ if (availableBrokerCandidates.isEmpty()) {
+ return
CompletableFuture.completedFuture(Optional.empty());
+ }
+ ArrayList<String> candidateBrokers = new
ArrayList<>(availableBrokerCandidates.keySet());
Review Comment:
Isn't availableBrokerCandidates already copied?
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.loadbalance.LoadManager;
+import org.apache.pulsar.broker.loadbalance.ResourceUnit;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
+
+public class ExtensibleLoadManagerWrapper implements LoadManager {
+
+ private PulsarService pulsar;
+
+ private final ExtensibleLoadManagerImpl loadManager;
+
+ public ExtensibleLoadManagerWrapper(ExtensibleLoadManagerImpl loadManager)
{
+ this.loadManager = loadManager;
+ }
+
+ @Override
+ public void start() throws PulsarServerException {
+ loadManager.start();
+ }
+
+ @Override
+ public void initialize(PulsarService pulsar) {
+ loadManager.initialize(pulsar);
+ this.pulsar = pulsar;
+ }
+
+ @Override
+ public boolean isCentralized() {
+ return true;
+ }
+
+ @Override
+ public CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
+ Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
+ return loadManager.assign(topic, bundle)
+ .thenApply(lookupData ->
lookupData.map(BrokerLookupData::toLookupResult));
+ }
+
+ @Override
+ public CompletableFuture<Boolean>
checkOwnershipAsync(Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
+ return loadManager.checkOwnershipAsync(topic, bundle);
+ }
+
+ @Override
+ public void disableBroker() throws Exception {
+ this.loadManager.getBrokerRegistry().unregister();
+ }
+
+ @Override
+ public Set<String> getAvailableBrokers() throws Exception {
+ return getAvailableBrokersAsync()
+
.get(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(),
TimeUnit.SECONDS);
+ }
+
+ @Override
+ public CompletableFuture<Set<String>> getAvailableBrokersAsync() {
+ return
this.loadManager.getBrokerRegistry().getAvailableBrokersAsync().thenApply(HashSet::new);
+ }
+
+ @Override
+ public void stop() throws PulsarServerException {
+ this.loadManager.close();
+ }
+
+
+ @Override
+ public Optional<ResourceUnit> getLeastLoaded(ServiceUnitId su) throws
Exception {
+ return Optional.empty();
+ }
+
+ @Override
+ public LoadManagerReport generateLoadReport() {
+ return null;
+ }
+
+ @Override
+ public void setLoadReportForceUpdateFlag() {
+ // No-op.
+ }
+
+ @Override
+ public void writeLoadReportOnZookeeper() throws Exception {
+ // No-op, this operation is not useful, the load data reporter will
automatically write.
+ }
+
+ @Override
+ public void writeResourceQuotasToZooKeeper() throws Exception {
+ // No-op, this operation is not useful, the load data reporter will
automatically write.
+ }
+
+ @Override
+ public List<Metrics> getLoadBalancingMetrics() {
+ return null;
+ }
+
+ @Override
+ public void doLoadShedding() {
+ // No-op.
+ }
+
+ @Override
+ public void doNamespaceBundleSplit() {
+ // No-op.
Review Comment:
Throw UnsupportedOperationException.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
+import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
+import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter;
+import
org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+
+@Slf4j
+public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
+
+ private PulsarService pulsar;
+
+ private ServiceConfiguration conf;
+
+ @Getter
+ private BrokerRegistry brokerRegistry;
+
+ private ServiceUnitStateChannel serviceUnitStateChannel;
+
+ @Getter
+ private LoadManagerContext context;
+
+ @Getter
+ private final BrokerSelectionStrategy brokerSelectionStrategy;
+
+ @Getter
+ private List<BrokerFilter> brokerFilterPipeline;
+
+ private final AtomicBoolean started = new AtomicBoolean(false);
+
+ private final ConcurrentOpenHashMap<String,
CompletableFuture<Optional<BrokerLookupData>>>
+ lookupRequests = ConcurrentOpenHashMap.<String,
+ CompletableFuture<Optional<BrokerLookupData>>>newBuilder()
+ .build();
+
+ /**
+ * Life cycle: Constructor -> initialize -> start -> close.
+ */
+ public ExtensibleLoadManagerImpl() {
+ this.brokerFilterPipeline = new ArrayList<>();
+ this.brokerSelectionStrategy = (brokers, bundle, context) -> {
+ if (brokers.isEmpty()) {
+ return Optional.empty();
+ }
+ return
Optional.of(brokers.get(ThreadLocalRandom.current().nextInt(brokers.size())));
+ };
+ }
+
+ @Override
+ public void start() throws PulsarServerException {
+ if (this.started.get()) {
+ return;
+ }
+ this.brokerRegistry = new BrokerRegistryImpl(pulsar);
+ this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar);
+ this.brokerRegistry.start();
+ this.serviceUnitStateChannel.start();
+
+ // TODO: Start the load data store.
+
+ this.context = LoadManagerContextImpl.builder()
+ .configuration(conf)
+ .brokerRegistry(brokerRegistry)
+ .brokerLoadDataStore(null)
+ .topBundleLoadDataStore(null).build();
+ // TODO: Start load data reporter.
+
+ // TODO: Start unload scheduler and bundle split scheduler
+
+ this.started.set(true);
+ }
+
+ @Override
+ public void initialize(PulsarService pulsar) {
+ this.pulsar = pulsar;
+ this.conf = pulsar.getConfiguration();
+ }
+
+ @Override
+ public CompletableFuture<Optional<BrokerLookupData>>
assign(Optional<ServiceUnitId> topic,
+ ServiceUnitId
serviceUnit) {
+
+ final String bundle = serviceUnit.toString();
+
+ CompletableFuture<Optional<BrokerLookupData>> future =
lookupRequests.computeIfAbsent(bundle, k -> {
+ final CompletableFuture<Optional<String>> owner;
+ // Assign the bundle to channel owner if is internal topic, to
avoid circular references.
+ if (topic.isPresent() && isInternalTopic(topic.get().toString())) {
+ owner = serviceUnitStateChannel.getChannelOwnerAsync();
+ } else {
+ owner =
serviceUnitStateChannel.getOwnerAsync(bundle).thenCompose(broker -> {
+ // If the bundle not assign yet, select and publish assign
event to channel.
+ if (broker.isEmpty()) {
+ return
this.selectAsync(serviceUnit).thenCompose(brokerOpt -> {
+ if (brokerOpt.isPresent()) {
+ log.info("Selected new owner broker: {} for
bundle: {}.", brokerOpt.get(), bundle);
+ return
serviceUnitStateChannel.publishAssignEventAsync(bundle, brokerOpt.get());
+ } else {
+ throw new IllegalStateException(
+ "Failed to discover(select) the new
owner broker for bundle: " + bundle);
+ }
+ });
+ }
+ // Already assigned, return it.
+ return CompletableFuture.completedFuture(broker);
+ });
+ }
+
+ return owner.thenCompose(broker -> {
+ if (broker.isEmpty()) {
+ String errorMsg = String.format(
+ "Failed to look up a broker registry:%s for
bundle:%s", broker, bundle);
+ log.error(errorMsg);
+ throw new IllegalStateException(errorMsg);
+ }
+ return CompletableFuture.completedFuture(broker.get());
+ }).thenCompose(broker ->
this.getBrokerRegistry().lookupAsync(broker).thenCompose(brokerLookupData -> {
+ if (brokerLookupData.isEmpty()) {
+ String errorMsg = String.format(
+ "Failed to look up a broker registry:%s for
bundle:%s", broker, bundle);
+ log.error(errorMsg);
+ throw new IllegalStateException(errorMsg);
+ }
+ return CompletableFuture.completedFuture(brokerLookupData);
+ }));
+ });
+ future.whenComplete((r, t) -> lookupRequests.remove(bundle));
+ return future;
+ }
+
+ public CompletableFuture<Optional<String>> selectAsync(ServiceUnitId
bundle) {
+ BrokerRegistry brokerRegistry = getBrokerRegistry();
+ return brokerRegistry.getAvailableBrokerLookupDataAsync()
+ .thenCompose(availableBrokers -> {
+ // TODO: Support isolation policies
+ LoadManagerContext context = this.getContext();
+
+ // Filter out brokers that do not meet the rules.
+ List<BrokerFilter> filterPipeline =
getBrokerFilterPipeline();
+ Map<String, BrokerLookupData> availableBrokerCandidates =
new HashMap<>(availableBrokers);
Review Comment:
Isn't availableBrokers already copied from the cache?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]