codelipenghui commented on code in PR #24100:
URL: https://github.com/apache/pulsar/pull/24100#discussion_r2006699969


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java:
##########
@@ -98,8 +101,31 @@ public ConsumerBuilder<T> clone() {
 
     @Override
     public Consumer<T> subscribe() throws PulsarClientException {
+        CompletableFuture<Consumer<T>> future = new CompletableFuture<>();
         try {
-            return subscribeAsync().get();
+            subscribeAsync().whenComplete((c, e) -> {
+                if (e != null) {
+                    // If the subscription fails, there is no need to close 
the consumer here,
+                    // as it will be handled in the subscribeAsync method.
+                    future.completeExceptionally(e);
+                    return;
+                }
+                if (interruptedBeforeConsumerCreation) {
+                    c.closeAsync().exceptionally(closeEx -> {
+                        log.error("Failed to close consumer after 
interruption", closeEx.getCause());
+                        return null;
+                    });
+                    future.completeExceptionally(new PulsarClientException(
+                            "Subscription was interrupted before the consumer 
could be fully created"));
+                } else {
+                    future.complete(c);
+                }
+            });
+            return future.get();
+        } catch (InterruptedException e) {
+            interruptedBeforeConsumerCreation = true;
+            Thread.currentThread().interrupt();
+            throw PulsarClientException.unwrap(e);

Review Comment:
   ```suggestion
               subscribeAsync().whenComplete((c, e) -> {
                   if (e != null) {
                       // If the subscription fails, there is no need to close 
the consumer here,
                       // as it will be handled in the subscribeAsync method.
                       future.completeExceptionally(e);
                       return;
                   }
                   if (interruptedBeforeConsumerCreation) {
                       c.closeAsync().whenComplete((__, closeEx) -> {
                           if (closeEx != null) {
                               log.error("Failed to close consumer after 
interruption", closeEx.getCause());
                               future.completeExceptionally(new 
PulsarClientException(interruptedException));
                           } else {
                               future.completeExceptionally(new 
PulsarClientException(interruptedException));
                           }
                       })                    
                   } else {
                       future.complete(c);
                   }
               });
               return future.get();
           } catch (InterruptedException e) {
               interruptedBeforeConsumerCreation = true;
               interruptedException = e;
               Thread.currentThread().interrupt();
   ```



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java:
##########
@@ -98,8 +101,31 @@ public ConsumerBuilder<T> clone() {
 
     @Override
     public Consumer<T> subscribe() throws PulsarClientException {
+        CompletableFuture<Consumer<T>> future = new CompletableFuture<>();
         try {
-            return subscribeAsync().get();
+            subscribeAsync().whenComplete((c, e) -> {
+                if (e != null) {
+                    // If the subscription fails, there is no need to close 
the consumer here,
+                    // as it will be handled in the subscribeAsync method.
+                    future.completeExceptionally(e);
+                    return;
+                }
+                if (interruptedBeforeConsumerCreation) {
+                    c.closeAsync().exceptionally(closeEx -> {
+                        log.error("Failed to close consumer after 
interruption", closeEx.getCause());
+                        return null;
+                    });
+                    future.completeExceptionally(new PulsarClientException(
+                            "Subscription was interrupted before the consumer 
could be fully created"));
+                } else {
+                    future.complete(c);
+                }
+            });
+            return future.get();
+        } catch (InterruptedException e) {
+            interruptedBeforeConsumerCreation = true;
+            Thread.currentThread().interrupt();
+            throw PulsarClientException.unwrap(e);

Review Comment:
   Is it better to let the consumer get closed first and then return exception 
to user?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to