This is an automated email from the ASF dual-hosted git repository.
crossoverJie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 6db892c3 [Issue 1473][Consumer] Fix race in grabConn dropping messages
before handler registration (#1476)
6db892c3 is described below
commit 6db892c38502ad3de9083f34b74b40a103e69fb5
Author: aleks-lazic <[email protected]>
AuthorDate: Wed May 6 02:57:53 2026 +0200
[Issue 1473][Consumer] Fix race in grabConn dropping messages before
handler registration (#1476)
---
pulsar/consumer_partition.go | 59 ++++--
pulsar/consumer_partition_test.go | 393 ++++++++++++++++++++++++++++++++++++++
2 files changed, 440 insertions(+), 12 deletions(-)
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index a0190f70..e63d4b39 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -2201,10 +2201,49 @@ func (pc *partitionConsumer) grabConn(assignedBrokerURL
string) error {
cmdSubscribe.ForceTopicCreation = proto.Bool(false)
}
- res, err := pc.client.rpcClient.RequestWithCnxKeySuffix(lr.LogicalAddr,
lr.PhysicalAddr, pc.cnxKeySuffix, requestID,
- pb.BaseCommand_SUBSCRIBE, cmdSubscribe)
+ // Obtain the connection before sending the subscribe RPC so we can
register
+ // the consumer handler before the broker starts delivering frames.
+ // This closes a race where MESSAGE and ACTIVE_CONSUMER_CHANGE commands
+ // arriving immediately after the subscribe response were silently
dropped
+ // because AddConsumeHandler had not been called yet.
+ cnx, err := pc.client.cnxPool.GetConnection(lr.LogicalAddr,
lr.PhysicalAddr, pc.cnxKeySuffix)
+ if err != nil {
+ pc.log.WithError(err).Error("Failed to get connection")
+ return err
+ }
+
+ // Set the connection BEFORE registering the handler so that handler
+ // callbacks (e.g. MessageReceived → discardCorruptedMessage) can safely
+ // call pc._getConn() without hitting a nil pointer.
+ var prevConn internal.Connection
+ if v := pc.conn.Load(); v != nil {
+ prevConn = *v
+ }
+ pc._setConn(cnx)
+
+ // restoreConn rolls back pc.conn to the previous connection (or nil on
+ // the very first call) so a failed subscribe attempt doesn't leave
+ // pc.conn pointing at a stale connection.
+ restoreConn := func() {
+ if prevConn != nil {
+ pc._setConn(prevConn)
+ } else {
+ pc.conn.Store(nil)
+ }
+ }
+ // Register handler BEFORE the subscribe RPC so no frames are missed
+ err = cnx.AddConsumeHandler(pc.consumerID, pc)
if err != nil {
+ restoreConn()
+ pc.log.WithError(err).Error("Failed to add consumer handler")
+ return err
+ }
+
+ res, err := pc.client.rpcClient.RequestOnCnx(cnx, requestID,
pb.BaseCommand_SUBSCRIBE, cmdSubscribe)
+ if err != nil {
+ cnx.DeleteConsumeHandler(pc.consumerID)
+ restoreConn()
pc.log.WithError(err).Error("Failed to create consumer")
if err == internal.ErrRequestTimeOut {
requestID := pc.client.rpcClient.NewRequestID()
@@ -2212,7 +2251,7 @@ func (pc *partitionConsumer) grabConn(assignedBrokerURL
string) error {
ConsumerId: proto.Uint64(pc.consumerID),
RequestId: proto.Uint64(requestID),
}
- _, _ =
pc.client.rpcClient.RequestWithCnxKeySuffix(lr.LogicalAddr, lr.PhysicalAddr,
pc.cnxKeySuffix, requestID,
+ _, _ = pc.client.rpcClient.RequestOnCnx(cnx, requestID,
pb.BaseCommand_CLOSE_CONSUMER, cmdClose)
}
return err
@@ -2222,13 +2261,7 @@ func (pc *partitionConsumer) grabConn(assignedBrokerURL
string) error {
pc.name = res.Response.ConsumerStatsResponse.GetConsumerName()
}
- pc._setConn(res.Cnx)
pc.log.Info("Connected consumer")
- err = pc._getConn().AddConsumeHandler(pc.consumerID, pc)
- if err != nil {
- pc.log.WithError(err).Error("Failed to add consumer handler")
- return err
- }
msgType := res.Response.GetType()
@@ -2493,9 +2526,11 @@ func (pc *partitionConsumer) _setConn(conn
internal.Connection) {
// _getConn returns internal connection field of this partition consumer
atomically.
// Note: should only be called by this partition consumer before attempting to
use the connection
func (pc *partitionConsumer) _getConn() internal.Connection {
- // Invariant: The conn must be non-nill for the lifetime of the
partitionConsumer.
- // For this reason we leave this cast unchecked and panic()
if the
- // invariant is broken
+ // Invariant: conn is non-nil after the first successful grabConn (i.e.
after
+ // a subscribe RPC succeeds). During grabConn itself, conn
is set
+ // before AddConsumeHandler so that handler callbacks can
use it.
+ // Before the first successful subscribe, conn may be nil.
+ // We leave this cast unchecked and panic() if the invariant
is broken.
return *pc.conn.Load()
}
diff --git a/pulsar/consumer_partition_test.go
b/pulsar/consumer_partition_test.go
index cfaf7bf6..a50d02ef 100644
--- a/pulsar/consumer_partition_test.go
+++ b/pulsar/consumer_partition_test.go
@@ -18,15 +18,20 @@
package pulsar
import (
+ "fmt"
+ "net/url"
"sync"
+ "sync/atomic"
"testing"
"time"
"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsar/internal/crypto"
+ pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/apache/pulsar-client-go/pulsar/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
+ "google.golang.org/protobuf/proto"
)
func TestSingleMessageIDNoAckTracker(t *testing.T) {
@@ -347,3 +352,391 @@ func TestMessageReceivedAllMessagesDuplicate(t
*testing.T) {
default:
}
}
+
+// TestGrabConn_HandlerRegisteredBeforeSubscribe verifies that the consumer
+// handler is registered on the connection BEFORE the subscribe RPC is sent.
+//
+// Without this ordering, the broker can send MESSAGE and
ACTIVE_CONSUMER_CHANGE
+// frames immediately after the subscribe succeeds, but the client's read
+// goroutine cannot route them because the handler isn't in the map yet.
+// Those frames are silently dropped.
+func TestGrabConn_HandlerRegisteredBeforeSubscribe(t *testing.T) {
+ cnx := newSpyConnection()
+ rpc := &grabConnSpyRPCClient{cnx: cnx}
+ pc := newGrabConnTestConsumer(cnx, rpc)
+
+ err := pc.grabConn("")
+ assert.NoError(t, err)
+
+ // Drain the connectedCh goroutine spawned on success to assert it fires
+ // and avoid relying solely on the channel's buffer capacity.
+ <-pc.connectedCh
+
+ assert.True(t, rpc.handlerRegisteredDuringRPC.Load(),
+ "AddConsumeHandler must be called before the subscribe RPC is
sent")
+}
+
+// TestGrabConn_HandlerRemovedOnSubscribeFailure verifies that when the
+// subscribe RPC fails, the pre-registered consumer handler is removed from
+// the connection so it does not leak.
+func TestGrabConn_HandlerRemovedOnSubscribeFailure(t *testing.T) {
+ cnx := newSpyConnection()
+ rpc := &grabConnSpyRPCClient{
+ cnx: cnx,
+ subscribeErr: fmt.Errorf("broker rejected subscribe"),
+ }
+ pc := newGrabConnTestConsumer(cnx, rpc)
+
+ err := pc.grabConn("")
+ assert.Error(t, err)
+
+ assert.True(t, cnx.handlerRemoved.Load(),
+ "DeleteConsumeHandler must be called when subscribe fails")
+}
+
+// TestGrabConn_HandlerRemovedOnSubscribeTimeout verifies cleanup on timeout
+// and that the close command is sent on the same connection (not a potentially
+// different one from the pool).
+func TestGrabConn_HandlerRemovedOnSubscribeTimeout(t *testing.T) {
+ cnx := newSpyConnection()
+ rpc := &grabConnSpyRPCClient{
+ cnx: cnx,
+ subscribeErr: internal.ErrRequestTimeOut,
+ }
+ pc := newGrabConnTestConsumer(cnx, rpc)
+
+ err := pc.grabConn("")
+ assert.ErrorIs(t, err, internal.ErrRequestTimeOut)
+
+ assert.True(t, cnx.handlerRemoved.Load(),
+ "DeleteConsumeHandler must be called on timeout")
+ assert.True(t, rpc.closeSentOnCnx.Load(),
+ "CloseConsumer must be sent via RequestOnCnx on the same
connection")
+}
+
+// TestGrabConn_ConnResetOnFirstCallFailure verifies that when grabConn fails
+// on the very first call (no prior connection), pc.conn is reset to nil rather
+// than left pointing at the stale connection on which subscribe failed.
+func TestGrabConn_ConnResetOnFirstCallFailure(t *testing.T) {
+ cnx := newSpyConnection()
+ rpc := &grabConnSpyRPCClient{
+ cnx: cnx,
+ subscribeErr: fmt.Errorf("broker rejected subscribe"),
+ }
+ pc := newGrabConnTestConsumerNoConn(cnx, rpc)
+
+ err := pc.grabConn("")
+ assert.Error(t, err)
+
+ assert.Nil(t, pc.conn.Load(),
+ "pc.conn must be reset to nil on first-call failure, not left
pointing at the stale connection")
+}
+
+// TestGrabConn_BrokerFrameDuringSubscribe simulates the exact race: the broker
+// sends a frame (e.g. ActiveConsumerChange) while the subscribe RPC is still
+// in flight. Because the handler is registered before the RPC, the frame
+// must be delivered to the consumer — not dropped.
+func TestGrabConn_BrokerFrameDuringSubscribe(t *testing.T) {
+ cnx := newSpyConnection()
+ var consumerReceivedChange atomic.Bool
+
+ rpc := &grabConnSpyRPCClient{
+ cnx: cnx,
+ duringSubscribe: func() {
+ // Simulate the broker's read goroutine delivering a
frame
+ // while the subscribe RPC is in flight.
+ if handler, ok :=
cnx.handler.Load().(*partitionConsumer); ok && handler != nil {
+ handler.ActiveConsumerChanged(true)
+ consumerReceivedChange.Store(true)
+ }
+ },
+ }
+ pc := newGrabConnTestConsumer(cnx, rpc)
+
+ err := pc.grabConn("")
+ assert.NoError(t, err)
+
+ // Drain the connectedCh goroutine spawned on success.
+ <-pc.connectedCh
+
+ assert.True(t, consumerReceivedChange.Load(),
+ "Frames sent by the broker during the subscribe RPC must reach
the consumer handler")
+}
+
+// TestGrabConn_MessageReceivedDuringSubscribe_NilConn verifies that when the
+// broker delivers a MESSAGE frame while the subscribe RPC is in flight, the
+// consumer does not panic due to a nil connection. This is a regression test:
+// registering the handler before calling _setConn means MessageReceived ->
+// discardCorruptedMessage -> _getConn() dereferences nil.
+func TestGrabConn_MessageReceivedDuringSubscribe_NilConn(t *testing.T) {
+ cnx := newSpyConnection()
+
+ rpc := &grabConnSpyRPCClient{
+ cnx: cnx,
+ duringSubscribe: func() {
+ handler, ok := cnx.handler.Load().(*partitionConsumer)
+ if !ok || handler == nil {
+ return
+ }
+ // Deliver an empty (invalid) message frame.
ReadBrokerMetadata
+ // will fail, causing discardCorruptedMessage which
calls
+ // pc._getConn(). If conn is nil this panics.
+ msgID := &pb.MessageIdData{
+ LedgerId: proto.Uint64(1),
+ EntryId: proto.Uint64(1),
+ }
+ cmd := &pb.CommandMessage{MessageId: msgID}
+ // 4 bytes: ReadBrokerMetadata reads 2 bytes for magic
number (won't
+ // match broker metadata magic → returns nil, nil), then
+ // ReadMessageMetadata fails → discardCorruptedMessage
→ _getConn()
+ // panics if conn is nil.
+ buf := internal.NewBuffer(4)
+ buf.Write([]byte{0x00, 0x00, 0x00, 0x00})
+ _ = handler.MessageReceived(cmd, buf)
+ },
+ }
+ // Build consumer WITHOUT pre-setting _setConn to simulate the real
+ // first-call path where conn is nil before grabConn completes.
+ pc := newGrabConnTestConsumerNoConn(cnx, rpc)
+
+ // This must not panic.
+ assert.NotPanics(t, func() {
+ _ = pc.grabConn("")
+ })
+}
+
+// TestGrabConn_GetConnectionFailure verifies that grabConn returns the error
+// from GetConnection without registering a handler or sending an RPC.
+func TestGrabConn_GetConnectionFailure(t *testing.T) {
+ cnx := newSpyConnection()
+ rpc := &grabConnSpyRPCClient{cnx: cnx}
+ pc := newGrabConnTestConsumer(cnx, rpc)
+
+ // Override the pool to return an error
+ pc.client.cnxPool = &grabConnMockPool{err: fmt.Errorf("connection
refused")}
+
+ err := pc.grabConn("")
+ assert.ErrorContains(t, err, "connection refused")
+
+ assert.False(t, cnx.handlerRegistered.Load(),
+ "AddConsumeHandler must not be called when GetConnection fails")
+}
+
+// TestGrabConn_AddConsumeHandlerFailure verifies that grabConn returns the
+// error from AddConsumeHandler without sending a subscribe RPC.
+func TestGrabConn_AddConsumeHandlerFailure(t *testing.T) {
+ cnx := newSpyConnection()
+ cnx.addHandlerErr = fmt.Errorf("connection closed")
+ rpc := &grabConnSpyRPCClient{cnx: cnx}
+ pc := newGrabConnTestConsumer(cnx, rpc)
+
+ err := pc.grabConn("")
+ assert.ErrorContains(t, err, "connection closed")
+
+ assert.False(t, rpc.handlerRegisteredDuringRPC.Load(),
+ "Subscribe RPC must not be sent when AddConsumeHandler fails")
+}
+
+// TestGrabConn_ConnResetOnAddHandlerFailure verifies that when
AddConsumeHandler
+// fails on the first call (no prior connection), pc.conn is reset to nil.
+func TestGrabConn_ConnResetOnAddHandlerFailure(t *testing.T) {
+ cnx := newSpyConnection()
+ cnx.addHandlerErr = fmt.Errorf("connection closed")
+ rpc := &grabConnSpyRPCClient{cnx: cnx}
+ pc := newGrabConnTestConsumerNoConn(cnx, rpc)
+
+ err := pc.grabConn("")
+ assert.ErrorContains(t, err, "connection closed")
+
+ assert.Nil(t, pc.conn.Load(),
+ "pc.conn must be reset to nil on first-call AddConsumeHandler
failure")
+}
+
+// --- Helpers
+
+// newGrabConnTestConsumer builds a minimal partitionConsumer wired to the
+// given spy connection and RPC client, suitable for testing grabConn.
+func newGrabConnTestConsumer(cnx *spyConnection, rpc *grabConnSpyRPCClient)
*partitionConsumer {
+ brokerURL, _ := url.Parse("pulsar://localhost:6650")
+ if rpc.lookupResult == nil {
+ rpc.lookupResult = &internal.LookupResult{
+ LogicalAddr: brokerURL,
+ PhysicalAddr: brokerURL,
+ }
+ }
+ pool := &grabConnMockPool{cnx: cnx}
+
+ c := &client{
+ cnxPool: pool,
+ rpcClient: rpc,
+ log: log.DefaultNopLogger(),
+ }
+
+ pc := &partitionConsumer{
+ client: c,
+ topic: "persistent://public/default/test",
+ options: &partitionConsumerOpts{subscription:
"sub"},
+ log: log.DefaultNopLogger(),
+ compressionProviders: sync.Map{},
+ connectedCh: make(chan struct{}, 1),
+ metrics: newTestMetrics(),
+ }
+ // Required: lookupTopic calls _getConn().IsProxied() when
assignedBrokerURL != "".
+ // grabConn will overwrite this with the same connection after a
successful subscribe.
+ pc._setConn(cnx)
+ // availablePermits is NOT initialised here because the success path and
+ // simple error paths don't touch it. Only newGrabConnTestConsumerNoConn
+ // sets it — the MessageReceived path (discardCorruptedMessage) calls
+ // availablePermits.inc() which would panic on a nil receiver.
+ return pc
+}
+
+// newGrabConnTestConsumerNoConn is like newGrabConnTestConsumer but does NOT
+// pre-set _setConn, simulating the real first-call path where conn is nil.
+// Only use with grabConn("") since a non-empty assignedBrokerURL would call
+// _getConn().IsProxied() in lookupTopic and panic.
+func newGrabConnTestConsumerNoConn(cnx *spyConnection, rpc
*grabConnSpyRPCClient) *partitionConsumer {
+ brokerURL, _ := url.Parse("pulsar://localhost:6650")
+ if rpc.lookupResult == nil {
+ rpc.lookupResult = &internal.LookupResult{
+ LogicalAddr: brokerURL,
+ PhysicalAddr: brokerURL,
+ }
+ }
+ pool := &grabConnMockPool{cnx: cnx}
+
+ c := &client{
+ cnxPool: pool,
+ rpcClient: rpc,
+ log: log.DefaultNopLogger(),
+ }
+
+ pc := &partitionConsumer{
+ client: c,
+ topic: "persistent://public/default/test",
+ options: &partitionConsumerOpts{subscription:
"sub"},
+ log: log.DefaultNopLogger(),
+ compressionProviders: sync.Map{},
+ connectedCh: make(chan struct{}, 1),
+ metrics: newTestMetrics(),
+ }
+ pc.availablePermits = &availablePermits{pc: pc}
+ return pc
+}
+
+// spyConnection tracks AddConsumeHandler / DeleteConsumeHandler calls and
+// stores the registered handler so tests can deliver frames through it.
+type spyConnection struct {
+ dummyConnection
+ handlerRegistered atomic.Bool
+ handlerRemoved atomic.Bool
+ handler atomic.Value // stores *partitionConsumer
+ addHandlerErr error // when set, AddConsumeHandler returns
this error
+}
+
+func newSpyConnection() *spyConnection {
+ return &spyConnection{}
+}
+
+func (s *spyConnection) AddConsumeHandler(_ uint64, h
internal.ConsumerHandler) error {
+ if s.addHandlerErr != nil {
+ return s.addHandlerErr
+ }
+ s.handlerRegistered.Store(true)
+ // Store as *partitionConsumer so all stores use the same concrete type
+ // (atomic.Value panics if you mix concrete types across stores).
+ s.handler.Store(h.(*partitionConsumer))
+ return nil
+}
+
+func (s *spyConnection) DeleteConsumeHandler(_ uint64) {
+ s.handlerRemoved.Store(true)
+ var h *partitionConsumer
+ s.handler.Store(h)
+}
+
+// grabConnSpyRPCClient records the ordering of AddConsumeHandler relative to
+// the subscribe RPC, and optionally injects errors or mid-RPC callbacks.
+type grabConnSpyRPCClient struct {
+ internal.RPCClient
+ cnx *spyConnection
+ lookupResult *internal.LookupResult
+
+ // handlerRegisteredDuringRPC is true if AddConsumeHandler was called
+ // before the subscribe RPC executed.
+ handlerRegisteredDuringRPC atomic.Bool
+
+ // subscribeErr, when set, makes the subscribe RPC return this error.
+ subscribeErr error
+
+ // duringSubscribe, when set, is called inside the subscribe RPC to
+ // simulate broker frames arriving while the RPC is in flight.
+ duringSubscribe func()
+
+ // closeSentOnCnx is true if a CLOSE_CONSUMER was sent via RequestOnCnx
+ // (as opposed to RequestWithCnxKeySuffix which could pick a different
connection).
+ closeSentOnCnx atomic.Bool
+}
+
+func (r *grabConnSpyRPCClient) NewRequestID() uint64 { return 1 }
+
+func (r *grabConnSpyRPCClient) RequestOnCnxNoWait(_ internal.Connection, _
pb.BaseCommand_Type,
+ _ proto.Message) error {
+ return nil
+}
+
+func (r *grabConnSpyRPCClient) RequestOnCnx(_ internal.Connection, _ uint64,
+ cmdType pb.BaseCommand_Type, _ proto.Message) (*internal.RPCResult,
error) {
+
+ switch cmdType {
+ case pb.BaseCommand_CLOSE_CONSUMER:
+ r.closeSentOnCnx.Store(true)
+ return nil, nil
+ case pb.BaseCommand_SUBSCRIBE:
+ // handled below
+ default:
+ panic(fmt.Sprintf("grabConnSpyRPCClient: unexpected command
type %v", cmdType))
+ }
+
+ r.handlerRegisteredDuringRPC.Store(r.cnx.handlerRegistered.Load())
+
+ if r.duringSubscribe != nil {
+ r.duringSubscribe()
+ }
+
+ if r.subscribeErr != nil {
+ return nil, r.subscribeErr
+ }
+
+ successType := pb.BaseCommand_SUCCESS
+ return &internal.RPCResult{
+ Response: &pb.BaseCommand{Type: &successType},
+ Cnx: r.cnx,
+ }, nil
+}
+
+func (r *grabConnSpyRPCClient) LookupService(_ string)
(internal.LookupService, error) {
+ return &grabConnMockLookup{result: r.lookupResult}, nil
+}
+
+type grabConnMockLookup struct {
+ internal.LookupService
+ result *internal.LookupResult
+}
+
+func (m *grabConnMockLookup) Lookup(_ string) (*internal.LookupResult, error) {
+ return m.result, nil
+}
+
+type grabConnMockPool struct {
+ internal.ConnectionPool
+ cnx internal.Connection
+ err error
+}
+
+func (m *grabConnMockPool) GetConnection(_ *url.URL, _ *url.URL, _ int32)
(internal.Connection, error) {
+ if m.err != nil {
+ return nil, m.err
+ }
+ return m.cnx, nil
+}