Niclas,

Il giorno mar 24 gen 2023 alle ore 09:04 Niclas Hedhman
<nic...@hedhman.org> ha scritto:
>
> Hi,
> I am trying to set up topics to act as "message buffers" to web clients.
>
> I would like the client to be able to re-connect at any time, and have
> the latest N messages (or AFAIUI, some number of Megabytes).
>
> So, I think I need;
> 1. non-persistent topics
> 2. very long MessageTTL on Namespace used for this
> 3. RetentionPolicies with -1 time limit and some number of MB
>
> Client in question is GO.
>
> I am reading (see below) the topic, not subscribing.
>
> Question 1;
> When I do 'pulsar.EarliestMessageID()', should I be getting the oldest
> message in the Retention buffer, or the oldest message not acknowledged?
>
> If the latter, will subsequent messages be read out, or will the same
> client be served the same message over and over again?
>
> Question 2;
> Will RetentionPolicies ("size"/"time") cause messages to be kept in RAM,
> or can I rely on messages being read from disk, even though they are
> non-persistent?


A non-persistent topic is volatile: if you restart the broker you lose the data


Enrico


>
>
> TIA
> Niclas
>
>
> -o-o-o- Relevant code -o-o-o-
> Creating reader;
>
> func (p *PulsarClient) CreateReader(topic string, earliest bool)
> pulsar.Reader {
>      var start pulsar.MessageID
>      if earliest {
>          start = pulsar.EarliestMessageID()
>      } else {
>          start = pulsar.LatestMessageID()
>      }
>      reader, err := p.client.CreateReader(pulsar.ReaderOptions{
>          Topic:          topic,
>          StartMessageID: start,
>      })
>      if err != nil {
>          log.DefaultLogger.Error(fmt.Sprintf("Failed to create Pulsar
> Reader for: %s", topic), err)
>      }
>      return reader
> }
>
>
> Reading messages;
>      reader :=
> h.pulsar.CreateReader(model.NotificationTopics+strconv.FormatInt(orgId,
> 10), true)
>      defer reader.Close()
>      for {
>          msg, err := reader.Next(ctx)
>          if msg == nil {
>              log.DefaultLogger.Info("Grafana sender: DONE")
>              return ctx.Err()
>          }
>          if err != nil {
>              log.DefaultLogger.Error(fmt.Sprintf("Couldn't get the
> message via reader.Next(): %+v", err))
>              continue
>          }
>          log.DefaultLogger.Info("Sending notification to " +
> req.PluginContext.User.Login)
>          err = sender.SendJSON(msg.Payload())
>          if err != nil {
>              log.DefaultLogger.Error(fmt.Sprintf("Couldn't send frame:
> %v", err))
>              return err
>          }
>      }
> }

Reply via email to