xiangzilv123 commented on issue #3226: Unclosed consumer after calling 
closeAsync
URL: https://github.com/apache/pulsar/issues/3226#issuecomment-449265114
 
 
   
   
   ```java
   package test;
   
   import org.apache.pulsar.client.api.Consumer;
   import org.apache.pulsar.client.api.PulsarClient;
   import org.apache.pulsar.client.api.SubscriptionType;
   
   import java.util.concurrent.TimeUnit;
   
   public class TestMain {
   
       private static void waitTillNextMinute() throws InterruptedException {
           System.out.println("waiting for next minute");
           long m = System.currentTimeMillis() / (60 * 1000);
           do {
               Thread.sleep(1000);
           } while (System.currentTimeMillis() / (60 * 1000) == m);
       }
   
       public static void main(String[] args) throws Exception {
           String url = "pulsar://localhost:6650";
           String topic = "test-topic";
           String subscriptionName = "test-subscription";
   
           PulsarClient pulsarClient = PulsarClient.builder()
                   .serviceUrl(url)
                   .build();
           System.out.println("client created");
   
           while (true) {
               Consumer<byte[]> consumer = pulsarClient.newConsumer()
                       .topic(topic)
                       .ackTimeout(2 * 60, TimeUnit.MINUTES)
                       .subscriptionName(subscriptionName)
                       .subscriptionType(SubscriptionType.Shared)
                       .subscribe();
               System.out.println("consumer created");
   
               waitTillNextMinute();
   
               System.out.println("will unsub or close");
               Consumer<byte[]> ref = consumer;
               consumer.unsubscribeAsync().whenComplete((r, e) -> {
                   if (e == null) {
                       System.out.println("unsub succeeded");
                   } else {
                       e.printStackTrace();
                   }
                   ref.closeAsync().whenComplete((r2, e2) -> {
                       if (e2 == null) {
                           System.out.println("close succeeded");
                       } else {
                           e2.printStackTrace();
                       }
                   });
               });
   
               waitTillNextMinute();
           }
       }
   }
   
   ```
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to