Great, thank you Matteo and Sijie,
last question....from org.apache.pulsar.client.api.Reader interface...is
there any way to get the internal subscription id.

I guess I should perform the following steps:
1) creare a Reader
2) get subscription id
3) move the reader to my current message id
4) use Admin API (
admin.topics().getStats(tenantId).subscriptions.get(subscriptionId).msgBacklog)


for 2) I am trying to go into ReaderImpl....
ReaderImpl reader = (ReaderImpl) pulsarReader;
ConsumerImp consumer = reader.getConsumer();
....

but I see no easy (and future proff) way



Enrico


Il giorno lun 22 lug 2019 alle ore 16:59 Matteo Merli <
matteo.me...@gmail.com> ha scritto:

> The reader has indeed already a "dummy" ephemeral subscription. As
> long as it's connected, the backlog for the reader is reported.
>
>
> --
> Matteo Merli
> <matteo.me...@gmail.com>
>
> On Mon, Jul 22, 2019 at 7:26 AM Enrico Olivelli <eolive...@gmail.com>
> wrote:
> >
> > Sijie,
> >
> > Il lun 22 lug 2019, 15:50 Sijie Guo <guosi...@gmail.com> ha scritto:
> >>
> >> You can query the topic stats. There is a "backlog" field in topic
> stats. It will tell you how many messages that a subscription has not
> consumed. It is similar as consumer lag in Kafka.
> >
> >
> > Actually I don't have a subscription to the topic as I am using the
> Reader API.
> > should I create a dummy subscription ? I image this would have a cost.
> >
> > I would use this feature for monitoring and I would like not to spend
> much resources
> >
> > Enrico
> >
> >>
> >> Thanks,
> >> Sijie
> >>
> >> On Mon, Jul 22, 2019 at 6:46 PM Enrico Olivelli <eolive...@gmail.com>
> wrote:
> >>>
> >>> Hello,
> >>>
> >>> I am looking for a function in Pulsar API to get the current consumer
> "lag"
> >>>
> >>> I see that Pulsar recently added getLastMessageId API but this is not
> useful to me
> >>> https://pulsar.apache.org/admin-rest-api/#operation/getLastMessageId
> >>>
> >>> In Pulsar if you have two message ids you cannot compute any kind of
> "distance".
> >>>
> >>> I am migrating from Kafka and in Kafka the message id is an
> incremental number (per partition).
> >>>
> >>> I am using the Reader API, and I am storing the last processed
> messageId in an external system, this way I have full control over the
> portion of the stream that I am processing.
> >>>
> >>> I think Sijie or Jia already told me something about this topic but I
> can't find references
> >>>
> >>> Thanks in advance
> >>>
> >>> Enrico
> >>>
> >>>
>

Reply via email to