This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new a7e7239 [ISSUE-328] gets last message when LatestMessageID and inclusive (#329) a7e7239 is described below commit a7e7239757ddee42a99d3fc042606edf7358c931 Author: Paulo Quintans Pereira <paulo.quint...@gmail.com> AuthorDate: Mon Aug 24 06:11:57 2020 +0100 [ISSUE-328] gets last message when LatestMessageID and inclusive (#329) Signed-off-by: Paulo Pereira <paulo.pere...@karhoo.com> ### Motivation I have a service that when it restarts, it needs to know what was the last message successfully sent to pulsar. A reader seems the logical place, since we can specify `StartMessageID` as `LatestMessageID()` and `StartMessageIDInclusive` ### Modifications When the reader is created, verify if it startMessageIDInclusive true and startMessageID == lastestMessageID() and then get the last message id and seek to that message id. --- pulsar/consumer_partition.go | 43 ++++++++++++++++++++++++------ pulsar/reader_test.go | 63 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 98 insertions(+), 8 deletions(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index b3fc17c..04a36cd 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -76,6 +76,8 @@ var ( Help: "Time it takes for application to process messages", Buckets: []float64{.0005, .001, .005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10}, }) + + lastestMessageID = LatestMessageID() ) type consumerState int @@ -98,6 +100,10 @@ const ( nonDurable ) +const ( + noMessageEntry = -1 +) + type partitionConsumerOpts struct { topic string consumerName string @@ -193,6 +199,21 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon pc.log.Info("Created consumer") pc.state = consumerReady + if pc.options.startMessageIDInclusive && pc.startMessageID == lastestMessageID { + msgID, err := pc.requestGetLastMessageID() + if err != nil { + return nil, err + } + if msgID.entryID != noMessageEntry { + pc.startMessageID = msgID + + err = pc.requestSeek(msgID) + if err != nil { + return nil, err + } + } + } + go pc.dispatcher() go pc.runEventsLoop() @@ -252,7 +273,10 @@ func (pc *partitionConsumer) getLastMessageID() (trackingMessageID, error) { func (pc *partitionConsumer) internalGetLastMessageID(req *getLastMsgIDRequest) { defer close(req.doneCh) + req.msgID, req.err = pc.requestGetLastMessageID() +} +func (pc *partitionConsumer) requestGetLastMessageID() (messageID, error) { requestID := pc.client.rpcClient.NewRequestID() cmdGetLastMessageID := &pb.CommandGetLastMessageId{ RequestId: proto.Uint64(requestID), @@ -262,11 +286,10 @@ func (pc *partitionConsumer) internalGetLastMessageID(req *getLastMsgIDRequest) pb.BaseCommand_GET_LAST_MESSAGE_ID, cmdGetLastMessageID) if err != nil { pc.log.WithError(err).Error("Failed to get last message id") - req.err = err - } else { - id := res.Response.GetLastMessageIdResponse.GetLastMessageId() - req.msgID = convertToMessageID(id) + return messageID{}, err } + id := res.Response.GetLastMessageIdResponse.GetLastMessageId() + return convertToMessageID(id), nil } func (pc *partitionConsumer) AckID(msgID trackingMessageID) { @@ -342,17 +365,20 @@ func (pc *partitionConsumer) Seek(msgID trackingMessageID) error { func (pc *partitionConsumer) internalSeek(seek *seekRequest) { defer close(seek.doneCh) + seek.err = pc.requestSeek(seek.msgID) +} +func (pc *partitionConsumer) requestSeek(msgID messageID) error { if pc.state == consumerClosing || pc.state == consumerClosed { pc.log.Error("Consumer was already closed") - return + return nil } id := &pb.MessageIdData{} - err := proto.Unmarshal(seek.msgID.Serialize(), id) + err := proto.Unmarshal(msgID.Serialize(), id) if err != nil { pc.log.WithError(err).Errorf("deserialize message id error: %s", err.Error()) - seek.err = err + return err } requestID := pc.client.rpcClient.NewRequestID() @@ -365,8 +391,9 @@ func (pc *partitionConsumer) internalSeek(seek *seekRequest) { _, err = pc.client.rpcClient.RequestOnCnx(pc.conn, requestID, pb.BaseCommand_SEEK, cmdSeek) if err != nil { pc.log.WithError(err).Error("Failed to reset to message id") - seek.err = err + return err } + return nil } func (pc *partitionConsumer) SeekByTime(time time.Time) error { diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index d99bfcb..08b949e 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -446,3 +446,66 @@ func TestReaderOnSpecificMessageWithCustomMessageID(t *testing.T) { assert.Equal(t, []byte(expectMsg), msg.Payload()) } } + +func TestReaderLatestInclusiveHasNext(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + topic := newTopicName() + ctx := context.Background() + + // create reader on the last message (inclusive) + reader0, err := client.CreateReader(ReaderOptions{ + Topic: topic, + StartMessageID: LatestMessageID(), + StartMessageIDInclusive: true, + }) + + assert.Nil(t, err) + defer reader0.Close() + + assert.False(t, reader0.HasNext()) + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: true, + }) + assert.Nil(t, err) + defer producer.Close() + + // send 10 messages + var lastMsgID MessageID + for i := 0; i < 10; i++ { + lastMsgID, err = producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }) + assert.NoError(t, err) + assert.NotNil(t, lastMsgID) + } + + // create reader on the last message (inclusive) + reader, err := client.CreateReader(ReaderOptions{ + Topic: topic, + StartMessageID: LatestMessageID(), + StartMessageIDInclusive: true, + }) + + assert.Nil(t, err) + defer reader.Close() + + var msgID MessageID + if reader.HasNext() { + msg, err := reader.Next(context.Background()) + assert.NoError(t, err) + + assert.Equal(t, []byte("hello-9"), msg.Payload()) + msgID = msg.ID() + } + + assert.Equal(t, lastMsgID.Serialize(), msgID.Serialize()) +}