codelipenghui commented on code in PR #24100:
URL: https://github.com/apache/pulsar/pull/24100#discussion_r2006699969
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java:
##########
@@ -98,8 +101,31 @@ public ConsumerBuilder<T> clone() {
@Override
public Consumer<T> subscribe() throws PulsarClientException {
+ CompletableFuture<Consumer<T>> future = new CompletableFuture<>();
try {
- return subscribeAsync().get();
+ subscribeAsync().whenComplete((c, e) -> {
+ if (e != null) {
+ // If the subscription fails, there is no need to close
the consumer here,
+ // as it will be handled in the subscribeAsync method.
+ future.completeExceptionally(e);
+ return;
+ }
+ if (interruptedBeforeConsumerCreation) {
+ c.closeAsync().exceptionally(closeEx -> {
+ log.error("Failed to close consumer after
interruption", closeEx.getCause());
+ return null;
+ });
+ future.completeExceptionally(new PulsarClientException(
+ "Subscription was interrupted before the consumer
could be fully created"));
+ } else {
+ future.complete(c);
+ }
+ });
+ return future.get();
+ } catch (InterruptedException e) {
+ interruptedBeforeConsumerCreation = true;
+ Thread.currentThread().interrupt();
+ throw PulsarClientException.unwrap(e);
Review Comment:
```suggestion
subscribeAsync().whenComplete((c, e) -> {
if (e != null) {
// If the subscription fails, there is no need to close
the consumer here,
// as it will be handled in the subscribeAsync method.
future.completeExceptionally(e);
return;
}
if (interruptedBeforeConsumerCreation) {
c.closeAsync().whenComplete((__, closeEx) -> {
if (closeEx != null) {
log.error("Failed to close consumer after
interruption", closeEx.getCause());
future.completeExceptionally(new
PulsarClientException(interruptedException));
} else {
future.completeExceptionally(new
PulsarClientException(interruptedException));
}
})
} else {
future.complete(c);
}
});
return future.get();
} catch (InterruptedException e) {
interruptedBeforeConsumerCreation = true;
interruptedException = e;
Thread.currentThread().interrupt();
```
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java:
##########
@@ -98,8 +101,31 @@ public ConsumerBuilder<T> clone() {
@Override
public Consumer<T> subscribe() throws PulsarClientException {
+ CompletableFuture<Consumer<T>> future = new CompletableFuture<>();
try {
- return subscribeAsync().get();
+ subscribeAsync().whenComplete((c, e) -> {
+ if (e != null) {
+ // If the subscription fails, there is no need to close
the consumer here,
+ // as it will be handled in the subscribeAsync method.
+ future.completeExceptionally(e);
+ return;
+ }
+ if (interruptedBeforeConsumerCreation) {
+ c.closeAsync().exceptionally(closeEx -> {
+ log.error("Failed to close consumer after
interruption", closeEx.getCause());
+ return null;
+ });
+ future.completeExceptionally(new PulsarClientException(
+ "Subscription was interrupted before the consumer
could be fully created"));
+ } else {
+ future.complete(c);
+ }
+ });
+ return future.get();
+ } catch (InterruptedException e) {
+ interruptedBeforeConsumerCreation = true;
+ Thread.currentThread().interrupt();
+ throw PulsarClientException.unwrap(e);
Review Comment:
Is it better to let the consumer get closed first and then return exception
to user?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]