codelipenghui commented on code in PR #25443:
URL: https://github.com/apache/pulsar/pull/25443#discussion_r3192488352


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -4136,6 +4153,38 @@ public void 
setPulsarChannelInitializerFactory(PulsarChannelInitializer.Factory
         this.pulsarChannelInitFactory = factory;
     }
 
+    /**
+     * @return Triple [namespace policies, global topic policies, topic 
policies].
+     */
+    public CompletableFuture<Boolean> isCurrentClusterAllowed(@NonNull 
TopicName topicName) {
+        final String cluster = getPulsar().getConfig().getClusterName();
+        return getCombinedTopicPolicies(topicName).thenApply(triple -> {
+            Optional<TopicPolicies> topicP = triple.getRight();
+            Optional<TopicPolicies> globalTopicP = triple.getMiddle();
+            Optional<Policies> nsPolicies = triple.getLeft();
+            // Disabled a cluster for a namespace manually.
+            if (nsPolicies.isPresent() && 
!isCurrentClusterAllowed(topicName.getNamespaceObject(), nsPolicies.get())) {

Review Comment:
   Possible regression: `isCurrentClusterAllowed(NamespaceName, Policies)` 
returns false when namespace `replication_clusters` excludes the current 
cluster (with empty `allowed_clusters`). But topic-level `replicationClusters` 
is supposed to override namespace-level defaults — e.g. ns 
`replication_clusters=[c1]`, topic `replicationClusters=[c1,c2]` should be 
allowed on `c2`, but this short-circuit returns false.
   
   Suggest only short-circuiting on the `allowed_clusters` hard gate (PIP-321), 
then deferring to topic-level checks before falling back to ns 
`replication_clusters`.



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java:
##########
@@ -149,6 +149,15 @@ public void testDeleteRemoteTopicByGlobalPolicy() throws 
Exception {
         });
         waitReplicatorStopped(subTopic, pulsar1, pulsar2, true);
 
+        try {
+            admin2.topics().createMissedPartitions(topicName);

Review Comment:
   Consider extracting these assertions into their own `@Test` method.
   
   Only the global-topic-policy branch is exercised — adding cases for the 
local-topic-level and namespace `allowed_clusters` branches plus a positive 
happy-path assertion would catch inverted-boolean regressions.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -4136,6 +4153,38 @@ public void 
setPulsarChannelInitializerFactory(PulsarChannelInitializer.Factory
         this.pulsarChannelInitFactory = factory;
     }
 
+    /**
+     * @return Triple [namespace policies, global topic policies, topic 
policies].
+     */
+    public CompletableFuture<Boolean> isCurrentClusterAllowed(@NonNull 
TopicName topicName) {
+        final String cluster = getPulsar().getConfig().getClusterName();
+        return getCombinedTopicPolicies(topicName).thenApply(triple -> {
+            Optional<TopicPolicies> topicP = triple.getRight();
+            Optional<TopicPolicies> globalTopicP = triple.getMiddle();
+            Optional<Policies> nsPolicies = triple.getLeft();
+            // Disabled a cluster for a namespace manually.
+            if (nsPolicies.isPresent() && 
!isCurrentClusterAllowed(topicName.getNamespaceObject(), nsPolicies.get())) {
+                return false;
+            }
+            // Manually enabled topic-level replication, which can skip to set 
a namespace-level replication.
+            if (topicP.isPresent() && 
CollectionUtils.isNotEmpty(topicP.get().getReplicationClusters())) {
+                if (topicP.get().getReplicationClusters().contains(cluster)) {
+                    return true;
+                } else {
+                    return false;
+                }

Review Comment:
   `if (x) return true; else return false;` → `return 
topicP.get().getReplicationClusters().contains(cluster);` (same simplification 
applies to the `globalTopicP` block below).



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java:
##########
@@ -149,6 +149,15 @@ public void testDeleteRemoteTopicByGlobalPolicy() throws 
Exception {
         });
         waitReplicatorStopped(subTopic, pulsar1, pulsar2, true);
 
+        try {
+            admin2.topics().createMissedPartitions(topicName);
+            fail("The action that creates mission partitions should have 
thrown exception");
+        } catch (Exception e) {
+            assertTrue(e.getMessage().contains("is not allowed to be loaded 
up"));

Review Comment:
   This couples the test to the English error string (and to the `polices` 
typo). Prefer catching `PulsarAdminException` and asserting `getStatusCode() == 
400`.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -4136,6 +4153,38 @@ public void 
setPulsarChannelInitializerFactory(PulsarChannelInitializer.Factory
         this.pulsarChannelInitFactory = factory;
     }
 
+    /**
+     * @return Triple [namespace policies, global topic policies, topic 
policies].

Review Comment:
   Stale copy-paste from `getCombinedTopicPolicies` — this method returns 
`CompletableFuture<Boolean>`, not a `Triple`.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -550,30 +551,45 @@ private CompletableFuture<Set<String>> 
getReplicationClusters() {
     }
 
     protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) 
{
-        getPartitionedTopicMetadataAsync(topicName, false, 
false).thenAccept(metadata -> {
-            if (metadata != null && metadata.partitions > 0) {
-                validateNamespaceOperationAsync(topicName.getNamespaceObject(),
-                        NamespaceOperation.CREATE_TOPIC)
-                .thenCompose(__ -> 
tryCreatePartitionsAsync(metadata.partitions)).thenAccept(v -> {
-                    asyncResponse.resume(Response.noContent().build());
-                }).exceptionally(e -> {
-                    log.error()
-                            .attr("topic", topicName)
-                            .log("Failed to create partitions for topic");
-                    resumeAsyncResponseExceptionally(asyncResponse, e);
-                    return null;
-                });
-            } else {
-                throw new RestException(Status.NOT_FOUND, String.format("Topic 
%s does not exist", topicName));
-            }
-        }).exceptionally(ex -> {
+        Consumer<Throwable> errorHandler = ex -> {
             // If the exception is not redirect exception we need to log it.
             if (!isRedirectException(ex)) {
                 log.error()
                         .attr("topic", topicName)
                         .log("Failed to create partitions for topic");
             }
             resumeAsyncResponseExceptionally(asyncResponse, ex);
+        };
+        
pulsar().getBrokerService().isCurrentClusterAllowed(topicName).thenAccept(allowed
 -> {
+            if (!allowed) {
+                resumeAsyncResponseExceptionally(asyncResponse,
+                    new RestException(Status.BAD_REQUEST, String.format("Topic 
[%s] is not allowed to be loaded"

Review Comment:
   Two nits: typo `polices` → `policies`; and "loaded up" misdescribes the 
action — this is `createMissedPartitions`, not topic loading. Including the 
current cluster name in the message would help operators identify which cluster 
rejected the request.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to