Copilot commented on code in PR #25188:
URL: https://github.com/apache/pulsar/pull/25188#discussion_r2737340824
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java:
##########
@@ -266,9 +321,17 @@ public void handleWatchTopicList(NamespaceName
namespaceName, long watcherId, lo
CompletableFuture<TopicListWatcher> existingWatcherFuture =
watchers.putIfAbsent(watcherId, watcherFuture);
if (existingWatcherFuture != null) {
- log.info("[{}] Watcher with the same watcherId={} is already
created.", connection, watcherId);
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Watcher with the same watcherId={} is already
created. Refreshing.", connection,
+ watcherId);
+ }
// use the existing watcher if it's already created
- watcherFuture = existingWatcherFuture;
+ watcherFuture = existingWatcherFuture.thenCompose(watcher -> {
+ watcher.prepareUpdateTopics();
+ CompletableFuture<TopicListWatcher> future = new
CompletableFuture<>();
+ updateTopicListWatcher(watcher, () ->
future.complete(watcher));
+ return future;
+ });
Review Comment:
When handling an existing watcher (lines 329-334), the code calls
prepareUpdateTopics which clears the matchingTopics collection and sets various
flags. However, if the watcher is in the middle of processing other events
(e.g., topic add/delete events from the metadata store), this could lead to
lost events or inconsistent state. Consider adding checks to ensure the watcher
is in a stable state before initiating a refresh, or document that this is
acceptable behavior for reconciliation.
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java:
##########
@@ -80,37 +78,39 @@ public PatternMultiTopicsConsumerImpl(TopicsPattern
topicsPattern,
super(client, conf, executorProvider, subscribeFuture, schema,
interceptors,
false /* createTopicIfDoesNotExist */);
this.topicsPattern = topicsPattern;
- this.topicsHash = topicsHash;
this.subscriptionMode = subscriptionMode;
this.namespaceName = topicsPattern.namespace();
-
this.topicsChangeListener = new PatternTopicsChangedListener();
this.updateTaskQueue = new PatternConsumerUpdateQueue(this);
if (subscriptionMode == Mode.PERSISTENT) {
- long watcherId = client.newTopicListWatcherId();
- topicListWatcher = new TopicListWatcher(updateTaskQueue, client,
topicsPattern, watcherId,
- namespaceName, topicsHash, watcherFuture, () ->
recheckTopicsChangeAfterReconnect());
- watcherFuture
- .exceptionally(ex -> {
- if (closed) {
- log.warn("Pattern consumer [{}] was closed while
creating topic list watcher",
- conf.getSubscriptionName(), ex);
- } else {
- log.warn(
- "Pattern consumer [{}] unable to create topic
list watcher. Falling back to only polling"
- + " for new topics",
conf.getSubscriptionName(), ex);
- this.recheckPatternTimeout = client.timer()
- .newTimeout(this, Math.max(1,
conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS);
- }
- return null;
- });
+ subscribeFuture.whenComplete((__, exception) -> {
+ if (!closed && exception == null) {
+ long watcherId = client.newTopicListWatcherId();
+ topicListWatcher = new TopicListWatcher(updateTaskQueue,
client, topicsPattern, watcherId,
+ namespaceName, this::getLocalStateTopicsHash,
watcherFuture,
+ this::getNextRecheckPatternEpoch);
Review Comment:
The topicListWatcher field is declared as volatile, and it's assigned in a
callback (line 89). However, there's a potential race condition: if multiple
threads access topicListWatcher between the assignment on line 89 and when it's
used, they might see it as null even though watcherFuture is being processed.
While this might be acceptable given the usage pattern, consider documenting
that topicListWatcher should only be accessed after watcherFuture completes, or
add null checks where it's used.
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternConsumerUpdateQueue.java:
##########
@@ -173,12 +231,31 @@ synchronized void triggerNextTask() {
});
break;
}
- case TOPICS_ADDED: {
- newTaskFuture =
topicsChangeListener.onTopicsAdded(task.getRight());
- break;
- }
- case TOPICS_REMOVED: {
- newTaskFuture =
topicsChangeListener.onTopicsRemoved(task.getRight());
+ case TOPICS_CHANGED: {
+ TopicsAddedOrRemovedTask topicsAddedOrRemovedTask =
(TopicsAddedOrRemovedTask) task;
+ newTaskFuture =
topicsChangeListener.onTopicsRemoved(topicsAddedOrRemovedTask.removedTopics)
+ .thenCompose(__ ->
+
topicsChangeListener.onTopicsAdded(topicsAddedOrRemovedTask.addedTopics))
+ .thenRun(() -> {
+ if
(!patternConsumer.supportsTopicListWatcherReconcile()) {
+ // ignore topics hash unless topic list
watcher reconcile is supported since
+ // the broker side state might be out of sync
and could cause unnecessary
+ // reconciliation.
+ // reconciliation will happen later when the
client requests the topic listing
+ // after the next patternAutoDiscoveryPeriod
interval
+ // Broker versions that support topic list
watcher reconcile will also update the
+ // broker side state when reconciliation is
requested.
+ return;
+ }
+ String localHash =
patternConsumer.getLocalStateTopicsHash();
+ String brokerHash =
topicsAddedOrRemovedTask.topicsHash;
+ if (brokerHash != null && brokerHash.length() > 0
&& !brokerHash.equals(localHash)) {
+ log.info("[{}][{}] Hash mismatch detected
(local: {}, broker: {}). Triggering "
+ + "reconciliation.",
patternConsumer.getPattern().inputPattern(),
+ patternConsumer.getSubscription(),
localHash, brokerHash);
+ appendRecheckOp();
Review Comment:
In the TOPICS_CHANGED case, when hash mismatch is detected and
appendRecheckOp is called, this adds a new recheck operation to the queue.
However, there's no check to prevent multiple recheck operations from being
added if multiple hash mismatches are detected in quick succession. While the
PatternConsumerUpdateQueue has a recheckTaskInQueue flag to handle this, the
logic could be clearer. Consider checking the recheckTaskInQueue flag before
calling appendRecheckOp to avoid unnecessary queue operations.
```suggestion
synchronized
(PatternConsumerUpdateQueue.this) {
if (!recheckTaskInQueue) {
appendRecheckOp();
}
}
```
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java:
##########
@@ -280,7 +282,33 @@ private void cleanupAtClose(CompletableFuture<Void>
closeFuture, Throwable excep
}
public void handleCommandWatchTopicUpdate(CommandWatchTopicUpdate update) {
-
patternConsumerUpdateQueue.appendTopicsRemovedOp(update.getDeletedTopicsList());
-
patternConsumerUpdateQueue.appendTopicsAddedOp(update.getNewTopicsList());
+ if (update == null) {
+ return;
+ }
+
patternConsumerUpdateQueue.appendTopicsChangedOp(update.getNewTopicsList(),
update.getDeletedTopicsList(),
+ update.hasTopicsHash() ? update.getTopicsHash() : "");
+ }
+
+ /**
+ * Perform a single reconciliation request using the existing watcher id
and the watcher's last-known topics hash.
+ * This will send a WatchTopicList request including the topics-hash to
the broker. If the watcher is not connected,
+ * the returned future will be completed exceptionally.
+ */
+ public CompletableFuture<CommandWatchTopicListSuccess> reconcile(String
localStateTopicsHash) {
+ ClientCnx c = cnx();
+ if (c == null || !isConnected()) {
+ CompletableFuture<CommandWatchTopicListSuccess> f = new
CompletableFuture<>();
+ f.completeExceptionally(new IllegalStateException("Watcher is not
connected"));
+ return f;
Review Comment:
When the watcher is not connected (line 299-302), the reconcile method
returns a failed future with an IllegalStateException. However, this is
expected behavior in normal operation when the connection is temporarily lost.
Consider using a more specific exception type (e.g.,
WatcherNotConnectedException) or changing the exception type to indicate that
this is a transient condition rather than an illegal state. This would make
error handling clearer for callers.
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternConsumerUpdateQueue.java:
##########
@@ -173,12 +231,31 @@ synchronized void triggerNextTask() {
});
break;
}
- case TOPICS_ADDED: {
- newTaskFuture =
topicsChangeListener.onTopicsAdded(task.getRight());
- break;
- }
- case TOPICS_REMOVED: {
- newTaskFuture =
topicsChangeListener.onTopicsRemoved(task.getRight());
+ case TOPICS_CHANGED: {
+ TopicsAddedOrRemovedTask topicsAddedOrRemovedTask =
(TopicsAddedOrRemovedTask) task;
+ newTaskFuture =
topicsChangeListener.onTopicsRemoved(topicsAddedOrRemovedTask.removedTopics)
+ .thenCompose(__ ->
+
topicsChangeListener.onTopicsAdded(topicsAddedOrRemovedTask.addedTopics))
+ .thenRun(() -> {
+ if
(!patternConsumer.supportsTopicListWatcherReconcile()) {
+ // ignore topics hash unless topic list
watcher reconcile is supported since
+ // the broker side state might be out of sync
and could cause unnecessary
+ // reconciliation.
+ // reconciliation will happen later when the
client requests the topic listing
+ // after the next patternAutoDiscoveryPeriod
interval
+ // Broker versions that support topic list
watcher reconcile will also update the
+ // broker side state when reconciliation is
requested.
+ return;
+ }
+ String localHash =
patternConsumer.getLocalStateTopicsHash();
+ String brokerHash =
topicsAddedOrRemovedTask.topicsHash;
+ if (brokerHash != null && brokerHash.length() > 0
&& !brokerHash.equals(localHash)) {
+ log.info("[{}][{}] Hash mismatch detected
(local: {}, broker: {}). Triggering "
+ + "reconciliation.",
patternConsumer.getPattern().inputPattern(),
+ patternConsumer.getSubscription(),
localHash, brokerHash);
+ appendRecheckOp();
Review Comment:
The check for broker support (line 240) will always return false for older
brokers that don't support topic list watcher reconcile. This means that hash
verification will be skipped entirely for those brokers, even though they could
still benefit from detecting hash mismatches (though they couldn't fix them via
reconciliation). Consider logging a warning when hash mismatches are detected
but reconciliation is not supported, to help diagnose issues with older brokers.
```suggestion
String localHash =
patternConsumer.getLocalStateTopicsHash();
String brokerHash =
topicsAddedOrRemovedTask.topicsHash;
if (brokerHash != null && brokerHash.length() >
0 && !brokerHash.equals(localHash)) {
if
(patternConsumer.supportsTopicListWatcherReconcile()) {
log.info("[{}][{}] Hash mismatch
detected (local: {}, broker: {}). Triggering "
+ "reconciliation.",
patternConsumer.getPattern().inputPattern(),
patternConsumer.getSubscription(), localHash, brokerHash);
appendRecheckOp();
} else {
log.warn("[{}][{}] Hash mismatch
detected (local: {}, broker: {}), but topic "
+ "list watcher
reconcile is not supported by the broker. "
+ "This may indicate
that the client and broker topic lists "
+ "are out of sync and
cannot be automatically reconciled.",
patternConsumer.getPattern().inputPattern(),
patternConsumer.getSubscription(), localHash, brokerHash);
}
```
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java:
##########
@@ -136,46 +136,113 @@ public void run(Timeout timeout) throws Exception {
}
CompletableFuture<Void> recheckTopicsChange() {
- String pattern = topicsPattern.inputPattern();
- final int epoch = recheckPatternEpoch.incrementAndGet();
- return client.getLookup().getTopicsUnderNamespace(namespaceName,
subscriptionMode, pattern, topicsHash)
- .thenCompose(getTopicsResult -> {
- // If "recheckTopicsChange" has been called more than one
times, only make the last one take affects.
- // Use "synchronized (recheckPatternTaskBackoff)" instead of
- // `synchronized(PatternMultiTopicsConsumerImpl.this)` to
avoid locking in a wider range.
- synchronized (PatternMultiTopicsConsumerImpl.this) {
- if (recheckPatternEpoch.get() > epoch) {
- return CompletableFuture.completedFuture(null);
- }
- if (log.isDebugEnabled()) {
- log.debug("Pattern consumer [{}] get topics under
namespace {}, topics.size: {},"
- + " topicsHash: {}, filtered: {}",
-
PatternMultiTopicsConsumerImpl.this.getSubscription(),
- namespaceName,
getTopicsResult.getTopics().size(), getTopicsResult.getTopicsHash(),
- getTopicsResult.isFiltered());
- getTopicsResult.getTopics().forEach(topicName ->
- log.debug("Get topics under namespace {},
topic: {}", namespaceName, topicName));
- }
+ final int epoch = getNextRecheckPatternEpoch();
- final List<String> oldTopics = new
ArrayList<>(getPartitions());
- return updateSubscriptions(topicsPattern,
this::setTopicsHash, getTopicsResult,
- topicsChangeListener, oldTopics, subscription);
+ CompletableFuture<Void> recheckFuture;
+ // Prefer watcher-based reconcile when a watcher exists and is
connected. Fallback to lookup if watcher
+ // is not available or the watcher-based request fails.
+ if (supportsTopicListWatcherReconcile()) {
+ String localStateTopicsHash = getLocalStateTopicsHash();
+ recheckFuture =
topicListWatcher.reconcile(localStateTopicsHash).thenCompose(response -> {
+ return handleWatchTopicListSuccess(response,
localStateTopicsHash, epoch);
+ }).handle((res, ex) -> {
+ if (ex != null) {
+ // watcher-based reconcile failed -> fall back to
lookup-based recheck
+ return doLookupBasedRecheck(epoch);
+ } else {
+ // watcher-based reconcile completed successfully
+ return CompletableFuture.<Void>completedFuture(null);
}
- }).thenAccept(__ -> {
- if (recheckPatternTimeout != null) {
- this.recheckPatternTimeout = client.timer().newTimeout(
- this, Math.max(1,
conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS);
- }
- });
+ }).thenCompose(Function.identity());
+ } else {
+ // Fallback: perform the existing lookup-based recheck
+ recheckFuture = doLookupBasedRecheck(epoch);
+ }
+
+ return recheckFuture.handle((__, ex) -> {
Review Comment:
The handle method here swallows exceptions by returning null. This means
that if recheckTopicsChange fails due to an exception, the error is not
propagated or logged at this level. While scheduleRecheckTopics will still be
called, the lack of error handling or logging could make it difficult to
diagnose issues. Consider logging the exception before returning null, or at
minimum document that errors are expected to be logged at a lower level.
```suggestion
return recheckFuture.handle((__, ex) -> {
if (ex != null) {
log.warn("Failed to recheck topics change, will reschedule
next check", ex);
}
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java:
##########
@@ -173,10 +175,63 @@ public synchronized void close() {
sendTopicListUpdateTasks.clear();
}
+ synchronized void prepareUpdateTopics() {
+ updatingTopics = true;
+ sendingInProgress = true;
+ sendTopicListUpdateTasks.clear();
+ matchingTopics.clear();
+ }
+
synchronized void updateTopics(List<String> topics) {
+ if (closed) {
+ return;
+ }
matchingTopics.clear();
TopicList.filterTopicsToStream(topics,
topicsPattern).forEach(matchingTopics::add);
updatingTopics = false;
+ if (disconnected) {
+ handleNewAndDeletedTopicsWhileDisconnected();
+ matchingTopicsBeforeDisconnected = null;
+ disconnected = false;
+ }
+ sendingCompleted();
+ }
+
+ private synchronized void handleNewAndDeletedTopicsWhileDisconnected()
{
+ List<String> newTopics = new ArrayList<>();
+ List<String> deletedTopics = new ArrayList<>();
+ Set<String> remainingTopics = new HashSet<>(matchingTopics);
+ for (String topic : matchingTopicsBeforeDisconnected) {
+ if (!remainingTopics.remove(topic)) {
+ deletedTopics.add(topic);
+ }
+ }
+ newTopics.addAll(remainingTopics);
+ if (!newTopics.isEmpty() || !deletedTopics.isEmpty()) {
+ String hash = TopicList.calculateHash(matchingTopics);
+ sendTopicListUpdate(hash, deletedTopics, newTopics);
+ }
+ }
+
+ @Override
+ public NamespaceName getNamespaceName() {
+ return namespace;
+ }
+
+ @Override
+ public synchronized void onSessionEvent(SessionEvent event) {
+ switch (event) {
+ case SessionReestablished:
+ case Reconnected:
+ executor.execute(() ->
topicListService.updateTopicListWatcher(this, null));
+ break;
Review Comment:
The onSessionEvent method schedules updateTopicListWatcher on the executor
for Reconnected and SessionReestablished events. However, if multiple session
events occur in quick succession (e.g., ConnectionLost followed immediately by
Reconnected), multiple updateTopicListWatcher calls could be scheduled. While
the updatingTopics flag provides some protection, consider adding logic to
cancel or skip pending updates if a new session event arrives before the
previous update completes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]