This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5e6e6cebcdb [feat][broker]PIP-255 Part-1: Add listener interface for
namespace service (#20406)
5e6e6cebcdb is described below
commit 5e6e6cebcdbeec32ed49729f658f2d5cd0d98347
Author: hleecs <[email protected]>
AuthorDate: Thu Jun 1 12:36:46 2023 +0800
[feat][broker]PIP-255 Part-1: Add listener interface for namespace service
(#20406)
---
.../namespace/NamespaceBundleSplitListener.java | 29 +++++++++++++++++++
.../pulsar/broker/namespace/NamespaceService.java | 27 ++++++++++++++++++
.../namespace/NamespaceCreateBundlesTest.java | 33 ++++++++++++++++++++++
3 files changed, 89 insertions(+)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceBundleSplitListener.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceBundleSplitListener.java
new file mode 100644
index 00000000000..a3312f5689e
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceBundleSplitListener.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.namespace;
+
+import java.util.function.Predicate;
+import org.apache.pulsar.common.naming.NamespaceBundle;
+
+/**
+ * Listener for <code>NamespaceBundle</code> split.
+ */
+public interface NamespaceBundleSplitListener extends
Predicate<NamespaceBundle> {
+ void onSplit(NamespaceBundle bundle);
+}
\ No newline at end of file
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index e2d4ef51537..cf969460c33 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -139,6 +139,10 @@ public class NamespaceService implements AutoCloseable {
private final ConcurrentOpenHashMap<ClusterDataImpl, PulsarClientImpl>
namespaceClients;
private final List<NamespaceBundleOwnershipListener>
bundleOwnershipListeners;
+
+ private final List<NamespaceBundleSplitListener> bundleSplitListeners;
+
+
private final RedirectManager redirectManager;
@@ -167,6 +171,7 @@ public class NamespaceService implements AutoCloseable {
this.namespaceClients =
ConcurrentOpenHashMap.<ClusterDataImpl,
PulsarClientImpl>newBuilder().build();
this.bundleOwnershipListeners = new CopyOnWriteArrayList<>();
+ this.bundleSplitListeners = new CopyOnWriteArrayList<>();
this.localBrokerDataCache =
pulsar.getLocalMetadataStore().getMetadataCache(LocalBrokerData.class);
this.redirectManager = new RedirectManager(pulsar);
}
@@ -975,6 +980,7 @@ public class NamespaceService implements AutoCloseable {
// affect the split operation which is already
safely completed
r.forEach(this::unloadNamespaceBundle);
}
+ onNamespaceBundleSplit(bundle);
})
.exceptionally(e -> {
String msg1 = format(
@@ -1230,6 +1236,18 @@ public class NamespaceService implements AutoCloseable {
}
}
+ protected void onNamespaceBundleSplit(NamespaceBundle bundle) {
+ for (NamespaceBundleSplitListener bundleSplitListener :
bundleSplitListeners) {
+ try {
+ if (bundleSplitListener.test(bundle)) {
+ bundleSplitListener.onSplit(bundle);
+ }
+ } catch (Throwable t) {
+ LOG.error("Call bundle {} split listener {} error", bundle,
bundleSplitListener, t);
+ }
+ }
+ }
+
public void
addNamespaceBundleOwnershipListener(NamespaceBundleOwnershipListener...
listeners) {
Objects.requireNonNull(listeners);
for (NamespaceBundleOwnershipListener listener : listeners) {
@@ -1240,6 +1258,15 @@ public class NamespaceService implements AutoCloseable {
getOwnedServiceUnits().forEach(bundle ->
notifyNamespaceBundleOwnershipListener(bundle, listeners));
}
+ public void
addNamespaceBundleSplitListener(NamespaceBundleSplitListener... listeners) {
+ Objects.requireNonNull(listeners);
+ for (NamespaceBundleSplitListener listener : listeners) {
+ if (listener != null) {
+ bundleSplitListeners.add(listener);
+ }
+ }
+ }
+
private void notifyNamespaceBundleOwnershipListener(NamespaceBundle bundle,
NamespaceBundleOwnershipListener... listeners) {
if (listeners != null) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceCreateBundlesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceCreateBundlesTest.java
index 43d37466918..73cfaf1b0d9 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceCreateBundlesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceCreateBundlesTest.java
@@ -20,15 +20,21 @@ package org.apache.pulsar.broker.namespace;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+import lombok.Cleanup;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
import org.apache.pulsar.common.policies.data.Policies;
+import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -81,4 +87,31 @@ public class NamespaceCreateBundlesTest extends
BrokerTestBase {
assertNotNull(admin.namespaces().getBookieAffinityGroup(namespaceName));
producer.close();
}
+
+ @Test
+ public void testBundleSplitListener() throws Exception {
+ String namespaceName = "prop/" + UUID.randomUUID().toString();
+ String topicName = "persistent://" + namespaceName + "/my-topic5";
+ admin.namespaces().createNamespace(namespaceName);
+ @Cleanup
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName).sendTimeout(1,
+ TimeUnit.SECONDS).create();
+ producer.send(new byte[1]);
+ String bundleRange = admin.lookups().getBundleRange(topicName);
+ AtomicBoolean isTriggered = new AtomicBoolean(false);
+ pulsar.getNamespaceService().addNamespaceBundleSplitListener(new
NamespaceBundleSplitListener() {
+ @Override
+ public void onSplit(NamespaceBundle bundle) {
+ assertEquals(bundleRange, bundle.getBundleRange());
+ isTriggered.set(true);
+ }
+
+ @Override
+ public boolean test(NamespaceBundle namespaceBundle) {
+ return true;
+ }
+ });
+ admin.namespaces().splitNamespaceBundle(namespaceName, bundleRange,
false, null);
+ Awaitility.await().untilAsserted(() -> assertTrue(isTriggered.get()));
+ }
}