This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new c00a4a13554 [fix][auto-recovery] [branch-2.11] Fix metadata store 
deadlock due to BookkeeperInternalCallbacks.Processor  (#21315)
c00a4a13554 is described below

commit c00a4a1355404354c35c9d97a0897981286663af
Author: Yan Zhao <[email protected]>
AuthorDate: Mon Oct 9 10:31:48 2023 +0800

    [fix][auto-recovery] [branch-2.11] Fix metadata store deadlock due to 
BookkeeperInternalCallbacks.Processor  (#21315)
    
    ### Motivation
    The Auditor will start AuditorCheckAllLedgersTask to checkAllLedgers. It 
will iterate each ledger using `ledgerManager.asyncProcessLedgers`, then 
callback checkLedgersProcessor in the metadata store thread.
    
    
https://github.com/apache/bookkeeper/blob/e8da8eb6cb7c8ef52336a51d4f7348ae07986c26/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorCheckAllLedgersTask.java#L158-L248
    
    In the checkLedgersProcessor, it will invoke 
`ledgerUnderreplicationManager.isLedgerReplicationEnabled()`, it's a metadata 
store sync operation, so deadlock.
    
    ### Modifications
    Using scheduler to trigger the processor.
---
 pom.xml                                            |  14 ++
 pulsar-metadata/pom.xml                            |  21 ++
 .../AbstractHierarchicalLedgerManager.java         |   9 +-
 .../replication/AuditorPeriodicCheckTest.java      | 219 +++++++++++++++++++++
 4 files changed, 259 insertions(+), 4 deletions(-)

diff --git a/pom.xml b/pom.xml
index cb9a735b456..0faf53ecc85 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1444,6 +1444,20 @@ flexible messaging model and an intuitive client 
API.</description>
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <!-- We use TestStatsProvider in many unit tests -->
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>bookkeeper-common</artifactId>
+      <version>${bookkeeper.version}</version>
+      <scope>test</scope>
+      <classifier>tests</classifier>
+      <exclusions>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/pulsar-metadata/pom.xml b/pulsar-metadata/pom.xml
index 2acbc1e8344..5181f01a55a 100644
--- a/pulsar-metadata/pom.xml
+++ b/pulsar-metadata/pom.xml
@@ -64,6 +64,27 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>ch.qos.logback</groupId>
+          <artifactId>logback-core</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>ch.qos.logback</groupId>
+          <artifactId>logback-classic</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-tcnative</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
     <!-- zookeeper server -->
     <dependency>
       <groupId>org.xerial.snappy</groupId>
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractHierarchicalLedgerManager.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractHierarchicalLedgerManager.java
index cbfa68692ae..1588fe9fee0 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractHierarchicalLedgerManager.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractHierarchicalLedgerManager.java
@@ -206,10 +206,11 @@ abstract class AbstractHierarchicalLedgerManager {
                             mcb = new 
BookkeeperInternalCallbacks.MultiCallback(activeLedgers.size(), finalCb, ctx,
                             successRc, failureRc);
                     // start loop over all ledgers
-                    for (Long ledger : activeLedgers) {
-                        processor.process(ledger, mcb);
-                    }
-
+                    scheduler.submit(() -> {
+                        for (Long ledger : activeLedgers) {
+                            processor.process(ledger, mcb);
+                        }
+                    });
                 }).exceptionally(ex -> {
                     finalCb.processResult(failureRc, null, ctx);
                     return null;
diff --git 
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
new file mode 100644
index 00000000000..5e7b4915c3c
--- /dev/null
+++ 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.replication;
+
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertFalse;
+import static org.testng.AssertJUnit.assertNotSame;
+import io.netty.buffer.ByteBuf;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.bookie.TestBookieImpl;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
+import org.apache.bookkeeper.meta.MetadataBookieDriver;
+import org.apache.bookkeeper.meta.MetadataDrivers;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+/**
+ * This test verifies that the period check on the auditor
+ * will pick up on missing data in the client.
+ */
+public class AuditorPeriodicCheckTest extends BookKeeperClusterTestCase {
+    private static final Logger LOG = LoggerFactory
+            .getLogger(AuditorPeriodicCheckTest.class);
+
+    private MetadataBookieDriver driver;
+    private HashMap<String, AuditorElector> auditorElectors = new 
HashMap<String, AuditorElector>();
+
+    private static final int CHECK_INTERVAL = 1; // run every second
+
+    public AuditorPeriodicCheckTest() throws Exception {
+        super(3);
+        baseConf.setPageLimit(1); // to make it easy to push ledger out of 
cache
+        
Class.forName("org.apache.pulsar.metadata.bookkeeper.PulsarMetadataClientDriver");
+        
Class.forName("org.apache.pulsar.metadata.bookkeeper.PulsarMetadataBookieDriver");
+    }
+
+    @BeforeTest
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+
+        for (int i = 0; i < numBookies; i++) {
+            ServerConfiguration conf = new ServerConfiguration(confByIndex(i));
+            conf.setAuditorPeriodicCheckInterval(CHECK_INTERVAL);
+            conf.setMetadataServiceUri(
+                    zkUtil.getMetadataServiceUri().replaceAll("zk://", 
"metadata-store:").replaceAll("/ledgers", ""));
+
+            String addr = addressByIndex(i).toString();
+
+            AuditorElector auditorElector = new AuditorElector(addr, conf);
+            auditorElectors.put(addr, auditorElector);
+            auditorElector.start();
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Starting Auditor Elector");
+            }
+        }
+
+        URI uri = 
URI.create(confByIndex(0).getMetadataServiceUri().replaceAll("zk://", 
"metadata-store:")
+                .replaceAll("/ledgers", ""));
+        driver = MetadataDrivers.getBookieDriver(uri);
+        ServerConfiguration serverConfiguration = new 
ServerConfiguration(confByIndex(0));
+        serverConfiguration.setMetadataServiceUri(
+                
serverConfiguration.getMetadataServiceUri().replaceAll("zk://", 
"metadata-store:")
+                        .replaceAll("/ledgers", ""));
+        driver.initialize(serverConfiguration, NullStatsLogger.INSTANCE);
+    }
+
+    @AfterTest
+    @Override
+    public void tearDown() throws Exception {
+        if (null != driver) {
+            driver.close();
+        }
+
+        for (AuditorElector e : auditorElectors.values()) {
+            e.shutdown();
+        }
+        super.tearDown();
+    }
+
+    private BookieId replaceBookieWithWriteFailingBookie(LedgerHandle lh) 
throws Exception {
+        int bookieIdx = -1;
+        Long entryId = lh.getLedgerMetadata().getAllEnsembles().firstKey();
+        List<BookieId> curEnsemble = 
lh.getLedgerMetadata().getAllEnsembles().get(entryId);
+
+        // Identify a bookie in the current ledger ensemble to be replaced
+        BookieId replacedBookie = null;
+        for (int i = 0; i < numBookies; i++) {
+            if (curEnsemble.contains(addressByIndex(i))) {
+                bookieIdx = i;
+                replacedBookie = addressByIndex(i);
+                break;
+            }
+        }
+        assertNotSame("Couldn't find ensemble bookie in bookie list", -1, 
bookieIdx);
+
+        LOG.info("Killing bookie " + addressByIndex(bookieIdx));
+        ServerConfiguration conf = killBookie(bookieIdx);
+        Bookie writeFailingBookie = new TestBookieImpl(conf) {
+            @Override
+            public void addEntry(ByteBuf entry, boolean ackBeforeSync, 
WriteCallback cb,
+                                 Object ctx, byte[] masterKey)
+                    throws IOException, BookieException {
+                try {
+                    LOG.info("Failing write to entry ");
+                    // sleep a bit so that writes to other bookies succeed 
before
+                    // the client hears about the failure on this bookie. If 
the
+                    // client gets ack-quorum number of acks first, it won't 
care
+                    // about any failures and won't reform the ensemble.
+                    Thread.sleep(100);
+                    throw new IOException();
+                } catch (InterruptedException ie) {
+                    // ignore, only interrupted if shutting down,
+                    // and an exception would spam the logs
+                    Thread.currentThread().interrupt();
+                }
+            }
+        };
+        startAndAddBookie(conf, writeFailingBookie);
+        return replacedBookie;
+    }
+
+    /*
+     * Validates that the periodic ledger check will fix entries with a failed 
write.
+     */
+    @Test
+    public void testFailedWriteRecovery() throws Exception {
+        LedgerManagerFactory mFactory = driver.getLedgerManagerFactory();
+        LedgerUnderreplicationManager underReplicationManager = 
mFactory.newLedgerUnderreplicationManager();
+        underReplicationManager.disableLedgerReplication();
+
+        LedgerHandle lh = bkc.createLedger(2, 2, 1, DigestType.CRC32, 
"passwd".getBytes());
+
+        // kill one of the bookies and replace it with one that rejects write;
+        // This way we get into the under replication state
+        BookieId replacedBookie = replaceBookieWithWriteFailingBookie(lh);
+
+        // Write a few entries; this should cause under replication
+        byte[] data = "foobar".getBytes();
+        data = "foobar".getBytes();
+        lh.addEntry(data);
+        lh.addEntry(data);
+        lh.addEntry(data);
+
+        lh.close();
+
+        // enable under replication detection and wait for it to report
+        // under replicated ledger
+        underReplicationManager.enableLedgerReplication();
+        long underReplicatedLedger = -1;
+        for (int i = 0; i < 5; i++) {
+            underReplicatedLedger = 
underReplicationManager.pollLedgerToRereplicate();
+            if (underReplicatedLedger != -1) {
+                break;
+            }
+            Thread.sleep(CHECK_INTERVAL * 1000);
+        }
+        assertEquals("Ledger should be under replicated", lh.getId(), 
underReplicatedLedger);
+
+        // now start the replication workers
+        List<ReplicationWorker> l = new ArrayList<ReplicationWorker>();
+        for (int i = 0; i < numBookies; i++) {
+            ReplicationWorker rw = new ReplicationWorker(confByIndex(i), 
NullStatsLogger.INSTANCE);
+            rw.start();
+            l.add(rw);
+        }
+        underReplicationManager.close();
+
+        // Wait for ensemble to change after replication
+        Thread.sleep(3000);
+        for (ReplicationWorker rw : l) {
+            rw.shutdown();
+        }
+
+        // check that ensemble has changed and the bookie that rejected writes 
has
+        // been replaced in the ensemble
+        LedgerHandle newLh = bkc.openLedger(lh.getId(), DigestType.CRC32, 
"passwd".getBytes());
+        for (Map.Entry<Long, ? extends List<BookieId>> e :
+                newLh.getLedgerMetadata().getAllEnsembles().entrySet()) {
+            List<BookieId> ensemble = e.getValue();
+            assertFalse("Ensemble hasn't been updated", 
ensemble.contains(replacedBookie));
+        }
+        newLh.close();
+    }
+}

Reply via email to