This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit cbeff98d513564418f4d800d04208d4206335c4e
Author: Qiang Zhao <[email protected]>
AuthorDate: Mon Aug 21 09:42:05 2023 +0800

    [improve][meta] Improve fault tolerance of blocking calls by supporting 
timeout (#21028)
    
    (cherry picked from commit 976a58061fd87d577b7903622ed2e61f4bec7d22)
---
 .../bookkeeper/AbstractMetadataDriver.java         |   2 +
 .../LegacyHierarchicalLedgerRangeIterator.java     |  18 ++-
 .../LongHierarchicalLedgerRangeIterator.java       |   7 +-
 .../metadata/bookkeeper/PulsarLayoutManager.java   |  17 ++-
 .../bookkeeper/PulsarLedgerManagerFactory.java     |  38 ++++++-
 .../PulsarLedgerUnderreplicationManager.java       | 122 +++++++++++++--------
 .../bookkeeper/PulsarRegistrationManager.java      | 112 +++++++++++--------
 7 files changed, 208 insertions(+), 108 deletions(-)

diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractMetadataDriver.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractMetadataDriver.java
index cc5f759c73f..435f94b05dc 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractMetadataDriver.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractMetadataDriver.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.metadata.bookkeeper;
 import java.io.Closeable;
 import java.io.IOException;
 import java.net.URI;
+import java.util.concurrent.TimeUnit;
 import lombok.SneakyThrows;
 import org.apache.bookkeeper.conf.AbstractConfiguration;
 import org.apache.bookkeeper.discover.RegistrationClient;
@@ -40,6 +41,7 @@ public abstract class AbstractMetadataDriver implements 
Closeable {
     public static final String METADATA_STORE_SCHEME = "metadata-store";
 
     public static final String METADATA_STORE_INSTANCE = 
"metadata-store-instance";
+    public static final long BLOCKING_CALL_TIMEOUT = 
TimeUnit.SECONDS.toMillis(30);
 
     protected MetadataStoreExtended store;
     private boolean storeInstanceIsOwned;
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LegacyHierarchicalLedgerRangeIterator.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LegacyHierarchicalLedgerRangeIterator.java
index 15b1d561f90..37e6dc836f2 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LegacyHierarchicalLedgerRangeIterator.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LegacyHierarchicalLedgerRangeIterator.java
@@ -18,17 +18,21 @@
  */
 package org.apache.pulsar.metadata.bookkeeper;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static 
org.apache.pulsar.metadata.bookkeeper.AbstractMetadataDriver.BLOCKING_CALL_TIMEOUT;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NavigableSet;
 import java.util.NoSuchElementException;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.util.StringUtils;
 import org.apache.pulsar.metadata.api.MetadataStore;
 
+
 /**
  * Hierarchical Ledger Manager which manages ledger meta in zookeeper using 
2-level hierarchical znodes.
  *
@@ -67,7 +71,7 @@ public class LegacyHierarchicalLedgerRangeIterator implements 
LedgerManager.Ledg
      * @return false if have visited all level1 nodes
      * @throws InterruptedException/KeeperException if error occurs reading 
zookeeper children
      */
-    private boolean nextL1Node() throws ExecutionException, 
InterruptedException {
+    private boolean nextL1Node() throws ExecutionException, 
InterruptedException, TimeoutException {
         l2NodesIter = null;
         while (l2NodesIter == null) {
             if (l1NodesIter.hasNext()) {
@@ -79,7 +83,8 @@ public class LegacyHierarchicalLedgerRangeIterator implements 
LedgerManager.Ledg
             if (!isLedgerParentNode(curL1Nodes)) {
                 continue;
             }
-            List<String> l2Nodes = store.getChildren(ledgersRoot + "/" + 
curL1Nodes).get();
+            List<String> l2Nodes = store.getChildren(ledgersRoot + "/" + 
curL1Nodes)
+                    .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
             l2NodesIter = l2Nodes.iterator();
             if (!l2NodesIter.hasNext()) {
                 l2NodesIter = null;
@@ -94,7 +99,8 @@ public class LegacyHierarchicalLedgerRangeIterator implements 
LedgerManager.Ledg
             boolean hasMoreElements = false;
             try {
                 if (l1NodesIter == null) {
-                    List<String> l1Nodes = 
store.getChildren(ledgersRoot).get();
+                    List<String> l1Nodes = store.getChildren(ledgersRoot)
+                            .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
                     l1NodesIter = l1Nodes.iterator();
                     hasMoreElements = nextL1Node();
                 } else if (l2NodesIter == null || !l2NodesIter.hasNext()) {
@@ -102,7 +108,7 @@ public class LegacyHierarchicalLedgerRangeIterator 
implements LedgerManager.Ledg
                 } else {
                     hasMoreElements = true;
                 }
-            } catch (ExecutionException ke) {
+            } catch (ExecutionException | TimeoutException ke) {
                 throw new IOException("Error preloading next range", ke);
             } catch (InterruptedException ie) {
                 Thread.currentThread().interrupt();
@@ -156,8 +162,8 @@ public class LegacyHierarchicalLedgerRangeIterator 
implements LedgerManager.Ledg
         String nodePath = nodeBuilder.toString();
         List<String> ledgerNodes = null;
         try {
-            ledgerNodes = store.getChildren(nodePath).get();
-        } catch (ExecutionException e) {
+            ledgerNodes = 
store.getChildren(nodePath).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
+        } catch (ExecutionException | TimeoutException e) {
             throw new IOException("Error when get child nodes from zk", e);
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LongHierarchicalLedgerRangeIterator.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LongHierarchicalLedgerRangeIterator.java
index 9a36ac53b89..3b32916e6e7 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LongHierarchicalLedgerRangeIterator.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LongHierarchicalLedgerRangeIterator.java
@@ -24,6 +24,8 @@ import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.util.StringUtils;
@@ -57,8 +59,9 @@ class LongHierarchicalLedgerRangeIterator implements 
LedgerManager.LedgerRangeIt
      */
     List<String> getChildrenAt(String path) throws IOException {
         try {
-            return store.getChildren(path).get();
-        } catch (ExecutionException e) {
+            return store.getChildren(path)
+                    .get(AbstractMetadataDriver.BLOCKING_CALL_TIMEOUT, 
TimeUnit.MILLISECONDS);
+        } catch (ExecutionException | TimeoutException e) {
             if (log.isDebugEnabled()) {
                 log.debug("Failed to get children at {}", path);
             }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLayoutManager.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLayoutManager.java
index ee06930b3c8..99dc474f5cd 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLayoutManager.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLayoutManager.java
@@ -18,9 +18,12 @@
  */
 package org.apache.pulsar.metadata.bookkeeper;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static 
org.apache.pulsar.metadata.bookkeeper.AbstractMetadataDriver.BLOCKING_CALL_TIMEOUT;
 import java.io.IOException;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
 import lombok.AccessLevel;
 import lombok.Getter;
 import org.apache.bookkeeper.bookie.BookieException;
@@ -49,14 +52,14 @@ public class PulsarLayoutManager implements LayoutManager {
     @Override
     public LedgerLayout readLedgerLayout() throws IOException {
         try {
-            byte[] layoutData = store.get(layoutPath).get()
+            byte[] layoutData = 
store.get(layoutPath).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS)
                     .orElseThrow(() -> new 
BookieException.MetadataStoreException("Layout node not found"))
                     .getValue();
             return LedgerLayout.parseLayout(layoutData);
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new IOException(e);
-        } catch (BookieException | ExecutionException e) {
+        } catch (BookieException | ExecutionException | TimeoutException e) {
             throw new IOException(e);
         }
     }
@@ -66,10 +69,13 @@ public class PulsarLayoutManager implements LayoutManager {
         try {
             byte[] layoutData = ledgerLayout.serialize();
 
-            store.put(layoutPath, layoutData, Optional.of(-1L)).get();
+            store.put(layoutPath, layoutData, Optional.of(-1L))
+                    .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new IOException(e);
+        } catch (TimeoutException e) {
+            throw new IOException(e);
         } catch (ExecutionException e) {
             if (e.getCause() instanceof 
MetadataStoreException.BadVersionException) {
                 throw new LedgerLayoutExistsException(e);
@@ -82,11 +88,12 @@ public class PulsarLayoutManager implements LayoutManager {
     @Override
     public void deleteLedgerLayout() throws IOException {
         try {
-            store.delete(layoutPath, Optional.empty()).get();
+            store.delete(layoutPath, Optional.empty())
+                    .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new IOException(e);
-        } catch (ExecutionException e) {
+        } catch (ExecutionException | TimeoutException e) {
             throw new IOException(e);
         }
     }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerManagerFactory.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerManagerFactory.java
index 1b229757c9c..bfcbf0b22d9 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerManagerFactory.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerManagerFactory.java
@@ -19,8 +19,12 @@
 package org.apache.pulsar.metadata.bookkeeper;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.pulsar.metadata.bookkeeper.AbstractMetadataDriver.BLOCKING_CALL_TIMEOUT;
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.conf.AbstractConfiguration;
@@ -110,7 +114,13 @@ public class PulsarLedgerManagerFactory implements 
LedgerManagerFactory {
          * before proceeding with nuking existing cluster, make sure there
          * are no unexpected nodes under ledgersRootPath
          */
-        List<String> ledgersRootPathChildrenList = 
store.getChildren(ledgerRootPath).join();
+        final List<String> ledgersRootPathChildrenList;
+        try {
+            ledgersRootPathChildrenList = store.getChildren(ledgerRootPath)
+                    .get(BLOCKING_CALL_TIMEOUT, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException | TimeoutException e) {
+            throw new IOException(e);
+        }
         for (String ledgersRootPathChildren : ledgersRootPathChildrenList) {
             if 
((!AbstractZkLedgerManager.isSpecialZnode(ledgersRootPathChildren))
                     && 
(!ledgerManager.isLedgerParentNode(ledgersRootPathChildren))) {
@@ -124,18 +134,34 @@ public class PulsarLedgerManagerFactory implements 
LedgerManagerFactory {
         format(conf, layoutManager);
 
         // now delete all the special nodes recursively
-        for (String ledgersRootPathChildren : 
store.getChildren(ledgerRootPath).join()) {
-            if 
(AbstractZkLedgerManager.isSpecialZnode(ledgersRootPathChildren)) {
-                store.deleteRecursive(ledgerRootPath + "/" + 
ledgersRootPathChildren).join();
+        final List<String> ledgersRootPathChildren;
+        try {
+            ledgersRootPathChildren = store.getChildren(ledgerRootPath)
+                    .get(BLOCKING_CALL_TIMEOUT, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException | TimeoutException e) {
+            throw new IOException(e);
+        }
+        for (String ledgersRootPathChild :ledgersRootPathChildren) {
+            if (AbstractZkLedgerManager.isSpecialZnode(ledgersRootPathChild)) {
+                try {
+                    store.deleteRecursive(ledgerRootPath + "/" + 
ledgersRootPathChild)
+                            .get(BLOCKING_CALL_TIMEOUT, TimeUnit.MILLISECONDS);
+                } catch (ExecutionException | TimeoutException e) {
+                    throw new IOException(e);
+                }
             } else {
                 log.error("Found unexpected node : {} under ledgersRootPath : 
{} so exiting nuke operation",
-                        ledgersRootPathChildren, ledgerRootPath);
+                        ledgersRootPathChild, ledgerRootPath);
                 return false;
             }
         }
 
         // finally deleting the ledgers rootpath
-        store.deleteRecursive(ledgerRootPath).join();
+        try {
+            store.deleteRecursive(ledgerRootPath).get(BLOCKING_CALL_TIMEOUT, 
TimeUnit.MILLISECONDS);
+        } catch (ExecutionException | TimeoutException e) {
+            throw new IOException(e);
+        }
 
         log.info("Successfully nuked existing cluster");
         return true;
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java
index 343c3165ec7..b7cde77bdc2 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java
@@ -19,12 +19,14 @@
 package org.apache.pulsar.metadata.bookkeeper;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.bookkeeper.proto.DataFormats.CheckAllLedgersFormat;
 import static 
org.apache.bookkeeper.proto.DataFormats.LedgerRereplicationLayoutFormat;
 import static org.apache.bookkeeper.proto.DataFormats.LockDataFormat;
 import static 
org.apache.bookkeeper.proto.DataFormats.PlacementPolicyCheckFormat;
 import static org.apache.bookkeeper.proto.DataFormats.ReplicasCheckFormat;
 import static 
org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat;
+import static 
org.apache.pulsar.metadata.bookkeeper.AbstractMetadataDriver.BLOCKING_CALL_TIMEOUT;
 import com.google.common.base.Joiner;
 import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.protobuf.TextFormat;
@@ -43,6 +45,7 @@ import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
 import java.util.function.Predicate;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -274,7 +277,7 @@ public class PulsarLedgerUnderreplicationManager implements 
LedgerUnderreplicati
         try {
             String path = getUrLedgerPath(ledgerId);
 
-            Optional<GetResult> optRes = store.get(path).get();
+            Optional<GetResult> optRes = 
store.get(path).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
             if (!optRes.isPresent()) {
                 if (log.isDebugEnabled()) {
                     log.debug("Ledger: {} is not marked underreplicated", 
ledgerId);
@@ -295,7 +298,7 @@ public class PulsarLedgerUnderreplicationManager implements 
LedgerUnderreplicati
             underreplicatedLedger.setCtime(ctime);
             underreplicatedLedger.setReplicaList(replicaList);
             return underreplicatedLedger;
-        } catch (ExecutionException ee) {
+        } catch (ExecutionException | TimeoutException ee) {
             throw new ReplicationException.UnavailableException("Error 
contacting with metadata store", ee);
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
@@ -399,14 +402,16 @@ public class PulsarLedgerUnderreplicationManager 
implements LedgerUnderreplicati
     public void acquireUnderreplicatedLedger(long ledgerId) throws 
ReplicationException {
         try {
             internalAcquireUnderreplicatedLedger(ledgerId);
-        } catch (ExecutionException | InterruptedException e) {
+        } catch (ExecutionException | TimeoutException | InterruptedException 
e) {
             throw new ReplicationException.UnavailableException("Failed to 
acuire under-replicated ledger", e);
         }
     }
 
-    private void internalAcquireUnderreplicatedLedger(long ledgerId) throws 
ExecutionException, InterruptedException {
+    private void internalAcquireUnderreplicatedLedger(long ledgerId) throws 
ExecutionException,
+            InterruptedException, TimeoutException {
         String lockPath = getUrLedgerLockPath(urLockPath, ledgerId);
-        store.put(lockPath, LOCK_DATA, Optional.of(-1L), 
EnumSet.of(CreateOption.Ephemeral)).get();
+        store.put(lockPath, LOCK_DATA, Optional.of(-1L), 
EnumSet.of(CreateOption.Ephemeral))
+                .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
     }
 
     @Override
@@ -417,7 +422,8 @@ public class PulsarLedgerUnderreplicationManager implements 
LedgerUnderreplicati
         try {
             Lock l = heldLocks.get(ledgerId);
             if (l != null) {
-                store.delete(getUrLedgerPath(ledgerId), 
Optional.of(l.getLedgerNodeVersion())).get();
+                store.delete(getUrLedgerPath(ledgerId), 
Optional.of(l.getLedgerNodeVersion()))
+                        .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
                 if (store instanceof ZKMetadataStore) {
                     try {
                         // clean up the hierarchy
@@ -455,6 +461,8 @@ public class PulsarLedgerUnderreplicationManager implements 
LedgerUnderreplicati
                 log.error("Error deleting underreplicated ledger node", ee);
                 throw new ReplicationException.UnavailableException("Error 
contacting metadata store", ee);
             }
+        } catch (TimeoutException ex) {
+            throw new ReplicationException.UnavailableException("Error 
contacting metadata store", ex);
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
             throw new ReplicationException.UnavailableException("Interrupted 
while contacting metadata store", ie);
@@ -495,7 +503,7 @@ public class PulsarLedgerUnderreplicationManager implements 
LedgerUnderreplicati
                 while (queue.size() > 0 && curBatch.size() == 0) {
                     String parent = queue.remove();
                     try {
-                        for (String c : store.getChildren(parent).get()) {
+                        for (String c : 
store.getChildren(parent).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS)) {
                             String child = parent + "/" + c;
                             if (c.startsWith("urL")) {
                                 long ledgerId = getLedgerId(child);
@@ -529,21 +537,23 @@ public class PulsarLedgerUnderreplicationManager 
implements LedgerUnderreplicati
     }
 
     private long getLedgerToRereplicateFromHierarchy(String parent, long depth)
-            throws ExecutionException, InterruptedException {
+            throws ExecutionException, InterruptedException, TimeoutException {
         if (depth == 4) {
-            List<String> children = new 
ArrayList<>(store.getChildren(parent).get());
+            List<String> children = new ArrayList<>(store.getChildren(parent)
+                    .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS));
             Collections.shuffle(children);
 
             while (!children.isEmpty()) {
                 String tryChild = children.get(0);
                 try {
-                    List<String> locks = store.getChildren(urLockPath).get();
+                    List<String> locks = 
store.getChildren(urLockPath).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
                     if (locks.contains(tryChild)) {
                         children.remove(tryChild);
                         continue;
                     }
 
-                    Optional<GetResult> optRes = store.get(parent + "/" + 
tryChild).get();
+                    Optional<GetResult> optRes = store.get(parent + "/" + 
tryChild)
+                            .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
                     if (!optRes.isPresent()) {
                         if (log.isDebugEnabled()) {
                             log.debug("{}/{} doesn't exist", parent, tryChild);
@@ -572,7 +582,7 @@ public class PulsarLedgerUnderreplicationManager implements 
LedgerUnderreplicati
             return -1;
         }
 
-        List<String> children = new 
ArrayList<>(store.getChildren(parent).join());
+        List<String> children = new 
ArrayList<>(store.getChildren(parent).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS));
         Collections.shuffle(children);
 
         while (children.size() > 0) {
@@ -595,7 +605,7 @@ public class PulsarLedgerUnderreplicationManager implements 
LedgerUnderreplicati
         }
         try {
             return getLedgerToRereplicateFromHierarchy(urLedgerPath, 0);
-        } catch (ExecutionException ee) {
+        } catch (ExecutionException | TimeoutException ee) {
             throw new ReplicationException.UnavailableException("Error 
contacting metadata store", ee);
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
@@ -621,7 +631,7 @@ public class PulsarLedgerUnderreplicationManager implements 
LedgerUnderreplicati
                     // nothing found, wait for a watcher to trigger
                     this.wait(1000);
                 }
-            } catch (ExecutionException ee) {
+            } catch (ExecutionException | TimeoutException ee) {
                 throw new ReplicationException.UnavailableException("Error 
contacting metadata store", ee);
             } catch (InterruptedException ie) {
                 Thread.currentThread().interrupt();
@@ -647,7 +657,8 @@ public class PulsarLedgerUnderreplicationManager implements 
LedgerUnderreplicati
         try {
             Lock l = heldLocks.get(ledgerId);
             if (l != null) {
-                store.delete(l.getLockPath(), Optional.empty()).get();
+                store.delete(l.getLockPath(), Optional.empty())
+                            .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
             }
         } catch (ExecutionException ee) {
             if (ee.getCause() instanceof 
MetadataStoreException.NotFoundException) {
@@ -656,6 +667,8 @@ public class PulsarLedgerUnderreplicationManager implements 
LedgerUnderreplicati
                 log.error("Error deleting underreplicated ledger lock", ee);
                 throw new ReplicationException.UnavailableException("Error 
contacting metadata store", ee);
             }
+        } catch (TimeoutException ex) {
+            throw new ReplicationException.UnavailableException("Error 
contacting metadata store", ex);
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
             throw new ReplicationException.UnavailableException("Interrupted 
while connecting metadata store", ie);
@@ -670,7 +683,8 @@ public class PulsarLedgerUnderreplicationManager implements 
LedgerUnderreplicati
         }
         try {
             for (Map.Entry<Long, Lock> e : heldLocks.entrySet()) {
-                store.delete(e.getValue().getLockPath(), 
Optional.empty()).get();
+                store.delete(e.getValue().getLockPath(), Optional.empty())
+                        .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
             }
         } catch (ExecutionException ee) {
             if (ee.getCause() instanceof 
MetadataStoreException.NotFoundException) {
@@ -679,6 +693,8 @@ public class PulsarLedgerUnderreplicationManager implements 
LedgerUnderreplicati
                 log.error("Error deleting underreplicated ledger lock", ee);
                 throw new ReplicationException.UnavailableException("Error 
contacting metadata store", ee);
             }
+        } catch (TimeoutException ex) {
+            throw new ReplicationException.UnavailableException("Error 
contacting metadata store", ex);
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
             throw new ReplicationException.UnavailableException("Interrupted 
while connecting metadata store", ie);
@@ -692,9 +708,10 @@ public class PulsarLedgerUnderreplicationManager 
implements LedgerUnderreplicati
             log.debug("disableLedegerReplication()");
         }
         try {
-            store.put(replicationDisablePath, "".getBytes(UTF_8), 
Optional.of(-1L)).get();
+            store.put(replicationDisablePath, "".getBytes(UTF_8), 
Optional.of(-1L))
+                    .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
             log.info("Auto ledger re-replication is disabled!");
-        } catch (ExecutionException ee) {
+        } catch (ExecutionException | TimeoutException ee) {
             log.error("Exception while stopping auto ledger re-replication", 
ee);
             throw new ReplicationException.UnavailableException(
                     "Exception while stopping auto ledger re-replication", ee);
@@ -712,9 +729,10 @@ public class PulsarLedgerUnderreplicationManager 
implements LedgerUnderreplicati
             log.debug("enableLedegerReplication()");
         }
         try {
-            store.delete(replicationDisablePath, Optional.empty()).get();
+            store.delete(replicationDisablePath, Optional.empty())
+                    .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
             log.info("Resuming automatic ledger re-replication");
-        } catch (ExecutionException ee) {
+        } catch (ExecutionException | TimeoutException ee) {
             log.error("Exception while resuming ledger replication", ee);
             throw new ReplicationException.UnavailableException(
                     "Exception while resuming auto ledger re-replication", ee);
@@ -732,8 +750,9 @@ public class PulsarLedgerUnderreplicationManager implements 
LedgerUnderreplicati
             log.debug("isLedgerReplicationEnabled()");
         }
         try {
-            return !store.exists(replicationDisablePath).get();
-        } catch (ExecutionException ee) {
+            return !store.exists(replicationDisablePath)
+                    .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
+        } catch (ExecutionException | TimeoutException ee) {
             log.error("Error while checking the state of "
                     + "ledger re-replication", ee);
             throw new ReplicationException.UnavailableException(
@@ -755,13 +774,14 @@ public class PulsarLedgerUnderreplicationManager 
implements LedgerUnderreplicati
             replicationEnabledCallbacks.add(cb);
         }
         try {
-            if (!store.exists(replicationDisablePath).get()) {
+            if (!store.exists(replicationDisablePath)
+                    .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS)) {
                 log.info("LedgerReplication is enabled externally through 
metadata store, "
                         + "since DISABLE_NODE node is deleted");
                 cb.operationComplete(0, null);
                 return;
             }
-        } catch (ExecutionException ee) {
+        } catch (ExecutionException | TimeoutException ee) {
             log.error("Error while checking the state of "
                     + "ledger re-replication", ee);
             throw new ReplicationException.UnavailableException(
@@ -779,7 +799,7 @@ public class PulsarLedgerUnderreplicationManager implements 
LedgerUnderreplicati
     @Override
     public boolean isLedgerBeingReplicated(long ledgerId) throws 
ReplicationException {
         try {
-            return store.exists(getUrLedgerLockPath(urLockPath, 
ledgerId)).get();
+            return store.exists(getUrLedgerLockPath(urLockPath, 
ledgerId)).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
         } catch (Exception e) {
             throw new ReplicationException.UnavailableException("Failed to 
check if ledger is beinge replicated", e);
         }
@@ -791,7 +811,7 @@ public class PulsarLedgerUnderreplicationManager implements 
LedgerUnderreplicati
         log.debug("initializeLostBookieRecoveryDelay()");
         try {
             store.put(lostBookieRecoveryDelayPath, 
Integer.toString(lostBookieRecoveryDelay).getBytes(UTF_8),
-                    Optional.of(-1L)).get();
+                    Optional.of(-1L)).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
         } catch (ExecutionException ee) {
             if (ee.getCause() instanceof 
MetadataStoreException.BadVersionException) {
                 log.info("lostBookieRecoveryDelay node is already present, so 
using "
@@ -801,6 +821,9 @@ public class PulsarLedgerUnderreplicationManager implements 
LedgerUnderreplicati
                 log.error("Error while initializing LostBookieRecoveryDelay", 
ee);
                 throw new ReplicationException.UnavailableException("Error 
contacting zookeeper", ee);
             }
+        } catch (TimeoutException ex) {
+            log.error("Error while initializing LostBookieRecoveryDelay", ex);
+            throw new ReplicationException.UnavailableException("Error 
contacting zookeeper", ex);
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
             throw new ReplicationException.UnavailableException("Interrupted 
while contacting zookeeper", ie);
@@ -814,9 +837,9 @@ public class PulsarLedgerUnderreplicationManager implements 
LedgerUnderreplicati
         log.debug("setLostBookieRecoveryDelay()");
         try {
             store.put(lostBookieRecoveryDelayPath, 
Integer.toString(lostBookieRecoveryDelay).getBytes(UTF_8),
-                    Optional.empty()).get();
+                    Optional.empty()).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
 
-        } catch (ExecutionException ee) {
+        } catch (ExecutionException | TimeoutException ee) {
             log.error("Error while setting LostBookieRecoveryDelay ", ee);
             throw new ReplicationException.UnavailableException("Error 
contacting zookeeper", ee);
         } catch (InterruptedException ie) {
@@ -829,9 +852,10 @@ public class PulsarLedgerUnderreplicationManager 
implements LedgerUnderreplicati
     public int getLostBookieRecoveryDelay() throws 
ReplicationException.UnavailableException {
         log.debug("getLostBookieRecoveryDelay()");
         try {
-            byte[] data = 
store.get(lostBookieRecoveryDelayPath).get().get().getValue();
+            byte[] data = 
store.get(lostBookieRecoveryDelayPath).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS)
+                    .get().getValue();
             return Integer.parseInt(new String(data, UTF_8));
-        } catch (ExecutionException ee) {
+        } catch (ExecutionException | TimeoutException ee) {
             log.error("Error while getting LostBookieRecoveryDelay ", ee);
             throw new ReplicationException.UnavailableException("Error 
contacting zookeeper", ee);
         } catch (InterruptedException ie) {
@@ -848,12 +872,12 @@ public class PulsarLedgerUnderreplicationManager 
implements LedgerUnderreplicati
             lostBookieRecoveryDelayCallbacks.add(cb);
         }
         try {
-            if (!store.exists(lostBookieRecoveryDelayPath).get()) {
+            if 
(!store.exists(lostBookieRecoveryDelayPath).get(BLOCKING_CALL_TIMEOUT, 
MILLISECONDS)) {
                 cb.operationComplete(0, null);
                 return;
             }
 
-        } catch (ExecutionException ee) {
+        } catch (ExecutionException | TimeoutException ee) {
             log.error("Error while checking the state of 
lostBookieRecoveryDelay", ee);
             throw new ReplicationException.UnavailableException("Error 
contacting zookeeper", ee);
         } catch (InterruptedException ie) {
@@ -867,7 +891,8 @@ public class PulsarLedgerUnderreplicationManager implements 
LedgerUnderreplicati
             throws ReplicationException.UnavailableException {
 
         try {
-            Optional<GetResult> optRes = 
store.get(getUrLedgerLockPath(urLockPath, ledgerId)).get();
+            Optional<GetResult> optRes = 
store.get(getUrLedgerLockPath(urLockPath, ledgerId))
+                    .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
             if (!optRes.isPresent()) {
                 // this is ok.
                 return null;
@@ -878,7 +903,7 @@ public class PulsarLedgerUnderreplicationManager implements 
LedgerUnderreplicati
             TextFormat.merge(new String(lockData, UTF_8), lockDataBuilder);
             LockDataFormat lock = lockDataBuilder.build();
             return lock.getBookieId();
-        } catch (ExecutionException e) {
+        } catch (ExecutionException | TimeoutException e) {
             log.error("Error while getting ReplicationWorkerId rereplicating 
Ledger", e);
             throw new ReplicationException.UnavailableException(
                     "Error while getting ReplicationWorkerId rereplicating 
Ledger", e);
@@ -902,8 +927,9 @@ public class PulsarLedgerUnderreplicationManager implements 
LedgerUnderreplicati
             builder.setCheckAllLedgersCTime(checkAllLedgersCTime);
             byte[] checkAllLedgersFormatByteArray = 
builder.build().toByteArray();
 
-            store.put(checkAllLedgersCtimePath, 
checkAllLedgersFormatByteArray, Optional.empty()).get();
-        } catch (ExecutionException ee) {
+            store.put(checkAllLedgersCtimePath, 
checkAllLedgersFormatByteArray, Optional.empty())
+                    .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
+        } catch (ExecutionException | TimeoutException ee) {
             throw new ReplicationException.UnavailableException("Error 
contacting zookeeper", ee);
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
@@ -917,7 +943,7 @@ public class PulsarLedgerUnderreplicationManager implements 
LedgerUnderreplicati
             log.debug("setCheckAllLedgersCTime");
         }
         try {
-            Optional<GetResult> optRes = 
store.get(checkAllLedgersCtimePath).get();
+            Optional<GetResult> optRes = 
store.get(checkAllLedgersCtimePath).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
             if (!optRes.isPresent()) {
                 log.warn("checkAllLedgersCtimeZnode is not yet available");
                 return -1;
@@ -926,7 +952,7 @@ public class PulsarLedgerUnderreplicationManager implements 
LedgerUnderreplicati
             CheckAllLedgersFormat checkAllLedgersFormat = 
CheckAllLedgersFormat.parseFrom(data);
             return checkAllLedgersFormat.hasCheckAllLedgersCTime() ? 
checkAllLedgersFormat.getCheckAllLedgersCTime()
                     : -1;
-        } catch (ExecutionException ee) {
+        } catch (ExecutionException | TimeoutException ee) {
             throw new ReplicationException.UnavailableException("Error 
contacting zookeeper", ee);
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
@@ -946,8 +972,9 @@ public class PulsarLedgerUnderreplicationManager implements 
LedgerUnderreplicati
             PlacementPolicyCheckFormat.Builder builder = 
PlacementPolicyCheckFormat.newBuilder();
             builder.setPlacementPolicyCheckCTime(placementPolicyCheckCTime);
             byte[] placementPolicyCheckFormatByteArray = 
builder.build().toByteArray();
-            store.put(placementPolicyCheckCtimePath, 
placementPolicyCheckFormatByteArray, Optional.empty()).get();
-        } catch (ExecutionException ke) {
+            store.put(placementPolicyCheckCtimePath, 
placementPolicyCheckFormatByteArray, Optional.empty())
+                    .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
+        } catch (ExecutionException | TimeoutException ke) {
             throw new ReplicationException.UnavailableException("Error 
contacting zookeeper", ke);
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
@@ -961,7 +988,8 @@ public class PulsarLedgerUnderreplicationManager implements 
LedgerUnderreplicati
             log.debug("getPlacementPolicyCheckCTime");
         }
         try {
-            Optional<GetResult> optRes = 
store.get(placementPolicyCheckCtimePath).get();
+            Optional<GetResult> optRes = 
store.get(placementPolicyCheckCtimePath)
+                    .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
             if (!optRes.isPresent()) {
                 log.warn("placementPolicyCheckCtimeZnode is not yet 
available");
                 return -1;
@@ -970,7 +998,7 @@ public class PulsarLedgerUnderreplicationManager implements 
LedgerUnderreplicati
             PlacementPolicyCheckFormat placementPolicyCheckFormat = 
PlacementPolicyCheckFormat.parseFrom(data);
             return placementPolicyCheckFormat.hasPlacementPolicyCheckCTime()
                     ? 
placementPolicyCheckFormat.getPlacementPolicyCheckCTime() : -1;
-        } catch (ExecutionException ee) {
+        } catch (ExecutionException | TimeoutException ee) {
             throw new ReplicationException.UnavailableException("Error 
contacting zookeeper", ee);
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
@@ -986,11 +1014,12 @@ public class PulsarLedgerUnderreplicationManager 
implements LedgerUnderreplicati
             ReplicasCheckFormat.Builder builder = 
ReplicasCheckFormat.newBuilder();
             builder.setReplicasCheckCTime(replicasCheckCTime);
             byte[] replicasCheckFormatByteArray = 
builder.build().toByteArray();
-            store.put(replicasCheckCtimePath, replicasCheckFormatByteArray, 
Optional.empty()).get();
+            store.put(replicasCheckCtimePath, replicasCheckFormatByteArray, 
Optional.empty())
+                    .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
             if (log.isDebugEnabled()) {
                 log.debug("setReplicasCheckCTime completed successfully");
             }
-        } catch (ExecutionException ke) {
+        } catch (ExecutionException | TimeoutException ke) {
             throw new ReplicationException.UnavailableException("Error 
contacting zookeeper", ke);
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
@@ -1001,7 +1030,8 @@ public class PulsarLedgerUnderreplicationManager 
implements LedgerUnderreplicati
     @Override
     public long getReplicasCheckCTime() throws 
ReplicationException.UnavailableException {
         try {
-            Optional<GetResult> optRes = 
store.get(replicasCheckCtimePath).get();
+            Optional<GetResult> optRes = store.get(replicasCheckCtimePath)
+                    .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
             if (!optRes.isPresent()) {
                 log.warn("placementPolicyCheckCtimeZnode is not yet 
available");
                 return -1;
@@ -1012,7 +1042,7 @@ public class PulsarLedgerUnderreplicationManager 
implements LedgerUnderreplicati
                 log.debug("getReplicasCheckCTime completed successfully");
             }
             return replicasCheckFormat.hasReplicasCheckCTime() ? 
replicasCheckFormat.getReplicasCheckCTime() : -1;
-        } catch (ExecutionException ee) {
+        } catch (ExecutionException | TimeoutException ee) {
             throw new ReplicationException.UnavailableException("Error 
contacting zookeeper", ee);
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationManager.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationManager.java
index 25c3f10aa18..c6aba6b7d93 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationManager.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationManager.java
@@ -19,10 +19,12 @@
 package org.apache.pulsar.metadata.bookkeeper;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
 import static org.apache.bookkeeper.util.BookKeeperConstants.COOKIE_NODE;
 import static org.apache.bookkeeper.util.BookKeeperConstants.INSTANCEID;
 import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY;
+import static 
org.apache.pulsar.metadata.bookkeeper.AbstractMetadataDriver.BLOCKING_CALL_TIMEOUT;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -32,8 +34,8 @@ import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
 import lombok.Cleanup;
-import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.conf.AbstractConfiguration;
@@ -85,12 +87,11 @@ public class PulsarRegistrationManager implements 
RegistrationManager {
     }
 
     @Override
-    @SneakyThrows
     public void close() {
         for (ResourceLock<BookieServiceInfo> rwBookie : 
bookieRegistration.values()) {
             try {
-                rwBookie.release().get();
-            } catch (ExecutionException ignore) {
+                rwBookie.release().get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
+            } catch (ExecutionException | TimeoutException ignore) {
                 log.error("Cannot release correctly {}", rwBookie, 
ignore.getCause());
             } catch (InterruptedException ignore) {
                 log.error("Cannot release correctly {}", rwBookie, ignore);
@@ -100,26 +101,30 @@ public class PulsarRegistrationManager implements 
RegistrationManager {
 
         for (ResourceLock<BookieServiceInfo> roBookie : 
bookieRegistrationReadOnly.values()) {
             try {
-                roBookie.release().get();
-            } catch (ExecutionException ignore) {
+                roBookie.release().get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
+            } catch (ExecutionException | TimeoutException ignore) {
                 log.error("Cannot release correctly {}", roBookie, 
ignore.getCause());
             } catch (InterruptedException ignore) {
                 log.error("Cannot release correctly {}", roBookie, ignore);
                 Thread.currentThread().interrupt();
             }
         }
-        coordinationService.close();
+        try {
+            coordinationService.close();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
     }
 
     @Override
     public String getClusterInstanceId() throws BookieException {
         try {
             return store.get(ledgersRootPath + "/" + INSTANCEID)
-                    .get()
+                    .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS)
                     .map(res -> new String(res.getValue(), UTF_8))
                     .orElseThrow(
                             () -> new 
BookieException.MetadataStoreException("BookKeeper cluster not initialized"));
-        } catch (ExecutionException | InterruptedException e) {
+        } catch (ExecutionException | InterruptedException | TimeoutException 
e) {
             throw new BookieException.MetadataStoreException("Failed to get 
cluster instance id", e);
         }
     }
@@ -136,22 +141,24 @@ public class PulsarRegistrationManager implements 
RegistrationManager {
                 ResourceLock<BookieServiceInfo> rwRegistration = 
bookieRegistration.remove(bookieId);
                 if (rwRegistration != null) {
                     log.info("Bookie {} was already registered as writable, 
unregistering", bookieId);
-                    rwRegistration.release().get();
+                    rwRegistration.release().get(BLOCKING_CALL_TIMEOUT, 
MILLISECONDS);
                 }
 
                 bookieRegistrationReadOnly.put(bookieId,
-                        lockManager.acquireLock(regPathReadOnly, 
bookieServiceInfo).get());
+                        lockManager.acquireLock(regPathReadOnly, 
bookieServiceInfo)
+                                .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS));
             } else {
                 ResourceLock<BookieServiceInfo> roRegistration = 
bookieRegistrationReadOnly.remove(bookieId);
                 if (roRegistration != null) {
                     log.info("Bookie {} was already registered as read-only, 
unregistering", bookieId);
-                    roRegistration.release().get();
+                    roRegistration.release().get(BLOCKING_CALL_TIMEOUT, 
MILLISECONDS);
                 }
 
                 bookieRegistration.put(bookieId,
-                        lockManager.acquireLock(regPath, 
bookieServiceInfo).get());
+                        lockManager.acquireLock(regPath, bookieServiceInfo)
+                                .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS));
             }
-        } catch (ExecutionException ee) {
+        } catch (ExecutionException | TimeoutException ee) {
             log.error("Exception registering ephemeral node for Bookie!", ee);
             // Throw an IOException back up. This will cause the Bookie
             // constructor to error out. Alternatively, we could do a System
@@ -173,18 +180,18 @@ public class PulsarRegistrationManager implements 
RegistrationManager {
             if (readOnly) {
                 ResourceLock<BookieServiceInfo> roRegistration = 
bookieRegistrationReadOnly.get(bookieId);
                 if (roRegistration != null) {
-                    roRegistration.release().get();
+                    roRegistration.release().get(BLOCKING_CALL_TIMEOUT, 
MILLISECONDS);
                 }
             } else {
                 ResourceLock<BookieServiceInfo> rwRegistration = 
bookieRegistration.get(bookieId);
                 if (rwRegistration != null) {
-                    rwRegistration.release().get();
+                    rwRegistration.release().get(BLOCKING_CALL_TIMEOUT, 
MILLISECONDS);
                 }
             }
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
             throw new BookieException.MetadataStoreException(ie);
-        } catch (ExecutionException e) {
+        } catch (ExecutionException | TimeoutException e) {
             throw new BookieException.MetadataStoreException(e);
         }
     }
@@ -195,8 +202,9 @@ public class PulsarRegistrationManager implements 
RegistrationManager {
         String readonlyRegPath = bookieReadonlyRegistrationPath + "/" + 
bookieId;
 
         try {
-            return (store.exists(regPath).get() || 
store.exists(readonlyRegPath).get());
-        } catch (ExecutionException e) {
+            return (store.exists(regPath).get(BLOCKING_CALL_TIMEOUT, 
MILLISECONDS)
+                    || 
store.exists(readonlyRegPath).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS));
+        } catch (ExecutionException | TimeoutException e) {
             log.error("Exception while checking registration ephemeral nodes 
for BookieId: {}", bookieId, e);
             throw new BookieException.MetadataStoreException(e);
         } catch (InterruptedException e) {
@@ -222,7 +230,8 @@ public class PulsarRegistrationManager implements 
RegistrationManager {
                 version = ((LongVersion) 
cookieData.getVersion()).getLongVersion();
             }
 
-            store.put(path, cookieData.getValue(), Optional.of(version)).get();
+            store.put(path, cookieData.getValue(), Optional.of(version))
+                    .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
             throw new BookieException.MetadataStoreException("Interrupted 
writing cookie for bookie " + bookieId, ie);
@@ -232,6 +241,8 @@ public class PulsarRegistrationManager implements 
RegistrationManager {
             } else {
                 throw new BookieException.MetadataStoreException("Failed to 
write cookie for bookie " + bookieId);
             }
+        } catch (TimeoutException ex) {
+            throw new BookieException.MetadataStoreException("Failed to write 
cookie for bookie " + bookieId, ex);
         }
     }
 
@@ -239,7 +250,7 @@ public class PulsarRegistrationManager implements 
RegistrationManager {
     public Versioned<byte[]> readCookie(BookieId bookieId) throws 
BookieException {
         String path = this.cookiePath + "/" + bookieId;
         try {
-            Optional<GetResult> res = store.get(path).get();
+            Optional<GetResult> res = 
store.get(path).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
             if (!res.isPresent()) {
                 throw new 
BookieException.CookieNotFoundException(bookieId.toString());
             }
@@ -250,7 +261,7 @@ public class PulsarRegistrationManager implements 
RegistrationManager {
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
             throw new BookieException.MetadataStoreException(ie);
-        } catch (ExecutionException e) {
+        } catch (ExecutionException | TimeoutException e) {
             throw new BookieException.MetadataStoreException(e);
         }
     }
@@ -259,7 +270,8 @@ public class PulsarRegistrationManager implements 
RegistrationManager {
     public void removeCookie(BookieId bookieId, Version version) throws 
BookieException {
         String path = this.cookiePath + "/" + bookieId;
         try {
-            store.delete(path, Optional.of(((LongVersion) 
version).getLongVersion())).get();
+            store.delete(path, Optional.of(((LongVersion) 
version).getLongVersion()))
+                    .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new BookieException.MetadataStoreException("Interrupted 
deleting cookie for bookie " + bookieId, e);
@@ -269,6 +281,8 @@ public class PulsarRegistrationManager implements 
RegistrationManager {
             } else {
                 throw new BookieException.MetadataStoreException("Failed to 
delete cookie for bookie " + bookieId);
             }
+        } catch (TimeoutException ex) {
+            throw new BookieException.MetadataStoreException("Failed to delete 
cookie for bookie " + bookieId);
         }
 
         log.info("Removed cookie from {} for bookie {}.", cookiePath, 
bookieId);
@@ -276,20 +290,23 @@ public class PulsarRegistrationManager implements 
RegistrationManager {
 
     @Override
     public boolean prepareFormat() throws Exception {
-        boolean ledgerRootExists = store.exists(ledgersRootPath).get();
-        boolean availableNodeExists = 
store.exists(bookieRegistrationPath).get();
+        boolean ledgerRootExists = 
store.exists(ledgersRootPath).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
+        boolean availableNodeExists = 
store.exists(bookieRegistrationPath).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
         // Create ledgers root node if not exists
         if (!ledgerRootExists) {
-            store.put(ledgersRootPath, new byte[0], Optional.empty()).get();
+            store.put(ledgersRootPath, new byte[0], Optional.empty())
+                    .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
         }
         // create available bookies node if not exists
         if (!availableNodeExists) {
-            store.put(bookieRegistrationPath, new byte[0], 
Optional.empty()).get();
+            store.put(bookieRegistrationPath, new byte[0], Optional.empty())
+                    .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
         }
 
         // create readonly bookies node if not exists
-        if (!store.exists(bookieReadonlyRegistrationPath).get()) {
-            store.put(bookieReadonlyRegistrationPath, new byte[0], 
Optional.empty()).get();
+        if 
(!store.exists(bookieReadonlyRegistrationPath).get(BLOCKING_CALL_TIMEOUT, 
MILLISECONDS)) {
+            store.put(bookieReadonlyRegistrationPath, new byte[0], 
Optional.empty())
+                    .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
         }
 
         return ledgerRootExists;
@@ -301,16 +318,18 @@ public class PulsarRegistrationManager implements 
RegistrationManager {
         log.info("Initializing metadata for new cluster, ledger root path: {}",
                 ledgersRootPath);
 
-        if (store.exists(instanceIdPath).get()) {
+        if (store.exists(instanceIdPath).get(BLOCKING_CALL_TIMEOUT, 
MILLISECONDS)) {
             log.error("Ledger root path: {} already exists", ledgersRootPath);
             return false;
         }
 
-        store.put(ledgersRootPath, new byte[0], Optional.empty()).get();
+        store.put(ledgersRootPath, new byte[0], Optional.empty())
+                .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
 
         // create INSTANCEID
         String instanceId = UUID.randomUUID().toString();
-        store.put(instanceIdPath, instanceId.getBytes(UTF_8), 
Optional.of(-1L)).join();
+        store.put(instanceIdPath, instanceId.getBytes(UTF_8), Optional.of(-1L))
+                .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
 
         log.info("Successfully initiated cluster. ledger root path: {} 
instanceId: {}",
                 ledgersRootPath, instanceId);
@@ -321,23 +340,28 @@ public class PulsarRegistrationManager implements 
RegistrationManager {
     public boolean format() throws Exception {
         // Clear underreplicated ledgers
         
store.deleteRecursive(PulsarLedgerUnderreplicationManager.getBasePath(ledgersRootPath)
-                + BookKeeperConstants.DEFAULT_ZK_LEDGERS_ROOT_PATH).get();
+                              + 
BookKeeperConstants.DEFAULT_ZK_LEDGERS_ROOT_PATH)
+                .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
 
         // Clear underreplicatedledger locks
-        
store.deleteRecursive(PulsarLedgerUnderreplicationManager.getUrLockPath(ledgersRootPath)).get();
+        
store.deleteRecursive(PulsarLedgerUnderreplicationManager.getUrLockPath(ledgersRootPath))
+                .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
 
         // Clear the cookies
-        store.deleteRecursive(cookiePath).get();
+        store.deleteRecursive(cookiePath).get(BLOCKING_CALL_TIMEOUT, 
MILLISECONDS);
 
         // Clear the INSTANCEID
-        if (store.exists(ledgersRootPath + "/" + 
BookKeeperConstants.INSTANCEID).get()) {
-            store.delete(ledgersRootPath + "/" + 
BookKeeperConstants.INSTANCEID, Optional.empty()).get();
+        if (store.exists(ledgersRootPath + "/" + 
BookKeeperConstants.INSTANCEID)
+                .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS)) {
+            store.delete(ledgersRootPath + "/" + 
BookKeeperConstants.INSTANCEID, Optional.empty())
+                    .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
         }
 
         // create INSTANCEID
         String instanceId = UUID.randomUUID().toString();
         store.put(ledgersRootPath + "/" + BookKeeperConstants.INSTANCEID,
-                instanceId.getBytes(StandardCharsets.UTF_8), 
Optional.of(-1L)).get();
+                instanceId.getBytes(StandardCharsets.UTF_8), Optional.of(-1L))
+                .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
 
         log.info("Successfully formatted BookKeeper metadata");
         return true;
@@ -347,7 +371,7 @@ public class PulsarRegistrationManager implements 
RegistrationManager {
     public boolean nukeExistingCluster() throws Exception {
         log.info("Nuking metadata of existing cluster, ledger root path: {}", 
ledgersRootPath);
 
-        if (!store.exists(ledgersRootPath + "/" + INSTANCEID).join()) {
+        if (!store.exists(ledgersRootPath + "/" + 
INSTANCEID).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS)) {
             log.info("There is no existing cluster with ledgersRootPath: {}, 
so exiting nuke operation",
                     ledgersRootPath);
             return true;
@@ -356,17 +380,19 @@ public class PulsarRegistrationManager implements 
RegistrationManager {
         @Cleanup
         RegistrationClient registrationClient = new 
PulsarRegistrationClient(store, ledgersRootPath);
 
-        Collection<BookieId> rwBookies = 
registrationClient.getWritableBookies().join().getValue();
+        Collection<BookieId> rwBookies = 
registrationClient.getWritableBookies()
+                .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS).getValue();
         if (rwBookies != null && !rwBookies.isEmpty()) {
             log.error("Bookies are still up and connected to this cluster, "
-                    + "stop all bookies before nuking the cluster");
+                      + "stop all bookies before nuking the cluster");
             return false;
         }
 
-        Collection<BookieId> roBookies = 
registrationClient.getReadOnlyBookies().join().getValue();
+        Collection<BookieId> roBookies = 
registrationClient.getReadOnlyBookies()
+                .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS).getValue();
         if (roBookies != null && !roBookies.isEmpty()) {
             log.error("Readonly Bookies are still up and connected to this 
cluster, "
-                    + "stop all bookies before nuking the cluster");
+                      + "stop all bookies before nuking the cluster");
             return false;
         }
 

Reply via email to