AnonHxy commented on code in PR #17251:
URL: https://github.com/apache/pulsar/pull/17251#discussion_r954574661
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -4357,39 +4357,50 @@ private PersistentReplicator
getReplicatorReference(String replName, PersistentT
}
}
+ private CompletableFuture<Void> updatePartitionedTopicMetadataAsync(int
numPartitions) {
+ CompletableFuture<Void> future =
namespaceResources().getPartitionedTopicResources()
+ .updatePartitionedTopicAsync(topicName, p -> new
PartitionedTopicMetadata(numPartitions, p.properties));
+ future.exceptionally(ex -> {
+ // If the update operation fails, clean up the partitions that
were created
+ getPartitionedTopicMetadataAsync(topicName, false,
false).thenAccept(metadata -> {
+ int oldPartition = metadata.partitions;
+ for (int i = oldPartition; i < numPartitions; i++) {
+
topicResources().deletePersistentTopicAsync(topicName.getPartition(i)).exceptionally(ex1
-> {
+ log.warn("[{}] Failed to clean up managedLedger {}",
clientAppId(), topicName,
+ ex1.getCause());
+ return null;
+ });
+ }
+ }).exceptionally(e -> {
+ log.warn("[{}] Failed to clean up managedLedger", topicName,
e);
+ return null;
+ });
+ return null;
+ });
+ return future;
+ }
private CompletableFuture<Void> updatePartitionedTopic(TopicName
topicName, int numPartitions, boolean force) {
CompletableFuture<Void> result = new CompletableFuture<>();
- createSubscriptions(topicName, numPartitions).thenCompose(__ -> {
- CompletableFuture<Void> future =
namespaceResources().getPartitionedTopicResources()
- .updatePartitionedTopicAsync(topicName, p ->
- new PartitionedTopicMetadata(numPartitions,
p.properties));
- future.exceptionally(ex -> {
- // If the update operation fails, clean up the partitions that
were created
- getPartitionedTopicMetadataAsync(topicName, false,
false).thenAccept(metadata -> {
- int oldPartition = metadata.partitions;
- for (int i = oldPartition; i < numPartitions; i++) {
-
topicResources().deletePersistentTopicAsync(topicName.getPartition(i)).exceptionally(ex1
-> {
- log.warn("[{}] Failed to clean up managedLedger
{}", clientAppId(), topicName,
- ex1.getCause());
Review Comment:
It looks like there is no need to move this code block to a new method
`updatePartitionedTopicMetadataAsync`, which will make more work fro review. I
prefer to keep it the way it is in this PR.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -4357,39 +4357,50 @@ private PersistentReplicator
getReplicatorReference(String replName, PersistentT
}
}
+ private CompletableFuture<Void> updatePartitionedTopicMetadataAsync(int
numPartitions) {
+ CompletableFuture<Void> future =
namespaceResources().getPartitionedTopicResources()
+ .updatePartitionedTopicAsync(topicName, p -> new
PartitionedTopicMetadata(numPartitions, p.properties));
+ future.exceptionally(ex -> {
+ // If the update operation fails, clean up the partitions that
were created
+ getPartitionedTopicMetadataAsync(topicName, false,
false).thenAccept(metadata -> {
+ int oldPartition = metadata.partitions;
+ for (int i = oldPartition; i < numPartitions; i++) {
+
topicResources().deletePersistentTopicAsync(topicName.getPartition(i)).exceptionally(ex1
-> {
+ log.warn("[{}] Failed to clean up managedLedger {}",
clientAppId(), topicName,
+ ex1.getCause());
+ return null;
+ });
+ }
+ }).exceptionally(e -> {
+ log.warn("[{}] Failed to clean up managedLedger", topicName,
e);
+ return null;
+ });
+ return null;
+ });
+ return future;
+ }
private CompletableFuture<Void> updatePartitionedTopic(TopicName
topicName, int numPartitions, boolean force) {
CompletableFuture<Void> result = new CompletableFuture<>();
- createSubscriptions(topicName, numPartitions).thenCompose(__ -> {
- CompletableFuture<Void> future =
namespaceResources().getPartitionedTopicResources()
- .updatePartitionedTopicAsync(topicName, p ->
- new PartitionedTopicMetadata(numPartitions,
p.properties));
- future.exceptionally(ex -> {
- // If the update operation fails, clean up the partitions that
were created
- getPartitionedTopicMetadataAsync(topicName, false,
false).thenAccept(metadata -> {
- int oldPartition = metadata.partitions;
- for (int i = oldPartition; i < numPartitions; i++) {
-
topicResources().deletePersistentTopicAsync(topicName.getPartition(i)).exceptionally(ex1
-> {
- log.warn("[{}] Failed to clean up managedLedger
{}", clientAppId(), topicName,
- ex1.getCause());
+ createSubscriptions(topicName, numPartitions).thenCompose(
+ __ ->
updatePartitionedTopicMetadataAsync(numPartitions))
+ .thenAccept(__ -> result.complete(null))
+ .exceptionally(ex -> {
+ if (force && ex.getCause() instanceof
PulsarAdminException.ConflictException) {
+ log.warn("[{}] update partitioned topic's partition
num {} with ignoring conflict exception",
Review Comment:
This exception is throw from `createSubscriptions`, I think it's better
handle the exception in `createSubscriptions`. BTW, could we make sure all the
subscriptions have been created if `ConflictException` happens and we ignore it
here.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -480,10 +481,9 @@ protected CompletableFuture<Void>
internalUpdatePartitionedTopicAsync(int numPar
if (!updateLocalTopicOnly) {
return
updatePartitionInOtherCluster(numPartitions, clusters)
.thenCompose(v ->
namespaceResources().getPartitionedTopicResources()
-
.updatePartitionedTopicAsync(topicName, p ->
Review Comment:
This modification is unrelation to this PR, we'd better do keep it the way
it is
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]