This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 22bfe2704fd [improve][broker]Ensure namespace deletion doesn't fail 
(#22627)
22bfe2704fd is described below

commit 22bfe2704fdcf0ca93ff2697d60346701d914397
Author: Enrico Olivelli <eolive...@apache.org>
AuthorDate: Mon May 13 11:50:39 2024 +0200

    [improve][broker]Ensure namespace deletion doesn't fail (#22627)
    
    (cherry picked from commit 936afecede8374b14d13e9d48e9372fec1c27447)
---
 .../pulsar/broker/resources/BaseResources.java     | 27 ++++++++---------
 .../broker/resources/LocalPoliciesResources.java   |  2 +-
 .../broker/resources/NamespaceResources.java       | 17 +++++++++--
 .../pulsar/broker/resources/TopicResources.java    | 35 ++++------------------
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 16 ++++++++--
 .../SystemTopicBasedTopicPoliciesService.java      |  3 +-
 .../apache/pulsar/metadata/api/MetadataStore.java  | 22 ++++++++++++++
 .../metadata/impl/AbstractMetadataStore.java       | 13 ++++----
 8 files changed, 78 insertions(+), 57 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
index 42add4271f6..ebbc1ab8c68 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
@@ -161,22 +161,21 @@ public class BaseResources<T> {
     }
 
     protected CompletableFuture<Void> deleteIfExistsAsync(String path) {
-        return cache.exists(path).thenCompose(exists -> {
-            if (!exists) {
-                return CompletableFuture.completedFuture(null);
+        log.info("Deleting path: {}", path);
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        cache.delete(path).whenComplete((ignore, ex) -> {
+            if (ex != null && ex.getCause() instanceof 
MetadataStoreException.NotFoundException) {
+                log.info("Path {} did not exist in metadata store", path);
+                future.complete(null);
+            } else if (ex != null) {
+                log.info("Failed to delete path from metadata store: {}", 
path, ex);
+                future.completeExceptionally(ex);
+            } else {
+                log.info("Deleted path from metadata store: {}", path);
+                future.complete(null);
             }
-            CompletableFuture<Void> future = new CompletableFuture<>();
-            cache.delete(path).whenComplete((ignore, ex) -> {
-                if (ex != null && ex.getCause() instanceof 
MetadataStoreException.NotFoundException) {
-                    future.complete(null);
-                } else if (ex != null) {
-                    future.completeExceptionally(ex);
-                } else {
-                    future.complete(null);
-                }
-            });
-            return future;
         });
+        return future;
     }
 
     protected boolean exists(String path) throws MetadataStoreException {
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java
index c6b658c3bd0..ae3479fde59 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java
@@ -79,7 +79,7 @@ public class LocalPoliciesResources extends 
BaseResources<LocalPolicies> {
     }
 
     public CompletableFuture<Void> deleteLocalPoliciesAsync(NamespaceName ns) {
-        return deleteAsync(joinPath(LOCAL_POLICIES_ROOT, ns.toString()));
+        return deleteIfExistsAsync(joinPath(LOCAL_POLICIES_ROOT, 
ns.toString()));
     }
 
     public CompletableFuture<Void> deleteLocalPoliciesTenantAsync(String 
tenant) {
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
index e35c208c208..e01e31b96dc 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
@@ -112,7 +112,7 @@ public class NamespaceResources extends 
BaseResources<Policies> {
     }
 
     public CompletableFuture<Void> deletePoliciesAsync(NamespaceName ns){
-        return deleteAsync(joinPath(BASE_POLICIES_PATH, ns.toString()));
+        return deleteIfExistsAsync(joinPath(BASE_POLICIES_PATH, 
ns.toString()));
     }
 
     public Optional<Policies> getPolicies(NamespaceName ns) throws 
MetadataStoreException{
@@ -152,10 +152,18 @@ public class NamespaceResources extends 
BaseResources<Policies> {
                 && path.substring(LOCAL_POLICIES_ROOT.length() + 
1).contains("/");
     }
 
-    // clear resource of `/namespace/{namespaceName}` for zk-node
+    /**
+     * Clear resource of `/namespace/{namespaceName}` for zk-node.
+     * @param ns the namespace name
+     * @return a handle to the results of the operation
+     * */
+    //
     public CompletableFuture<Void> deleteNamespaceAsync(NamespaceName ns) {
         final String namespacePath = joinPath(NAMESPACE_BASE_PATH, 
ns.toString());
-        return deleteIfExistsAsync(namespacePath);
+        // please beware that this will delete all the children of the 
namespace
+        // including the ownership nodes (ephemeral nodes)
+        // see ServiceUnitUtils.path(ns) for the ownership node path
+        return getStore().deleteRecursive(namespacePath);
     }
 
     // clear resource of `/namespace/{tenant}` for zk-node
@@ -298,11 +306,14 @@ public class NamespaceResources extends 
BaseResources<Policies> {
 
         public CompletableFuture<Void> 
clearPartitionedTopicMetadataAsync(NamespaceName namespaceName) {
             final String globalPartitionedPath = 
joinPath(PARTITIONED_TOPIC_PATH, namespaceName.toString());
+            log.info("Clearing partitioned topic metadata for namespace {}, 
path is {}",
+                    namespaceName, globalPartitionedPath);
             return getStore().deleteRecursive(globalPartitionedPath);
         }
 
         public CompletableFuture<Void> clearPartitionedTopicTenantAsync(String 
tenant) {
             final String partitionedTopicPath = 
joinPath(PARTITIONED_TOPIC_PATH, tenant);
+            log.info("Clearing partitioned topic metadata for tenant {}, path 
is {}", tenant, partitionedTopicPath);
             return deleteIfExistsAsync(partitionedTopicPath);
         }
 
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java
index 413184764f5..f607da76b3c 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java
@@ -75,11 +75,6 @@ public class TopicResources {
         );
     }
 
-    public CompletableFuture<Void> deletePersistentTopicAsync(TopicName topic) 
{
-        String path = MANAGED_LEDGER_PATH + "/" + 
topic.getPersistenceNamingEncoding();
-        return store.delete(path, Optional.of(-1L));
-    }
-
     public CompletableFuture<Void> createPersistentTopicAsync(TopicName topic) 
{
         String path = MANAGED_LEDGER_PATH + "/" + 
topic.getPersistenceNamingEncoding();
         return store.put(path, new byte[0], Optional.of(-1L))
@@ -93,38 +88,20 @@ public class TopicResources {
 
     public CompletableFuture<Void> clearNamespacePersistence(NamespaceName ns) 
{
         String path = MANAGED_LEDGER_PATH + "/" + ns;
-        return store.exists(path)
-                .thenCompose(exists -> {
-                    if (exists) {
-                        return store.delete(path, Optional.empty());
-                    } else {
-                        return CompletableFuture.completedFuture(null);
-                    }
-                });
+        log.info("Clearing namespace persistence for namespace: {}, path {}", 
ns, path);
+        return store.deleteIfExists(path, Optional.empty());
     }
 
     public CompletableFuture<Void> clearDomainPersistence(NamespaceName ns) {
         String path = MANAGED_LEDGER_PATH + "/" + ns + "/persistent";
-        return store.exists(path)
-                .thenCompose(exists -> {
-                    if (exists) {
-                        return store.delete(path, Optional.empty());
-                    } else {
-                        return CompletableFuture.completedFuture(null);
-                    }
-                });
+        log.info("Clearing domain persistence for namespace: {}, path {}", ns, 
path);
+        return store.deleteIfExists(path, Optional.empty());
     }
 
     public CompletableFuture<Void> clearTenantPersistence(String tenant) {
         String path = MANAGED_LEDGER_PATH + "/" + tenant;
-        return store.exists(path)
-                .thenCompose(exists -> {
-                    if (exists) {
-                        return store.deleteRecursive(path);
-                    } else {
-                        return CompletableFuture.completedFuture(null);
-                    }
-                });
+        log.info("Clearing tenant persistence for tenant: {}, path {}", 
tenant, path);
+        return store.deleteRecursive(path);
     }
 
     void handleNotification(Notification notification) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index a73ef0b4400..0d58055f236 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -310,8 +310,14 @@ public abstract class NamespacesBase extends AdminResource 
{
                                                     clientAppId(), ex);
                                             return FutureUtil.failedFuture(ex);
                                         }
+                                        log.info("[{}] Deleting namespace 
bundle {}/{}", clientAppId(),
+                                                namespaceName, 
bundle.getBundleRange());
                                         return 
admin.namespaces().deleteNamespaceBundleAsync(namespaceName.toString(),
                                                 bundle.getBundleRange(), 
force);
+                                    } else {
+                                        log.warn("[{}] Skipping deleting 
namespace bundle {}/{} "
+                                                        + "as it's not owned 
by any broker",
+                                                clientAppId(), namespaceName, 
bundle.getBundleRange());
                                     }
                                     return 
CompletableFuture.completedFuture(null);
                                 })
@@ -322,8 +328,11 @@ public abstract class NamespacesBase extends AdminResource 
{
                         final Throwable rc = 
FutureUtil.unwrapCompletionException(error);
                         if (rc instanceof MetadataStoreException) {
                             if (rc.getCause() != null && rc.getCause() 
instanceof KeeperException.NotEmptyException) {
+                                KeeperException.NotEmptyException ne =
+                                        (KeeperException.NotEmptyException) 
rc.getCause();
                                 log.info("[{}] There are in-flight topics 
created during the namespace deletion, "
-                                        + "retry to delete the namespace 
again.", namespaceName);
+                                        + "retry to delete the namespace 
again. (path {} is not empty on metadata)",
+                                        namespaceName, ne.getPath());
                                 final int next = retryTimes - 1;
                                 if (next > 0) {
                                     // async recursive
@@ -331,7 +340,8 @@ public abstract class NamespacesBase extends AdminResource {
                                 } else {
                                     callback.completeExceptionally(
                                             new RestException(Status.CONFLICT, 
"The broker still have in-flight topics"
-                                                    + " created during 
namespace deletion, please try again."));
+                                                    + " created during 
namespace deletion (path " + ne.getPath() + ") "
+                                                    + "is not empty on 
metadata store, please try again."));
                                     // drop out recursive
                                 }
                                 return;
@@ -476,6 +486,8 @@ public abstract class NamespacesBase extends AdminResource {
     @SuppressWarnings("deprecation")
     protected CompletableFuture<Void> 
internalDeleteNamespaceBundleAsync(String bundleRange, boolean authoritative,
                                                                          
boolean force) {
+        log.info("[{}] Deleting namespace bundle {}/{} authoritative:{} 
force:{}",
+                clientAppId(), namespaceName, bundleRange, authoritative, 
force);
         return validateNamespaceOperationAsync(namespaceName, 
NamespaceOperation.DELETE_BUNDLE)
                 .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
                 .thenCompose(__ -> {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 0449e5c885c..e344d892b31 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -522,7 +522,8 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                     } else {
                         Throwable cause = 
FutureUtil.unwrapCompletionException(ex);
                         if (cause instanceof 
PulsarClientException.AlreadyClosedException) {
-                            log.warn("Read more topic policies exception, 
close the read now!", ex);
+                            log.info("Closing the topic policies reader for 
{}",
+                                    reader.getSystemTopic().getTopicName());
                             cleanCacheAndCloseReader(
                                     
reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
                         } else {
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
index 33942c19520..89b0e7a6fe1 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
@@ -23,9 +23,12 @@ import com.google.common.annotations.Beta;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.function.Consumer;
 import 
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
 import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Metadata store client interface.
@@ -36,6 +39,8 @@ import 
org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
 @Beta
 public interface MetadataStore extends AutoCloseable {
 
+    Logger LOGGER = LoggerFactory.getLogger(MetadataStore.class);
+
     /**
      * Read the value of one key, identified by the path
      *
@@ -121,6 +126,23 @@ public interface MetadataStore extends AutoCloseable {
      */
     CompletableFuture<Void> delete(String path, Optional<Long> 
expectedVersion);
 
+    default CompletableFuture<Void> deleteIfExists(String path, Optional<Long> 
expectedVersion) {
+        return delete(path, expectedVersion)
+                .exceptionally(e -> {
+                    if (e.getCause() instanceof NotFoundException) {
+                        LOGGER.info("Path {} not found while deleting (this is 
not a problem)", path);
+                        return null;
+                    } else {
+                        if (expectedVersion.isEmpty()) {
+                            LOGGER.info("Failed to delete path {}", path, e);
+                        } else {
+                            LOGGER.info("Failed to delete path {} with 
expected version {}", path, expectedVersion, e);
+                        }
+                        throw new CompletionException(e);
+                    }
+                });
+    }
+
     /**
      * Delete a key-value pair and all the children nodes.
      *
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index d3066661113..d099d79d05c 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -349,6 +349,7 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
 
     @Override
     public final CompletableFuture<Void> delete(String path, Optional<Long> 
expectedVersion) {
+        log.info("Deleting path: {} (v. {})", path, expectedVersion);
         if (isClosed()) {
             return alreadyClosedFailedFuture();
         }
@@ -393,11 +394,13 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
             }
 
             metadataCaches.forEach(c -> c.invalidate(path));
+            log.info("Deleted path: {} (v. {})", path, expectedVersion);
         });
     }
 
     @Override
     public CompletableFuture<Void> deleteRecursive(String path) {
+        log.info("Deleting recursively path: {}", path);
         if (isClosed()) {
             return alreadyClosedFailedFuture();
         }
@@ -406,13 +409,9 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
                         children.stream()
                                 .map(child -> deleteRecursive(path + "/" + 
child))
                                 .collect(Collectors.toList())))
-                .thenCompose(__ -> exists(path))
-                .thenCompose(exists -> {
-                    if (exists) {
-                        return delete(path, Optional.empty());
-                    } else {
-                        return CompletableFuture.completedFuture(null);
-                    }
+                .thenCompose(__ -> {
+                    log.info("After deleting all children, now deleting path: 
{}", path);
+                    return deleteIfExists(path, Optional.empty());
                 });
     }
 

Reply via email to