eolivelli commented on code in PR #17254:
URL: https://github.com/apache/pulsar/pull/17254#discussion_r953810374
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java:
##########
@@ -644,6 +646,25 @@ private CompletableFuture<Optional<CandidateBrokerResult>>
getCandidateBrokerRes
}
+ private CompletableFuture<Optional<LeaderBroker>>
getOrWaitForLeader(AtomicInteger retries) {
+ return pulsar.getLeaderElectionService().readCurrentLeader()
+ .handle((leaderBroker, t) -> {
+ CompletableFuture<Optional<LeaderBroker>> retval;
+ if ((t != null || !leaderBroker.isPresent()) &&
retries.getAndIncrement() < 3) {
+ // retry after a delay of 5 seconds
+ retval =
+ CompletableFuture.supplyAsync(() -> null,
+
CompletableFuture.delayedExecutor(5, SECONDS))
Review Comment:
which thread pool will run this execution ? (I guess it would be the system
common pool)
should we use a well-known thread pool ?
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java:
##########
@@ -644,6 +646,25 @@ private CompletableFuture<Optional<CandidateBrokerResult>>
getCandidateBrokerRes
}
+ private CompletableFuture<Optional<LeaderBroker>>
getOrWaitForLeader(AtomicInteger retries) {
+ return pulsar.getLeaderElectionService().readCurrentLeader()
+ .handle((leaderBroker, t) -> {
+ CompletableFuture<Optional<LeaderBroker>> retval;
+ if ((t != null || !leaderBroker.isPresent()) &&
retries.getAndIncrement() < 3) {
+ // retry after a delay of 5 seconds
+ retval =
+ CompletableFuture.supplyAsync(() -> null,
+
CompletableFuture.delayedExecutor(5, SECONDS))
+ .thenCompose(__ ->
getOrWaitForLeader(retries));
+ } else {
+ retval = CompletableFuture.completedFuture(
+ leaderBroker != null ? leaderBroker :
Optional.empty());
+ }
+ return retval;
+ })
+ .thenCompose(Function.identity());
Review Comment:
why do we need this step ?
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java:
##########
@@ -644,6 +646,25 @@ private CompletableFuture<Optional<CandidateBrokerResult>>
getCandidateBrokerRes
}
+ private CompletableFuture<Optional<LeaderBroker>>
getOrWaitForLeader(AtomicInteger retries) {
+ return pulsar.getLeaderElectionService().readCurrentLeader()
+ .handle((leaderBroker, t) -> {
+ CompletableFuture<Optional<LeaderBroker>> retval;
+ if ((t != null || !leaderBroker.isPresent()) &&
retries.getAndIncrement() < 3) {
+ // retry after a delay of 5 seconds
Review Comment:
tracking this delay with a debugger or with a jstack dump is very hard.
maybe we should log something at INFO level
otherwise lookup operations will hang without any observable sign
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]