This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.8 by this push:
     new 5de46f9  [Cherry-pick #11597] Use get instead of join to avoid getting 
stuck (#12110)
5de46f9 is described below

commit 5de46f93672d4eef3604754517eb55773b4320a1
Author: Jiwei Guo <techno...@apache.org>
AuthorDate: Fri Sep 24 17:56:19 2021 +0800

    [Cherry-pick #11597] Use get instead of join to avoid getting stuck (#12110)
---
 .../java/org/apache/pulsar/broker/PulsarService.java |  3 ++-
 .../pulsar/broker/admin/impl/NamespacesBase.java     |  6 ++++--
 .../pulsar/broker/admin/v1/NonPersistentTopics.java  | 12 ++++++++++--
 .../pulsar/broker/admin/v2/NonPersistentTopics.java  | 13 +++++++++++--
 .../pulsar/broker/namespace/OwnershipCache.java      | 11 ++++++++++-
 .../pulsar/client/impl/ConsumerBuilderImpl.java      | 20 +++++++++++++++-----
 6 files changed, 52 insertions(+), 13 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 61ffef7..eb01fa4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1063,7 +1063,8 @@ public class PulsarService implements AutoCloseable {
             List<CompletableFuture<Topic>> persistentTopics = 
Lists.newArrayList();
             long topicLoadStart = System.nanoTime();
 
-            for (String topic : 
getNamespaceService().getListOfPersistentTopics(nsName).join()) {
+            for (String topic : 
getNamespaceService().getListOfPersistentTopics(nsName).
+                    get(config.getZooKeeperOperationTimeoutSeconds(), 
TimeUnit.SECONDS)) {
                 try {
                     TopicName topicName = TopicName.get(topic);
                     if (bundle.includes(topicName) && 
!isTransactionSystemTopic(topicName)) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 2ba0a09..3fd6c37 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -391,7 +391,8 @@ public abstract class NamespacesBase extends AdminResource {
 
         List<String> topics;
         try {
-            topics = 
pulsar().getNamespaceService().getFullListOfTopics(namespaceName).join();
+            topics = 
pulsar().getNamespaceService().getFullListOfTopics(namespaceName).
+                    get(config().getZooKeeperOperationTimeoutSeconds(), 
TimeUnit.SECONDS);
         } catch (Exception e) {
             asyncResponse.resume(new RestException(e));
             return;
@@ -571,7 +572,8 @@ public abstract class NamespacesBase extends AdminResource {
         try {
             NamespaceBundle bundle = 
validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
                 authoritative, true);
-            List<String> topics = 
pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).join();
+            List<String> topics = 
pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName)
+                    .get(config().getZooKeeperOperationTimeoutSeconds(), 
TimeUnit.SECONDS);
             for (String topic : topics) {
                 NamespaceBundle topicBundle = pulsar().getNamespaceService()
                         .getBundle(TopicName.get(topic));
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
index daf6ea0..c5d5870 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import javax.ws.rs.DefaultValue;
 import javax.ws.rs.Encoded;
 import javax.ws.rs.GET;
@@ -297,7 +298,14 @@ public class NonPersistentTopics extends PersistentTopics {
     }
 
     private Topic getTopicReference(TopicName topicName) {
-        return 
pulsar().getBrokerService().getTopicIfExists(topicName.toString()).join()
-                .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Topic 
not found"));
+        try {
+            return 
pulsar().getBrokerService().getTopicIfExists(topicName.toString())
+                    .get(config().getZooKeeperOperationTimeoutSeconds(), 
TimeUnit.SECONDS)
+                    .orElseThrow(() -> new RestException(Status.NOT_FOUND, 
"Topic not found"));
+        } catch (ExecutionException e) {
+            throw new RuntimeException(e.getCause());
+        } catch (InterruptedException | TimeoutException e) {
+            throw new RestException(e);
+        }
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index 2cfb20f..d390049 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -29,6 +29,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 import javax.ws.rs.DELETE;
 import javax.ws.rs.DefaultValue;
@@ -530,7 +532,14 @@ public class NonPersistentTopics extends PersistentTopics {
     }
 
     private Topic getTopicReference(TopicName topicName) {
-        return 
pulsar().getBrokerService().getTopicIfExists(topicName.toString()).join()
-                .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Topic 
not found"));
+        try {
+            return 
pulsar().getBrokerService().getTopicIfExists(topicName.toString())
+                    .get(config().getZooKeeperOperationTimeoutSeconds(), 
TimeUnit.SECONDS)
+                    .orElseThrow(() -> new RestException(Status.NOT_FOUND, 
"Topic not found"));
+        } catch (ExecutionException e) {
+            throw new RestException(e.getCause());
+        } catch (InterruptedException | TimeoutException e) {
+            throw new RestException(e);
+        }
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
index 116397a..c27575c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
@@ -30,7 +30,10 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -371,7 +374,13 @@ public class OwnershipCache {
     public OwnedBundle getOwnedBundle(NamespaceBundle bundle) {
         CompletableFuture<OwnedBundle> future = 
ownedBundlesCache.getIfPresent(ServiceUnitUtils.path(bundle));
         if (future != null && future.isDone() && 
!future.isCompletedExceptionally()) {
-            return future.join();
+            try {
+                return 
future.get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), 
TimeUnit.SECONDS);
+            } catch (InterruptedException | TimeoutException e) {
+                throw new RuntimeException(e);
+            } catch (ExecutionException e) {
+                throw new RuntimeException(e.getCause());
+            }
         } else {
             return null;
         }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 47f9991..dba18d9 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -24,7 +24,9 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import lombok.AccessLevel;
@@ -122,11 +124,19 @@ public class ConsumerBuilderImpl<T> implements 
ConsumerBuilder<T> {
             //Issue 9327: do compatibility check in case of the default retry 
and dead letter topic name changed
             String oldRetryLetterTopic = topicFirst.getNamespace() + "/" + 
conf.getSubscriptionName() + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
             String oldDeadLetterTopic = topicFirst.getNamespace() + "/" + 
conf.getSubscriptionName() + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
-            if 
(client.getPartitionedTopicMetadata(oldRetryLetterTopic).join().partitions > 0) 
{
-                retryLetterTopic = oldRetryLetterTopic;
-            }
-            if 
(client.getPartitionedTopicMetadata(oldDeadLetterTopic).join().partitions > 0) {
-                deadLetterTopic = oldDeadLetterTopic;
+            try {
+                if (client.getPartitionedTopicMetadata(oldRetryLetterTopic)
+                        .get(client.conf.getOperationTimeoutMs(), 
TimeUnit.MILLISECONDS).partitions > 0) {
+                    retryLetterTopic = oldRetryLetterTopic;
+                }
+                if (client.getPartitionedTopicMetadata(oldDeadLetterTopic)
+                        .get(client.conf.getOperationTimeoutMs(), 
TimeUnit.MILLISECONDS).partitions > 0) {
+                    deadLetterTopic = oldDeadLetterTopic;
+                }
+            } catch (InterruptedException | TimeoutException e) {
+                return FutureUtil.failedFuture(e);
+            } catch (ExecutionException e) {
+                return FutureUtil.failedFuture(e.getCause());
             }
 
             if(conf.getDeadLetterPolicy() == null) {

Reply via email to