This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new 1dfb8fd Add producer check state before send msg. (#569) 1dfb8fd is described below commit 1dfb8fdffb97d0c4f4845cc668bed485324daf5c Author: Zhiqiang Li <stu...@qq.com> AuthorDate: Wed Jul 21 17:43:23 2021 +0800 Add producer check state before send msg. (#569) Add producer state check before send msg. --- pulsar/error.go | 4 ++++ pulsar/producer_partition.go | 7 +++++++ pulsar/producer_test.go | 30 ++++++++++++++++++++++++++++++ 3 files changed, 41 insertions(+) diff --git a/pulsar/error.go b/pulsar/error.go index 60a832b..f433bfc 100644 --- a/pulsar/error.go +++ b/pulsar/error.go @@ -99,6 +99,8 @@ const ( AddToBatchFailed // SeekFailed seek failed SeekFailed + // ProducerClosed means producer already been closed + ProducerClosed ) // Error implement error interface, composed of two parts: msg and result. @@ -201,6 +203,8 @@ func getResultStr(r Result) string { return "AddToBatchFailed" case SeekFailed: return "SeekFailed" + case ProducerClosed: + return "ProducerClosed" default: return fmt.Sprintf("Result(%d)", r) } diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 8b3d33d..7e83bfa 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -50,6 +50,7 @@ var ( errSendQueueIsFull = newError(ProducerQueueIsFull, "producer send queue is full") errContextExpired = newError(TimeoutError, "message send context expired") errMessageTooLarge = newError(MessageTooBig, "message size exceeds MaxMessageSize") + errProducerClosed = newError(ProducerClosed, "producer already been closed") buffersPool sync.Pool ) @@ -658,6 +659,12 @@ func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage, func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage, callback func(MessageID, *ProducerMessage, error), flushImmediately bool) { + if p.getProducerState() != producerReady { + // Producer is closing + callback(nil, msg, errProducerClosed) + return + } + sr := &sendRequest{ ctx: ctx, msg: msg, diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 7c3dbd7..bbe8028 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -1097,3 +1097,33 @@ func TestProducerWithInterceptors(t *testing.T) { assert.Equal(t, 10, metric.sendn) assert.Equal(t, 10, metric.ackn) } + +func TestProducerSendAfterClose(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: serviceURL, + }) + assert.NoError(t, err) + defer client.Close() + + producer, err := client.CreateProducer(ProducerOptions{ + Topic: newTopicName(), + }) + + assert.NoError(t, err) + assert.NotNil(t, producer) + defer producer.Close() + + ID, err := producer.Send(context.Background(), &ProducerMessage{ + Payload: []byte("hello"), + }) + + assert.NoError(t, err) + assert.NotNil(t, ID) + + producer.Close() + ID, err = producer.Send(context.Background(), &ProducerMessage{ + Payload: []byte("hello"), + }) + assert.Nil(t, ID) + assert.Error(t, err) +}