Fomal-haut commented on issue #5755: UnAckedMessages admin information not
correct
URL: https://github.com/apache/pulsar/issues/5755#issuecomment-560232144
**Sure. Here is my POC code.**
_**Pulsar admin(create topic and subscription):**_
```
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;`
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
public class PulsarAdminTest {
public static void main(String[] args) throws PulsarClientException,
PulsarAdminException {
String url = "http://localhost:8080";;
boolean tlsAllowInsecureConnection = false;
String tlsTrustCertsFilePath = null;
PulsarAdmin admin = PulsarAdmin.builder()
.serviceHttpUrl(url)
.tlsTrustCertsFilePath(tlsTrustCertsFilePath)
.allowTlsInsecureConnection(tlsAllowInsecureConnection)
.build();
admin.topics().createNonPartitionedTopic("public/default/one");
admin.topics().createSubscription("public/default/one",
"one-subscription", MessageId.earliest);
admin.close();
}
}
```
_**Pulsar producer(publish messages):**_
```
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
public class PulsarSingleProducer {
public static void main(String[] args) throws PulsarClientException,
InterruptedException {
PulsarClient client = null;
client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Producer producer = client.newProducer()
.enableBatching(false)
.topic("public/default/one")
.create();
int counter = 0;
for (int i = 0; i < 100_000; i++) {
counter ++;
producer.send((i + "").getBytes());
if (counter % 5000 == 0) {
System.out.println(counter);
}
}
System.out.println("start to close");
producer.close();
client.close();
}
}
```
_**Pulsar consumer(receive messages):**_
```
import org.apache.pulsar.client.api.*;
import java.util.concurrent.TimeUnit;
public class PulsarConsumerMaxUnAckTest {
public static void main(String[] args) throws PulsarClientException {
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Consumer consumer = client.newConsumer()
.topic("public/default/one")
.ackTimeout(10, TimeUnit.HOURS)
.subscriptionName("one-subscription")
.subscriptionType(SubscriptionType.Shared)
.subscribe();
int counter = 0;
while (true) {
// Wait for a message
Message msg = consumer.receive();
counter ++;
try {
// Do something with the message
if (counter % 1000 == 0) {
consumer.acknowledge(msg);
System.out.println(counter);
System.out.println(String.format("Message received: %s",
new String(msg.getData(;
}
} catch (Exception e) {
// Message failed to process, redeliver later
consumer.negativeAcknowledge(msg);
}
}
}
}
```
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org
With regards,
Apache Git Services