Niclas,
Il giorno mar 24 gen 2023 alle ore 09:04 Niclas Hedhman <[email protected]> ha scritto: > > Hi, > I am trying to set up topics to act as "message buffers" to web clients. > > I would like the client to be able to re-connect at any time, and have > the latest N messages (or AFAIUI, some number of Megabytes). > > So, I think I need; > 1. non-persistent topics > 2. very long MessageTTL on Namespace used for this > 3. RetentionPolicies with -1 time limit and some number of MB > > Client in question is GO. > > I am reading (see below) the topic, not subscribing. > > Question 1; > When I do 'pulsar.EarliestMessageID()', should I be getting the oldest > message in the Retention buffer, or the oldest message not acknowledged? > > If the latter, will subsequent messages be read out, or will the same > client be served the same message over and over again? > > Question 2; > Will RetentionPolicies ("size"/"time") cause messages to be kept in RAM, > or can I rely on messages being read from disk, even though they are > non-persistent? A non-persistent topic is volatile: if you restart the broker you lose the data Enrico > > > TIA > Niclas > > > -o-o-o- Relevant code -o-o-o- > Creating reader; > > func (p *PulsarClient) CreateReader(topic string, earliest bool) > pulsar.Reader { > var start pulsar.MessageID > if earliest { > start = pulsar.EarliestMessageID() > } else { > start = pulsar.LatestMessageID() > } > reader, err := p.client.CreateReader(pulsar.ReaderOptions{ > Topic: topic, > StartMessageID: start, > }) > if err != nil { > log.DefaultLogger.Error(fmt.Sprintf("Failed to create Pulsar > Reader for: %s", topic), err) > } > return reader > } > > > Reading messages; > reader := > h.pulsar.CreateReader(model.NotificationTopics+strconv.FormatInt(orgId, > 10), true) > defer reader.Close() > for { > msg, err := reader.Next(ctx) > if msg == nil { > log.DefaultLogger.Info("Grafana sender: DONE") > return ctx.Err() > } > if err != nil { > log.DefaultLogger.Error(fmt.Sprintf("Couldn't get the > message via reader.Next(): %+v", err)) > continue > } > log.DefaultLogger.Info("Sending notification to " + > req.PluginContext.User.Login) > err = sender.SendJSON(msg.Payload()) > if err != nil { > log.DefaultLogger.Error(fmt.Sprintf("Couldn't send frame: > %v", err)) > return err > } > } > }
