This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push: new 5de46f9 [Cherry-pick #11597] Use get instead of join to avoid getting stuck (#12110) 5de46f9 is described below commit 5de46f93672d4eef3604754517eb55773b4320a1 Author: Jiwei Guo <techno...@apache.org> AuthorDate: Fri Sep 24 17:56:19 2021 +0800 [Cherry-pick #11597] Use get instead of join to avoid getting stuck (#12110) --- .../java/org/apache/pulsar/broker/PulsarService.java | 3 ++- .../pulsar/broker/admin/impl/NamespacesBase.java | 6 ++++-- .../pulsar/broker/admin/v1/NonPersistentTopics.java | 12 ++++++++++-- .../pulsar/broker/admin/v2/NonPersistentTopics.java | 13 +++++++++++-- .../pulsar/broker/namespace/OwnershipCache.java | 11 ++++++++++- .../pulsar/client/impl/ConsumerBuilderImpl.java | 20 +++++++++++++++----- 6 files changed, 52 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 61ffef7..eb01fa4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -1063,7 +1063,8 @@ public class PulsarService implements AutoCloseable { List<CompletableFuture<Topic>> persistentTopics = Lists.newArrayList(); long topicLoadStart = System.nanoTime(); - for (String topic : getNamespaceService().getListOfPersistentTopics(nsName).join()) { + for (String topic : getNamespaceService().getListOfPersistentTopics(nsName). + get(config.getZooKeeperOperationTimeoutSeconds(), TimeUnit.SECONDS)) { try { TopicName topicName = TopicName.get(topic); if (bundle.includes(topicName) && !isTransactionSystemTopic(topicName)) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 2ba0a09..3fd6c37 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -391,7 +391,8 @@ public abstract class NamespacesBase extends AdminResource { List<String> topics; try { - topics = pulsar().getNamespaceService().getFullListOfTopics(namespaceName).join(); + topics = pulsar().getNamespaceService().getFullListOfTopics(namespaceName). + get(config().getZooKeeperOperationTimeoutSeconds(), TimeUnit.SECONDS); } catch (Exception e) { asyncResponse.resume(new RestException(e)); return; @@ -571,7 +572,8 @@ public abstract class NamespacesBase extends AdminResource { try { NamespaceBundle bundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange, authoritative, true); - List<String> topics = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).join(); + List<String> topics = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName) + .get(config().getZooKeeperOperationTimeoutSeconds(), TimeUnit.SECONDS); for (String topic : topics) { NamespaceBundle topicBundle = pulsar().getNamespaceService() .getBundle(TopicName.get(topic)); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java index daf6ea0..c5d5870 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import javax.ws.rs.DefaultValue; import javax.ws.rs.Encoded; import javax.ws.rs.GET; @@ -297,7 +298,14 @@ public class NonPersistentTopics extends PersistentTopics { } private Topic getTopicReference(TopicName topicName) { - return pulsar().getBrokerService().getTopicIfExists(topicName.toString()).join() - .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Topic not found")); + try { + return pulsar().getBrokerService().getTopicIfExists(topicName.toString()) + .get(config().getZooKeeperOperationTimeoutSeconds(), TimeUnit.SECONDS) + .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Topic not found")); + } catch (ExecutionException e) { + throw new RuntimeException(e.getCause()); + } catch (InterruptedException | TimeoutException e) { + throw new RestException(e); + } } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java index 2cfb20f..d390049 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java @@ -29,6 +29,8 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import javax.ws.rs.DELETE; import javax.ws.rs.DefaultValue; @@ -530,7 +532,14 @@ public class NonPersistentTopics extends PersistentTopics { } private Topic getTopicReference(TopicName topicName) { - return pulsar().getBrokerService().getTopicIfExists(topicName.toString()).join() - .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Topic not found")); + try { + return pulsar().getBrokerService().getTopicIfExists(topicName.toString()) + .get(config().getZooKeeperOperationTimeoutSeconds(), TimeUnit.SECONDS) + .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Topic not found")); + } catch (ExecutionException e) { + throw new RestException(e.getCause()); + } catch (InterruptedException | TimeoutException e) { + throw new RestException(e); + } } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java index 116397a..c27575c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java @@ -30,7 +30,10 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.bookkeeper.util.ZkUtils; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.common.naming.NamespaceBundle; @@ -371,7 +374,13 @@ public class OwnershipCache { public OwnedBundle getOwnedBundle(NamespaceBundle bundle) { CompletableFuture<OwnedBundle> future = ownedBundlesCache.getIfPresent(ServiceUnitUtils.path(bundle)); if (future != null && future.isDone() && !future.isCompletedExceptionally()) { - return future.join(); + try { + return future.get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), TimeUnit.SECONDS); + } catch (InterruptedException | TimeoutException e) { + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e.getCause()); + } } else { return null; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index 47f9991..dba18d9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -24,7 +24,9 @@ import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.regex.Pattern; import java.util.stream.Collectors; import lombok.AccessLevel; @@ -122,11 +124,19 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> { //Issue 9327: do compatibility check in case of the default retry and dead letter topic name changed String oldRetryLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName() + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX; String oldDeadLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName() + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX; - if (client.getPartitionedTopicMetadata(oldRetryLetterTopic).join().partitions > 0) { - retryLetterTopic = oldRetryLetterTopic; - } - if (client.getPartitionedTopicMetadata(oldDeadLetterTopic).join().partitions > 0) { - deadLetterTopic = oldDeadLetterTopic; + try { + if (client.getPartitionedTopicMetadata(oldRetryLetterTopic) + .get(client.conf.getOperationTimeoutMs(), TimeUnit.MILLISECONDS).partitions > 0) { + retryLetterTopic = oldRetryLetterTopic; + } + if (client.getPartitionedTopicMetadata(oldDeadLetterTopic) + .get(client.conf.getOperationTimeoutMs(), TimeUnit.MILLISECONDS).partitions > 0) { + deadLetterTopic = oldDeadLetterTopic; + } + } catch (InterruptedException | TimeoutException e) { + return FutureUtil.failedFuture(e); + } catch (ExecutionException e) { + return FutureUtil.failedFuture(e.getCause()); } if(conf.getDeadLetterPolicy() == null) {