BewareMyPower commented on code in PR #22797:
URL: https://github.com/apache/pulsar/pull/22797#discussion_r1620199371


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -449,20 +449,52 @@ protected CompletableFuture<Void> 
internalCreateNonPartitionedTopicAsync(boolean
                     }
                     // update remote cluster
                     return namespaceResources().getPoliciesAsync(namespaceName)
+                            .thenCompose(CompletableFuture::completedFuture)
                             .thenCompose(policies -> {
-                                if (!policies.isPresent()) {
+                                if (policies.isEmpty()) {
                                     return 
CompletableFuture.completedFuture(null);
                                 }
                                 // Combine namespace level policies and topic 
level policies.
                                 Set<String> replicationClusters = 
policies.get().replication_clusters;
                                 TopicPolicies topicPolicies =
                                         
pulsarService.getTopicPoliciesService().getTopicPoliciesIfExists(topicName);
-                                if (topicPolicies != null) {
-                                    replicationClusters = new 
HashSet<>(topicPolicies.getReplicationClusters());
+                                if (topicPolicies != null && 
topicPolicies.getReplicationClusters() != null) {
+                                    replicationClusters = new HashSet<>();
                                 }
+
+                                CompletableFuture<Void> checkShadowTopics = 
Optional.ofNullable(topicPolicies)
+                                        .map(TopicPolicies::getShadowTopics)
+                                        .filter(shadowTopics -> 
!shadowTopics.isEmpty())
+                                        .map(shadowTopics -> {
+                                            List<CompletableFuture<Void>> 
futures = shadowTopics.stream()
+                                                    .map(shadowTopic -> 
getPartitionedTopicMetadataAsync(
+                                                            
TopicName.get(shadowTopic), false, false)
+                                                            
.thenCompose(metadata -> {
+                                                                if 
(metadata.partitions < expectPartitions) {
+                                                                    return 
CompletableFuture.<Void>failedFuture(
+                                                                            
new RestException(400, String.format(
+                                                                               
     "The shadow topic %s has fewer "
+                                                                               
     + "partitions than the current "
+                                                                               
     + "topic. Currently, it only "
+                                                                               
     + "contains %d partitions. Please"
+                                                                               
     + " expand the partitions of the "
+                                                                               
     + "shadow topic %s first.",
+                                                                               
     shadowTopic, metadata.partitions,
+                                                                               
     shadowTopic
+                                                                            ))
+                                                                    );
+                                                                }
+                                                                return 
CompletableFuture.completedFuture(null);
+                                                            }))
+                                                    
.collect(Collectors.toList());
+                                            return 
FutureUtil.waitForAll(futures);
+                                        })
+                                        
.orElse(CompletableFuture.completedFuture(null));

Review Comment:
   Could you add a new method for it? It's a deep recursion code that harms the 
readability.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to