This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new a7e7239  [ISSUE-328] gets last message when LatestMessageID and 
inclusive (#329)
a7e7239 is described below

commit a7e7239757ddee42a99d3fc042606edf7358c931
Author: Paulo Quintans Pereira <paulo.quint...@gmail.com>
AuthorDate: Mon Aug 24 06:11:57 2020 +0100

    [ISSUE-328] gets last message when LatestMessageID and inclusive (#329)
    
    Signed-off-by: Paulo Pereira <paulo.pere...@karhoo.com>
    
    ### Motivation
    
    I have a service that when it restarts, it needs to know what was the last 
message successfully sent to pulsar.
    A reader seems the logical place, since we can specify `StartMessageID` as 
`LatestMessageID()` and `StartMessageIDInclusive`
    
    ### Modifications
    
    When the reader is created, verify if it startMessageIDInclusive true and 
startMessageID == lastestMessageID() and then get the last message id and seek 
to that message id.
---
 pulsar/consumer_partition.go | 43 ++++++++++++++++++++++++------
 pulsar/reader_test.go        | 63 ++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 98 insertions(+), 8 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index b3fc17c..04a36cd 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -76,6 +76,8 @@ var (
                Help:    "Time it takes for application to process messages",
                Buckets: []float64{.0005, .001, .005, .01, .025, .05, .1, .25, 
.5, 1, 2.5, 5, 10},
        })
+
+       lastestMessageID = LatestMessageID()
 )
 
 type consumerState int
@@ -98,6 +100,10 @@ const (
        nonDurable
 )
 
+const (
+       noMessageEntry = -1
+)
+
 type partitionConsumerOpts struct {
        topic                      string
        consumerName               string
@@ -193,6 +199,21 @@ func newPartitionConsumer(parent Consumer, client *client, 
options *partitionCon
        pc.log.Info("Created consumer")
        pc.state = consumerReady
 
+       if pc.options.startMessageIDInclusive && pc.startMessageID == 
lastestMessageID {
+               msgID, err := pc.requestGetLastMessageID()
+               if err != nil {
+                       return nil, err
+               }
+               if msgID.entryID != noMessageEntry {
+                       pc.startMessageID = msgID
+
+                       err = pc.requestSeek(msgID)
+                       if err != nil {
+                               return nil, err
+                       }
+               }
+       }
+
        go pc.dispatcher()
 
        go pc.runEventsLoop()
@@ -252,7 +273,10 @@ func (pc *partitionConsumer) getLastMessageID() 
(trackingMessageID, error) {
 
 func (pc *partitionConsumer) internalGetLastMessageID(req 
*getLastMsgIDRequest) {
        defer close(req.doneCh)
+       req.msgID, req.err = pc.requestGetLastMessageID()
+}
 
+func (pc *partitionConsumer) requestGetLastMessageID() (messageID, error) {
        requestID := pc.client.rpcClient.NewRequestID()
        cmdGetLastMessageID := &pb.CommandGetLastMessageId{
                RequestId:  proto.Uint64(requestID),
@@ -262,11 +286,10 @@ func (pc *partitionConsumer) internalGetLastMessageID(req 
*getLastMsgIDRequest)
                pb.BaseCommand_GET_LAST_MESSAGE_ID, cmdGetLastMessageID)
        if err != nil {
                pc.log.WithError(err).Error("Failed to get last message id")
-               req.err = err
-       } else {
-               id := res.Response.GetLastMessageIdResponse.GetLastMessageId()
-               req.msgID = convertToMessageID(id)
+               return messageID{}, err
        }
+       id := res.Response.GetLastMessageIdResponse.GetLastMessageId()
+       return convertToMessageID(id), nil
 }
 
 func (pc *partitionConsumer) AckID(msgID trackingMessageID) {
@@ -342,17 +365,20 @@ func (pc *partitionConsumer) Seek(msgID 
trackingMessageID) error {
 
 func (pc *partitionConsumer) internalSeek(seek *seekRequest) {
        defer close(seek.doneCh)
+       seek.err = pc.requestSeek(seek.msgID)
+}
 
+func (pc *partitionConsumer) requestSeek(msgID messageID) error {
        if pc.state == consumerClosing || pc.state == consumerClosed {
                pc.log.Error("Consumer was already closed")
-               return
+               return nil
        }
 
        id := &pb.MessageIdData{}
-       err := proto.Unmarshal(seek.msgID.Serialize(), id)
+       err := proto.Unmarshal(msgID.Serialize(), id)
        if err != nil {
                pc.log.WithError(err).Errorf("deserialize message id error: 
%s", err.Error())
-               seek.err = err
+               return err
        }
 
        requestID := pc.client.rpcClient.NewRequestID()
@@ -365,8 +391,9 @@ func (pc *partitionConsumer) internalSeek(seek 
*seekRequest) {
        _, err = pc.client.rpcClient.RequestOnCnx(pc.conn, requestID, 
pb.BaseCommand_SEEK, cmdSeek)
        if err != nil {
                pc.log.WithError(err).Error("Failed to reset to message id")
-               seek.err = err
+               return err
        }
+       return nil
 }
 
 func (pc *partitionConsumer) SeekByTime(time time.Time) error {
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index d99bfcb..08b949e 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -446,3 +446,66 @@ func TestReaderOnSpecificMessageWithCustomMessageID(t 
*testing.T) {
                assert.Equal(t, []byte(expectMsg), msg.Payload())
        }
 }
+
+func TestReaderLatestInclusiveHasNext(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := newTopicName()
+       ctx := context.Background()
+
+       // create reader on the last message (inclusive)
+       reader0, err := client.CreateReader(ReaderOptions{
+               Topic:                   topic,
+               StartMessageID:          LatestMessageID(),
+               StartMessageIDInclusive: true,
+       })
+
+       assert.Nil(t, err)
+       defer reader0.Close()
+
+       assert.False(t, reader0.HasNext())
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           topic,
+               DisableBatching: true,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       // send 10 messages
+       var lastMsgID MessageID
+       for i := 0; i < 10; i++ {
+               lastMsgID, err = producer.Send(ctx, &ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+               })
+               assert.NoError(t, err)
+               assert.NotNil(t, lastMsgID)
+       }
+
+       // create reader on the last message (inclusive)
+       reader, err := client.CreateReader(ReaderOptions{
+               Topic:                   topic,
+               StartMessageID:          LatestMessageID(),
+               StartMessageIDInclusive: true,
+       })
+
+       assert.Nil(t, err)
+       defer reader.Close()
+
+       var msgID MessageID
+       if reader.HasNext() {
+               msg, err := reader.Next(context.Background())
+               assert.NoError(t, err)
+
+               assert.Equal(t, []byte("hello-9"), msg.Payload())
+               msgID = msg.ID()
+       }
+
+       assert.Equal(t, lastMsgID.Serialize(), msgID.Serialize())
+}

Reply via email to