This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch branch-0.11.0 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
commit e1154399a9f6eed218f13814b5172278199c9436 Author: gunli <[email protected]> AuthorDate: Thu Jul 6 16:29:15 2023 +0800 [Fix][Producer] check if message is nil (#1047) * [Fix][Producer] check if message is nil * add a debug log --------- Co-authored-by: gunli <[email protected]> (cherry picked from commit 8d4513787a25423c988708dff985e9c994545df5) --- pulsar/producer_partition.go | 6 ++++++ pulsar/producer_test.go | 11 +++++++++++ 2 files changed, 17 insertions(+) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 9d04427a..2f771fde 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -1112,6 +1112,12 @@ func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage, func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage, callback func(MessageID, *ProducerMessage, error), flushImmediately bool) { + if msg == nil { + p.log.Error("Message is nil") + runCallback(callback, nil, msg, newError(InvalidMessage, "Message is nil")) + return + } + // Register transaction operation to transaction and the transaction coordinator. var newCallback func(MessageID, *ProducerMessage, error) var txn *transaction diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 7c3abdc7..fecae8ef 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -108,6 +108,9 @@ func TestSimpleProducer(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, ID) } + + _, err = producer.Send(context.Background(), nil) + assert.NotNil(t, err) } func TestProducerAsyncSend(t *testing.T) { @@ -152,6 +155,14 @@ func TestProducerAsyncSend(t *testing.T) { wg.Wait() assert.Equal(t, 0, errors.Size()) + + wg.Add(1) + producer.SendAsync(context.Background(), nil, func(id MessageID, m *ProducerMessage, e error) { + assert.NotNil(t, e) + assert.Nil(t, id) + wg.Done() + }) + wg.Wait() } func TestProducerCompression(t *testing.T) {
