This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new c00a4a13554 [fix][auto-recovery] [branch-2.11] Fix metadata store
deadlock due to BookkeeperInternalCallbacks.Processor (#21315)
c00a4a13554 is described below
commit c00a4a1355404354c35c9d97a0897981286663af
Author: Yan Zhao <[email protected]>
AuthorDate: Mon Oct 9 10:31:48 2023 +0800
[fix][auto-recovery] [branch-2.11] Fix metadata store deadlock due to
BookkeeperInternalCallbacks.Processor (#21315)
### Motivation
The Auditor will start AuditorCheckAllLedgersTask to checkAllLedgers. It
will iterate each ledger using `ledgerManager.asyncProcessLedgers`, then
callback checkLedgersProcessor in the metadata store thread.
https://github.com/apache/bookkeeper/blob/e8da8eb6cb7c8ef52336a51d4f7348ae07986c26/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorCheckAllLedgersTask.java#L158-L248
In the checkLedgersProcessor, it will invoke
`ledgerUnderreplicationManager.isLedgerReplicationEnabled()`, it's a metadata
store sync operation, so deadlock.
### Modifications
Using scheduler to trigger the processor.
---
pom.xml | 14 ++
pulsar-metadata/pom.xml | 21 ++
.../AbstractHierarchicalLedgerManager.java | 9 +-
.../replication/AuditorPeriodicCheckTest.java | 219 +++++++++++++++++++++
4 files changed, 259 insertions(+), 4 deletions(-)
diff --git a/pom.xml b/pom.xml
index cb9a735b456..0faf53ecc85 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1444,6 +1444,20 @@ flexible messaging model and an intuitive client
API.</description>
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <!-- We use TestStatsProvider in many unit tests -->
+ <groupId>org.apache.bookkeeper</groupId>
+ <artifactId>bookkeeper-common</artifactId>
+ <version>${bookkeeper.version}</version>
+ <scope>test</scope>
+ <classifier>tests</classifier>
+ <exclusions>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
<build>
diff --git a/pulsar-metadata/pom.xml b/pulsar-metadata/pom.xml
index 2acbc1e8344..5181f01a55a 100644
--- a/pulsar-metadata/pom.xml
+++ b/pulsar-metadata/pom.xml
@@ -64,6 +64,27 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-tcnative</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
<!-- zookeeper server -->
<dependency>
<groupId>org.xerial.snappy</groupId>
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractHierarchicalLedgerManager.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractHierarchicalLedgerManager.java
index cbfa68692ae..1588fe9fee0 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractHierarchicalLedgerManager.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractHierarchicalLedgerManager.java
@@ -206,10 +206,11 @@ abstract class AbstractHierarchicalLedgerManager {
mcb = new
BookkeeperInternalCallbacks.MultiCallback(activeLedgers.size(), finalCb, ctx,
successRc, failureRc);
// start loop over all ledgers
- for (Long ledger : activeLedgers) {
- processor.process(ledger, mcb);
- }
-
+ scheduler.submit(() -> {
+ for (Long ledger : activeLedgers) {
+ processor.process(ledger, mcb);
+ }
+ });
}).exceptionally(ex -> {
finalCb.processResult(failureRc, null, ctx);
return null;
diff --git
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
new file mode 100644
index 00000000000..5e7b4915c3c
--- /dev/null
+++
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.replication;
+
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertFalse;
+import static org.testng.AssertJUnit.assertNotSame;
+import io.netty.buffer.ByteBuf;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.bookie.TestBookieImpl;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
+import org.apache.bookkeeper.meta.MetadataBookieDriver;
+import org.apache.bookkeeper.meta.MetadataDrivers;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+/**
+ * This test verifies that the period check on the auditor
+ * will pick up on missing data in the client.
+ */
+public class AuditorPeriodicCheckTest extends BookKeeperClusterTestCase {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(AuditorPeriodicCheckTest.class);
+
+ private MetadataBookieDriver driver;
+ private HashMap<String, AuditorElector> auditorElectors = new
HashMap<String, AuditorElector>();
+
+ private static final int CHECK_INTERVAL = 1; // run every second
+
+ public AuditorPeriodicCheckTest() throws Exception {
+ super(3);
+ baseConf.setPageLimit(1); // to make it easy to push ledger out of
cache
+
Class.forName("org.apache.pulsar.metadata.bookkeeper.PulsarMetadataClientDriver");
+
Class.forName("org.apache.pulsar.metadata.bookkeeper.PulsarMetadataBookieDriver");
+ }
+
+ @BeforeTest
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+
+ for (int i = 0; i < numBookies; i++) {
+ ServerConfiguration conf = new ServerConfiguration(confByIndex(i));
+ conf.setAuditorPeriodicCheckInterval(CHECK_INTERVAL);
+ conf.setMetadataServiceUri(
+ zkUtil.getMetadataServiceUri().replaceAll("zk://",
"metadata-store:").replaceAll("/ledgers", ""));
+
+ String addr = addressByIndex(i).toString();
+
+ AuditorElector auditorElector = new AuditorElector(addr, conf);
+ auditorElectors.put(addr, auditorElector);
+ auditorElector.start();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Starting Auditor Elector");
+ }
+ }
+
+ URI uri =
URI.create(confByIndex(0).getMetadataServiceUri().replaceAll("zk://",
"metadata-store:")
+ .replaceAll("/ledgers", ""));
+ driver = MetadataDrivers.getBookieDriver(uri);
+ ServerConfiguration serverConfiguration = new
ServerConfiguration(confByIndex(0));
+ serverConfiguration.setMetadataServiceUri(
+
serverConfiguration.getMetadataServiceUri().replaceAll("zk://",
"metadata-store:")
+ .replaceAll("/ledgers", ""));
+ driver.initialize(serverConfiguration, NullStatsLogger.INSTANCE);
+ }
+
+ @AfterTest
+ @Override
+ public void tearDown() throws Exception {
+ if (null != driver) {
+ driver.close();
+ }
+
+ for (AuditorElector e : auditorElectors.values()) {
+ e.shutdown();
+ }
+ super.tearDown();
+ }
+
+ private BookieId replaceBookieWithWriteFailingBookie(LedgerHandle lh)
throws Exception {
+ int bookieIdx = -1;
+ Long entryId = lh.getLedgerMetadata().getAllEnsembles().firstKey();
+ List<BookieId> curEnsemble =
lh.getLedgerMetadata().getAllEnsembles().get(entryId);
+
+ // Identify a bookie in the current ledger ensemble to be replaced
+ BookieId replacedBookie = null;
+ for (int i = 0; i < numBookies; i++) {
+ if (curEnsemble.contains(addressByIndex(i))) {
+ bookieIdx = i;
+ replacedBookie = addressByIndex(i);
+ break;
+ }
+ }
+ assertNotSame("Couldn't find ensemble bookie in bookie list", -1,
bookieIdx);
+
+ LOG.info("Killing bookie " + addressByIndex(bookieIdx));
+ ServerConfiguration conf = killBookie(bookieIdx);
+ Bookie writeFailingBookie = new TestBookieImpl(conf) {
+ @Override
+ public void addEntry(ByteBuf entry, boolean ackBeforeSync,
WriteCallback cb,
+ Object ctx, byte[] masterKey)
+ throws IOException, BookieException {
+ try {
+ LOG.info("Failing write to entry ");
+ // sleep a bit so that writes to other bookies succeed
before
+ // the client hears about the failure on this bookie. If
the
+ // client gets ack-quorum number of acks first, it won't
care
+ // about any failures and won't reform the ensemble.
+ Thread.sleep(100);
+ throw new IOException();
+ } catch (InterruptedException ie) {
+ // ignore, only interrupted if shutting down,
+ // and an exception would spam the logs
+ Thread.currentThread().interrupt();
+ }
+ }
+ };
+ startAndAddBookie(conf, writeFailingBookie);
+ return replacedBookie;
+ }
+
+ /*
+ * Validates that the periodic ledger check will fix entries with a failed
write.
+ */
+ @Test
+ public void testFailedWriteRecovery() throws Exception {
+ LedgerManagerFactory mFactory = driver.getLedgerManagerFactory();
+ LedgerUnderreplicationManager underReplicationManager =
mFactory.newLedgerUnderreplicationManager();
+ underReplicationManager.disableLedgerReplication();
+
+ LedgerHandle lh = bkc.createLedger(2, 2, 1, DigestType.CRC32,
"passwd".getBytes());
+
+ // kill one of the bookies and replace it with one that rejects write;
+ // This way we get into the under replication state
+ BookieId replacedBookie = replaceBookieWithWriteFailingBookie(lh);
+
+ // Write a few entries; this should cause under replication
+ byte[] data = "foobar".getBytes();
+ data = "foobar".getBytes();
+ lh.addEntry(data);
+ lh.addEntry(data);
+ lh.addEntry(data);
+
+ lh.close();
+
+ // enable under replication detection and wait for it to report
+ // under replicated ledger
+ underReplicationManager.enableLedgerReplication();
+ long underReplicatedLedger = -1;
+ for (int i = 0; i < 5; i++) {
+ underReplicatedLedger =
underReplicationManager.pollLedgerToRereplicate();
+ if (underReplicatedLedger != -1) {
+ break;
+ }
+ Thread.sleep(CHECK_INTERVAL * 1000);
+ }
+ assertEquals("Ledger should be under replicated", lh.getId(),
underReplicatedLedger);
+
+ // now start the replication workers
+ List<ReplicationWorker> l = new ArrayList<ReplicationWorker>();
+ for (int i = 0; i < numBookies; i++) {
+ ReplicationWorker rw = new ReplicationWorker(confByIndex(i),
NullStatsLogger.INSTANCE);
+ rw.start();
+ l.add(rw);
+ }
+ underReplicationManager.close();
+
+ // Wait for ensemble to change after replication
+ Thread.sleep(3000);
+ for (ReplicationWorker rw : l) {
+ rw.shutdown();
+ }
+
+ // check that ensemble has changed and the bookie that rejected writes
has
+ // been replaced in the ensemble
+ LedgerHandle newLh = bkc.openLedger(lh.getId(), DigestType.CRC32,
"passwd".getBytes());
+ for (Map.Entry<Long, ? extends List<BookieId>> e :
+ newLh.getLedgerMetadata().getAllEnsembles().entrySet()) {
+ List<BookieId> ensemble = e.getValue();
+ assertFalse("Ensemble hasn't been updated",
ensemble.contains(replacedBookie));
+ }
+ newLh.close();
+ }
+}