congbobo184 commented on a change in pull request #10963:
URL: https://github.com/apache/pulsar/pull/10963#discussion_r654312778



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
##########
@@ -672,55 +673,57 @@ protected void 
internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
                     "Number of partitions should be less than or equal to " + 
maxPartitions));
             return;
         }
+
+        List<CompletableFuture<Void>> createFutureList = new ArrayList<>();
+
+        CompletableFuture<Void> createLocalFuture = new CompletableFuture<>();
+        createFutureList.add(createLocalFuture);
         checkTopicExistsAsync(topicName).thenAccept(exists -> {
             if (exists) {
                 log.warn("[{}] Failed to create already existing topic {}", 
clientAppId(), topicName);
                 asyncResponse.resume(new RestException(Status.CONFLICT, "This 
topic already exists"));

Review comment:
       if local exist but remote cluster don't have, do we need to check this 
logical?

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
##########
@@ -672,55 +673,57 @@ protected void 
internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
                     "Number of partitions should be less than or equal to " + 
maxPartitions));
             return;
         }
+
+        List<CompletableFuture<Void>> createFutureList = new ArrayList<>();
+
+        CompletableFuture<Void> createLocalFuture = new CompletableFuture<>();
+        createFutureList.add(createLocalFuture);
         checkTopicExistsAsync(topicName).thenAccept(exists -> {
             if (exists) {
                 log.warn("[{}] Failed to create already existing topic {}", 
clientAppId(), topicName);
                 asyncResponse.resume(new RestException(Status.CONFLICT, "This 
topic already exists"));
-            } else {
-
-                try {
-                    String path = ZkAdminPaths.partitionedTopicPath(topicName);
-                    namespaceResources().getPartitionedTopicResources()
-                            .createAsync(path, new 
PartitionedTopicMetadata(numPartitions)).thenAccept(r -> {
-                                log.info("[{}] Successfully created 
partitioned topic {}", clientAppId(), topicName);
-                                
tryCreatePartitionsAsync(numPartitions).thenAccept(v -> {
-                                    log.info("[{}] Successfully created 
partitions for topic {}", clientAppId(),
-                                            topicName);
-                                    
asyncResponse.resume(Response.noContent().build());
-                                }).exceptionally(e -> {
-                                    log.error("[{}] Failed to create 
partitions for topic {}", clientAppId(),
-                                            topicName);
-                                    // The partitioned topic is created but 
there are some partitions create failed
-                                    asyncResponse.resume(new RestException(e));
-                                    return null;
-                                });
-                            }).exceptionally(ex -> {
-                                if (ex.getCause() instanceof 
AlreadyExistsException) {
-                                    log.warn("[{}] Failed to create already 
existing partitioned topic {}",
-                                            clientAppId(), topicName);
-                                    asyncResponse.resume(
-                                            new RestException(Status.CONFLICT, 
"Partitioned topic already exists"));
-                                } else if (ex.getCause() instanceof 
BadVersionException) {
-                                    log.warn("[{}] Failed to create 
partitioned topic {}: concurrent modification",
-                                            clientAppId(), topicName);
-                                    asyncResponse.resume(new 
RestException(Status.CONFLICT, "Concurrent modification"));
-                                } else {
-                                    log.error("[{}] Failed to create 
partitioned topic {}", clientAppId(), topicName,
-                                            ex.getCause());
-                                    asyncResponse.resume(new 
RestException(ex.getCause()));
-                                }
-                                return null;
-                            });
-                } catch (Exception e) {
-                    log.error("[{}] Failed to create partitioned topic {}", 
clientAppId(), topicName, e);
-                    resumeAsyncResponseExceptionally(asyncResponse, e);
-                }
+                return;
             }
+
+            provisionPartitionedTopicPath(asyncResponse, numPartitions, 
createLocalTopicOnly)
+                    .thenCompose(ignored -> 
tryCreatePartitionsAsync(numPartitions))
+                    .whenComplete((ignored, ex) -> {
+                        if (ex != null) {
+                            createLocalFuture.completeExceptionally(ex);
+                            return;
+                        }
+                        createLocalFuture.complete(null);

Review comment:
       `createFutureList.add(tryCreatePartitionsAsync(numPartitions))`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to