This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 5e3c4df7d0c [fix][broker] fix broker may lost rack information (#23331)
5e3c4df7d0c is described below
commit 5e3c4df7d0c25d2e25d14027e029383f13a885dd
Author: ken <[email protected]>
AuthorDate: Sat Feb 15 07:30:56 2025 +0800
[fix][broker] fix broker may lost rack information (#23331)
Co-authored-by: fanjianye <[email protected]>
---
.../metadata/bookkeeper/PulsarRegistrationClient.java | 14 ++++++++++++--
1 file changed, 12 insertions(+), 2 deletions(-)
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
index be945d988fb..89dbf2be990 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
@@ -181,8 +181,13 @@ public class PulsarRegistrationClient implements
RegistrationClient {
@Override
public CompletableFuture<Void> watchWritableBookies(RegistrationListener
registrationListener) {
writableBookiesWatchers.add(registrationListener);
+ // trigger all listeners in writableBookiesWatchers one by one. It
aims to keep a sync way
+ // to make sure the previous listener has finished when a new listener
is register.
+ // Though it would bring duplicate trigger listener problem, but since
watchWritableBookies
+ // is only executed when bookieClient construct, the duplicate problem
is acceptable.
return getWritableBookies()
- .thenAcceptAsync(registrationListener::onBookiesChanged,
executor);
+ .thenAcceptAsync(bookies ->
+ writableBookiesWatchers.forEach(w ->
w.onBookiesChanged(bookies)), executor);
}
@Override
@@ -193,8 +198,13 @@ public class PulsarRegistrationClient implements
RegistrationClient {
@Override
public CompletableFuture<Void> watchReadOnlyBookies(RegistrationListener
registrationListener) {
readOnlyBookiesWatchers.add(registrationListener);
+ // trigger all listeners in readOnlyBookiesWatchers one by one. It
aims to keep a sync way
+ // to make sure the previous listener has finished when a new listener
is register.
+ // Though it would bring duplicate trigger listener problem, but since
watchReadOnlyBookies
+ // is only executed when bookieClient construct, the duplicate problem
is acceptable.
return getReadOnlyBookies()
- .thenAcceptAsync(registrationListener::onBookiesChanged,
executor);
+ .thenAcceptAsync(bookies ->
+ readOnlyBookiesWatchers.forEach(w ->
w.onBookiesChanged(bookies)), executor);
}
@Override