Re: Topic as a buffer?

2023-01-24 Thread Enrico Olivelli
Niclas,


Il giorno mar 24 gen 2023 alle ore 09:04 Niclas Hedhman
 ha scritto:
>
> Hi,
> I am trying to set up topics to act as "message buffers" to web clients.
>
> I would like the client to be able to re-connect at any time, and have
> the latest N messages (or AFAIUI, some number of Megabytes).
>
> So, I think I need;
> 1. non-persistent topics
> 2. very long MessageTTL on Namespace used for this
> 3. RetentionPolicies with -1 time limit and some number of MB
>
> Client in question is GO.
>
> I am reading (see below) the topic, not subscribing.
>
> Question 1;
> When I do 'pulsar.EarliestMessageID()', should I be getting the oldest
> message in the Retention buffer, or the oldest message not acknowledged?
>
> If the latter, will subsequent messages be read out, or will the same
> client be served the same message over and over again?
>
> Question 2;
> Will RetentionPolicies ("size"/"time") cause messages to be kept in RAM,
> or can I rely on messages being read from disk, even though they are
> non-persistent?


A non-persistent topic is volatile: if you restart the broker you lose the data


Enrico


>
>
> TIA
> Niclas
>
>
> -o-o-o- Relevant code -o-o-o-
> Creating reader;
>
> func (p *PulsarClient) CreateReader(topic string, earliest bool)
> pulsar.Reader {
>  var start pulsar.MessageID
>  if earliest {
>  start = pulsar.EarliestMessageID()
>  } else {
>  start = pulsar.LatestMessageID()
>  }
>  reader, err := p.client.CreateReader(pulsar.ReaderOptions{
>  Topic:  topic,
>  StartMessageID: start,
>  })
>  if err != nil {
>  log.DefaultLogger.Error(fmt.Sprintf("Failed to create Pulsar
> Reader for: %s", topic), err)
>  }
>  return reader
> }
>
>
> Reading messages;
>  reader :=
> h.pulsar.CreateReader(model.NotificationTopics+strconv.FormatInt(orgId,
> 10), true)
>  defer reader.Close()
>  for {
>  msg, err := reader.Next(ctx)
>  if msg == nil {
>  log.DefaultLogger.Info("Grafana sender: DONE")
>  return ctx.Err()
>  }
>  if err != nil {
>  log.DefaultLogger.Error(fmt.Sprintf("Couldn't get the
> message via reader.Next(): %+v", err))
>  continue
>  }
>  log.DefaultLogger.Info("Sending notification to " +
> req.PluginContext.User.Login)
>  err = sender.SendJSON(msg.Payload())
>  if err != nil {
>  log.DefaultLogger.Error(fmt.Sprintf("Couldn't send frame:
> %v", err))
>  return err
>  }
>  }
> }


Re: Topic as a buffer?

2023-01-24 Thread Niclas Hedhman

On 2023-01-24 09:06, Enrico Olivelli wrote:
A non-persistent topic is volatile: if you restart the broker you lose 
the data


Yes, I know that. But for this particular case, that is fine. This 
buffer is roughly the equivalent of "syslog", reporting what is going on 
in the async backend, but at a application level abstraction (rather 
than "programmer's view"). And the application has a cyclic "vibe", so 
any important information lost will re-appear in an hour or less.


Cheers
Niclas


Re: Topic as a buffer?

2023-01-24 Thread Asaf Mesika
I would add that it's more volatile that you think:

There is *no buffer* in memory as you would imagine.

I quote from documentation here

:

With non-persistent topics, message data lives only in memory, without a
specific buffer - which means data *is not* buffered in memory. The
received messages are immediately transmitted to all *connected consumers*.
If a message broker fails or message data can otherwise not be retrieved
from memory, your message data may be lost. Use non-persistent topics only
if you're *certain* that your use case requires it and can sustain it.

Meaning, as soon as the message is received, it is immediately pushed to
the socket of each connected consumer.
If your client is a consumer and it is disconnected, once reconnected it
will start receiving messages from this point on.

Honestly I find this non persistent topic quite confusing.


On Tue, Jan 24, 2023 at 10:07 AM Enrico Olivelli 
wrote:

> Niclas,
>
>
> Il giorno mar 24 gen 2023 alle ore 09:04 Niclas Hedhman
>  ha scritto:
> >
> > Hi,
> > I am trying to set up topics to act as "message buffers" to web clients.
> >
> > I would like the client to be able to re-connect at any time, and have
> > the latest N messages (or AFAIUI, some number of Megabytes).
> >
> > So, I think I need;
> > 1. non-persistent topics
> > 2. very long MessageTTL on Namespace used for this
> > 3. RetentionPolicies with -1 time limit and some number of MB
> >
> > Client in question is GO.
> >
> > I am reading (see below) the topic, not subscribing.
> >
> > Question 1;
> > When I do 'pulsar.EarliestMessageID()', should I be getting the oldest
> > message in the Retention buffer, or the oldest message not acknowledged?
> >
> > If the latter, will subsequent messages be read out, or will the same
> > client be served the same message over and over again?
> >
> > Question 2;
> > Will RetentionPolicies ("size"/"time") cause messages to be kept in RAM,
> > or can I rely on messages being read from disk, even though they are
> > non-persistent?
>
>
> A non-persistent topic is volatile: if you restart the broker you lose the
> data
>
>
> Enrico
>
>
> >
> >
> > TIA
> > Niclas
> >
> >
> > -o-o-o- Relevant code -o-o-o-
> > Creating reader;
> >
> > func (p *PulsarClient) CreateReader(topic string, earliest bool)
> > pulsar.Reader {
> >  var start pulsar.MessageID
> >  if earliest {
> >  start = pulsar.EarliestMessageID()
> >  } else {
> >  start = pulsar.LatestMessageID()
> >  }
> >  reader, err := p.client.CreateReader(pulsar.ReaderOptions{
> >  Topic:  topic,
> >  StartMessageID: start,
> >  })
> >  if err != nil {
> >  log.DefaultLogger.Error(fmt.Sprintf("Failed to create Pulsar
> > Reader for: %s", topic), err)
> >  }
> >  return reader
> > }
> >
> >
> > Reading messages;
> >  reader :=
> > h.pulsar.CreateReader(model.NotificationTopics+strconv.FormatInt(orgId,
> > 10), true)
> >  defer reader.Close()
> >  for {
> >  msg, err := reader.Next(ctx)
> >  if msg == nil {
> >  log.DefaultLogger.Info("Grafana sender: DONE")
> >  return ctx.Err()
> >  }
> >  if err != nil {
> >  log.DefaultLogger.Error(fmt.Sprintf("Couldn't get the
> > message via reader.Next(): %+v", err))
> >  continue
> >  }
> >  log.DefaultLogger.Info("Sending notification to " +
> > req.PluginContext.User.Login)
> >  err = sender.SendJSON(msg.Payload())
> >  if err != nil {
> >  log.DefaultLogger.Error(fmt.Sprintf("Couldn't send frame:
> > %v", err))
> >  return err
> >  }
> >  }
> > }
>


Re: Topic as a buffer?

2023-01-24 Thread Niclas Hedhman

On 2023-01-24 09:46, Asaf Mesika wrote:

I would add that it's more volatile that you think:


Hmmm


There is no buffer in memory as you would imagine.

I quote from documentation here [2]:



Meaning, as soon as the message is received, it is immediately pushed
to the socket of each connected consumer.
If your client is a consumer and it is disconnected, once reconnected
it will start receiving messages from this point on.


So, if not connected, no messages are forwarded. Got it! Explains what I 
am observing and assumed what is a bug on my side.



Honestly I find this non persistent topic quite confusing.


Agree.

Now I need to find out (again) why I didn't use persistent topics in the 
first place. Back in my head there was something I am forced to use that 
is not supported in persistent topics... Maybe I have some notes on that 
in the commit logs.


Niclas


Re: Topic as a buffer?

2023-01-24 Thread Niclas Hedhman



What a mess up! I already had it changed to "persistent://" and I 
misread "non-partitioned" in my server code.


So, my topics ARE persistent already, non-partitioned (to save space?). 
The rest of my original post still applies.




On 2023-01-24 09:03, Niclas Hedhman wrote:

Hi,
I am trying to set up topics to act as "message buffers" to web 
clients.


I would like the client to be able to re-connect at any time, and have
the latest N messages (or AFAIUI, some number of Megabytes).

So, I think I need;
1. non-persistent topics
2. very long MessageTTL on Namespace used for this
3. RetentionPolicies with -1 time limit and some number of MB

Client in question is GO.

I am reading (see below) the topic, not subscribing.

Question 1;
When I do 'pulsar.EarliestMessageID()', should I be getting the oldest
message in the Retention buffer, or the oldest message not
acknowledged?

If the latter, will subsequent messages be read out, or will the same
client be served the same message over and over again?

Question 2;
Will RetentionPolicies ("size"/"time") cause messages to be kept in
RAM, or can I rely on messages being read from disk, even though they
are non-persistent?


TIA
Niclas


-o-o-o- Relevant code -o-o-o-
Creating reader;

func (p *PulsarClient) CreateReader(topic string, earliest bool) 
pulsar.Reader {

var start pulsar.MessageID
if earliest {
start = pulsar.EarliestMessageID()
} else {
start = pulsar.LatestMessageID()
}
reader, err := p.client.CreateReader(pulsar.ReaderOptions{
Topic:  topic,
StartMessageID: start,
})
if err != nil {
log.DefaultLogger.Error(fmt.Sprintf("Failed to create Pulsar
Reader for: %s", topic), err)
}
return reader
}


Reading messages;
reader :=
h.pulsar.CreateReader(model.NotificationTopics+strconv.FormatInt(orgId,
10), true)
defer reader.Close()
for {
msg, err := reader.Next(ctx)
if msg == nil {
log.DefaultLogger.Info("Grafana sender: DONE")
return ctx.Err()
}
if err != nil {
log.DefaultLogger.Error(fmt.Sprintf("Couldn't get the
message via reader.Next(): %+v", err))
continue
}
log.DefaultLogger.Info("Sending notification to " +
req.PluginContext.User.Login)
err = sender.SendJSON(msg.Payload())
if err != nil {
log.DefaultLogger.Error(fmt.Sprintf("Couldn't send frame: 
%v", err))

return err
}
}
}


Re: Topic as a buffer?

2023-01-24 Thread Niclas Hedhman

On 2023-01-24 10:29, Niclas Hedhman wrote:

What a mess up! I already had it changed to "persistent://" and I
misread "non-partitioned" in my server code.

So, my topics ARE persistent already, non-partitioned (to save
space?). The rest of my original post still applies.


non-partitioned so I can do

seekError := reader.SeekByTime(hourAgo)

So additional question; Should I try to stick with this approach or rely 
on RetentionPolicies?






Re: Topic as a buffer?

2023-01-25 Thread Asaf Mesika
If I understand correctly, you don't really need the whole notion of
acknowledgement (subscription).
When the web client connects, it needs to receive the last N messages (What
happens from there after?)

How about using a non-durable subscription - they only live as long as the
broker is not restarted and the topic has not moved between them (load
balancer).
If the subscription vanishes, you just recreate your whole buffer from
scratch?

Also take a look at Reader interface:
https://pulsar.apache.org/docs/2.11.x/concepts-clients/#reader-interface


On Tue, Jan 24, 2023 at 11:47 AM Niclas Hedhman  wrote:

> On 2023-01-24 10:29, Niclas Hedhman wrote:
> > What a mess up! I already had it changed to "persistent://" and I
> > misread "non-partitioned" in my server code.
> >
> > So, my topics ARE persistent already, non-partitioned (to save
> > space?). The rest of my original post still applies.
>
> non-partitioned so I can do
>
>  seekError := reader.SeekByTime(hourAgo)
>
> So additional question; Should I try to stick with this approach or rely
> on RetentionPolicies?
>
>
>
>


Re: Topic as a buffer?

2023-01-25 Thread Niclas Hedhman

On 2023-01-25 13:59, Asaf Mesika wrote:

If I understand correctly, you don't really need the whole notion of
acknowledgement (subscription).
When the web client connects, it needs to receive the last N messages
(What happens from there after?)


Keep "reading" messages and update UI as they arrive.


How about using a non-durable subscription - they only live as long as
the broker is not restarted and the topic has not moved between them
(load balancer).
If the subscription vanishes, you just recreate your whole buffer from
scratch?



Also take a look at Reader interface:
https://pulsar.apache.org/docs/2.11.x/concepts-clients/#reader-interface


Ah!! I use Reader... I wasn't clear (to me) that it doesn't use 
acknowledgements. Should have realized that, pretty obvious.



So, thanks... I now have the Pulsar side of the equation working, and 
need to get the mechanics of Grafana Streaming to play along.



Niclas