Denovo1998 commented on code in PR #25070:
URL: https://github.com/apache/pulsar/pull/25070#discussion_r2679211085
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java:
##########
@@ -287,9 +422,70 @@ public void deleteTopicListWatcher(Long watcherId) {
* @param newTopics topics names added(contains the partition suffix).
*/
public void sendTopicListUpdate(long watcherId, String topicsHash,
List<String> deletedTopics,
- List<String> newTopics) {
- connection.getCommandSender().sendWatchTopicListUpdate(watcherId,
newTopics, deletedTopics, topicsHash);
+ List<String> newTopics, Runnable
completionCallback) {
+ performOperationWithPermitAcquiringRetries(watcherId, "topic list
update", permitAcquireErrorHandler ->
+ () -> connection.getCommandSender()
+ .sendWatchTopicListUpdate(watcherId, newTopics,
deletedTopics, topicsHash,
+ permitAcquireErrorHandler)
+ .whenComplete((__, t) -> {
+ if (t != null) {
+ // this is an unexpected case
+ log.warn("[{}] Failed to send topic list
update for watcherId={}. Watcher will be in "
+ + "inconsistent state.", connection,
watcherId, t);
+ }
+ completionCallback.run();
Review Comment:
Even if update A fails due to "acquire timeout / queue full" and a retry has
been scheduled, the completionCallback will still trigger
TopicListWatcher.sendingCompleted(), and the queue will continue to execute the
subsequent update B.
Assume there are two updates on the same watcher (in the order of event
occurrence): A then B.
- t0:update A (Create topic2) triggers, enters the sending chain, and
starts attempting to acquire direct permits.
- t1:Direct permits not enough, A acquire failed:
- acquireDirectMemoryPermitsAndWriteAndFlush calls
permitAcquireErrorHandler.accept(failure) → Schedule "retry A later"
- At the same time, the future returned by sendWatchTopicListUpdate
also completes exceptionally → sendTopicListUpdate(...).whenComplete is called
- whenComplete still calls completionCallback.run() →
sendingCompleted() is triggered → the queue releases the next item
- t2:Update B (Delete topic2) starts executing. Maybe this time the
permits are sufficient, so B successfully writes to the socket first.
- t3:t1's scheduled "Retry A" has expired, attempting to acquire again.
This time it succeeds, so A is then written back to the socket.
**Result: The sequence in which the client receives the messages becomes B
(Delete) first, followed by A (Create). As a result, the client may incorrectly
"retain topic2".**
---
For each watcher, perhaps we need a strict "send chain":
- Head task (success or some update) is only allowed to advance
sendingCompleted() to the next one after successfully writing (at least the
write is committed)?
- If the head of the queue enters retry due to permit acquire failure,
then it is still the head of the queue: subsequent updates must not skip over
it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]