https://github.com/apache/pulsar/commit/f4936763af1c1dab54d031ae355e40d66d3a602e
Was merged a week ago. -Ivan On Thu, Dec 27, 2018 at 2:46 PM Enrico Olivelli <[email protected]> wrote: > > Hi, > I am migrating an application from Kafka to Pulsar, I want to know > which is the ID of the latest message sent to a topic. > My case is that I would like to know if the reader has already > processed all available data. > > > I am using the Reader API because I need full control on the consumer > side about the portion of data to process (I will store the last > Message ID on a DB). > > In Kafka I had a "consumer#seekToEnd + consumer#position" API to move > to the end of a topic and then read the offsets for each partition. > > In Pulsar I am trying to achieve the same goal with: > > try (Reader<byte[]> pulsarReader = pulsarClient.newReader() > .startMessageId(MessageId.latest) > .topic(topicName) > .create()) { > MessageId lastMessageId = null; > Message<byte[]> readNext = pulsarReader.readNext(1, > TimeUnit.SECONDS); > if (readNext != null) { > lastMessageId = readNext.getMessageId(); > } > } > > This is not working, as within 1 second it is not guaranteed to have a > message.... > > Which is the best way ? > > Thanks > Enrico
