https://github.com/apache/pulsar/commit/f4936763af1c1dab54d031ae355e40d66d3a602e

Was merged a week ago.

-Ivan

On Thu, Dec 27, 2018 at 2:46 PM Enrico Olivelli <[email protected]> wrote:
>
> Hi,
> I am migrating an application from Kafka to Pulsar, I want to know
> which is the ID of the latest message sent to a topic.
> My case is that I would like to know if the reader has already
> processed all available data.
>
>
> I am using the Reader API because I need full control on the consumer
> side about the portion of data to process (I will store the last
> Message ID on a DB).
>
> In Kafka I had a "consumer#seekToEnd + consumer#position"  API to move
> to the end of a topic and then read the offsets for each partition.
>
> In Pulsar I am trying to achieve the same goal with:
>
> try (Reader<byte[]> pulsarReader = pulsarClient.newReader()
>                 .startMessageId(MessageId.latest)
>                 .topic(topicName)
>                 .create()) {
>             MessageId lastMessageId = null;
>             Message<byte[]> readNext = pulsarReader.readNext(1,
> TimeUnit.SECONDS);
>             if (readNext != null) {
>                 lastMessageId = readNext.getMessageId();
>             }
> }
>
> This is not working, as within 1 second it is not guaranteed to have a
> message....
>
> Which is the best way ?
>
> Thanks
> Enrico

Reply via email to