shibd commented on code in PR #24100:
URL: https://github.com/apache/pulsar/pull/24100#discussion_r2006705755
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java:
##########
@@ -98,8 +101,31 @@ public ConsumerBuilder<T> clone() {
@Override
public Consumer<T> subscribe() throws PulsarClientException {
+ CompletableFuture<Consumer<T>> future = new CompletableFuture<>();
try {
- return subscribeAsync().get();
+ subscribeAsync().whenComplete((c, e) -> {
+ if (e != null) {
+ // If the subscription fails, there is no need to close
the consumer here,
+ // as it will be handled in the subscribeAsync method.
+ future.completeExceptionally(e);
+ return;
+ }
+ if (interruptedBeforeConsumerCreation) {
+ c.closeAsync().exceptionally(closeEx -> {
+ log.error("Failed to close consumer after
interruption", closeEx.getCause());
+ return null;
+ });
+ future.completeExceptionally(new PulsarClientException(
+ "Subscription was interrupted before the consumer
could be fully created"));
+ } else {
+ future.complete(c);
+ }
+ });
+ return future.get();
+ } catch (InterruptedException e) {
+ interruptedBeforeConsumerCreation = true;
+ Thread.currentThread().interrupt();
+ throw PulsarClientException.unwrap(e);
Review Comment:
Yes, this is better from the code side.
However, to clarify, in this case, the user won't actually get this
exception(`future.completeExceptionally(new
PulsarClientException(interruptedException));`) because future.get() has
already thrown an InterruptedException and returned it to the user.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]