This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new e45122c [Fix][Producer] Stop block request even if Value and Payload
are both set (#1052)
e45122c is described below
commit e45122c2defc5efd4efc493d0acef278a7ccfc01
Author: gunli <[email protected]>
AuthorDate: Thu Jul 13 17:15:37 2023 +0800
[Fix][Producer] Stop block request even if Value and Payload are both set
(#1052)
### Motivation
Currently, if `!p.options.DisableBlockIfQueueFull` and `msg.Value != nil &&
msg.Payload != nil`, request will be blocked forever 'cause `defer
request.stopBlock()` is set up after the verify logic.
```go
if msg.Value != nil && msg.Payload != nil {
p.log.Error("Can not set Value and Payload both")
runCallback(request.callback, nil, request.msg, errors.New("can not
set Value and Payload both"))
return
}
// The block chan must be closed when returned with exception
defer request.stopBlock()
```
Here is the PR to stop block request even if Value and Payload are both set
### Modifications
- pulsar/producer_partition.go
---------
Co-authored-by: gunli <[email protected]>
---
pulsar/producer_partition.go | 11 ++++++-----
pulsar/producer_test.go | 15 +++++++++++++++
2 files changed, 21 insertions(+), 5 deletions(-)
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index dd45ff2..48411b4 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -481,11 +481,6 @@ func (p *partitionProducer) internalSend(request
*sendRequest) {
var schemaPayload []byte
var err error
- if msg.Value != nil && msg.Payload != nil {
- p.log.Error("Can not set Value and Payload both")
- runCallback(request.callback, nil, request.msg, errors.New("can
not set Value and Payload both"))
- return
- }
// The block chan must be closed when returned with exception
defer request.stopBlock()
@@ -1117,6 +1112,12 @@ func (p *partitionProducer) internalSendAsync(ctx
context.Context, msg *Producer
return
}
+ if msg.Value != nil && msg.Payload != nil {
+ p.log.Error("Can not set Value and Payload both")
+ runCallback(callback, nil, msg, newError(InvalidMessage, "Can
not set Value and Payload both"))
+ return
+ }
+
// Register transaction operation to transaction and the transaction
coordinator.
var newCallback func(MessageID, *ProducerMessage, error)
var txn *transaction
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 11ff089..adbdc71 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -111,6 +111,12 @@ func TestSimpleProducer(t *testing.T) {
_, err = producer.Send(context.Background(), nil)
assert.NotNil(t, err)
+
+ _, err = producer.Send(context.Background(), &ProducerMessage{
+ Payload: []byte("hello"),
+ Value: []byte("hello"),
+ })
+ assert.NotNil(t, err)
}
func TestProducerAsyncSend(t *testing.T) {
@@ -163,6 +169,15 @@ func TestProducerAsyncSend(t *testing.T) {
wg.Done()
})
wg.Wait()
+
+ wg.Add(1)
+ producer.SendAsync(context.Background(), &ProducerMessage{Payload:
[]byte("hello"), Value: []byte("hello")},
+ func(id MessageID, m *ProducerMessage, e error) {
+ assert.NotNil(t, e)
+ assert.Nil(t, id)
+ wg.Done()
+ })
+ wg.Wait()
}
func TestProducerCompression(t *testing.T) {