This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push: new baddf9d Allow to configure BookKeeper Opportunistic Striping Feature (and all BK client features using bookkeeper_ prefix) (#9232) baddf9d is described below commit baddf9dfd0cd65742f09d4a2af165794fd9e9cdf Author: Enrico Olivelli <eolive...@gmail.com> AuthorDate: Mon Feb 1 03:48:37 2021 +0100 Allow to configure BookKeeper Opportunistic Striping Feature (and all BK client features using bookkeeper_ prefix) (#9232) BookKeeper 4.12 introduces the 'Opportunistic Striping' feature. In BK terms 'striping' happens when EnsembleSize is greater than WriteQuorumSize, in this mode the entries are distributed round robin over a set of bookies, in order to achieve better performances as you can use the resources of more bookies. For instance in a small HA cluster, with only 3 bookies, you must run Pulsar with 2-2-2 replication parameters (EnsembleSize=2,WriteQuorumSize=2,AckQuorumSize=2). You cannot set EnsembleSize=3 (and thus use 'striping') because in case of temporary outage of a single bookie the BK client is not able to create an ensemble with 3 bookies. With Opportunistic Striping you can use 3-2-2 and when the system is fully up-and-running with 3 bookies then you go with striping, but during single bookie outages (like during reconfigurations/updates) you fall back to 2-2-2. This is not about consistency or durability, it is only about having the ability to get the most out of your bookkeeper cluster. - Add a generic way to configure internal BookKeeper client options, any entry start starts with `bookkeeper_` is passed to the BK client configuration after stripping the prefix - Add unit tests for `bookkeeper_opportunisticStriping` configuration option - Add configuration example in broker.conf - the change add new tests (cherry picked from commit bc69ad29e01a005ef2dc3eae36aed429b42495aa) --- conf/broker.conf | 7 ++ .../apache/pulsar/broker/ServiceConfiguration.java | 1 + .../pulsar/broker/BookKeeperClientFactoryImpl.java | 20 ++++- .../broker/BookKeeperClientFactoryImplTest.java | 13 +++ .../pulsar/broker/service/BkEnsemblesTestBase.java | 15 ++-- .../broker/service/OpportunisticStripingTest.java | 95 ++++++++++++++++++++++ 6 files changed, 140 insertions(+), 11 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 45e5d8c..4e689c6 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -791,6 +791,13 @@ managedLedgerDefaultWriteQuorum=2 # Number of guaranteed copies (acks to wait before write is complete) managedLedgerDefaultAckQuorum=2 +# with OpportunisticStriping=true the ensembleSize is adapted automatically to writeQuorum +# in case of lack of enough bookies +#bookkeeper_opportunisticStriping=false + +# you can add other configuration options for the BookKeeper client +# by prefixing them with bookkeeper_ + # How frequently to flush the cursor positions that were accumulated due to rate limiting. (seconds). # Default is 60 seconds managedLedgerCursorPositionFlushSeconds = 60 diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index c6b64d7..c9001a2 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1044,6 +1044,7 @@ public class ServiceConfiguration implements PulsarConfiguration { + " and resolving its metadata service location" ) private String bookkeeperMetadataServiceUri; + @FieldContext( category = CATEGORY_STORAGE_BK, doc = "Authentication plugin to use when connecting to bookies" diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java index 7709930..f00c905 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java @@ -27,10 +27,11 @@ import static org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy.RE import java.io.IOException; import java.util.Map; import java.util.Optional; +import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; - import com.google.common.annotations.VisibleForTesting; +import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.EnsemblePlacementPolicy; @@ -48,6 +49,7 @@ import org.apache.zookeeper.ZooKeeper; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; @SuppressWarnings("deprecation") +@Slf4j public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory { private final AtomicReference<ZooKeeperCache> rackawarePolicyZkCache = new AtomicReference<>(); @@ -135,9 +137,19 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory { bkConf.setReorderReadSequenceEnabled(conf.isBookkeeperClientReorderReadSequenceEnabled()); bkConf.setExplictLacInterval(conf.getBookkeeperExplicitLacIntervalInMills()); - bkConf.setGetBookieInfoIntervalSeconds(conf.getBookkeeperClientGetBookieInfoIntervalSeconds(), TimeUnit.SECONDS); - bkConf.setGetBookieInfoRetryIntervalSeconds(conf.getBookkeeperClientGetBookieInfoRetryIntervalSeconds(), TimeUnit.SECONDS); - + bkConf.setGetBookieInfoIntervalSeconds( + conf.getBookkeeperClientGetBookieInfoIntervalSeconds(), TimeUnit.SECONDS); + bkConf.setGetBookieInfoRetryIntervalSeconds( + conf.getBookkeeperClientGetBookieInfoRetryIntervalSeconds(), TimeUnit.SECONDS); + Properties allProps = conf.getProperties(); + allProps.forEach((key, value) -> { + String sKey = key.toString(); + if (sKey.startsWith("bookkeeper_") && value != null) { + String bkExtraConfigKey = sKey.substring(11); + log.info("Extra BookKeeper client configuration {}, setting {}={}", sKey, bkExtraConfigKey, value); + bkConf.setProperty(bkExtraConfigKey, value); + } + }); return bkConf; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java index 0f170717..13c352c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java @@ -205,4 +205,17 @@ public class BookKeeperClientFactoryImplTest { } } + @Test + public void testOpportunisticStripingConfiguration() { + BookKeeperClientFactoryImpl factory = new BookKeeperClientFactoryImpl(); + ServiceConfiguration conf = new ServiceConfiguration(); + // default value + assertEquals(factory.createBkClientConfiguration(conf).getOpportunisticStriping(), false); + conf.getProperties().setProperty("bookkeeper_opportunisticStriping", "true"); + assertEquals(factory.createBkClientConfiguration(conf).getOpportunisticStriping(), true); + conf.getProperties().setProperty("bookkeeper_opportunisticStriping", "false"); + assertEquals(factory.createBkClientConfiguration(conf).getOpportunisticStriping(), false); + + } + } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java index ec4e24d..a44e6d6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java @@ -57,6 +57,10 @@ public abstract class BkEnsemblesTestBase { this.numberOfBookies = numberOfBookies; } + protected void configurePulsar(ServiceConfiguration config) throws Exception { + //overridable by subclasses + } + @BeforeMethod protected void setup() throws Exception { try { @@ -78,6 +82,7 @@ public abstract class BkEnsemblesTestBase { config.setAdvertisedAddress("127.0.0.1"); config.setAllowAutoTopicCreationType("non-partitioned"); config.setZooKeeperOperationTimeoutSeconds(1); + configurePulsar(config); pulsar = new PulsarService(config); pulsar.start(); @@ -95,13 +100,9 @@ public abstract class BkEnsemblesTestBase { @AfterMethod(alwaysRun = true) protected void shutdown() throws Exception { - try { - admin.close(); - pulsar.close(); - bkEnsemble.stop(); - } catch (Throwable t) { - log.warn("Error cleaning up broker test setup state", t); - } + admin.close(); + pulsar.close(); + bkEnsemble.stop(); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpportunisticStripingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpportunisticStripingTest.java new file mode 100644 index 0000000..221367c --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpportunisticStripingTest.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.client.api.BookKeeper; +import org.apache.bookkeeper.client.api.LedgerMetadata; +import org.apache.bookkeeper.client.api.ListLedgersResult; +import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertTrue; +import org.testng.annotations.Test; + +/** + * With BookKeeper Opportunistic Striping feature we can allow Pulsar to work + * with only WQ bookie during temporary outages of some bookie. + */ +public class OpportunisticStripingTest extends BkEnsemblesTestBase { + + public OpportunisticStripingTest() { + // starting only two bookies + super(2); + } + + @Override + protected void configurePulsar(ServiceConfiguration config) throws Exception { + // we would like to stripe over 5 bookies + config.setManagedLedgerDefaultEnsembleSize(5); + // we want 2 copies for each entry + config.setManagedLedgerDefaultWriteQuorum(2); + config.setManagedLedgerDefaultAckQuorum(2); + + config.setBrokerDeleteInactiveTopicsEnabled(false); + config.getProperties().setProperty("bookkeeper_opportunisticStriping", "true"); + } + + @Test + public void testOpportunisticStriping() throws Exception { + + try (PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsar.getWebServiceAddress()) + .statsInterval(0, TimeUnit.SECONDS) + .build();) { + + final String ns1 = "prop/usc/opportunistic1"; + admin.namespaces().createNamespace(ns1); + + final String topic1 = "persistent://" + ns1 + "/my-topic"; + Producer<byte[]> producer = client.newProducer().topic(topic1).create(); + for (int i = 0; i < 10; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + } + + // verify that all ledgers has the proper writequorumsize, + // equals to the number of available bookies (in this case 2) + ClientConfiguration clientConfiguration = new ClientConfiguration(); + clientConfiguration.setZkServers("localhost:" + this.bkEnsemble.getZookeeperPort()); + + try (BookKeeper bkAdmin = BookKeeper.newBuilder(clientConfiguration).build()) { + try (ListLedgersResult list = bkAdmin.newListLedgersOp().execute().get();) { + int count = 0; + for (long ledgerId : list.toIterable()) { + LedgerMetadata ledgerMetadata = bkAdmin.getLedgerMetadata(ledgerId).get(); + assertEquals(2, ledgerMetadata.getEnsembleSize()); + assertEquals(2, ledgerMetadata.getWriteQuorumSize()); + assertEquals(2, ledgerMetadata.getAckQuorumSize()); + count++; + } + assertTrue(count > 0); + } + } + } + } + +}