This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e6e6cebcdb [feat][broker]PIP-255 Part-1: Add listener interface for 
namespace service (#20406)
5e6e6cebcdb is described below

commit 5e6e6cebcdbeec32ed49729f658f2d5cd0d98347
Author: hleecs <[email protected]>
AuthorDate: Thu Jun 1 12:36:46 2023 +0800

    [feat][broker]PIP-255 Part-1: Add listener interface for namespace service 
(#20406)
---
 .../namespace/NamespaceBundleSplitListener.java    | 29 +++++++++++++++++++
 .../pulsar/broker/namespace/NamespaceService.java  | 27 ++++++++++++++++++
 .../namespace/NamespaceCreateBundlesTest.java      | 33 ++++++++++++++++++++++
 3 files changed, 89 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceBundleSplitListener.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceBundleSplitListener.java
new file mode 100644
index 00000000000..a3312f5689e
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceBundleSplitListener.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.namespace;
+
+import java.util.function.Predicate;
+import org.apache.pulsar.common.naming.NamespaceBundle;
+
+/**
+ * Listener for <code>NamespaceBundle</code> split.
+ */
+public interface NamespaceBundleSplitListener extends 
Predicate<NamespaceBundle> {
+    void onSplit(NamespaceBundle bundle);
+}
\ No newline at end of file
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index e2d4ef51537..cf969460c33 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -139,6 +139,10 @@ public class NamespaceService implements AutoCloseable {
     private final ConcurrentOpenHashMap<ClusterDataImpl, PulsarClientImpl> 
namespaceClients;
 
     private final List<NamespaceBundleOwnershipListener> 
bundleOwnershipListeners;
+
+    private final List<NamespaceBundleSplitListener> bundleSplitListeners;
+
+
     private final RedirectManager redirectManager;
 
 
@@ -167,6 +171,7 @@ public class NamespaceService implements AutoCloseable {
         this.namespaceClients =
                 ConcurrentOpenHashMap.<ClusterDataImpl, 
PulsarClientImpl>newBuilder().build();
         this.bundleOwnershipListeners = new CopyOnWriteArrayList<>();
+        this.bundleSplitListeners = new CopyOnWriteArrayList<>();
         this.localBrokerDataCache = 
pulsar.getLocalMetadataStore().getMetadataCache(LocalBrokerData.class);
         this.redirectManager = new RedirectManager(pulsar);
     }
@@ -975,6 +980,7 @@ public class NamespaceService implements AutoCloseable {
                                 // affect the split operation which is already 
safely completed
                                 r.forEach(this::unloadNamespaceBundle);
                             }
+                            onNamespaceBundleSplit(bundle);
                         })
                         .exceptionally(e -> {
                             String msg1 = format(
@@ -1230,6 +1236,18 @@ public class NamespaceService implements AutoCloseable {
         }
     }
 
+    protected void onNamespaceBundleSplit(NamespaceBundle bundle) {
+        for (NamespaceBundleSplitListener bundleSplitListener : 
bundleSplitListeners) {
+            try {
+                if (bundleSplitListener.test(bundle)) {
+                    bundleSplitListener.onSplit(bundle);
+                }
+            } catch (Throwable t) {
+                LOG.error("Call bundle {} split listener {} error", bundle, 
bundleSplitListener, t);
+            }
+        }
+    }
+
     public void 
addNamespaceBundleOwnershipListener(NamespaceBundleOwnershipListener... 
listeners) {
         Objects.requireNonNull(listeners);
         for (NamespaceBundleOwnershipListener listener : listeners) {
@@ -1240,6 +1258,15 @@ public class NamespaceService implements AutoCloseable {
         getOwnedServiceUnits().forEach(bundle -> 
notifyNamespaceBundleOwnershipListener(bundle, listeners));
     }
 
+    public void 
addNamespaceBundleSplitListener(NamespaceBundleSplitListener... listeners) {
+        Objects.requireNonNull(listeners);
+        for (NamespaceBundleSplitListener listener : listeners) {
+            if (listener != null) {
+                bundleSplitListeners.add(listener);
+            }
+        }
+    }
+
     private void notifyNamespaceBundleOwnershipListener(NamespaceBundle bundle,
                     NamespaceBundleOwnershipListener... listeners) {
         if (listeners != null) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceCreateBundlesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceCreateBundlesTest.java
index 43d37466918..73cfaf1b0d9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceCreateBundlesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceCreateBundlesTest.java
@@ -20,15 +20,21 @@ package org.apache.pulsar.broker.namespace;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+import lombok.Cleanup;
 
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
 import org.apache.pulsar.common.policies.data.Policies;
+import org.awaitility.Awaitility;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -81,4 +87,31 @@ public class NamespaceCreateBundlesTest extends 
BrokerTestBase {
         
assertNotNull(admin.namespaces().getBookieAffinityGroup(namespaceName));
         producer.close();
     }
+
+    @Test
+    public void testBundleSplitListener() throws Exception {
+        String namespaceName = "prop/" + UUID.randomUUID().toString();
+        String topicName = "persistent://" + namespaceName + "/my-topic5";
+        admin.namespaces().createNamespace(namespaceName);
+        @Cleanup
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).sendTimeout(1,
+            TimeUnit.SECONDS).create();
+        producer.send(new byte[1]);
+        String bundleRange = admin.lookups().getBundleRange(topicName);
+        AtomicBoolean isTriggered = new AtomicBoolean(false);
+        pulsar.getNamespaceService().addNamespaceBundleSplitListener(new 
NamespaceBundleSplitListener() {
+            @Override
+            public void onSplit(NamespaceBundle bundle) {
+                assertEquals(bundleRange, bundle.getBundleRange());
+                isTriggered.set(true);
+            }
+
+            @Override
+            public boolean test(NamespaceBundle namespaceBundle) {
+                return true;
+            }
+        });
+        admin.namespaces().splitNamespaceBundle(namespaceName, bundleRange, 
false, null);
+        Awaitility.await().untilAsserted(() -> assertTrue(isTriggered.get()));
+    }
 }

Reply via email to