mattisonchao commented on a change in pull request #13935:
URL: https://github.com/apache/pulsar/pull/13935#discussion_r791344954
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
##########
@@ -358,6 +375,25 @@ protected void validateClusterForTenant(String tenant,
String cluster) {
log.info("Successfully validated clusters on tenant [{}]", tenant);
}
+ protected CompletableFuture<Void> validateClusterOwnershipAsync(String
cluster){
Review comment:
```suggestion
protected CompletableFuture<Void> validateClusterOwnershipAsync(String
cluster) {
```
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
##########
@@ -407,36 +432,28 @@ private URI getRedirectionUrl(ClusterData
differentClusterData) throws Malformed
protected static CompletableFuture<ClusterData>
getClusterDataIfDifferentCluster(PulsarService pulsar,
String cluster,
String clientAppId) {
-
CompletableFuture<ClusterData> clusterDataFuture = new
CompletableFuture<>();
-
- if (!isValidCluster(pulsar, cluster)) {
- try {
+ if (isValidCluster(pulsar, cluster) ||
// this code should only happen with a v1 namespace format
prop/cluster/namespaces
- if
(!pulsar.getConfiguration().getClusterName().equals(cluster)) {
- // redirect to the cluster requested
-
pulsar.getPulsarResources().getClusterResources().getClusterAsync(cluster)
- .thenAccept(clusterDataResult -> {
- if (clusterDataResult.isPresent()) {
-
clusterDataFuture.complete(clusterDataResult.get());
- } else {
- log.warn("[{}] Cluster does not exist:
requested={}", clientAppId, cluster);
-
clusterDataFuture.completeExceptionally(new RestException(Status.NOT_FOUND,
- "Cluster does not exist: cluster="
+ cluster));
- }
- }).exceptionally(ex -> {
- clusterDataFuture.completeExceptionally(ex);
- return null;
- });
- } else {
- clusterDataFuture.complete(null);
- }
- } catch (Exception e) {
- clusterDataFuture.completeExceptionally(e);
- }
- } else {
+ pulsar.getConfiguration().getClusterName().equals(cluster)) {
clusterDataFuture.complete(null);
+ return clusterDataFuture;
}
+ // redirect to the cluster requested
+
pulsar.getPulsarResources().getClusterResources().getClusterAsync(cluster)
+ .whenComplete((clusterDataResult, ex) -> {
+ if (ex != null){
Review comment:
```suggestion
if (ex != null) {
```
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
##########
@@ -358,6 +375,25 @@ protected void validateClusterForTenant(String tenant,
String cluster) {
log.info("Successfully validated clusters on tenant [{}]", tenant);
}
+ protected CompletableFuture<Void> validateClusterOwnershipAsync(String
cluster){
+ return getClusterDataIfDifferentCluster(pulsar(), cluster,
clientAppId())
+ .thenAccept(differentClusterData -> {
+ if (differentClusterData != null){
Review comment:
```suggestion
if (differentClusterData != null) {
```
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
##########
@@ -85,16 +85,21 @@
@ApiResponse(code = 401, message = "Authentication required"),
@ApiResponse(code = 403, message = "This operation requires
super-user access"),
@ApiResponse(code = 404, message = "Cluster does not exist:
cluster={clustername}") })
- public Set<String> getActiveBrokers(@PathParam("cluster") String cluster)
throws Exception {
- validateSuperUserAccess();
- validateClusterOwnership(cluster);
-
- try {
- return pulsar().getLoadManager().get().getAvailableBrokers();
- } catch (Exception e) {
- LOG.error("[{}] Failed to get active broker list: cluster={}",
clientAppId(), cluster, e);
- throw new RestException(e);
- }
+ public void getActiveBrokers(@Suspended final AsyncResponse asyncResponse,
+ @PathParam("cluster") String cluster) {
+ validateSuperUserAccessAsync()
+ .thenCompose(__ -> validateClusterOwnershipAsync(cluster))
+ .thenCompose(__ ->
pulsar().getLoadManager().get().getAvailableBrokersAsync())
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex ->{
Review comment:
```suggestion
.exceptionally(ex -> {
```
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
##########
@@ -85,16 +85,21 @@
@ApiResponse(code = 401, message = "Authentication required"),
@ApiResponse(code = 403, message = "This operation requires
super-user access"),
@ApiResponse(code = 404, message = "Cluster does not exist:
cluster={clustername}") })
- public Set<String> getActiveBrokers(@PathParam("cluster") String cluster)
throws Exception {
- validateSuperUserAccess();
- validateClusterOwnership(cluster);
-
- try {
- return pulsar().getLoadManager().get().getAvailableBrokers();
- } catch (Exception e) {
- LOG.error("[{}] Failed to get active broker list: cluster={}",
clientAppId(), cluster, e);
- throw new RestException(e);
- }
+ public void getActiveBrokers(@Suspended final AsyncResponse asyncResponse,
+ @PathParam("cluster") String cluster) {
+ validateSuperUserAccessAsync()
+ .thenCompose(__ -> validateClusterOwnershipAsync(cluster))
+ .thenCompose(__ ->
pulsar().getLoadManager().get().getAvailableBrokersAsync())
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex ->{
+ Throwable realCause =
FutureUtil.unwrapCompletionException(ex);
+ if (realCause instanceof WebApplicationException){
Review comment:
```suggestion
if (realCause instanceof WebApplicationException) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]