This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new b05d0d1ef19 [improve][broker] Register the broker to metadata store 
without version id compare (#23298)
b05d0d1ef19 is described below

commit b05d0d1ef194c91aea3fb8d4382c403f80a60b7e
Author: Yunze Xu <xyzinfern...@163.com>
AuthorDate: Fri Sep 13 19:07:12 2024 +0800

    [improve][broker] Register the broker to metadata store without version id 
compare (#23298)
    
    (cherry picked from commit 13c19b50216ba7e73766e6fa7b57d2700614e3b5)
---
 .../loadbalance/extensions/BrokerRegistryImpl.java | 44 +++++++++++-----------
 .../loadbalance/extensions/BrokerRegistryTest.java | 14 +++----
 .../apache/pulsar/metadata/api/MetadataCache.java  | 20 ++++++++++
 .../metadata/cache/impl/MetadataCacheImpl.java     | 24 ++++++++++++
 .../apache/pulsar/metadata/MetadataCacheTest.java  | 25 ++++++++++++
 5 files changed, 96 insertions(+), 31 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
index 18e30ddf922..44f6287b2e1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
@@ -21,11 +21,11 @@ package org.apache.pulsar.broker.loadbalance.extensions;
 import static 
org.apache.pulsar.broker.loadbalance.LoadManager.LOADBALANCE_BROKERS_ROOT;
 import com.google.common.annotations.VisibleForTesting;
 import java.util.ArrayList;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.RejectedExecutionException;
@@ -39,11 +39,11 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.Notification;
 import org.apache.pulsar.metadata.api.NotificationType;
-import org.apache.pulsar.metadata.api.coordination.LockManager;
-import org.apache.pulsar.metadata.api.coordination.ResourceLock;
+import org.apache.pulsar.metadata.api.extended.CreateOption;
 
 /**
  * The broker registry impl, base on the LockManager.
@@ -57,16 +57,14 @@ public class BrokerRegistryImpl implements BrokerRegistry {
 
     private final BrokerLookupData brokerLookupData;
 
-    private final LockManager<BrokerLookupData> brokerLookupDataLockManager;
+    private final MetadataCache<BrokerLookupData> 
brokerLookupDataMetadataCache;
 
-    private final String brokerId;
+    private final String brokerIdKeyPath;
 
     private final ScheduledExecutorService scheduler;
 
     private final List<BiConsumer<String, NotificationType>> listeners;
 
-    private volatile ResourceLock<BrokerLookupData> brokerLookupDataLock;
-
     protected enum State {
         Init,
         Started,
@@ -79,10 +77,10 @@ public class BrokerRegistryImpl implements BrokerRegistry {
     public BrokerRegistryImpl(PulsarService pulsar) {
         this.pulsar = pulsar;
         this.conf = pulsar.getConfiguration();
-        this.brokerLookupDataLockManager = 
pulsar.getCoordinationService().getLockManager(BrokerLookupData.class);
+        this.brokerLookupDataMetadataCache = 
pulsar.getLocalMetadataStore().getMetadataCache(BrokerLookupData.class);
         this.scheduler = pulsar.getLoadManagerExecutor();
         this.listeners = new ArrayList<>();
-        this.brokerId = pulsar.getBrokerId();
+        this.brokerIdKeyPath = keyPath(pulsar.getBrokerId());
         this.brokerLookupData = new BrokerLookupData(
                 pulsar.getWebServiceAddress(),
                 pulsar.getWebServiceAddressTls(),
@@ -121,7 +119,7 @@ public class BrokerRegistryImpl implements BrokerRegistry {
     public synchronized void register() throws MetadataStoreException {
         if (this.state == State.Started) {
             try {
-                this.brokerLookupDataLock = 
brokerLookupDataLockManager.acquireLock(keyPath(brokerId), brokerLookupData)
+                brokerLookupDataMetadataCache.put(brokerIdKeyPath, 
brokerLookupData, EnumSet.of(CreateOption.Ephemeral))
                         .get(conf.getMetadataStoreOperationTimeoutSeconds(), 
TimeUnit.SECONDS);
                 this.state = State.Registered;
             } catch (InterruptedException | ExecutionException | 
TimeoutException e) {
@@ -134,30 +132,37 @@ public class BrokerRegistryImpl implements BrokerRegistry 
{
     public synchronized void unregister() throws MetadataStoreException {
         if (this.state == State.Registered) {
             try {
-                this.brokerLookupDataLock.release()
+                brokerLookupDataMetadataCache.delete(brokerIdKeyPath)
                         .get(conf.getMetadataStoreOperationTimeoutSeconds(), 
TimeUnit.SECONDS);
-                this.state = State.Started;
-            } catch (CompletionException | InterruptedException | 
ExecutionException | TimeoutException e) {
+            } catch (ExecutionException e) {
+                if (e.getCause() instanceof 
MetadataStoreException.NotFoundException) {
+                    log.warn("{} has already been unregistered", 
brokerIdKeyPath);
+                } else {
+                    throw MetadataStoreException.unwrap(e);
+                }
+            } catch (InterruptedException | TimeoutException e) {
                 throw MetadataStoreException.unwrap(e);
+            } finally {
+                this.state = State.Started;
             }
         }
     }
 
     @Override
     public String getBrokerId() {
-        return this.brokerId;
+        return pulsar.getBrokerId();
     }
 
     @Override
     public CompletableFuture<List<String>> getAvailableBrokersAsync() {
         this.checkState();
-        return 
brokerLookupDataLockManager.listLocks(LOADBALANCE_BROKERS_ROOT).thenApply(ArrayList::new);
+        return 
brokerLookupDataMetadataCache.getChildren(LOADBALANCE_BROKERS_ROOT);
     }
 
     @Override
     public CompletableFuture<Optional<BrokerLookupData>> lookupAsync(String 
broker) {
         this.checkState();
-        return brokerLookupDataLockManager.readLock(keyPath(broker));
+        return brokerLookupDataMetadataCache.get(keyPath(broker));
     }
 
     public CompletableFuture<Map<String, BrokerLookupData>> 
getAvailableBrokerLookupDataAsync() {
@@ -191,13 +196,8 @@ public class BrokerRegistryImpl implements BrokerRegistry {
         try {
             this.listeners.clear();
             this.unregister();
-            this.brokerLookupDataLockManager.close();
         } catch (Exception ex) {
-            if (ex.getCause() instanceof 
MetadataStoreException.NotFoundException) {
-                throw new 
PulsarServerException.NotFoundException(MetadataStoreException.unwrap(ex));
-            } else {
-                throw new 
PulsarServerException(MetadataStoreException.unwrap(ex));
-            }
+            log.error("Unexpected error when unregistering the broker 
registry", ex);
         } finally {
             this.state = State.Closed;
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
index 42600a42035..91ada90dda6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
@@ -291,7 +291,7 @@ public class BrokerRegistryTest {
     }
 
     @Test
-    public void testRegisterFailWithSameBrokerId() throws Exception {
+    public void testRegisterWithSameBrokerId() throws Exception {
         PulsarService pulsar1 = createPulsarService();
         PulsarService pulsar2 = createPulsarService();
         pulsar1.start();
@@ -301,14 +301,10 @@ public class BrokerRegistryTest {
         BrokerRegistryImpl brokerRegistry1 = createBrokerRegistryImpl(pulsar1);
         BrokerRegistryImpl brokerRegistry2 = createBrokerRegistryImpl(pulsar2);
         brokerRegistry1.start();
-        try {
-            brokerRegistry2.start();
-            fail();
-        } catch (Exception ex) {
-            log.info("Broker registry start failed.", ex);
-            assertTrue(ex instanceof PulsarServerException);
-            assertTrue(ex.getMessage().contains("LockBusyException"));
-        }
+        brokerRegistry2.start();
+
+        pulsar1.close();
+        pulsar2.close();
     }
 
     @Test
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java
index 6d558e70971..8e153b23d30 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java
@@ -18,12 +18,14 @@
  */
 package org.apache.pulsar.metadata.api;
 
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 import 
org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
 import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
+import org.apache.pulsar.metadata.api.extended.CreateOption;
 
 /**
  * Represent the caching layer access for a specific type of objects.
@@ -128,6 +130,24 @@ public interface MetadataCache<T> {
      */
     CompletableFuture<Void> create(String path, T value);
 
+    /**
+     * Create or update the value of the given path in the metadata store 
without version comparison.
+     * <p>
+     * This method is equivalent to
+     * {@link 
org.apache.pulsar.metadata.api.extended.MetadataStoreExtended#put(String, 
byte[], Optional, EnumSet)} or
+     * {@link MetadataStore#put(String, byte[], Optional)} if the metadata 
store does not support this extended API,
+     * with `Optional.empty()` as the 3rd argument. It means if the path does 
not exist, it will be created. If the path
+     * already exists, the new value will override the old value.
+     * </p>
+     * @param path the path of the object in the metadata store
+     * @param value the object to put in the metadata store
+     * @param options the create options if the path does not in the metadata 
store
+     * @return the future that indicates if this operation failed, it could 
fail with
+     *   {@link java.io.IOException} if the value failed to be serialized
+     *   {@link MetadataStoreException} if the metadata store operation failed
+     */
+    CompletableFuture<Void> put(String path, T value, EnumSet<CreateOption> 
options);
+
     /**
      * Delete an object from the metadata store.
      * <p>
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
index b9051a7dc7d..ee394b0267c 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
@@ -25,6 +25,7 @@ import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -47,12 +48,15 @@ import 
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException
 import 
org.apache.pulsar.metadata.api.MetadataStoreException.ContentDeserializationException;
 import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
 import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.extended.CreateOption;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
 
 @Slf4j
 public class MetadataCacheImpl<T> implements MetadataCache<T>, 
Consumer<Notification> {
     @Getter
     private final MetadataStore store;
+    private final MetadataStoreExtended storeExtended;
     private final MetadataSerde<T> serde;
 
     private final AsyncLoadingCache<String, Optional<CacheGetResult<T>>> 
objCache;
@@ -67,6 +71,11 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
 
     public MetadataCacheImpl(MetadataStore store, MetadataSerde<T> serde, 
MetadataCacheConfig cacheConfig) {
         this.store = store;
+        if (store instanceof MetadataStoreExtended) {
+            this.storeExtended = (MetadataStoreExtended) store;
+        } else {
+            this.storeExtended = null;
+        }
         this.serde = serde;
 
         Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder();
@@ -243,6 +252,21 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
         return future;
     }
 
+    @Override
+    public CompletableFuture<Void> put(String path, T value, 
EnumSet<CreateOption> options) {
+        final byte[] bytes;
+        try {
+            bytes = serde.serialize(path, value);
+        } catch (IOException e) {
+            return CompletableFuture.failedFuture(e);
+        }
+        if (storeExtended != null) {
+            return storeExtended.put(path, bytes, Optional.empty(), 
options).thenAccept(__ -> refresh(path));
+        } else {
+            return store.put(path, bytes, Optional.empty()).thenAccept(__ -> 
refresh(path));
+        }
+    }
+
     @Override
     public CompletableFuture<Void> delete(String path) {
         return store.delete(path, Optional.empty());
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
index df59d25bdcc..bac58073604 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
@@ -29,6 +29,7 @@ import com.fasterxml.jackson.core.type.TypeReference;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -55,6 +56,7 @@ import 
org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
 import org.apache.pulsar.metadata.api.MetadataStoreFactory;
 import org.apache.pulsar.metadata.api.NotificationType;
 import org.apache.pulsar.metadata.api.Stat;
+import org.apache.pulsar.metadata.api.extended.CreateOption;
 import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
 import org.awaitility.Awaitility;
 import org.testng.annotations.DataProvider;
@@ -597,4 +599,27 @@ public class MetadataCacheTest extends 
BaseMetadataStoreTest {
         assertEquals(res.getValue().b, 2);
         assertEquals(res.getValue().path, key1);
     }
+
+    @Test(dataProvider = "distributedImpl")
+    public void testPut(String provider, Supplier<String> urlSupplier) throws 
Exception {
+        @Cleanup final var store1 = 
MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder()
+                .build());
+        final var cache1 = store1.getMetadataCache(Integer.class);
+        @Cleanup final var store2 = 
MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder()
+                .build());
+        final var cache2 = store2.getMetadataCache(Integer.class);
+        final var key = "/testPut";
+
+        cache1.put(key, 1, EnumSet.of(CreateOption.Ephemeral)); // create
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(cache1.get(key).get().orElse(-1), 1);
+            assertEquals(cache2.get(key).get().orElse(-1), 1);
+        });
+
+        cache2.put(key, 2, EnumSet.of(CreateOption.Ephemeral)); // update
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(cache1.get(key).get().orElse(-1), 2);
+            assertEquals(cache2.get(key).get().orElse(-1), 2);
+        });
+    }
 }

Reply via email to