Fomal-haut commented on issue #5755: UnAckedMessages admin information not correct URL: https://github.com/apache/pulsar/issues/5755#issuecomment-560232144 **Sure. Here is my POC code.** _**Pulsar admin(create topic and subscription):**_ ``` import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException;` import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClientException; public class PulsarAdminTest { public static void main(String[] args) throws PulsarClientException, PulsarAdminException { String url = "http://localhost:8080"; boolean tlsAllowInsecureConnection = false; String tlsTrustCertsFilePath = null; PulsarAdmin admin = PulsarAdmin.builder() .serviceHttpUrl(url) .tlsTrustCertsFilePath(tlsTrustCertsFilePath) .allowTlsInsecureConnection(tlsAllowInsecureConnection) .build(); admin.topics().createNonPartitionedTopic("public/default/one"); admin.topics().createSubscription("public/default/one", "one-subscription", MessageId.earliest); admin.close(); } } ``` _**Pulsar producer(publish messages):**_ ``` import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; public class PulsarSingleProducer { public static void main(String[] args) throws PulsarClientException, InterruptedException { PulsarClient client = null; client = PulsarClient.builder() .serviceUrl("pulsar://localhost:6650") .build(); Producer<byte[]> producer = client.newProducer() .enableBatching(false) .topic("public/default/one") .create(); int counter = 0; for (int i = 0; i < 100_000; i++) { counter ++; producer.send((i + "").getBytes()); if (counter % 5000 == 0) { System.out.println(counter); } } System.out.println("start to close"); producer.close(); client.close(); } } ``` _**Pulsar consumer(receive messages):**_ ``` import org.apache.pulsar.client.api.*; import java.util.concurrent.TimeUnit; public class PulsarConsumerMaxUnAckTest { public static void main(String[] args) throws PulsarClientException { PulsarClient client = PulsarClient.builder() .serviceUrl("pulsar://localhost:6650") .build(); Consumer consumer = client.newConsumer() .topic("public/default/one") .ackTimeout(10, TimeUnit.HOURS) .subscriptionName("one-subscription") .subscriptionType(SubscriptionType.Shared) .subscribe(); int counter = 0; while (true) { // Wait for a message Message msg = consumer.receive(); counter ++; try { // Do something with the message if (counter % 1000 == 0) { consumer.acknowledge(msg); System.out.println(counter); System.out.println(String.format("Message received: %s", new String(msg.getData()))); } } catch (Exception e) { // Message failed to process, redeliver later consumer.negativeAcknowledge(msg); } } } } ```
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services