This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ae09fa7643192cb4c4f9353e3bbb37aa37319860
Author: Yan Zhao <[email protected]>
AuthorDate: Thu Aug 31 15:12:03 2023 +0800

    [fix][auto-recovery] Improve to the ReplicaitonWorker performance by 
deleting invalid underreplication nodes (#21059)
    
    (cherry picked from commit ba0f2ba38bb88869b8bf664cb20ef41d47a73026)
---
 .../gradle-enterprise-workspace-id                 |  1 +
 .../bookkeeper/AbstractMetadataDriver.java         |  2 +
 .../PulsarLedgerUnderreplicationManager.java       | 38 ++++++++++++-
 .../LedgerUnderreplicationManagerTest.java         | 66 ++++++++++++++++++++++
 4 files changed, 106 insertions(+), 1 deletion(-)

diff --git a/.mvn/.gradle-enterprise/gradle-enterprise-workspace-id 
b/.mvn/.gradle-enterprise/gradle-enterprise-workspace-id
new file mode 100644
index 00000000000..663b50b8c2e
--- /dev/null
+++ b/.mvn/.gradle-enterprise/gradle-enterprise-workspace-id
@@ -0,0 +1 @@
+s3kqc43mf5g5vgx62q3q6vo3hm
\ No newline at end of file
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractMetadataDriver.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractMetadataDriver.java
index 76a14300d0b..8af9bb91f5b 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractMetadataDriver.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractMetadataDriver.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.metadata.bookkeeper;
 import java.io.Closeable;
 import java.io.IOException;
 import java.net.URI;
+import java.util.concurrent.TimeUnit;
 import lombok.SneakyThrows;
 import org.apache.bookkeeper.conf.AbstractConfiguration;
 import org.apache.bookkeeper.discover.RegistrationClient;
@@ -40,6 +41,7 @@ public abstract class AbstractMetadataDriver implements 
Closeable {
     public static final String METADATA_STORE_SCHEME = "metadata-store";
 
     public static final String METADATA_STORE_INSTANCE = 
"metadata-store-instance";
+    public static final long BLOCKING_CALL_TIMEOUT = 
TimeUnit.SECONDS.toMillis(30);
 
     protected MetadataStoreExtended store;
     private boolean storeInstanceIsOwned;
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java
index f58a2752180..eeedf54f3bb 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java
@@ -19,16 +19,20 @@
 package org.apache.pulsar.metadata.bookkeeper;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.bookkeeper.proto.DataFormats.CheckAllLedgersFormat;
 import static 
org.apache.bookkeeper.proto.DataFormats.LedgerRereplicationLayoutFormat;
 import static org.apache.bookkeeper.proto.DataFormats.LockDataFormat;
 import static 
org.apache.bookkeeper.proto.DataFormats.PlacementPolicyCheckFormat;
 import static org.apache.bookkeeper.proto.DataFormats.ReplicasCheckFormat;
 import static 
org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat;
+import static 
org.apache.pulsar.metadata.bookkeeper.AbstractMetadataDriver.BLOCKING_CALL_TIMEOUT;
+import com.google.common.base.Joiner;
 import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.protobuf.TextFormat;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -41,6 +45,7 @@ import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
 import java.util.function.Predicate;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -61,6 +66,8 @@ import org.apache.pulsar.metadata.api.Notification;
 import org.apache.pulsar.metadata.api.NotificationType;
 import org.apache.pulsar.metadata.api.extended.CreateOption;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.apache.zookeeper.KeeperException;
 
 @Slf4j
 public class PulsarLedgerUnderreplicationManager implements 
LedgerUnderreplicationManager {
@@ -392,7 +399,34 @@ public class PulsarLedgerUnderreplicationManager 
implements LedgerUnderreplicati
         try {
             Lock l = heldLocks.get(ledgerId);
             if (l != null) {
-                store.delete(getUrLedgerPath(ledgerId), 
Optional.of(l.getLedgerNodeVersion())).get();
+                store.delete(getUrLedgerPath(ledgerId), 
Optional.of(l.getLedgerNodeVersion()))
+                        .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
+                if (store instanceof ZKMetadataStore) {
+                    try {
+                        // clean up the hierarchy
+                        String[] parts = getUrLedgerPath(ledgerId).split("/");
+                        for (int i = 1; i <= 4; i++) {
+                            String[] p = Arrays.copyOf(parts, parts.length - 
i);
+                            String path = Joiner.on("/").join(p);
+                            Optional<GetResult> getResult = 
store.get(path).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
+                            if (getResult.isPresent()) {
+                                store.delete(path, 
Optional.of(getResult.get().getStat().getVersion()))
+                                        .get(BLOCKING_CALL_TIMEOUT, 
MILLISECONDS);
+                            }
+                        }
+                    } catch (ExecutionException ee) {
+                        // This can happen when cleaning up the hierarchy.
+                        // It's safe to ignore, it simply means another
+                        // ledger in the same hierarchy has been marked as
+                        // underreplicated.
+                        if (ee.getCause() instanceof MetadataStoreException && 
ee.getCause().getCause()
+                                instanceof KeeperException.NotEmptyException) {
+                            //do nothing.
+                        } else {
+                            log.warn("Error deleting underrepcalited ledger 
parent node", ee);
+                        }
+                    }
+                }
             }
         } catch (ExecutionException ee) {
             if (ee.getCause() instanceof 
MetadataStoreException.NotFoundException) {
@@ -405,6 +439,8 @@ public class PulsarLedgerUnderreplicationManager implements 
LedgerUnderreplicati
                 log.error("Error deleting underreplicated ledger node", ee);
                 throw new ReplicationException.UnavailableException("Error 
contacting metadata store", ee);
             }
+        } catch (TimeoutException ex) {
+            throw new ReplicationException.UnavailableException("Error 
contacting metadata store", ex);
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
             throw new ReplicationException.UnavailableException("Interrupted 
while contacting metadata store", ie);
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java
index 661eb13ac28..1c8b62642da 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java
@@ -23,12 +23,14 @@ import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import com.google.protobuf.TextFormat;
+import java.lang.reflect.Field;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -54,6 +56,7 @@ import 
org.apache.bookkeeper.replication.ReplicationException.UnavailableExcepti
 import org.apache.bookkeeper.util.BookKeeperConstants;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.metadata.BaseMetadataStoreTest;
+import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.NotificationType;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
@@ -295,6 +298,69 @@ public class LedgerUnderreplicationManagerTest extends 
BaseMetadataStoreTest {
         assertEquals(l, lB.get(), "Should be the ledger I marked");
     }
 
+
+    @Test(timeOut = 10000)
+    public void testZkMetasStoreMarkReplicatedDeleteEmptyParentNodes() throws 
Exception {
+        methodSetup(stringSupplier(() -> zks.getConnectionString()));
+
+        String missingReplica = "localhost:3181";
+
+        @Cleanup
+        LedgerUnderreplicationManager m1 = 
lmf.newLedgerUnderreplicationManager();
+
+        Long ledgerA = 0xfeadeefdacL;
+        m1.markLedgerUnderreplicated(ledgerA, missingReplica);
+
+        Field storeField = m1.getClass().getDeclaredField("store");
+        storeField.setAccessible(true);
+        MetadataStoreExtended metadataStore = (MetadataStoreExtended) 
storeField.get(m1);
+
+        String fiveLevelPath = 
PulsarLedgerUnderreplicationManager.getUrLedgerPath(urLedgerPath, ledgerA);
+        Optional<GetResult> getResult = 
metadataStore.get(fiveLevelPath).get(1, TimeUnit.SECONDS);
+        assertTrue(getResult.isPresent());
+
+        String fourLevelPath = fiveLevelPath.substring(0, 
fiveLevelPath.lastIndexOf("/"));
+        getResult = metadataStore.get(fourLevelPath).get(1, TimeUnit.SECONDS);
+        assertTrue(getResult.isPresent());
+
+        String threeLevelPath = fourLevelPath.substring(0, 
fourLevelPath.lastIndexOf("/"));
+        getResult = metadataStore.get(threeLevelPath).get(1, TimeUnit.SECONDS);
+        assertTrue(getResult.isPresent());
+
+        String twoLevelPath = fourLevelPath.substring(0, 
threeLevelPath.lastIndexOf("/"));
+        getResult = metadataStore.get(twoLevelPath).get(1, TimeUnit.SECONDS);
+        assertTrue(getResult.isPresent());
+
+        String oneLevelPath = fourLevelPath.substring(0, 
twoLevelPath.lastIndexOf("/"));
+        getResult = metadataStore.get(oneLevelPath).get(1, TimeUnit.SECONDS);
+        assertTrue(getResult.isPresent());
+
+        getResult = metadataStore.get(urLedgerPath).get(1, TimeUnit.SECONDS);
+        assertTrue(getResult.isPresent());
+
+        long ledgerToRereplicate = m1.getLedgerToRereplicate();
+        assertEquals(Long.valueOf(ledgerToRereplicate), ledgerA);
+        m1.markLedgerReplicated(ledgerA);
+
+        getResult = metadataStore.get(fiveLevelPath).get(1, TimeUnit.SECONDS);
+        assertFalse(getResult.isPresent());
+
+        getResult = metadataStore.get(fourLevelPath).get(1, TimeUnit.SECONDS);
+        assertFalse(getResult.isPresent());
+
+        getResult = metadataStore.get(threeLevelPath).get(1, TimeUnit.SECONDS);
+        assertFalse(getResult.isPresent());
+
+        getResult = metadataStore.get(twoLevelPath).get(1, TimeUnit.SECONDS);
+        assertFalse(getResult.isPresent());
+
+        getResult = metadataStore.get(oneLevelPath).get(1, TimeUnit.SECONDS);
+        assertFalse(getResult.isPresent());
+
+        getResult = metadataStore.get(urLedgerPath).get(1, TimeUnit.SECONDS);
+        assertTrue(getResult.isPresent());
+    }
+
     /**
      * Test releasing of a ledger
      * A ledger is released when a client decides it does not want

Reply via email to