This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1300f0a4b1c [fix][client] rollback TopicListWatcher retry behavior
(#24752)
1300f0a4b1c is described below
commit 1300f0a4b1c015e278ff321d58c12a7e85ba12e7
Author: Oneby <[email protected]>
AuthorDate: Wed Sep 17 21:06:48 2025 +0800
[fix][client] rollback TopicListWatcher retry behavior (#24752)
Co-authored-by: oneby-wang <[email protected]>
---
.../impl/PatternMultiTopicsConsumerImpl.java | 30 ++++++++++++++--------
.../pulsar/client/impl/TopicListWatcher.java | 17 +++++-------
2 files changed, 26 insertions(+), 21 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
index ee949593c19..8a3798a670c 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
@@ -52,6 +52,7 @@ public class PatternMultiTopicsConsumerImpl<T> extends
MultiTopicsConsumerImpl<T
private final TopicsPattern topicsPattern;
final TopicsChangedListener topicsChangeListener;
private final Mode subscriptionMode;
+ private final TopicListWatcher topicListWatcher;
private final CompletableFuture<TopicListWatcher> watcherFuture = new
CompletableFuture<>();
protected NamespaceName namespaceName;
@@ -62,6 +63,7 @@ public class PatternMultiTopicsConsumerImpl<T> extends
MultiTopicsConsumerImpl<T
private volatile String topicsHash;
private PatternConsumerUpdateQueue updateTaskQueue;
+ private volatile boolean closed = false;
/***
* @param topicsPattern The regexp for the topic name(not contains
partition suffix).
@@ -86,19 +88,26 @@ public class PatternMultiTopicsConsumerImpl<T> extends
MultiTopicsConsumerImpl<T
this.updateTaskQueue = new PatternConsumerUpdateQueue(this);
if (subscriptionMode == Mode.PERSISTENT) {
long watcherId = client.newTopicListWatcherId();
- new TopicListWatcher(updateTaskQueue, client, topicsPattern,
watcherId,
+ topicListWatcher = new TopicListWatcher(updateTaskQueue, client,
topicsPattern, watcherId,
namespaceName, topicsHash, watcherFuture, () ->
recheckTopicsChangeAfterReconnect());
watcherFuture
.exceptionally(ex -> {
- log.warn("Pattern consumer [{}] unable to create topic list
watcher. Falling back to only polling"
- + " for new topics", conf.getSubscriptionName(),
ex);
- this.recheckPatternTimeout = client.timer().newTimeout(
- this, Math.max(1,
conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS);
+ if (closed) {
+ log.warn("Pattern consumer [{}] was closed while
creating topic list watcher",
+ conf.getSubscriptionName(), ex);
+ } else {
+ log.warn(
+ "Pattern consumer [{}] unable to create topic
list watcher. Falling back to only polling"
+ + " for new topics",
conf.getSubscriptionName(), ex);
+ this.recheckPatternTimeout = client.timer()
+ .newTimeout(this, Math.max(1,
conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS);
+ }
return null;
});
} else {
log.debug("Pattern consumer [{}] not creating topic list watcher
for subscription mode {}",
conf.getSubscriptionName(), subscriptionMode);
+ topicListWatcher = null;
watcherFuture.complete(null);
this.recheckPatternTimeout = client.timer().newTimeout(
this, Math.max(1, conf.getPatternAutoDiscoveryPeriod()),
TimeUnit.SECONDS);
@@ -120,7 +129,7 @@ public class PatternMultiTopicsConsumerImpl<T> extends
MultiTopicsConsumerImpl<T
// TimerTask to recheck topics change, and trigger subscribe/unsubscribe
based on the change.
@Override
public void run(Timeout timeout) throws Exception {
- if (timeout.isCancelled()) {
+ if (timeout.isCancelled() || closed) {
return;
}
updateTaskQueue.appendRecheckOp();
@@ -381,16 +390,17 @@ public class PatternMultiTopicsConsumerImpl<T> extends
MultiTopicsConsumerImpl<T
@Override
@SuppressFBWarnings
public CompletableFuture<Void> closeAsync() {
+ closed = true;
Timeout timeout = recheckPatternTimeout;
if (timeout != null) {
timeout.cancel();
recheckPatternTimeout = null;
}
- CompletableFuture<Void> watcherCloseFuture = watcherFuture.thenCompose(
- topicListWatcher ->
Optional.ofNullable(topicListWatcher).map(TopicListWatcher::closeAsync)
-
.orElse(CompletableFuture.completedFuture(null))).exceptionally(t -> null);
+ CompletableFuture<Void> topicListWatcherCloseFuture =
+
Optional.ofNullable(topicListWatcher).map(TopicListWatcher::closeAsync)
+
.orElse(CompletableFuture.completedFuture(null)).exceptionally(t -> null);
CompletableFuture<Void> runningTaskCancelFuture =
updateTaskQueue.cancelAllAndWaitForTheRunningTask();
- return FutureUtil.waitForAll(Lists.newArrayList(watcherCloseFuture,
runningTaskCancelFuture))
+ return
FutureUtil.waitForAll(Lists.newArrayList(topicListWatcherCloseFuture,
runningTaskCancelFuture))
.exceptionally(t -> null).thenCompose(__ ->
super.closeAsync());
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
index d83ae3e7337..49cf64656fe 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
@@ -47,7 +47,6 @@ public class TopicListWatcher extends HandlerState implements
ConnectionHandler.
private final ConnectionHandler connectionHandler;
private final TopicsPattern topicsPattern;
private final long watcherId;
- private final long lookupDeadline;
private volatile long createWatcherDeadline = 0;
private final NamespaceName namespace;
// TODO maintain the value based on updates from broker and warn the user
if inconsistent with hash from polling
@@ -81,7 +80,6 @@ public class TopicListWatcher extends HandlerState implements
ConnectionHandler.
this);
this.topicsPattern = topicsPattern;
this.watcherId = watcherId;
- this.lookupDeadline = System.currentTimeMillis() +
client.getConfiguration().getLookupTimeoutMs();
this.namespace = namespace;
this.topicsHash = topicsHash;
this.watcherFuture = watcherFuture;
@@ -93,17 +91,12 @@ public class TopicListWatcher extends HandlerState
implements ConnectionHandler.
@Override
public boolean connectionFailed(PulsarClientException exception) {
boolean nonRetriableError =
!PulsarClientException.isRetriableError(exception);
- boolean timeout = System.currentTimeMillis() > lookupDeadline;
- if (nonRetriableError || timeout) {
+ if (nonRetriableError) {
exception.setPreviousExceptionCount(previousExceptionCount);
if (watcherFuture.completeExceptionally(exception)) {
setState(State.Failed);
- if (nonRetriableError) {
- log.info("[{}] Watcher creation failed for {} with
non-retriable error {}", topic, name,
- exception.getMessage());
- } else {
- log.info("[{}] Watcher creation failed for {} after
timeout", topic, name);
- }
+ log.info("[{}] Watcher creation failed for {} with
non-retriable error {}",
+ topic, name, exception.getMessage());
deregisterFromClientCnx();
return false;
}
@@ -209,8 +202,10 @@ public class TopicListWatcher extends HandlerState
implements ConnectionHandler.
}
public CompletableFuture<Void> closeAsync() {
-
CompletableFuture<Void> closeFuture = new CompletableFuture<>();
+ // since we set closed flag in PatternMultiTopicsConsumerImpl, it is
ok to directly cancel watcherFuture whether
+ // it's completed or not to make sure watcherFuture is completed
+ watcherFuture.cancel(false);
if (getState() == State.Closing || getState() == State.Closed) {
closeFuture.complete(null);