poorbarcode commented on code in PR #22537: URL: https://github.com/apache/pulsar/pull/22537#discussion_r1574733176
########## pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java: ########## @@ -621,35 +625,82 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n private void internalCreatePartitionedTopicToReplicatedClustersInBackground(int numPartitions) { getNamespaceReplicatedClustersAsync(namespaceName) - .thenAccept(clusters -> { - for (String cluster : clusters) { - if (!cluster.equals(pulsar().getConfiguration().getClusterName())) { - // this call happens in the background without async composition. completion is logged. - pulsar().getPulsarResources().getClusterResources() - .getClusterAsync(cluster) - .thenCompose(clusterDataOp -> - ((TopicsImpl) pulsar().getBrokerService() - .getClusterPulsarAdmin(cluster, - clusterDataOp).topics()) - .createPartitionedTopicAsync( - topicName.getPartitionedTopicName(), - numPartitions, - true, null)) - .whenComplete((__, ex) -> { - if (ex != null) { - log.error( - "[{}] Failed to create partitioned topic {} in cluster {}.", - clientAppId(), topicName, cluster, ex); - } else { - log.info( - "[{}] Successfully created partitioned topic {} in " - + "cluster {}", - clientAppId(), topicName, cluster); - } - }); - } + .thenAccept(clusters -> { + // this call happens in the background without async composition. completion is logged. + internalCreatePartitionedTopicToReplicatedClustersInBackground(clusters, numPartitions); + }); + } + + protected Map<String, CompletableFuture<Void>> internalCreatePartitionedTopicToReplicatedClustersInBackground( + Set<String> clusters, int numPartitions) { + final String shortTopicName = topicName.getPartitionedTopicName(); + Map<String, CompletableFuture<Void>> tasksForAllClusters = new HashMap<>(); + for (String cluster : clusters) { + if (cluster.equals(pulsar().getConfiguration().getClusterName())) { + continue; + } + ClusterResources clusterResources = pulsar().getPulsarResources().getClusterResources(); + CompletableFuture<Void> createRemoteTopicFuture = new CompletableFuture<>(); + tasksForAllClusters.put(cluster, createRemoteTopicFuture); + clusterResources.getClusterAsync(cluster).whenComplete((clusterData, ex1) -> { + if (ex1 != null) { + // Unexpected error, such as NPE. Catch all error to avoid the "createRemoteTopicFuture" stuck. + log.error("[{}] An un-expected error occurs when trying to create partitioned topic {} in cluster" + + " {}.", clientAppId(), topicName, cluster, ex1); + createRemoteTopicFuture.completeExceptionally(new RestException(ex1)); + return; + } + // Get cluster data success. + TopicsImpl topics = + (TopicsImpl) pulsar().getBrokerService().getClusterPulsarAdmin(cluster, clusterData).topics(); + topics.createPartitionedTopicAsync(shortTopicName, numPartitions, true, null) + .whenComplete((ignore, ex2) -> { + if (ex2 == null) { + // Create success. + log.info("[{}] Successfully created partitioned topic {} in cluster {}", + clientAppId(), topicName, cluster); + createRemoteTopicFuture.complete(null); + return; + } + // Create topic on the remote cluster error. + Throwable unwrapEx2 = FutureUtil.unwrapCompletionException(ex2); + // The topic has been created before, check the partitions count is expected. + if (unwrapEx2 instanceof PulsarAdminException.ConflictException) { + topics.getPartitionedTopicMetadataAsync(shortTopicName).whenComplete((topicMeta, ex3) -> { + if (ex3 != null) { + // Unexpected error, such as NPE. Catch all error to avoid the + // "createRemoteTopicFuture" stuck. + log.error("[{}] Failed to check remote-cluster's topic metadata when creating" + + " partitioned topic {} in cluster {}.", + clientAppId(), topicName, cluster, ex3); + createRemoteTopicFuture.completeExceptionally(new RestException(ex3)); + } + // Call get partitioned metadata of remote cluster success. + if (topicMeta.partitions == numPartitions) { Review Comment: Good suggestion, since only the latest releases(or latest patch releases) will contain the new error code we added, these compatibility codes are still needed. We can separate a PR to add the new error code and remove these compatibility codes after a long time. ########## pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java: ########## @@ -621,35 +625,82 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n private void internalCreatePartitionedTopicToReplicatedClustersInBackground(int numPartitions) { getNamespaceReplicatedClustersAsync(namespaceName) - .thenAccept(clusters -> { - for (String cluster : clusters) { - if (!cluster.equals(pulsar().getConfiguration().getClusterName())) { - // this call happens in the background without async composition. completion is logged. - pulsar().getPulsarResources().getClusterResources() - .getClusterAsync(cluster) - .thenCompose(clusterDataOp -> - ((TopicsImpl) pulsar().getBrokerService() - .getClusterPulsarAdmin(cluster, - clusterDataOp).topics()) - .createPartitionedTopicAsync( - topicName.getPartitionedTopicName(), - numPartitions, - true, null)) - .whenComplete((__, ex) -> { - if (ex != null) { - log.error( - "[{}] Failed to create partitioned topic {} in cluster {}.", - clientAppId(), topicName, cluster, ex); - } else { - log.info( - "[{}] Successfully created partitioned topic {} in " - + "cluster {}", - clientAppId(), topicName, cluster); - } - }); - } + .thenAccept(clusters -> { + // this call happens in the background without async composition. completion is logged. + internalCreatePartitionedTopicToReplicatedClustersInBackground(clusters, numPartitions); + }); + } + + protected Map<String, CompletableFuture<Void>> internalCreatePartitionedTopicToReplicatedClustersInBackground( + Set<String> clusters, int numPartitions) { + final String shortTopicName = topicName.getPartitionedTopicName(); + Map<String, CompletableFuture<Void>> tasksForAllClusters = new HashMap<>(); + for (String cluster : clusters) { + if (cluster.equals(pulsar().getConfiguration().getClusterName())) { + continue; + } + ClusterResources clusterResources = pulsar().getPulsarResources().getClusterResources(); + CompletableFuture<Void> createRemoteTopicFuture = new CompletableFuture<>(); + tasksForAllClusters.put(cluster, createRemoteTopicFuture); + clusterResources.getClusterAsync(cluster).whenComplete((clusterData, ex1) -> { + if (ex1 != null) { + // Unexpected error, such as NPE. Catch all error to avoid the "createRemoteTopicFuture" stuck. + log.error("[{}] An un-expected error occurs when trying to create partitioned topic {} in cluster" + + " {}.", clientAppId(), topicName, cluster, ex1); + createRemoteTopicFuture.completeExceptionally(new RestException(ex1)); + return; + } + // Get cluster data success. + TopicsImpl topics = + (TopicsImpl) pulsar().getBrokerService().getClusterPulsarAdmin(cluster, clusterData).topics(); + topics.createPartitionedTopicAsync(shortTopicName, numPartitions, true, null) + .whenComplete((ignore, ex2) -> { + if (ex2 == null) { + // Create success. + log.info("[{}] Successfully created partitioned topic {} in cluster {}", + clientAppId(), topicName, cluster); + createRemoteTopicFuture.complete(null); + return; + } + // Create topic on the remote cluster error. + Throwable unwrapEx2 = FutureUtil.unwrapCompletionException(ex2); + // The topic has been created before, check the partitions count is expected. + if (unwrapEx2 instanceof PulsarAdminException.ConflictException) { + topics.getPartitionedTopicMetadataAsync(shortTopicName).whenComplete((topicMeta, ex3) -> { + if (ex3 != null) { + // Unexpected error, such as NPE. Catch all error to avoid the + // "createRemoteTopicFuture" stuck. + log.error("[{}] Failed to check remote-cluster's topic metadata when creating" + + " partitioned topic {} in cluster {}.", + clientAppId(), topicName, cluster, ex3); + createRemoteTopicFuture.completeExceptionally(new RestException(ex3)); + } + // Call get partitioned metadata of remote cluster success. + if (topicMeta.partitions == numPartitions) { Review Comment: Good suggestion, since only the latest releases(or latest patch releases) will contain the new error code we added, these compatibility codes are still needed. So we can separate a PR to add the new error code and remove these compatibility codes after a long time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org