This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3e27cc60eaf [fix][broker] PIP-468: Fix listener leak in
DagWatchSession (#25650)
3e27cc60eaf is described below
commit 3e27cc60eafd01549474fb89d3a81018cd26958e
Author: Matteo Merli <[email protected]>
AuthorDate: Sat May 2 07:43:26 2026 -0700
[fix][broker] PIP-468: Fix listener leak in DagWatchSession (#25650)
---
.../broker/resources/ScalableTopicResources.java | 132 +++++++++++-----
.../broker/service/scalable/DagWatchSession.java | 28 +++-
.../ScalableTopicListenerRegistryTest.java | 167 +++++++++++++++++++++
.../service/scalable/DagWatchSessionTest.java | 25 ++-
4 files changed, 305 insertions(+), 47 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
index 550212a4a19..7867fb90509 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
@@ -75,13 +75,26 @@ public class ScalableTopicResources extends
BaseResources<ScalableTopicMetadata>
private final MetadataCache<ConsumerRegistration>
consumerRegistrationCache;
/**
- * Per-namespace listeners for scalable topic create / modify / delete
events.
- * Keyed by listener so each subscriber can deregister cleanly on close.
The map
- * is consulted from the single store-level listener registered at
construction
- * time, eliminating the listener leak that would otherwise occur every
time a
- * watcher session ends (the metadata store API has no
- * {@code unregisterListener}). Mirrors the {@link TopicResources}
- * pattern for {@code TopicListener}.
+ * Per-path listeners for scalable-topic metadata events. Each listener
watches a
+ * single exact path (typically a topic record); the resources-level
fan-out
+ * dispatches notifications whose path equals the listener's registered
path.
+ * Used by {@link DagWatchSession}-style subscribers that want events for
one
+ * specific topic.
+ *
+ * <p>Hosted here — rather than letting each subscriber call
+ * {@code store.registerListener} directly — because {@code MetadataStore}
has no
+ * {@code unregisterListener}: per-subscriber direct registration would
leak a
+ * listener for the broker's lifetime every time a session ends, and every
+ * metadata notification would fan out to all stale listeners. Mirrors
+ * {@link TopicResources} for {@code TopicListener}.
+ */
+ private final Map<MetadataPathListener, String> pathListeners = new
ConcurrentHashMap<>();
+
+ /**
+ * Per-namespace listeners for scalable-topic create / modify / delete
events.
+ * Used by namespace-wide watchers (e.g. multi-topic consumer wrappers);
the
+ * fan-out matches direct children of the listener's namespace base path.
Same
+ * leak-avoidance rationale as {@link #pathListeners}.
*/
private final Map<NamespaceListener, NamespaceName> namespaceListeners =
new ConcurrentHashMap<>();
@@ -90,9 +103,9 @@ public class ScalableTopicResources extends
BaseResources<ScalableTopicMetadata>
super(store, ScalableTopicMetadata.class, operationTimeoutSec);
this.subscriptionCache =
store.getMetadataCache(SubscriptionMetadata.class);
this.consumerRegistrationCache =
store.getMetadataCache(ConsumerRegistration.class);
- // Single shared metadata-store listener fans out to every registered
watcher.
- // Per-watcher registration happens via registerNamespaceListener;
close() calls
- // deregisterNamespaceListener so closed watchers are not on the
dispatch list.
+ // Single shared metadata-store listener fans out to both per-path and
+ // per-namespace subscribers. Per-subscriber lifecycle goes through the
+ // register / deregister methods below.
if (store instanceof MetadataStoreExtended ext) {
ext.registerListener(this::handleNotification);
} else {
@@ -100,6 +113,37 @@ public class ScalableTopicResources extends
BaseResources<ScalableTopicMetadata>
}
}
+ // --- Per-path metadata listeners ---
+
+ /**
+ * Listener for metadata events on a specific scalable-topic-related path.
The
+ * fan-out in {@link ScalableTopicResources} compares each notification's
path
+ * against {@link #getMetadataPath()} and dispatches on exact match.
+ */
+ public interface MetadataPathListener {
+ /** Exact path this listener is interested in (no wildcard / prefix).
*/
+ String getMetadataPath();
+
+ /** Called for every metadata event on the listener's path. */
+ void onNotification(Notification notification);
+ }
+
+ /**
+ * Register a per-path metadata listener. Idempotent — re-registering the
same
+ * listener just refreshes its path mapping (e.g. if the listener moved
its path).
+ */
+ public void registerPathListener(MetadataPathListener listener) {
+ pathListeners.put(listener, listener.getMetadataPath());
+ }
+
+ /**
+ * Deregister a previously-registered listener. Safe to call multiple
times or for
+ * listeners that were never registered.
+ */
+ public void deregisterPathListener(MetadataPathListener listener) {
+ pathListeners.remove(listener);
+ }
+
// --- Namespace-level scalable-topics listeners ---
/**
@@ -135,35 +179,53 @@ public class ScalableTopicResources extends
BaseResources<ScalableTopicMetadata>
}
/**
- * Single fan-out path: for each registered listener, emit the
notification iff
- * its path is a direct child of the listener's namespace base path.
Filters out
- * subtree events (subscriptions, controller lock) up front.
+ * Single fan-out path. For each registered subscriber:
+ * <ul>
+ * <li>Path listener: dispatch when the notification's path equals the
listener's
+ * registered path.</li>
+ * <li>Namespace listener: dispatch when the notification's path is a
direct
+ * child of {@code /topics/<tenant>/<ns>} (skips subtree events like
+ * subscriptions / controller lock).</li>
+ * </ul>
*/
void handleNotification(Notification notification) {
- if (namespaceListeners.isEmpty()) {
- return;
- }
String path = notification.getPath();
- if (!path.startsWith(SCALABLE_TOPIC_PATH + "/")) {
- return;
- }
- for (Map.Entry<NamespaceListener, NamespaceName> entry :
namespaceListeners.entrySet()) {
- String basePath = namespacePath(entry.getValue());
- if (!path.startsWith(basePath + "/")) {
- continue;
- }
- // Direct child only — strip the prefix and check there's no
further '/'.
- String rest = path.substring(basePath.length() + 1);
- if (rest.indexOf('/') >= 0) {
- continue;
+
+ // Path listeners — exact match.
+ if (!pathListeners.isEmpty()) {
+ for (Map.Entry<MetadataPathListener, String> entry :
pathListeners.entrySet()) {
+ if (entry.getValue().equals(path)) {
+ try {
+ entry.getKey().onNotification(notification);
+ } catch (Exception e) {
+ log.warn().attr("listener", entry.getKey())
+ .attr("path", path)
+ .exceptionMessage(e)
+ .log("Failed to dispatch scalable-topic path
notification");
+ }
+ }
}
- try {
- entry.getKey().onNotification(notification);
- } catch (Exception e) {
- log.warn().attr("listener", entry.getKey())
- .attr("path", path)
- .exceptionMessage(e)
- .log("Failed to dispatch scalable-topic notification");
+ }
+
+ // Namespace listeners — direct child of /topics/<ns>.
+ if (!namespaceListeners.isEmpty() &&
path.startsWith(SCALABLE_TOPIC_PATH + "/")) {
+ for (Map.Entry<NamespaceListener, NamespaceName> entry :
namespaceListeners.entrySet()) {
+ String basePath = namespacePath(entry.getValue());
+ if (!path.startsWith(basePath + "/")) {
+ continue;
+ }
+ String rest = path.substring(basePath.length() + 1);
+ if (rest.indexOf('/') >= 0) {
+ continue;
+ }
+ try {
+ entry.getKey().onNotification(notification);
+ } catch (Exception e) {
+ log.warn().attr("listener", entry.getKey())
+ .attr("path", path)
+ .exceptionMessage(e)
+ .log("Failed to dispatch scalable-topic namespace
notification");
+ }
}
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java
index ef0f8947b50..1e7e87865a3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java
@@ -48,7 +48,7 @@ import org.apache.pulsar.metadata.api.NotificationType;
* <p>The session is tied to a connection. When the connection breaks, the
session dies.
* The client must reinitiate a new session (possibly with another broker).
*/
-public class DagWatchSession {
+public class DagWatchSession implements
ScalableTopicResources.MetadataPathListener {
private static final Logger LOG = Logger.get(DagWatchSession.class);
private final Logger log;
@@ -61,7 +61,6 @@ public class DagWatchSession {
private final BrokerService brokerService;
private final String metadataPath;
- private final java.util.function.Consumer<Notification>
notificationListener;
private volatile boolean closed = false;
public DagWatchSession(long sessionId,
@@ -75,17 +74,22 @@ public class DagWatchSession {
this.resources = resources;
this.brokerService = brokerService;
this.metadataPath = resources.topicPath(topicName);
- this.notificationListener = this::onNotification;
this.log = LOG.with().attr("topic", topicName).attr("sessionId",
sessionId).build();
}
+ @Override
+ public String getMetadataPath() {
+ return metadataPath;
+ }
+
/**
* Start the session: load current metadata, set up watch, and return
* the initial layout response.
*/
public CompletableFuture<ScalableTopicLayoutResponse> start() {
- // Register metadata store listener for changes to this topic's
metadata
- resources.getStore().registerListener(notificationListener);
+ // Register through the resources-level fan-out so close() can
deregister us
+ // and we don't accumulate stale store-level listeners over time.
+ resources.registerPathListener(this);
return resources.getScalableTopicMetadataAsync(topicName, true)
.thenCompose(optMd -> {
@@ -98,8 +102,13 @@ public class DagWatchSession {
});
}
- // Visible for testing — invoked by the metadata-store listener registered
in start().
- void onNotification(Notification notification) {
+ /**
+ * Invoked by the {@link ScalableTopicResources} fan-out for every
metadata event
+ * matching this session's topic path. The registry already path-filtered
for us;
+ * we re-check defensively so a registry-level bug can't cause a reload
storm.
+ */
+ @Override
+ public void onNotification(Notification notification) {
if (closed) {
return;
}
@@ -184,7 +193,10 @@ public class DagWatchSession {
public void close() {
closed = true;
- // Listener is guarded by the closed flag; MetadataStore does not
support unregister.
+ // Drop ourselves from the resources' fan-out so the per-event
dispatch skips
+ // us — no listener leak, no per-notification dispatch tax across the
broker's
+ // lifetime.
+ resources.deregisterPathListener(this);
}
/**
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resources/ScalableTopicListenerRegistryTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resources/ScalableTopicListenerRegistryTest.java
new file mode 100644
index 00000000000..9f04b7ad8c4
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resources/ScalableTopicListenerRegistryTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resources;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.impl.LocalMemoryMetadataStore;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for the {@code MetadataPathListener} registry on
+ * {@link ScalableTopicResources}: register / deregister behaviour, exact-path
+ * dispatch, and the no-leak property the registry exists to enforce.
+ *
+ * <p>The bug it prevents: pre-registry, every {@code DagWatchSession} called
+ * {@code MetadataStore.registerListener} directly. The store has no
+ * {@code unregisterListener}, so each closed session left a stale listener
+ * registered for the broker's lifetime — both a memory leak and a
+ * per-notification dispatch tax that grew linearly with total sessions ever
+ * opened.
+ */
+public class ScalableTopicListenerRegistryTest {
+
+ private MetadataStoreExtended store;
+ private ScalableTopicResources resources;
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ store = new LocalMemoryMetadataStore("memory:local",
+ MetadataStoreConfig.builder().build());
+ resources = new ScalableTopicResources(store, 30);
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown() throws Exception {
+ if (store != null) {
+ store.close();
+ }
+ }
+
+ @Test
+ public void deregisterStopsDispatchToListener() {
+ // The whole point of the registry: a deregistered listener must NOT
receive
+ // events even if the path still matches. This is the leak fix proper —
+ // pre-registry, a closed session would keep ticking on every
namespace event
+ // for the broker's lifetime.
+ var listener = new RecordingListener("/topics/tenant/ns/some-topic");
+ resources.registerPathListener(listener);
+ resources.handleNotification(new
Notification(NotificationType.Modified,
+ "/topics/tenant/ns/some-topic"));
+ assertEquals(listener.received.size(), 1, "registered listener should
see its event");
+
+ resources.deregisterPathListener(listener);
+ resources.handleNotification(new
Notification(NotificationType.Modified,
+ "/topics/tenant/ns/some-topic"));
+ assertEquals(listener.received.size(), 1,
+ "deregistered listener must not see further events");
+ }
+
+ @Test
+ public void dispatchesOnlyOnExactPathMatch() {
+ // Path filter is exact-equal — a notification for a sibling topic
must not
+ // wake up listeners on a different topic.
+ var watching = new RecordingListener("/topics/tenant/ns/a");
+ var bystander = new RecordingListener("/topics/tenant/ns/b");
+ resources.registerPathListener(watching);
+ resources.registerPathListener(bystander);
+
+ resources.handleNotification(new
Notification(NotificationType.Modified,
+ "/topics/tenant/ns/a"));
+
+ assertEquals(watching.received.size(), 1);
+ assertTrue(bystander.received.isEmpty(),
+ "sibling topic listener must not receive events on /a");
+ }
+
+ @Test
+ public void multipleListenersOnSamePathAllReceive() {
+ // Two sessions can watch the same topic (rare but legal) — both must
fire.
+ String path = "/topics/tenant/ns/shared";
+ var l1 = new RecordingListener(path);
+ var l2 = new RecordingListener(path);
+ resources.registerPathListener(l1);
+ resources.registerPathListener(l2);
+
+ resources.handleNotification(new
Notification(NotificationType.Modified, path));
+
+ assertEquals(l1.received.size(), 1);
+ assertEquals(l2.received.size(), 1);
+ }
+
+ @Test
+ public void listenerExceptionDoesNotInterruptOtherListeners() {
+ // One bad listener throwing must not poison the dispatch loop — the
next
+ // listener on the same path must still see the event.
+ String path = "/topics/tenant/ns/x";
+ var throwing = new ScalableTopicResources.MetadataPathListener() {
+ @Override public String getMetadataPath() {
+ return path;
+ }
+ @Override public void onNotification(Notification notification) {
+ throw new RuntimeException("boom");
+ }
+ };
+ var ok = new RecordingListener(path);
+ resources.registerPathListener(throwing);
+ resources.registerPathListener(ok);
+
+ resources.handleNotification(new
Notification(NotificationType.Modified, path));
+
+ assertEquals(ok.received.size(), 1, "well-behaved listener should
still see the event");
+ }
+
+ @Test
+ public void deregisterIsIdempotent() {
+ var listener = new RecordingListener("/topics/tenant/ns/a");
+ resources.registerPathListener(listener);
+ resources.deregisterPathListener(listener);
+ resources.deregisterPathListener(listener); // must not throw
+ // Deregistering one we never registered must also be silent.
+ resources.deregisterPathListener(new
RecordingListener("/topics/tenant/ns/never"));
+ }
+
+ private static final class RecordingListener
+ implements ScalableTopicResources.MetadataPathListener {
+ final String path;
+ final List<Notification> received = new ArrayList<>();
+
+ RecordingListener(String path) {
+ this.path = path;
+ }
+
+ @Override
+ public String getMetadataPath() {
+ return path;
+ }
+
+ @Override
+ public void onNotification(Notification notification) {
+ received.add(notification);
+ }
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/DagWatchSessionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/DagWatchSessionTest.java
index 9f423413003..cf14eb6b00e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/DagWatchSessionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/DagWatchSessionTest.java
@@ -125,15 +125,32 @@ public class DagWatchSessionTest {
}
@Test
- public void testStartRegistersMetadataStoreListener() {
- // Regardless of outcome, start() should wire up a notification
listener so that
- // subsequent metadata changes flow into the session.
+ public void testStartRegistersWithResources() {
+ // start() routes through the resources-level fan-out instead of
registering
+ // directly on the metadata store — that way close() can drop the
+ // registration cleanly via deregisterPathListener.
when(resources.getScalableTopicMetadataAsync(TOPIC, true))
.thenReturn(CompletableFuture.completedFuture(Optional.empty()));
session.start();
- verify(resources.getStore()).registerListener(any());
+ verify(resources).registerPathListener(session);
+ }
+
+ @Test
+ public void testCloseDeregistersPathListener() {
+ // The whole point of the registry pattern: close() must remove the
listener so
+ // the per-event fan-out skips us. Otherwise we leak a stale entry per
session
+ // for the broker's lifetime.
+ session.close();
+ verify(resources).deregisterPathListener(session);
+ }
+
+ @Test
+ public void testGetMetadataPathExposesTopicPath() {
+ // The registry uses this for its dispatch filter — must exactly match
the
+ // path that the resources layer would generate for the topic.
+ assertEquals(session.getMetadataPath(), TOPIC_PATH);
}
// --- onNotification filtering ---