Great, thank you Matteo and Sijie, last question....from org.apache.pulsar.client.api.Reader interface...is there any way to get the internal subscription id.
I guess I should perform the following steps: 1) creare a Reader 2) get subscription id 3) move the reader to my current message id 4) use Admin API ( admin.topics().getStats(tenantId).subscriptions.get(subscriptionId).msgBacklog) for 2) I am trying to go into ReaderImpl.... ReaderImpl reader = (ReaderImpl) pulsarReader; ConsumerImp consumer = reader.getConsumer(); .... but I see no easy (and future proff) way Enrico Il giorno lun 22 lug 2019 alle ore 16:59 Matteo Merli < matteo.me...@gmail.com> ha scritto: > The reader has indeed already a "dummy" ephemeral subscription. As > long as it's connected, the backlog for the reader is reported. > > > -- > Matteo Merli > <matteo.me...@gmail.com> > > On Mon, Jul 22, 2019 at 7:26 AM Enrico Olivelli <eolive...@gmail.com> > wrote: > > > > Sijie, > > > > Il lun 22 lug 2019, 15:50 Sijie Guo <guosi...@gmail.com> ha scritto: > >> > >> You can query the topic stats. There is a "backlog" field in topic > stats. It will tell you how many messages that a subscription has not > consumed. It is similar as consumer lag in Kafka. > > > > > > Actually I don't have a subscription to the topic as I am using the > Reader API. > > should I create a dummy subscription ? I image this would have a cost. > > > > I would use this feature for monitoring and I would like not to spend > much resources > > > > Enrico > > > >> > >> Thanks, > >> Sijie > >> > >> On Mon, Jul 22, 2019 at 6:46 PM Enrico Olivelli <eolive...@gmail.com> > wrote: > >>> > >>> Hello, > >>> > >>> I am looking for a function in Pulsar API to get the current consumer > "lag" > >>> > >>> I see that Pulsar recently added getLastMessageId API but this is not > useful to me > >>> https://pulsar.apache.org/admin-rest-api/#operation/getLastMessageId > >>> > >>> In Pulsar if you have two message ids you cannot compute any kind of > "distance". > >>> > >>> I am migrating from Kafka and in Kafka the message id is an > incremental number (per partition). > >>> > >>> I am using the Reader API, and I am storing the last processed > messageId in an external system, this way I have full control over the > portion of the stream that I am processing. > >>> > >>> I think Sijie or Jia already told me something about this topic but I > can't find references > >>> > >>> Thanks in advance > >>> > >>> Enrico > >>> > >>> >