eolivelli commented on code in PR #19086:
URL: https://github.com/apache/pulsar/pull/19086#discussion_r1064338938


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java:
##########
@@ -2676,15 +2676,12 @@ public void testFailedUpdatePartitionedTopic() throws 
Exception {
         
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
 startPartitions);
 
         // create a subscription for few new partition which can fail
-        admin.topics().createSubscription(partitionedTopicName + "-partition-" 
+ startPartitions, subName1,
-                MessageId.earliest);
-
         try {
-            admin.topics().updatePartitionedTopic(partitionedTopicName, 
newPartitions, false, false);
-        } catch (PulsarAdminException.PreconditionFailedException e) {
-            // Ok
+            admin.topics().createSubscription(partitionedTopicName + 
"-partition-" + startPartitions, subName1,
+                    MessageId.earliest);

Review Comment:
   if we expect an error we have to fail() if we reach this point



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -4363,124 +4380,100 @@ private PersistentReplicator 
getReplicatorReference(String replName, PersistentT
         }
     }
 
-    private CompletableFuture<Void> updatePartitionedTopic(TopicName 
topicName, int numPartitions, boolean force) {
-        CompletableFuture<Void> result = new CompletableFuture<>();
-        createSubscriptions(topicName, numPartitions, force).thenCompose(__ -> 
{
-            CompletableFuture<Void> future = 
namespaceResources().getPartitionedTopicResources()
-                    .updatePartitionedTopicAsync(topicName, p ->
-                        new PartitionedTopicMetadata(numPartitions, 
p.properties));
-            future.exceptionally(ex -> {
-                // If the update operation fails, clean up the partitions that 
were created
-                getPartitionedTopicMetadataAsync(topicName, false, 
false).thenAccept(metadata -> {
-                    int oldPartition = metadata.partitions;
-                    for (int i = oldPartition; i < numPartitions; i++) {
-                        
topicResources().deletePersistentTopicAsync(topicName.getPartition(i)).exceptionally(ex1
 -> {
-                            log.warn("[{}] Failed to clean up managedLedger 
{}", clientAppId(), topicName,
-                                    ex1.getCause());
-                            return null;
-                        });
-                    }
-                }).exceptionally(e -> {
-                    log.warn("[{}] Failed to clean up managedLedger", 
topicName, e);
-                    return null;
-                });
+    private CompletableFuture<Void> updatePartitionedTopic(TopicName 
topicName, int expectPartitions) {
+        CompletableFuture<Void> future = 
namespaceResources().getPartitionedTopicResources()
+                .updatePartitionedTopicAsync(topicName, p ->
+                        new PartitionedTopicMetadata(expectPartitions, 
p.properties));
+        future.exceptionally(ex -> {
+            // If the update operation fails, clean up the partitions that 
were created
+            getPartitionedTopicMetadataAsync(topicName, false, false)
+                    .thenAccept(metadata -> {
+                int oldPartition = metadata.partitions;
+                for (int i = oldPartition; i < expectPartitions; i++) {
+                    
topicResources().deletePersistentTopicAsync(topicName.getPartition(i)).exceptionally(ex1
 -> {

Review Comment:
   for a follow up work...
   here we are not waiting for the operation to complete, we should try to wait 
otherwise the topic is in a inconsistent state when the command returns
   
   
   cc @dlg99 



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -4363,124 +4380,100 @@ private PersistentReplicator 
getReplicatorReference(String replName, PersistentT
         }
     }
 
-    private CompletableFuture<Void> updatePartitionedTopic(TopicName 
topicName, int numPartitions, boolean force) {
-        CompletableFuture<Void> result = new CompletableFuture<>();
-        createSubscriptions(topicName, numPartitions, force).thenCompose(__ -> 
{
-            CompletableFuture<Void> future = 
namespaceResources().getPartitionedTopicResources()
-                    .updatePartitionedTopicAsync(topicName, p ->
-                        new PartitionedTopicMetadata(numPartitions, 
p.properties));
-            future.exceptionally(ex -> {
-                // If the update operation fails, clean up the partitions that 
were created
-                getPartitionedTopicMetadataAsync(topicName, false, 
false).thenAccept(metadata -> {
-                    int oldPartition = metadata.partitions;
-                    for (int i = oldPartition; i < numPartitions; i++) {
-                        
topicResources().deletePersistentTopicAsync(topicName.getPartition(i)).exceptionally(ex1
 -> {
-                            log.warn("[{}] Failed to clean up managedLedger 
{}", clientAppId(), topicName,
-                                    ex1.getCause());
-                            return null;
-                        });
-                    }
-                }).exceptionally(e -> {
-                    log.warn("[{}] Failed to clean up managedLedger", 
topicName, e);
-                    return null;
-                });
+    private CompletableFuture<Void> updatePartitionedTopic(TopicName 
topicName, int expectPartitions) {
+        CompletableFuture<Void> future = 
namespaceResources().getPartitionedTopicResources()
+                .updatePartitionedTopicAsync(topicName, p ->
+                        new PartitionedTopicMetadata(expectPartitions, 
p.properties));
+        future.exceptionally(ex -> {
+            // If the update operation fails, clean up the partitions that 
were created
+            getPartitionedTopicMetadataAsync(topicName, false, false)
+                    .thenAccept(metadata -> {
+                int oldPartition = metadata.partitions;
+                for (int i = oldPartition; i < expectPartitions; i++) {
+                    
topicResources().deletePersistentTopicAsync(topicName.getPartition(i)).exceptionally(ex1
 -> {
+                        log.warn("[{}] Failed to clean up managedLedger {}", 
clientAppId(), topicName,
+                                ex1.getCause());
+                        return null;
+                    });
+                }
+            }).exceptionally(e -> {
+                log.warn("[{}] Failed to clean up managedLedger", topicName, 
e);
                 return null;
             });
-            return future;
-        }).thenAccept(__ -> result.complete(null)).exceptionally(ex -> {
-            result.completeExceptionally(ex);
             return null;
         });
-        return result;
+        return future.thenCompose(__ -> createSubscriptions(topicName, 
expectPartitions));
     }
 
     /**
      * It creates subscriptions for new partitions of existing 
partitioned-topics.
      *
      * @param topicName     : topic-name: persistent://prop/cluster/ns/topic
-     * @param numPartitions : number partitions for the topics
-     * @param ignoreConflictException : If true, ignore ConflictException: 
subscription already exists for topic
+     * @param expectPartitions : number of expected partitions
      *
      */
-    private CompletableFuture<Void> createSubscriptions(TopicName topicName, 
int numPartitions,
-                                                       boolean 
ignoreConflictException) {
+    private CompletableFuture<Void> createSubscriptions(TopicName topicName, 
int expectPartitions) {
         CompletableFuture<Void> result = new CompletableFuture<>();
-        
pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).thenAccept(partitionMetadata
 -> {
-            if (partitionMetadata.partitions < 1) {
-                result.completeExceptionally(new 
RestException(Status.CONFLICT, "Topic is not partitioned topic"));
-                return;
-            }
-
-            if (partitionMetadata.partitions >= numPartitions) {
-                result.completeExceptionally(new RestException(Status.CONFLICT,
-                        "number of partitions must be more than existing " + 
partitionMetadata.partitions));
-                return;
-            }
-
-            PulsarAdmin admin;
-            try {
-                admin = pulsar().getAdminClient();
-            } catch (PulsarServerException e1) {
-                result.completeExceptionally(e1);
-                return;
-            }
+        if (expectPartitions < 1) {
+            return FutureUtil.failedFuture(new RestException(Status.CONFLICT, 
"Topic is not partitioned topic"));
+        }
+        PulsarAdmin admin;
+        try {
+            admin = pulsar().getAdminClient();
+        } catch (PulsarServerException e1) {
+            return FutureUtil.failedFuture(e1);
+        }
 
-            
admin.topics().getStatsAsync(topicName.getPartition(0).toString()).thenAccept(stats
 -> {
-                List<CompletableFuture<Void>> subscriptionFutures = new 
ArrayList<>();
+        
admin.topics().getStatsAsync(topicName.getPartition(0).toString()).thenAccept(stats
 -> {
+            List<CompletableFuture<Void>> subscriptionFutures = new 
ArrayList<>();
 
-                stats.getSubscriptions().entrySet().forEach(e -> {
-                    String subscription = e.getKey();
-                    SubscriptionStats ss = e.getValue();
-                    if (!ss.isDurable()) {
-                        // We must not re-create non-durable subscriptions on 
the new partitions
-                        return;
-                    }
-                    boolean replicated = ss.isReplicated();
-
-                    for (int i = partitionMetadata.partitions; i < 
numPartitions; i++) {
-                        final String topicNamePartition = 
topicName.getPartition(i).toString();
-                        CompletableFuture<Void> future = new 
CompletableFuture<>();
-                        
admin.topics().createSubscriptionAsync(topicNamePartition,
-                                        subscription, MessageId.latest, 
replicated).whenComplete((__, ex) -> {
-                            if (ex == null) {
+            stats.getSubscriptions().entrySet().forEach(e -> {
+                String subscription = e.getKey();
+                SubscriptionStats ss = e.getValue();
+                if (!ss.isDurable()) {
+                    // We must not re-create non-durable subscriptions on the 
new partitions
+                    return;
+                }
+                boolean replicated = ss.isReplicated();
+
+                for (int i = 0; i < expectPartitions; i++) {
+                    final String topicNamePartition = 
topicName.getPartition(i).toString();
+                    CompletableFuture<Void> future = new CompletableFuture<>();
+                    admin.topics().createSubscriptionAsync(topicNamePartition,
+                                    subscription, MessageId.latest, 
replicated).whenComplete((__, ex) -> {

Review Comment:
   here we are not passing the "subscriptionProperties"
   this is a problem, probably for a separate PR



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -411,67 +411,84 @@ protected CompletableFuture<Void> 
internalCreateNonPartitionedTopicAsync(boolean
      * recreate them at application so, newly created producers and consumers 
can connect to newly added partitions as
      * well. Therefore, it can violate partition ordering at producers until 
all producers are restarted at application.
      *
-     * @param numPartitions
+     * @param expectPartitions
      * @param updateLocalTopicOnly
      * @param authoritative
      * @param force
      */
-    protected CompletableFuture<Void> internalUpdatePartitionedTopicAsync(int 
numPartitions,
+    protected CompletableFuture<Void> internalUpdatePartitionedTopicAsync(int 
expectPartitions,
                                                                           
boolean updateLocalTopicOnly,
                                                                           
boolean authoritative, boolean force) {
-        if (numPartitions <= 0) {
-            return FutureUtil.failedFuture(new 
RestException(Status.NOT_ACCEPTABLE,
-                    "Number of partitions should be more than 0"));
+        if (expectPartitions <= 0) {
+            return FutureUtil.failedFuture(
+                    new RestException(Status.NOT_ACCEPTABLE, "Number of 
partitions should be more than 0"));
         }
         return validateTopicOwnershipAsync(topicName, authoritative)
-            .thenCompose(__ -> validateTopicPolicyOperationAsync(topicName, 
PolicyName.PARTITION,
-                    PolicyOperation.WRITE))
+            .thenCompose(__ ->
+                    validateTopicPolicyOperationAsync(topicName, 
PolicyName.PARTITION, PolicyOperation.WRITE))
             .thenCompose(__ -> {
                 if (!updateLocalTopicOnly && !force) {
-                    return 
validatePartitionTopicUpdateAsync(topicName.getLocalName(), numPartitions);
+                    return 
validatePartitionTopicUpdateAsync(topicName.getLocalName(), expectPartitions);
                 }  else {
                     return CompletableFuture.completedFuture(null);
                 }
             }).thenCompose(__ -> 
pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName))
             .thenCompose(topicMetadata -> {
                 final int maxPartitions = 
pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
-                if (maxPartitions > 0 && numPartitions > maxPartitions) {
+                if (maxPartitions > 0 && expectPartitions > maxPartitions) {
                     throw new RestException(Status.NOT_ACCEPTABLE,
                             "Number of partitions should be less than or equal 
to " + maxPartitions);
                 }
-                // Only do the validation if it's the first hop.
-                if (topicName.isGlobal() && 
isNamespaceReplicated(topicName.getNamespaceObject())) {
-                    return 
getNamespaceReplicatedClustersAsync(topicName.getNamespaceObject())
-                            .thenApply(clusters -> {
-                                if 
(!clusters.contains(pulsar().getConfig().getClusterName())) {
-                                    log.error("[{}] local cluster is not part 
of replicated cluster for namespace {}",
-                                    clientAppId(), topicName);
-                                    throw new RestException(Status.FORBIDDEN, 
"Local cluster is not part of replicate"
-                                            + " cluster list");
-                                }
-                                return clusters;
-                            })
-                            .thenCompose(clusters ->
-                                    
tryCreateExtendedPartitionsAsync(topicMetadata.partitions, numPartitions)
-                                            .thenApply(ignore -> clusters))
-                            .thenCompose(clusters -> 
createSubscriptions(topicName, numPartitions, force).thenApply(
-                                    ignore -> clusters))
-                            .thenCompose(clusters -> {
-                                if (!updateLocalTopicOnly) {
-                                    return 
updatePartitionInOtherCluster(numPartitions, clusters)
-                                        .thenCompose(v -> 
namespaceResources().getPartitionedTopicResources()
-                                                        
.updatePartitionedTopicAsync(topicName, p ->
-                                                                new 
PartitionedTopicMetadata(numPartitions,
-                                                                    
p.properties)
-                                                        ));
-                                } else {
-                                    return 
CompletableFuture.completedFuture(null);
-                                }
-                            });
-                } else {
-                    return 
tryCreateExtendedPartitionsAsync(topicMetadata.partitions, numPartitions)
-                            .thenCompose(ignore -> 
updatePartitionedTopic(topicName, numPartitions, force));
+                final PulsarAdmin adminClient;
+                try {
+                    adminClient = pulsar().getAdminClient();
+                } catch (PulsarServerException e) {
+                    throw new RuntimeException(e);
                 }
+                return 
adminClient.topics().getListAsync(topicName.getNamespace())
+                        .thenCompose(topics -> {
+                            long existPartitions = topics.stream()
+                                    .filter(t -> 
t.startsWith(topicName.getPartitionedTopicName()))

Review Comment:
   This check is not accurate.
   
   if you are dealing with partitioned topic "tenant/namespace/f"
   this check matches also:
   tenant/namespace/foo-partition-0
   tenant/namespace/fo-partition-0
   tenant/namespace/f-partition-0



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to