This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch branch-0.15.0
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git

commit 45956795619c22e56102fe5ffbf52b51003c253d
Author: Zike Yang <[email protected]>
AuthorDate: Wed May 14 11:27:24 2025 +0800

    Fix reader hanging when startMessageId is latest (#1364)
    
    * Fix reader hanging when startMessageId is latest
    
    * Fix lint
    
    * Apply suggestions from code review
    
    Co-authored-by: Copilot <[email protected]>
    
    * Add comment to seekMessageId
    
    ---------
    
    Co-authored-by: Copilot <[email protected]>
    (cherry picked from commit 2516598b8d8014ad3eaa76191261e02d4c979c47)
---
 pulsar/consumer_partition.go | 19 ++++++++++++--
 pulsar/reader_test.go        | 59 ++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 76 insertions(+), 2 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index cc2554c0..fc2cb774 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -161,6 +161,10 @@ type partitionConsumer struct {
        startMessageID  atomicMessageID
        lastDequeuedMsg *trackingMessageID
 
+       // This is used to track the seeking message id during the seek 
operation.
+       // It will be set to nil after seek completes and reconnected.
+       seekMessageID atomicMessageID
+
        currentQueueSize       uAtomic.Int32
        scaleReceiverQueueHint uAtomic.Bool
        incomingMessages       uAtomic.Int32
@@ -365,6 +369,7 @@ func newPartitionConsumer(parent Consumer, client *client, 
options *partitionCon
                maxQueueSize:               int32(options.receiverQueueSize),
                queueCh:                    make(chan []*message, 
options.receiverQueueSize),
                startMessageID:             atomicMessageID{msgID: 
options.startMessageID},
+               seekMessageID:              atomicMessageID{msgID: nil},
                connectedCh:                make(chan struct{}),
                messageCh:                  messageCh,
                connectClosedCh:            make(chan *connectionClosed, 1),
@@ -1023,7 +1028,7 @@ func (pc *partitionConsumer) requestSeek(msgID 
*messageID) error {
        // 2. The startMessageID is reset to ensure accurate judgment when 
calling hasNext next time.
        //    Since the messages in the queue are cleared here reconnection 
won't reset startMessageId.
        pc.lastDequeuedMsg = nil
-       pc.startMessageID.set(toTrackingMessageID(msgID))
+       pc.seekMessageID.set(toTrackingMessageID(msgID))
        pc.clearQueueAndGetNextMessage()
        return nil
 }
@@ -1481,6 +1486,11 @@ func (pc *partitionConsumer) 
messageShouldBeDiscarded(msgID *trackingMessageID)
                return false
        }
 
+       // if we start at latest message, we should never discard
+       if pc.startMessageID.get().equal(latestMessageID) {
+               return false
+       }
+
        if pc.options.startMessageIDInclusive {
                return pc.startMessageID.get().greater(msgID.messageID)
        }
@@ -1977,7 +1987,12 @@ func (pc *partitionConsumer) grabConn(assignedBrokerURL 
string) error {
                KeySharedMeta:              keySharedMeta,
        }
 
-       pc.startMessageID.set(pc.clearReceiverQueue())
+       if seekMsgID := pc.seekMessageID.get(); seekMsgID != nil {
+               pc.startMessageID.set(seekMsgID)
+               pc.seekMessageID.set(nil) // Reset seekMessageID to nil to 
avoid persisting state across reconnects
+       } else {
+               pc.startMessageID.set(pc.clearReceiverQueue())
+       }
        if pc.options.subscriptionMode != Durable {
                // For regular subscriptions the broker will determine the 
restarting point
                cmdSubscribe.StartMessageId = 
convertToMessageIDData(pc.startMessageID.get())
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index 2fbe89e4..3a74db6c 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -23,6 +23,8 @@ import (
        "testing"
        "time"
 
+       "github.com/stretchr/testify/require"
+
        "github.com/apache/pulsar-client-go/pulsar/backoff"
 
        "github.com/apache/pulsar-client-go/pulsar/crypto"
@@ -1237,3 +1239,60 @@ func TestReaderWithSeekByTime(t *testing.T) {
                })
        }
 }
+
+func TestReaderReadFromLatest(t *testing.T) {
+       topic := newTopicName()
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       require.NoError(t, err)
+       defer client.Close()
+
+       r, err := client.CreateReader(ReaderOptions{
+               Topic:          topic,
+               StartMessageID: LatestMessageID(),
+       })
+       require.NoError(t, err)
+       defer r.Close()
+
+       p, err := client.CreateProducer(ProducerOptions{
+               Topic: topic,
+       })
+       require.NoError(t, err)
+       defer p.Close()
+
+       // Send messages
+       for i := 0; i < 10; i++ {
+               msg := &ProducerMessage{
+                       Key:     "key",
+                       Payload: []byte(fmt.Sprintf("message-%d", i)),
+               }
+               id, err := p.Send(context.Background(), msg)
+               require.NoError(t, err)
+               require.NotNil(t, id)
+       }
+
+       // Read and verify messages
+       for i := 0; i < 10; i++ {
+               ctx, cancel := context.WithTimeout(context.Background(), 
1*time.Second)
+               msg, err := r.Next(ctx)
+               cancel()
+               require.NoError(t, err)
+               require.NotNil(t, msg)
+
+               // Verify message key
+               require.Equal(t, "key", msg.Key())
+
+               // Verify message payload
+               expectedPayload := fmt.Sprintf("message-%d", i)
+               require.Equal(t, []byte(expectedPayload), msg.Payload())
+       }
+
+       // Verify no more messages
+       ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
+       defer cancel()
+       msg, err := r.Next(ctx)
+       require.Error(t, err)
+       require.Nil(t, msg)
+}

Reply via email to