315157973 commented on a change in pull request #10790:
URL: https://github.com/apache/pulsar/pull/10790#discussion_r644498422



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -4035,4 +4035,126 @@ protected void internalTruncateTopic(AsyncResponse 
asyncResponse, boolean author
             });
         }
     }
+
+    protected void internalSetReplicatedSubscriptionStatus(AsyncResponse 
asyncResponse, String subName,
+            boolean authoritative, Boolean enabled) {
+        log.info("[{}] Attempting to change replicated subscription status to 
{} - {} {}", clientAppId(), enabled,
+                topicName, subName);
+
+        if (enabled == null) {

Review comment:
       required = true, it is impossible to be null here, can we use boolean?

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -4035,4 +4035,134 @@ protected void internalTruncateTopic(AsyncResponse 
asyncResponse, boolean author
             });
         }
     }
+
+    protected void internalSetReplicatedSubscriptionStatus(AsyncResponse 
asyncResponse, String subName,
+            boolean authoritative, Boolean enabled) {
+        log.info("[{}] Attempting to change replicated subscription status to 
{} - {} {}", clientAppId(), enabled,
+                topicName, subName);
+
+        if (enabled == null) {
+            asyncResponse.resume(new RestException(Status.BAD_REQUEST, 
"Boolean type request body is required"));
+            return;
+        }
+
+        // Reject the request if the topic is not persistent
+        if (!topicName.isPersistent()) {
+            asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
+                    "Cannot enable/disable replicated subscriptions on 
non-persistent topics"));
+            return;
+        }
+
+        // Reject the request if the topic is not global
+        if (!topicName.isGlobal()) {
+            asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
+                    "Cannot enable/disable replicated subscriptions on 
non-global topics"));
+            return;
+        }
+
+        // Permission to consume this topic is required
+        try {
+            validateTopicOperation(topicName, 
TopicOperation.SET_REPLICATED_SUBSCRIPTION_STATUS, subName);
+        } catch (Exception e) {
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+            return;
+        }
+
+        // Redirect the request to the peer-cluster if the local cluster is 
not included in the replication clusters
+        try {
+            validateGlobalNamespaceOwnership(namespaceName);
+        } catch (Exception e) {
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+            return;
+        }
+
+        // If the topic name is a partition name, no need to get partition 
topic metadata again
+        if (topicName.isPartitioned()) {
+            
internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse, 
subName, authoritative,
+                    enabled);
+        } else {
+            getPartitionedTopicMetadataAsync(topicName, authoritative, 
false).thenAccept(partitionMetadata -> {
+                if (partitionMetadata.partitions > 0) {
+                    final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
+
+                    for (int i = 0; i < partitionMetadata.partitions; i++) {
+                        TopicName topicNamePartition = 
topicName.getPartition(i);
+                        try {
+                            
futures.add(pulsar().getAdminClient().topics().setReplicatedSubscriptionStatusAsync(
+                                    topicNamePartition.toString(), subName, 
enabled));
+                        } catch (Exception e) {
+                            log.warn("[{}] Failed to change replicated 
subscription status to {} - {} {}",
+                                    clientAppId(), enabled, 
topicNamePartition, subName, e);
+                            resumeAsyncResponseExceptionally(asyncResponse, e);
+                            return;
+                        }
+                    }
+
+                    FutureUtil.waitForAll(futures).handle((result, exception) 
-> {
+                        if (exception != null) {
+                            Throwable t = exception.getCause();
+                            if (t instanceof NotFoundException) {
+                                asyncResponse
+                                        .resume(new 
RestException(Status.NOT_FOUND, "Topic or subscription not found"));
+                                return null;
+                            } else if (t instanceof 
PreconditionFailedException) {
+                                asyncResponse.resume(new 
RestException(Status.PRECONDITION_FAILED,
+                                        "Cannot enable/disable replicated 
subscriptions on non-global topics"));
+                                return null;
+                            } else {
+                                log.warn("[{}] Failed to change replicated 
subscription status to {} - {} {}",
+                                        clientAppId(), enabled, topicName, 
subName, t);
+                                asyncResponse.resume(new RestException(t));
+                                return null;
+                            }
+                        }
+
+                        asyncResponse.resume(Response.noContent().build());
+                        return null;
+                    });
+                } else {
+                    
internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse, 
subName, authoritative,
+                            enabled);
+                }
+            }).exceptionally(ex -> {
+                log.warn("[{}] Failed to change replicated subscription status 
to {} - {} {}", clientAppId(), enabled,
+                        topicName, subName, ex);
+                resumeAsyncResponseExceptionally(asyncResponse, ex);
+                return null;
+            });
+        }
+    }
+
+    private void 
internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(AsyncResponse 
asyncResponse,
+            String subName, boolean authoritative, Boolean enabled) {

Review comment:
       Should we use boolean

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -4035,4 +4035,134 @@ protected void internalTruncateTopic(AsyncResponse 
asyncResponse, boolean author
             });
         }
     }
+
+    protected void internalSetReplicatedSubscriptionStatus(AsyncResponse 
asyncResponse, String subName,
+            boolean authoritative, Boolean enabled) {
+        log.info("[{}] Attempting to change replicated subscription status to 
{} - {} {}", clientAppId(), enabled,
+                topicName, subName);
+
+        if (enabled == null) {
+            asyncResponse.resume(new RestException(Status.BAD_REQUEST, 
"Boolean type request body is required"));
+            return;
+        }
+
+        // Reject the request if the topic is not persistent
+        if (!topicName.isPersistent()) {
+            asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
+                    "Cannot enable/disable replicated subscriptions on 
non-persistent topics"));
+            return;
+        }
+
+        // Reject the request if the topic is not global
+        if (!topicName.isGlobal()) {
+            asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
+                    "Cannot enable/disable replicated subscriptions on 
non-global topics"));
+            return;
+        }
+
+        // Permission to consume this topic is required
+        try {
+            validateTopicOperation(topicName, 
TopicOperation.SET_REPLICATED_SUBSCRIPTION_STATUS, subName);
+        } catch (Exception e) {
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+            return;
+        }
+
+        // Redirect the request to the peer-cluster if the local cluster is 
not included in the replication clusters
+        try {
+            validateGlobalNamespaceOwnership(namespaceName);
+        } catch (Exception e) {
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+            return;
+        }
+
+        // If the topic name is a partition name, no need to get partition 
topic metadata again
+        if (topicName.isPartitioned()) {

Review comment:
       Is there missing one `!` 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to