heesung-sn commented on code in PR #20058:
URL: https://github.com/apache/pulsar/pull/20058#discussion_r1161885291


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupData.java:
##########
@@ -67,6 +69,16 @@ public Optional<String> getProtocol(String protocol) {
         return Optional.ofNullable(this.protocols().get(protocol));
     }
 

Review Comment:
   Do we need to define these getters from the `record` class? I thought the 
getters are already provided.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLMClassFilter.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.filter;
+
+import java.util.Map;
+import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
+import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+
+public class BrokerLMClassFilter implements BrokerFilter {

Review Comment:
   Can't we rename this to `BrokerLoadManagerClassFilter`



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java:
##########
@@ -106,6 +107,8 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
 
     private static final long MONITOR_INTERVAL_IN_MILLIS = 120_000;
 
+    private static final String ELECTION_ROOT = 
"/loadbalance/extension/leader";

Review Comment:
   why do we need to change the leader znode path?(can't the same leader play 
both old and new LM leader?) can't we use the same path? `/loadbalance/leader` 



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BrokerLMClassFilter.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import java.util.Set;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.BrokerFilter;
+import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.policies.data.loadbalancer.BundleData;
+
+public class BrokerLMClassFilter implements BrokerFilter {
+
+    @Override
+    public void filter(Set<String> brokers, BundleData bundleToAssign,
+                       LoadData loadData,
+                       ServiceConfiguration conf) throws BrokerFilterException 
{
+        loadData.getBrokerData().forEach((key, value) -> {
+            if (!value.getLocalData().getLoadManagerClassName()
+                    .equals(conf.getLoadManagerClassName())) {

Review Comment:
   Similarly,
   during the ExtensionLM deployment, the brokers before deployment could still 
have the ModularLM class name in its config. In this case, the brokers will 
still assign bundles to the brokers with ModularLM class name.
   
   Are admins expected to update the loadbalance class name for all brokers 
first, and then start deployment?



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilterTestBase.java:
##########
@@ -115,6 +116,7 @@ public BrokerLookupData getLookupData(String version) {
         }};
         return new BrokerLookupData(
                 webServiceUrl, webServiceUrlTls, pulsarServiceUrl,
-                pulsarServiceUrlTls, advertisedListeners, protocols, true, 
true, version);
+                pulsarServiceUrlTls, advertisedListeners, protocols, true, 
true,
+                ExtensibleLoadManagerImpl.class.getName(), -1, version);

Review Comment:
   We should pass loadmanagerClassName as arg and cover brokers with different 
BrokerLookupData .loadmanagerClassName during rollback <--> deployment.
   
   case new LM deployment:
   1. brokers with new LM
   2. brokers with old LM
   
   case rollback to old LM:
   1. brokers with new LM
   2. brokers with old LM
   
   



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManager.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.manager;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.coordination.LockManager;
+import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
+
+@Slf4j
+public class RedirectManager {
+    protected static final String LOOKUP_DATA_PATH = "/loadbalance/brokers";
+
+    private final PulsarService pulsar;
+
+    private final LockManager<BrokerLookupData> brokerLookupDataLockManager;
+
+
+    public RedirectManager(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.brokerLookupDataLockManager = 
pulsar.getCoordinationService().getLockManager(BrokerLookupData.class);
+    }
+
+    public CompletableFuture<Map<String, BrokerLookupData>> 
getAvailableBrokerLookupDataAsync() {
+        return 
brokerLookupDataLockManager.listLocks(LOOKUP_DATA_PATH).thenCompose(availableBrokers
 -> {
+            Map<String, BrokerLookupData> map = new ConcurrentHashMap<>();
+            List<CompletableFuture<Void>> futures = new ArrayList<>();
+            for (String brokerId : availableBrokers) {
+                futures.add(this.brokerLookupDataLockManager.readLock(
+                        String.format("%s/%s", LOOKUP_DATA_PATH, 
brokerId)).thenAccept(lookupDataOpt -> {
+                    if (lookupDataOpt.isPresent()) {
+                        map.put(brokerId, lookupDataOpt.get());
+                    } else {
+                        log.warn("Got an empty lookup data, brokerId: {}", 
brokerId);
+                    }
+                }));
+            }
+
+            return FutureUtil.waitForAll(futures).thenApply(__ -> map);
+        });
+    }
+
+    public CompletableFuture<Optional<LookupResult>> 
findRedirectLookupResultAsync() {
+        String currentLMClassName = 
pulsar.getConfiguration().getLoadManagerClassName();
+        return getAvailableBrokerLookupDataAsync().thenApply(lookupDataMap -> {
+            if (lookupDataMap.isEmpty()) {
+                log.warn("No available broker found");
+                return Optional.empty();
+            }
+            AtomicReference<ServiceLookupData> serviceLookupData = new 
AtomicReference<>();

Review Comment:
   can we rename serviceLookupData to `latestServiceLookupData`?
   
   Also I think we need to have multiple candidate brokers to redirect to(and 
randomly select one among them). Redirecting to the single broker could be a 
bottleneck in case of thundering herd lookups.
   
   For this, probably we need two loops:
   First phase: find the latestServiceLookupData.
   Second phase: find the redirect candidates whose LoadManager class name is 
the same as  latestServiceLookupData. 
   
   



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLMClassFilter.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.filter;
+
+import java.util.Map;
+import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
+import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+
+public class BrokerLMClassFilter implements BrokerFilter {
+
+    public static final String FILTER_NAME = "broker_LM_class_filter";
+    @Override
+    public String name() {
+        return FILTER_NAME;
+    }
+
+    @Override
+    public Map<String, BrokerLookupData> filter(
+            Map<String, BrokerLookupData> brokers,
+            ServiceUnitId serviceUnit,
+            LoadManagerContext context)
+            throws BrokerFilterException {
+        if (brokers.isEmpty()) {
+            return brokers;
+        }
+        brokers.entrySet().removeIf(entry -> {
+            BrokerLookupData v = entry.getValue();
+            return 
!v.getLoadManagerClassName().equals(context.brokerConfiguration().getLoadManagerClassName());

Review Comment:
   During the rollback, the brokers before rollback could still have the 
Extension class name in its config. In this case, the brokers will still assign 
bundles to the brokers with Extension classname.
   
   Are admins expected to update the loadbalance class name for all brokers 
first, and then start deployment?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManager.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.manager;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.coordination.LockManager;
+import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
+
+@Slf4j
+public class RedirectManager {
+    protected static final String LOOKUP_DATA_PATH = "/loadbalance/brokers";
+
+    private final PulsarService pulsar;
+
+    private final LockManager<BrokerLookupData> brokerLookupDataLockManager;
+
+
+    public RedirectManager(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.brokerLookupDataLockManager = 
pulsar.getCoordinationService().getLockManager(BrokerLookupData.class);
+    }
+
+    public CompletableFuture<Map<String, BrokerLookupData>> 
getAvailableBrokerLookupDataAsync() {

Review Comment:
   Do we need this class? Why not use BrokerRegistry?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java:
##########
@@ -51,7 +51,7 @@
 @Slf4j
 public class BrokerRegistryImpl implements BrokerRegistry {
 
-    protected static final String LOOKUP_DATA_PATH = 
"/loadbalance/extension/brokers";
+    protected static final String LOOKUP_DATA_PATH = "/loadbalance/brokers";

Review Comment:
   Are the registry data compatible with both new and old lookup data? 



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManager.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.manager;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.coordination.LockManager;
+import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
+
+@Slf4j
+public class RedirectManager {
+    protected static final String LOOKUP_DATA_PATH = "/loadbalance/brokers";
+
+    private final PulsarService pulsar;
+
+    private final LockManager<BrokerLookupData> brokerLookupDataLockManager;
+
+
+    public RedirectManager(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.brokerLookupDataLockManager = 
pulsar.getCoordinationService().getLockManager(BrokerLookupData.class);
+    }
+
+    public CompletableFuture<Map<String, BrokerLookupData>> 
getAvailableBrokerLookupDataAsync() {
+        return 
brokerLookupDataLockManager.listLocks(LOOKUP_DATA_PATH).thenCompose(availableBrokers
 -> {
+            Map<String, BrokerLookupData> map = new ConcurrentHashMap<>();
+            List<CompletableFuture<Void>> futures = new ArrayList<>();
+            for (String brokerId : availableBrokers) {
+                futures.add(this.brokerLookupDataLockManager.readLock(
+                        String.format("%s/%s", LOOKUP_DATA_PATH, 
brokerId)).thenAccept(lookupDataOpt -> {
+                    if (lookupDataOpt.isPresent()) {
+                        map.put(brokerId, lookupDataOpt.get());
+                    } else {
+                        log.warn("Got an empty lookup data, brokerId: {}", 
brokerId);
+                    }
+                }));
+            }
+
+            return FutureUtil.waitForAll(futures).thenApply(__ -> map);
+        });
+    }
+
+    public CompletableFuture<Optional<LookupResult>> 
findRedirectLookupResultAsync() {
+        String currentLMClassName = 
pulsar.getConfiguration().getLoadManagerClassName();
+        return getAvailableBrokerLookupDataAsync().thenApply(lookupDataMap -> {
+            if (lookupDataMap.isEmpty()) {
+                log.warn("No available broker found");
+                return Optional.empty();
+            }
+            AtomicReference<ServiceLookupData> serviceLookupData = new 
AtomicReference<>();
+            AtomicLong lastStartTimestamp = new AtomicLong(0L);
+            lookupDataMap.forEach((key, value) -> {
+                if (lastStartTimestamp.get() < value.getStartTimestamp()) {
+                    lastStartTimestamp.set(value.getStartTimestamp());
+                    serviceLookupData.set(value);
+                }
+            });
+            if (serviceLookupData.get() == null) {
+                log.warn("No available broker found");
+                return Optional.empty();
+            }
+            if 
(serviceLookupData.get().getLoadManagerClassName().equals(currentLMClassName)) {
+                if (log.isDebugEnabled()) {

Review Comment:
   Can we use ExtensibleLoadManagerImpl.debug()?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to