git-enzo opened a new issue #7545:
URL: https://github.com/apache/pulsar/issues/7545


   **Describe the bug**
   In MultiTopicsConsumerImpl class there is possibility to swallow an 
exception in exception handling during subscribe new topic. 
   
   This behavior may occur in ```handleSubscribeOneTopicError()``` method, 
which is invoking in ```exceptionally``` block in some CompletableFuture.
   
   This handler method is responsible for completing exceptionally a 
```CompletableFuture``` named ```subscribeFuture``` (code below). This future 
is completed in one place only and just before that there is possibility that 
```checkState()``` method returns an exception. This exception will be 
swallowed and it also causes that  ```subscribeFuture``` future will be never 
completed.  
   
   ```
   if (toCloseNum.decrementAndGet() == 0) {
       log.warn("[{}] Failed to subscribe for topic [{}] in topics consumer, 
subscribe error: {}",
                       topic, topicName, error.getMessage());
       topics.remove(topicName);
       checkState(allTopicPartitionsNumber.get() == consumers.values().size()); 
// a possible swallowed exception
       subscribeFuture.completeExceptionally(error);
   }
   ```
   
   **To Reproduce**
   Above behavior can be reproduced using following sample test code:
   
   ```
       CompletableFuture<Void> someMethod() {
           return CompletableFuture.failedFuture(new 
RuntimeException("someMethod error")); // always fails for test purposes
       }
   
       CompletableFuture<Void> mainMethod() {
           CompletableFuture<Void> toFinishFuture = new CompletableFuture<>();
   
           someMethod().thenAccept(x -> {
               // some code
               // ...
               toFinishFuture.complete(null);
           }).exceptionally(throwable -> {
               checkState(1 == 2); // always fails for test purposes, throws an 
exception, which is swallowed
               toFinishFuture.completeExceptionally(new RuntimeException("some 
error"));
               return null;
           });
   
           return toFinishFuture;
       }
   
       @Test
       void swallowedExceptionTest() throws ExecutionException, 
InterruptedException, TimeoutException {
           CompletableFuture<Void> completableFuture = mainMethod();
           completableFuture.whenComplete((aVoid, throwable) -> {
               if (throwable == null) {
                   System.out.println("OK");
               } else {
                   System.out.println("ERROR");
               }
           }).get(20, TimeUnit.SECONDS); // the completableFuture will never 
complete, the TimeoutException will be thrown
       }
   ```
   
   
   **Expected behavior**
   An exception shouldn't be swallowed and completable future named 
```subscribeFuture``` should always be completed inside 
```handleSubscribeOneTopicError()``` method.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to