This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new baddf9d  Allow to configure BookKeeper Opportunistic Striping Feature 
(and all BK client features using bookkeeper_ prefix) (#9232)
baddf9d is described below

commit baddf9dfd0cd65742f09d4a2af165794fd9e9cdf
Author: Enrico Olivelli <eolive...@gmail.com>
AuthorDate: Mon Feb 1 03:48:37 2021 +0100

    Allow to configure BookKeeper Opportunistic Striping Feature (and all BK 
client features using bookkeeper_ prefix) (#9232)
    
    BookKeeper 4.12 introduces the 'Opportunistic Striping' feature.
    In BK terms 'striping' happens when EnsembleSize is greater than 
WriteQuorumSize, in this mode the entries are distributed round robin over a 
set of bookies, in order to  achieve better performances as you can use the 
resources of more bookies.
    
    For instance in a small HA cluster, with only 3 bookies, you must run 
Pulsar with 2-2-2 replication parameters 
(EnsembleSize=2,WriteQuorumSize=2,AckQuorumSize=2).
    You cannot set EnsembleSize=3 (and thus use 'striping') because in case of 
temporary outage of a single bookie the BK client is not able to create an 
ensemble with 3 bookies.
    
    With Opportunistic Striping you can use 3-2-2 and when the system is fully 
up-and-running with 3 bookies then you go with striping, but during single 
bookie outages (like during reconfigurations/updates) you fall back to 2-2-2.
    
    This is not about consistency or durability, it is only about having the 
ability to get the most out of your bookkeeper cluster.
    
    - Add a generic way to configure internal BookKeeper client options, any 
entry start starts with `bookkeeper_` is passed to the BK client configuration 
after stripping the prefix
    - Add unit tests for `bookkeeper_opportunisticStriping` configuration option
    - Add configuration example in broker.conf
    
    - the change add new tests
    
    (cherry picked from commit bc69ad29e01a005ef2dc3eae36aed429b42495aa)
---
 conf/broker.conf                                   |  7 ++
 .../apache/pulsar/broker/ServiceConfiguration.java |  1 +
 .../pulsar/broker/BookKeeperClientFactoryImpl.java | 20 ++++-
 .../broker/BookKeeperClientFactoryImplTest.java    | 13 +++
 .../pulsar/broker/service/BkEnsemblesTestBase.java | 15 ++--
 .../broker/service/OpportunisticStripingTest.java  | 95 ++++++++++++++++++++++
 6 files changed, 140 insertions(+), 11 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 45e5d8c..4e689c6 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -791,6 +791,13 @@ managedLedgerDefaultWriteQuorum=2
 # Number of guaranteed copies (acks to wait before write is complete)
 managedLedgerDefaultAckQuorum=2
 
+# with OpportunisticStriping=true the ensembleSize is adapted automatically to 
writeQuorum
+# in case of lack of enough bookies
+#bookkeeper_opportunisticStriping=false
+
+# you can add other configuration options for the BookKeeper client
+# by prefixing them with bookkeeper_
+
 # How frequently to flush the cursor positions that were accumulated due to 
rate limiting. (seconds).
 # Default is 60 seconds
 managedLedgerCursorPositionFlushSeconds = 60
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index c6b64d7..c9001a2 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1044,6 +1044,7 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
             + " and resolving its metadata service location"
     )
     private String bookkeeperMetadataServiceUri;
+
     @FieldContext(
         category = CATEGORY_STORAGE_BK,
         doc = "Authentication plugin to use when connecting to bookies"
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
index 7709930..f00c905 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
@@ -27,10 +27,11 @@ import static 
org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy.RE
 import java.io.IOException;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
-
 import com.google.common.annotations.VisibleForTesting;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
@@ -48,6 +49,7 @@ import org.apache.zookeeper.ZooKeeper;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 
 @SuppressWarnings("deprecation")
+@Slf4j
 public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
 
     private final AtomicReference<ZooKeeperCache> rackawarePolicyZkCache = new 
AtomicReference<>();
@@ -135,9 +137,19 @@ public class BookKeeperClientFactoryImpl implements 
BookKeeperClientFactory {
 
         
bkConf.setReorderReadSequenceEnabled(conf.isBookkeeperClientReorderReadSequenceEnabled());
         
bkConf.setExplictLacInterval(conf.getBookkeeperExplicitLacIntervalInMills());
-        
bkConf.setGetBookieInfoIntervalSeconds(conf.getBookkeeperClientGetBookieInfoIntervalSeconds(),
 TimeUnit.SECONDS);
-        
bkConf.setGetBookieInfoRetryIntervalSeconds(conf.getBookkeeperClientGetBookieInfoRetryIntervalSeconds(),
 TimeUnit.SECONDS);
-
+        bkConf.setGetBookieInfoIntervalSeconds(
+                conf.getBookkeeperClientGetBookieInfoIntervalSeconds(), 
TimeUnit.SECONDS);
+        bkConf.setGetBookieInfoRetryIntervalSeconds(
+                conf.getBookkeeperClientGetBookieInfoRetryIntervalSeconds(), 
TimeUnit.SECONDS);
+        Properties allProps = conf.getProperties();
+        allProps.forEach((key, value) -> {
+            String sKey = key.toString();
+            if (sKey.startsWith("bookkeeper_") && value != null) {
+                String bkExtraConfigKey = sKey.substring(11);
+                log.info("Extra BookKeeper client configuration {}, setting 
{}={}", sKey, bkExtraConfigKey, value);
+                bkConf.setProperty(bkExtraConfigKey, value);
+            }
+        });
         return bkConf;
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java
index 0f170717..13c352c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java
@@ -205,4 +205,17 @@ public class BookKeeperClientFactoryImplTest {
         }
     }
 
+    @Test
+    public void testOpportunisticStripingConfiguration() {
+        BookKeeperClientFactoryImpl factory = new 
BookKeeperClientFactoryImpl();
+        ServiceConfiguration conf = new ServiceConfiguration();
+        // default value
+        
assertEquals(factory.createBkClientConfiguration(conf).getOpportunisticStriping(),
 false);
+        conf.getProperties().setProperty("bookkeeper_opportunisticStriping", 
"true");
+        
assertEquals(factory.createBkClientConfiguration(conf).getOpportunisticStriping(),
 true);
+        conf.getProperties().setProperty("bookkeeper_opportunisticStriping", 
"false");
+        
assertEquals(factory.createBkClientConfiguration(conf).getOpportunisticStriping(),
 false);
+
+    }
+
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
index ec4e24d..a44e6d6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
@@ -57,6 +57,10 @@ public abstract class BkEnsemblesTestBase {
         this.numberOfBookies = numberOfBookies;
     }
 
+    protected void configurePulsar(ServiceConfiguration config) throws 
Exception {
+        //overridable by subclasses
+    }
+
     @BeforeMethod
     protected void setup() throws Exception {
         try {
@@ -78,6 +82,7 @@ public abstract class BkEnsemblesTestBase {
             config.setAdvertisedAddress("127.0.0.1");
             config.setAllowAutoTopicCreationType("non-partitioned");
             config.setZooKeeperOperationTimeoutSeconds(1);
+            configurePulsar(config);
 
             pulsar = new PulsarService(config);
             pulsar.start();
@@ -95,13 +100,9 @@ public abstract class BkEnsemblesTestBase {
 
     @AfterMethod(alwaysRun = true)
     protected void shutdown() throws Exception {
-        try {
-            admin.close();
-            pulsar.close();
-            bkEnsemble.stop();
-        } catch (Throwable t) {
-            log.warn("Error cleaning up broker test setup state", t);
-        }
+        admin.close();
+        pulsar.close();
+        bkEnsemble.stop();
     }
 
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpportunisticStripingTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpportunisticStripingTest.java
new file mode 100644
index 0000000..221367c
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpportunisticStripingTest.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.client.api.BookKeeper;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.client.api.ListLedgersResult;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertTrue;
+import org.testng.annotations.Test;
+
+/**
+ * With BookKeeper Opportunistic Striping feature we can allow Pulsar to work
+ * with only WQ bookie during temporary outages of some bookie.
+ */
+public class OpportunisticStripingTest extends BkEnsemblesTestBase {
+
+    public OpportunisticStripingTest() {
+        // starting only two bookies
+        super(2);
+    }
+
+    @Override
+    protected void configurePulsar(ServiceConfiguration config) throws 
Exception {
+        // we would like to stripe over 5 bookies
+        config.setManagedLedgerDefaultEnsembleSize(5);
+        // we want 2 copies for each entry
+        config.setManagedLedgerDefaultWriteQuorum(2);
+        config.setManagedLedgerDefaultAckQuorum(2);
+
+        config.setBrokerDeleteInactiveTopicsEnabled(false);
+        config.getProperties().setProperty("bookkeeper_opportunisticStriping", 
"true");
+    }
+
+    @Test
+    public void testOpportunisticStriping() throws Exception {
+
+        try (PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getWebServiceAddress())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .build();) {
+
+            final String ns1 = "prop/usc/opportunistic1";
+            admin.namespaces().createNamespace(ns1);
+
+            final String topic1 = "persistent://" + ns1 + "/my-topic";
+            Producer<byte[]> producer = 
client.newProducer().topic(topic1).create();
+            for (int i = 0; i < 10; i++) {
+                String message = "my-message-" + i;
+                producer.send(message.getBytes());
+            }
+
+            // verify that all ledgers has the proper writequorumsize,
+            // equals to the number of available bookies (in this case 2)
+            ClientConfiguration clientConfiguration = new 
ClientConfiguration();
+            clientConfiguration.setZkServers("localhost:" + 
this.bkEnsemble.getZookeeperPort());
+
+            try (BookKeeper bkAdmin = 
BookKeeper.newBuilder(clientConfiguration).build()) {
+                try (ListLedgersResult list = 
bkAdmin.newListLedgersOp().execute().get();) {
+                    int count = 0;
+                    for (long ledgerId : list.toIterable()) {
+                        LedgerMetadata ledgerMetadata = 
bkAdmin.getLedgerMetadata(ledgerId).get();
+                        assertEquals(2, ledgerMetadata.getEnsembleSize());
+                        assertEquals(2, ledgerMetadata.getWriteQuorumSize());
+                        assertEquals(2, ledgerMetadata.getAckQuorumSize());
+                        count++;
+                    }
+                    assertTrue(count > 0);
+                }
+            }
+        }
+    }
+
+}

Reply via email to