AnonHxy commented on code in PR #17251:
URL: https://github.com/apache/pulsar/pull/17251#discussion_r954574661


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -4357,39 +4357,50 @@ private PersistentReplicator 
getReplicatorReference(String replName, PersistentT
         }
     }
 
+    private CompletableFuture<Void> updatePartitionedTopicMetadataAsync(int 
numPartitions) {
+        CompletableFuture<Void> future = 
namespaceResources().getPartitionedTopicResources()
+                .updatePartitionedTopicAsync(topicName, p -> new 
PartitionedTopicMetadata(numPartitions, p.properties));
+        future.exceptionally(ex -> {
+            // If the update operation fails, clean up the partitions that 
were created
+            getPartitionedTopicMetadataAsync(topicName, false, 
false).thenAccept(metadata -> {
+                int oldPartition = metadata.partitions;
+                for (int i = oldPartition; i < numPartitions; i++) {
+                    
topicResources().deletePersistentTopicAsync(topicName.getPartition(i)).exceptionally(ex1
 -> {
+                        log.warn("[{}] Failed to clean up managedLedger {}", 
clientAppId(), topicName,
+                                ex1.getCause());
+                        return null;
+                    });
+                }
+            }).exceptionally(e -> {
+                log.warn("[{}] Failed to clean up managedLedger", topicName, 
e);
+                return null;
+            });
+            return null;
+        });
+        return future;
+    }
 
     private CompletableFuture<Void> updatePartitionedTopic(TopicName 
topicName, int numPartitions, boolean force) {
         CompletableFuture<Void> result = new CompletableFuture<>();
-        createSubscriptions(topicName, numPartitions).thenCompose(__ -> {
-            CompletableFuture<Void> future = 
namespaceResources().getPartitionedTopicResources()
-                    .updatePartitionedTopicAsync(topicName, p ->
-                        new PartitionedTopicMetadata(numPartitions, 
p.properties));
-            future.exceptionally(ex -> {
-                // If the update operation fails, clean up the partitions that 
were created
-                getPartitionedTopicMetadataAsync(topicName, false, 
false).thenAccept(metadata -> {
-                    int oldPartition = metadata.partitions;
-                    for (int i = oldPartition; i < numPartitions; i++) {
-                        
topicResources().deletePersistentTopicAsync(topicName.getPartition(i)).exceptionally(ex1
 -> {
-                            log.warn("[{}] Failed to clean up managedLedger 
{}", clientAppId(), topicName,
-                                    ex1.getCause());

Review Comment:
   It looks like there is no need to move this code block to a new method  
`updatePartitionedTopicMetadataAsync`, which will make more work fro review.  I 
prefer to keep it the way it is in this PR.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -4357,39 +4357,50 @@ private PersistentReplicator 
getReplicatorReference(String replName, PersistentT
         }
     }
 
+    private CompletableFuture<Void> updatePartitionedTopicMetadataAsync(int 
numPartitions) {
+        CompletableFuture<Void> future = 
namespaceResources().getPartitionedTopicResources()
+                .updatePartitionedTopicAsync(topicName, p -> new 
PartitionedTopicMetadata(numPartitions, p.properties));
+        future.exceptionally(ex -> {
+            // If the update operation fails, clean up the partitions that 
were created
+            getPartitionedTopicMetadataAsync(topicName, false, 
false).thenAccept(metadata -> {
+                int oldPartition = metadata.partitions;
+                for (int i = oldPartition; i < numPartitions; i++) {
+                    
topicResources().deletePersistentTopicAsync(topicName.getPartition(i)).exceptionally(ex1
 -> {
+                        log.warn("[{}] Failed to clean up managedLedger {}", 
clientAppId(), topicName,
+                                ex1.getCause());
+                        return null;
+                    });
+                }
+            }).exceptionally(e -> {
+                log.warn("[{}] Failed to clean up managedLedger", topicName, 
e);
+                return null;
+            });
+            return null;
+        });
+        return future;
+    }
 
     private CompletableFuture<Void> updatePartitionedTopic(TopicName 
topicName, int numPartitions, boolean force) {
         CompletableFuture<Void> result = new CompletableFuture<>();
-        createSubscriptions(topicName, numPartitions).thenCompose(__ -> {
-            CompletableFuture<Void> future = 
namespaceResources().getPartitionedTopicResources()
-                    .updatePartitionedTopicAsync(topicName, p ->
-                        new PartitionedTopicMetadata(numPartitions, 
p.properties));
-            future.exceptionally(ex -> {
-                // If the update operation fails, clean up the partitions that 
were created
-                getPartitionedTopicMetadataAsync(topicName, false, 
false).thenAccept(metadata -> {
-                    int oldPartition = metadata.partitions;
-                    for (int i = oldPartition; i < numPartitions; i++) {
-                        
topicResources().deletePersistentTopicAsync(topicName.getPartition(i)).exceptionally(ex1
 -> {
-                            log.warn("[{}] Failed to clean up managedLedger 
{}", clientAppId(), topicName,
-                                    ex1.getCause());
+        createSubscriptions(topicName, numPartitions).thenCompose(
+                        __ -> 
updatePartitionedTopicMetadataAsync(numPartitions))
+                .thenAccept(__ -> result.complete(null))
+                .exceptionally(ex -> {
+                    if (force && ex.getCause() instanceof 
PulsarAdminException.ConflictException) {
+                        log.warn("[{}] update partitioned topic's partition 
num {} with ignoring conflict exception",

Review Comment:
   This exception is throw from `createSubscriptions`, I think it's better 
handle the exception in `createSubscriptions`. BTW,  could we make sure all the 
subscriptions have been created if `ConflictException` happens and we ignore it 
here.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -480,10 +481,9 @@ protected CompletableFuture<Void> 
internalUpdatePartitionedTopicAsync(int numPar
                                 if (!updateLocalTopicOnly) {
                                     return 
updatePartitionInOtherCluster(numPartitions, clusters)
                                         .thenCompose(v -> 
namespaceResources().getPartitionedTopicResources()
-                                                        
.updatePartitionedTopicAsync(topicName, p ->

Review Comment:
   This modification is unrelation to this PR, we'd better do keep it the way 
it is



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to