Fomal-haut commented on issue #5755: UnAckedMessages admin information not 
correct
URL: https://github.com/apache/pulsar/issues/5755#issuecomment-560232144
 
 
   **Sure. Here is my POC code.**
   
   _**Pulsar admin(create topic and subscription):**_ 
   
   ```
   import org.apache.pulsar.client.admin.PulsarAdmin; 
   import org.apache.pulsar.client.admin.PulsarAdminException;`
   import org.apache.pulsar.client.api.MessageId;
   import org.apache.pulsar.client.api.PulsarClientException;
   
   public class PulsarAdminTest {
       public static void main(String[] args) throws PulsarClientException, 
PulsarAdminException {
           String url = "http://localhost:8080";;
           boolean tlsAllowInsecureConnection = false;
           String tlsTrustCertsFilePath = null;
           PulsarAdmin admin = PulsarAdmin.builder()
                   .serviceHttpUrl(url)
                   .tlsTrustCertsFilePath(tlsTrustCertsFilePath)
                   .allowTlsInsecureConnection(tlsAllowInsecureConnection)
                   .build();
   
           admin.topics().createNonPartitionedTopic("public/default/one");
           admin.topics().createSubscription("public/default/one", 
"one-subscription", MessageId.earliest);
   
           admin.close();
       }
   }
   ```
   
   _**Pulsar producer(publish messages):**_ 
   
   ```
   import org.apache.pulsar.client.api.Producer;
   import org.apache.pulsar.client.api.PulsarClient;
   import org.apache.pulsar.client.api.PulsarClientException;
   
   
   public class PulsarSingleProducer {
   
       public static void main(String[] args) throws PulsarClientException, 
InterruptedException {
           PulsarClient client = null;
           client = PulsarClient.builder()
                   .serviceUrl("pulsar://localhost:6650")
                   .build();
           Producer<byte[]> producer = client.newProducer()
                   .enableBatching(false)
                   .topic("public/default/one")
                   .create();
   
           int counter = 0;
           for (int i = 0; i < 100_000; i++) {
               counter ++;
               producer.send((i + "").getBytes());
               if (counter % 5000 == 0) {
                   System.out.println(counter);
               }
           }
   
           System.out.println("start to close");
   
           producer.close();
           client.close();
       }
   }
   ```
   
   _**Pulsar consumer(receive messages):**_ 
   
   ```
   import org.apache.pulsar.client.api.*;
   import java.util.concurrent.TimeUnit;
   
   public class PulsarConsumerMaxUnAckTest {
       public static void main(String[] args) throws PulsarClientException {
           PulsarClient client = PulsarClient.builder()
                   .serviceUrl("pulsar://localhost:6650")
                   .build();
   
           Consumer consumer = client.newConsumer()
                   .topic("public/default/one")
                   .ackTimeout(10, TimeUnit.HOURS)
                   .subscriptionName("one-subscription")
                   .subscriptionType(SubscriptionType.Shared)
                   .subscribe();
   
   
           int counter = 0;
           while (true) {
               // Wait for a message
               Message msg = consumer.receive();
               counter ++;
   
               try {
                   // Do something with the message
                   if (counter % 1000 == 0) {
                       consumer.acknowledge(msg);
                       System.out.println(counter);
                       System.out.println(String.format("Message received: %s", 
new String(msg.getData())));
                   }
   
               } catch (Exception e) {
                   // Message failed to process, redeliver later
                   consumer.negativeAcknowledge(msg);
               }
   
           }
       }
   }
   ```
   
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to