HI folks,
I found your email from the apache pulsar website. I need your help with the
following problem that we are facing
Lets say I have a publisher which ran normally first and produced 20 messages
to the topic
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
public class TestPub {
public static void main(String[] args) throws PulsarClientException,
InterruptedException {
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Producer<byte[]> producer = client.newProducer()
.topic("example-topic")
.create();
for (int i = 0; i < 20; i++)
{
String msg = "my-message"+System.currentTimeMillis();
producer.newMessage()
.value(msg.getBytes())
.send();
System.out.println("Sent:"+msg);
Thread.sleep(1000);
}
}
}
Now I need to launch the subscriber but I only care about the last message that
was saved to the topic - I thought that specifying latest in consumer or reader
should help but it looks like it doesn't work for me, looks like latest means
next message that will be sent to the topic when consumer or reader subscribed,
is there a way then for me to find the message before the latest?
import org.apache.pulsar.client.api.*;
public class TestSub {
public static void main(String[] args) throws PulsarClientException {
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
read(client);
consume(client);
}
static void consume(PulsarClient client) throws PulsarClientException {
Consumer consumer = client.newConsumer()
.topic("example-topic")
.subscriptionName("my-subscription-consumer")
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
.startMessageIdInclusive()
.subscribe();
//consumer.seek(MessageId.latest);
while (true) {
// Wait for a message
Message msg = consumer.receive();
try {
System.out.println("Message consumed: " +
new String(msg.getData()));
//consumer.acknowledge(msg);
} catch (Exception e) {
consumer.negativeAcknowledge(msg);
}
}
}
static void read(PulsarClient client) throws PulsarClientException {
Reader consumer = client.newReader()
.topic("example-topic")
.subscriptionName("my-subscription-reader")
.startMessageIdInclusive()
.startMessageId(MessageId.latest)
.create()
;
while (true) {
// Wait for a message
Message msg = consumer.readNext();
try {
System.out.println("Message read: " +
new String(msg.getData()));
//consumer.acknowledge(msg);
} catch (Exception e) {
System.out.println(e);
}
}
}
}
Thanks,
Andrey
This e-mail, including any attachments, is confidential and may be privileged
and is for the intended recipient(s) only. If received in error, please
immediately delete this email and any attachments and contact the sender.
Unauthorized copying, use or disclosure of this email or its content or
attachments is prohibited. For full email disclaimer, click here.
To unsubscribe from receiving commercial electronic messages from The Bank of
Nova Scotia, or from certain of its affiliates, including Scotia iTRADE and the
Scotia Wealth Management businesses, please click here.
Pour obtenir la traduction en français, cliquez ici.
Haga clic aquí para ver la traducción al español.