This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch branch-0.15.0 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
commit 45956795619c22e56102fe5ffbf52b51003c253d Author: Zike Yang <[email protected]> AuthorDate: Wed May 14 11:27:24 2025 +0800 Fix reader hanging when startMessageId is latest (#1364) * Fix reader hanging when startMessageId is latest * Fix lint * Apply suggestions from code review Co-authored-by: Copilot <[email protected]> * Add comment to seekMessageId --------- Co-authored-by: Copilot <[email protected]> (cherry picked from commit 2516598b8d8014ad3eaa76191261e02d4c979c47) --- pulsar/consumer_partition.go | 19 ++++++++++++-- pulsar/reader_test.go | 59 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 76 insertions(+), 2 deletions(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index cc2554c0..fc2cb774 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -161,6 +161,10 @@ type partitionConsumer struct { startMessageID atomicMessageID lastDequeuedMsg *trackingMessageID + // This is used to track the seeking message id during the seek operation. + // It will be set to nil after seek completes and reconnected. + seekMessageID atomicMessageID + currentQueueSize uAtomic.Int32 scaleReceiverQueueHint uAtomic.Bool incomingMessages uAtomic.Int32 @@ -365,6 +369,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon maxQueueSize: int32(options.receiverQueueSize), queueCh: make(chan []*message, options.receiverQueueSize), startMessageID: atomicMessageID{msgID: options.startMessageID}, + seekMessageID: atomicMessageID{msgID: nil}, connectedCh: make(chan struct{}), messageCh: messageCh, connectClosedCh: make(chan *connectionClosed, 1), @@ -1023,7 +1028,7 @@ func (pc *partitionConsumer) requestSeek(msgID *messageID) error { // 2. The startMessageID is reset to ensure accurate judgment when calling hasNext next time. // Since the messages in the queue are cleared here reconnection won't reset startMessageId. pc.lastDequeuedMsg = nil - pc.startMessageID.set(toTrackingMessageID(msgID)) + pc.seekMessageID.set(toTrackingMessageID(msgID)) pc.clearQueueAndGetNextMessage() return nil } @@ -1481,6 +1486,11 @@ func (pc *partitionConsumer) messageShouldBeDiscarded(msgID *trackingMessageID) return false } + // if we start at latest message, we should never discard + if pc.startMessageID.get().equal(latestMessageID) { + return false + } + if pc.options.startMessageIDInclusive { return pc.startMessageID.get().greater(msgID.messageID) } @@ -1977,7 +1987,12 @@ func (pc *partitionConsumer) grabConn(assignedBrokerURL string) error { KeySharedMeta: keySharedMeta, } - pc.startMessageID.set(pc.clearReceiverQueue()) + if seekMsgID := pc.seekMessageID.get(); seekMsgID != nil { + pc.startMessageID.set(seekMsgID) + pc.seekMessageID.set(nil) // Reset seekMessageID to nil to avoid persisting state across reconnects + } else { + pc.startMessageID.set(pc.clearReceiverQueue()) + } if pc.options.subscriptionMode != Durable { // For regular subscriptions the broker will determine the restarting point cmdSubscribe.StartMessageId = convertToMessageIDData(pc.startMessageID.get()) diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index 2fbe89e4..3a74db6c 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -23,6 +23,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/apache/pulsar-client-go/pulsar/backoff" "github.com/apache/pulsar-client-go/pulsar/crypto" @@ -1237,3 +1239,60 @@ func TestReaderWithSeekByTime(t *testing.T) { }) } } + +func TestReaderReadFromLatest(t *testing.T) { + topic := newTopicName() + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + require.NoError(t, err) + defer client.Close() + + r, err := client.CreateReader(ReaderOptions{ + Topic: topic, + StartMessageID: LatestMessageID(), + }) + require.NoError(t, err) + defer r.Close() + + p, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + }) + require.NoError(t, err) + defer p.Close() + + // Send messages + for i := 0; i < 10; i++ { + msg := &ProducerMessage{ + Key: "key", + Payload: []byte(fmt.Sprintf("message-%d", i)), + } + id, err := p.Send(context.Background(), msg) + require.NoError(t, err) + require.NotNil(t, id) + } + + // Read and verify messages + for i := 0; i < 10; i++ { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + msg, err := r.Next(ctx) + cancel() + require.NoError(t, err) + require.NotNil(t, msg) + + // Verify message key + require.Equal(t, "key", msg.Key()) + + // Verify message payload + expectedPayload := fmt.Sprintf("message-%d", i) + require.Equal(t, []byte(expectedPayload), msg.Payload()) + } + + // Verify no more messages + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + msg, err := r.Next(ctx) + require.Error(t, err) + require.Nil(t, msg) +}
