Denovo1998 commented on code in PR #25188:
URL: https://github.com/apache/pulsar/pull/25188#discussion_r2741338643
##########
pulsar-common/src/main/proto/PulsarApi.proto:
##########
@@ -315,6 +315,7 @@ message FeatureFlags {
optional bool supports_topic_watchers = 4 [default = false];
optional bool supports_get_partitioned_metadata_without_auto_creation = 5
[default = false];
optional bool supports_repl_dedup_by_lid_and_eid = 6 [default = false];
+ optional bool supports_topic_watcher_reconcile = 7 [default = false];
Review Comment:
The comment suggests that a new feature flag needs to be synchronously added
to `PulsarClientException.FailedFeatureCheck`. Should it be added?
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java:
##########
@@ -136,46 +146,117 @@ public void run(Timeout timeout) throws Exception {
}
CompletableFuture<Void> recheckTopicsChange() {
- String pattern = topicsPattern.inputPattern();
- final int epoch = recheckPatternEpoch.incrementAndGet();
- return client.getLookup().getTopicsUnderNamespace(namespaceName,
subscriptionMode, pattern, topicsHash)
- .thenCompose(getTopicsResult -> {
- // If "recheckTopicsChange" has been called more than one
times, only make the last one take affects.
- // Use "synchronized (recheckPatternTaskBackoff)" instead of
- // `synchronized(PatternMultiTopicsConsumerImpl.this)` to
avoid locking in a wider range.
- synchronized (PatternMultiTopicsConsumerImpl.this) {
- if (recheckPatternEpoch.get() > epoch) {
- return CompletableFuture.completedFuture(null);
- }
- if (log.isDebugEnabled()) {
- log.debug("Pattern consumer [{}] get topics under
namespace {}, topics.size: {},"
- + " topicsHash: {}, filtered: {}",
-
PatternMultiTopicsConsumerImpl.this.getSubscription(),
- namespaceName,
getTopicsResult.getTopics().size(), getTopicsResult.getTopicsHash(),
- getTopicsResult.isFiltered());
- getTopicsResult.getTopics().forEach(topicName ->
- log.debug("Get topics under namespace {},
topic: {}", namespaceName, topicName));
- }
+ final int epoch = getNextRecheckPatternEpoch();
- final List<String> oldTopics = new
ArrayList<>(getPartitions());
- return updateSubscriptions(topicsPattern,
this::setTopicsHash, getTopicsResult,
- topicsChangeListener, oldTopics, subscription);
- }
- }).thenAccept(__ -> {
- if (recheckPatternTimeout != null) {
- this.recheckPatternTimeout = client.timer().newTimeout(
- this, Math.max(1,
conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS);
+ CompletableFuture<Void> recheckFuture;
+ // Prefer watcher-based reconcile when a watcher exists and is
connected. Fallback to lookup if watcher
+ // is not available or the watcher-based request fails.
+ if (supportsTopicListWatcherReconcile()) {
+ String localStateTopicsHash = getLocalStateTopicsHash();
+ recheckFuture =
topicListWatcher.reconcile(localStateTopicsHash).thenCompose(response -> {
+ return handleWatchTopicListSuccess(response,
localStateTopicsHash, epoch);
+ }).handle((res, ex) -> {
+ if (ex != null) {
+ // watcher-based reconcile failed -> fall back to
lookup-based recheck
+ return doLookupBasedRecheck(epoch);
+ } else {
+ // watcher-based reconcile completed successfully
+ return CompletableFuture.<Void>completedFuture(null);
}
- });
+ }).thenCompose(Function.identity());
+ } else {
+ // Fallback: perform the existing lookup-based recheck
+ recheckFuture = doLookupBasedRecheck(epoch);
+ }
+
+ return recheckFuture.handle((__, ex) -> {
+ if (ex != null) {
+ log.info("[{}][{}] Pattern consumer failed to recheck topics
changes: {}",
+ getPattern().inputPattern(), getSubscription(),
ex.getMessage());
+ }
+ scheduleRecheckTopics();
+ return null;
+ });
+ }
+
+ int getNextRecheckPatternEpoch() {
+ return recheckPatternEpoch.incrementAndGet();
+ }
+
+ CompletableFuture<Void>
handleWatchTopicListSuccess(CommandWatchTopicListSuccess response,
+ String
localStateTopicsHash, int epoch) {
+ synchronized (PatternMultiTopicsConsumerImpl.this) {
+ if (recheckPatternEpoch.get() > epoch) {
+ return CompletableFuture.completedFuture(null);
+ }
+ // Build a GetTopicsResult-like object from the watch response
+ // so we can reuse updateSubscriptions
+ final List<String> topics = (response != null)
+ ? response.getTopicsList()
+ : Collections.emptyList();
+ final String hash = (response != null && response.hasTopicsHash())
+ ? response.getTopicsHash()
+ : null;
+ final boolean changed = !localStateTopicsHash.equals(hash);
+ final GetTopicsResult getTopicsResult =
+ new GetTopicsResult(topics, hash, true, changed);
+
+ final List<String> oldTopics = new ArrayList<>(getPartitions());
+ return updateSubscriptions(topicsPattern, getTopicsResult,
topicsChangeListener, oldTopics,
+ subscription);
+ }
+ }
+
+ boolean supportsTopicListWatcherReconcile() {
+ return topicListWatcher != null &&
topicListWatcher.supportsReconcile() && watcherFuture.isDone()
+ && !watcherFuture.isCompletedExceptionally() &&
topicListWatcher.isConnected();
+ }
+
+ private void scheduleRecheckTopics() {
+ if (!closed) {
+ this.recheckPatternTimeout = client.timer().newTimeout(this,
+ Math.max(1, conf.getPatternAutoDiscoveryPeriod()),
TimeUnit.SECONDS);
+ }
Review Comment:
Is there a problem with duplicate scheduled tasks? Need to cancel old
timeout references?
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java:
##########
@@ -266,9 +370,27 @@ public void handleWatchTopicList(NamespaceName
namespaceName, long watcherId, lo
CompletableFuture<TopicListWatcher> existingWatcherFuture =
watchers.putIfAbsent(watcherId, watcherFuture);
if (existingWatcherFuture != null) {
- log.info("[{}] Watcher with the same watcherId={} is already
created.", connection, watcherId);
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Watcher with the same watcherId={} is already
created. Refreshing.", connection,
+ watcherId);
+ }
// use the existing watcher if it's already created
- watcherFuture = existingWatcherFuture;
+ watcherFuture = existingWatcherFuture.thenCompose(watcher -> {
+ CompletableFuture<TopicListWatcher> future = new
CompletableFuture<>();
+ Runnable callback = () -> future.complete(watcher);
+ // trigger a new update unless an update is already ongoing.
Register the callback to complete
+ // when the update completes.
+ if (watcher.prepareUpdateTopics(callback)) {
+ updateTopicListWatcher(watcher)
+ // run the callback also in failure cases
+ // prepareUpdateTopics handles it for success cases
+ .exceptionally(ex -> {
+ callback.run();
+ return null;
+ });
+ }
+ return future;
Review Comment:
Possible hang for existing watcher refresh: when CommandWatchTopicList hits
an already-existing watcher and prepareUpdateTopics(callback) returns false
(update in progress), we return a future that’s only completed when callbacks
are drained in TopicListWatcher.updateTopics() (success path).
If the in-flight update fails terminally (non-permit / non-retry case),
callbacks are never drained, so this watch request can hang indefinitely (and
potentially block lookup semaphore release / response)?
Maybe we should: drain/complete callbacks on update failure as well (and
reset update state), or track the in-flight update CompletableFuture and attach
the request completion to it (success + failure)?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]