lhotari commented on code in PR #25188:
URL: https://github.com/apache/pulsar/pull/25188#discussion_r2737646582
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternConsumerUpdateQueue.java:
##########
@@ -173,12 +231,31 @@ synchronized void triggerNextTask() {
});
break;
}
- case TOPICS_ADDED: {
- newTaskFuture =
topicsChangeListener.onTopicsAdded(task.getRight());
- break;
- }
- case TOPICS_REMOVED: {
- newTaskFuture =
topicsChangeListener.onTopicsRemoved(task.getRight());
+ case TOPICS_CHANGED: {
+ TopicsAddedOrRemovedTask topicsAddedOrRemovedTask =
(TopicsAddedOrRemovedTask) task;
+ newTaskFuture =
topicsChangeListener.onTopicsRemoved(topicsAddedOrRemovedTask.removedTopics)
+ .thenCompose(__ ->
+
topicsChangeListener.onTopicsAdded(topicsAddedOrRemovedTask.addedTopics))
+ .thenRun(() -> {
+ if
(!patternConsumer.supportsTopicListWatcherReconcile()) {
+ // ignore topics hash unless topic list
watcher reconcile is supported since
+ // the broker side state might be out of sync
and could cause unnecessary
+ // reconciliation.
+ // reconciliation will happen later when the
client requests the topic listing
+ // after the next patternAutoDiscoveryPeriod
interval
+ // Broker versions that support topic list
watcher reconcile will also update the
+ // broker side state when reconciliation is
requested.
+ return;
+ }
+ String localHash =
patternConsumer.getLocalStateTopicsHash();
+ String brokerHash =
topicsAddedOrRemovedTask.topicsHash;
+ if (brokerHash != null && brokerHash.length() > 0
&& !brokerHash.equals(localHash)) {
+ log.info("[{}][{}] Hash mismatch detected
(local: {}, broker: {}). Triggering "
+ + "reconciliation.",
patternConsumer.getPattern().inputPattern(),
+ patternConsumer.getSubscription(),
localHash, brokerHash);
+ appendRecheckOp();
Review Comment:
It's unnecessary since older brokers contain another bug that make the hash
useless. Improving comment.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]