Technoboy- commented on a change in pull request #14141:
URL: https://github.com/apache/pulsar/pull/14141#discussion_r800391437
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -416,87 +432,60 @@ protected void internalCreateNonPartitionedTopic(boolean
authoritative, Map<Stri
*
* @param numPartitions
*/
- protected void internalUpdatePartitionedTopic(int numPartitions,
- boolean
updateLocalTopicOnly, boolean authoritative,
- boolean force) {
+ protected CompletableFuture<Void> internalUpdatePartitionedTopicAsync(int
numPartitions,
+
boolean updateLocalTopicOnly,
+
boolean authoritative, boolean force) {
if (numPartitions <= 0) {
- throw new RestException(Status.NOT_ACCEPTABLE, "Number of
partitions should be more than 0");
+ return FutureUtil.failedFuture(new
RestException(Status.NOT_ACCEPTABLE,
+ "Number of partitions should be more than 0"));
}
-
- validateTopicOwnership(topicName, authoritative);
- validateTopicPolicyOperation(topicName, PolicyName.PARTITION,
PolicyOperation.WRITE);
- // Only do the validation if it's the first hop.
- if (!updateLocalTopicOnly && !force) {
- validatePartitionTopicUpdate(topicName.getLocalName(),
numPartitions);
- }
- final int maxPartitions =
pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
- if (maxPartitions > 0 && numPartitions > maxPartitions) {
- throw new RestException(Status.NOT_ACCEPTABLE,
- "Number of partitions should be less than or equal to " +
maxPartitions);
- }
-
- if (topicName.isGlobal() &&
isNamespaceReplicated(topicName.getNamespaceObject())) {
- Set<String> clusters =
getNamespaceReplicatedClusters(topicName.getNamespaceObject());
- if (!clusters.contains(pulsar().getConfig().getClusterName())) {
- log.error("[{}] local cluster is not part of replicated
cluster for namespace {}", clientAppId(),
- topicName);
- throw new RestException(Status.FORBIDDEN, "Local cluster is
not part of replicate cluster list");
- }
- try {
-
tryCreatePartitionsAsync(numPartitions).get(DEFAULT_OPERATION_TIMEOUT_SEC,
TimeUnit.SECONDS);
- createSubscriptions(topicName,
numPartitions).get(DEFAULT_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS);
- } catch (Exception e) {
- if (e.getCause() instanceof RestException) {
- throw (RestException) e.getCause();
+ return validateTopicOwnershipAsync(topicName, authoritative)
+ .thenCompose(__ -> validateTopicPolicyOperationAsync(topicName,
PolicyName.PARTITION,
+ PolicyOperation.WRITE))
+ .thenCompose(__ -> {
+ CompletableFuture<Void> ret;
+ if (!updateLocalTopicOnly && !force) {
+ ret =
validatePartitionTopicUpdateAsync(topicName.getLocalName(), numPartitions);
Review comment:
yes, right.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]