This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new e45122c  [Fix][Producer] Stop block request even if Value and Payload 
are both set (#1052)
e45122c is described below

commit e45122c2defc5efd4efc493d0acef278a7ccfc01
Author: gunli <[email protected]>
AuthorDate: Thu Jul 13 17:15:37 2023 +0800

    [Fix][Producer] Stop block request even if Value and Payload are both set 
(#1052)
    
    ### Motivation
    Currently, if `!p.options.DisableBlockIfQueueFull` and `msg.Value != nil && 
msg.Payload != nil`, request will be blocked forever 'cause `defer 
request.stopBlock()` is set up after the verify logic.
    ```go
    if msg.Value != nil && msg.Payload != nil {
            p.log.Error("Can not set Value and Payload both")
            runCallback(request.callback, nil, request.msg, errors.New("can not 
set Value and Payload both"))
            return
    }
    
    // The block chan must be closed when returned with exception
    defer request.stopBlock()
    ```
    Here is the PR to stop block request even if Value and Payload are both set
    
    ### Modifications
    
    - pulsar/producer_partition.go
    
    
    ---------
    
    Co-authored-by: gunli <[email protected]>
---
 pulsar/producer_partition.go | 11 ++++++-----
 pulsar/producer_test.go      | 15 +++++++++++++++
 2 files changed, 21 insertions(+), 5 deletions(-)

diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index dd45ff2..48411b4 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -481,11 +481,6 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
 
        var schemaPayload []byte
        var err error
-       if msg.Value != nil && msg.Payload != nil {
-               p.log.Error("Can not set Value and Payload both")
-               runCallback(request.callback, nil, request.msg, errors.New("can 
not set Value and Payload both"))
-               return
-       }
 
        // The block chan must be closed when returned with exception
        defer request.stopBlock()
@@ -1117,6 +1112,12 @@ func (p *partitionProducer) internalSendAsync(ctx 
context.Context, msg *Producer
                return
        }
 
+       if msg.Value != nil && msg.Payload != nil {
+               p.log.Error("Can not set Value and Payload both")
+               runCallback(callback, nil, msg, newError(InvalidMessage, "Can 
not set Value and Payload both"))
+               return
+       }
+
        // Register transaction operation to transaction and the transaction 
coordinator.
        var newCallback func(MessageID, *ProducerMessage, error)
        var txn *transaction
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 11ff089..adbdc71 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -111,6 +111,12 @@ func TestSimpleProducer(t *testing.T) {
 
        _, err = producer.Send(context.Background(), nil)
        assert.NotNil(t, err)
+
+       _, err = producer.Send(context.Background(), &ProducerMessage{
+               Payload: []byte("hello"),
+               Value:   []byte("hello"),
+       })
+       assert.NotNil(t, err)
 }
 
 func TestProducerAsyncSend(t *testing.T) {
@@ -163,6 +169,15 @@ func TestProducerAsyncSend(t *testing.T) {
                wg.Done()
        })
        wg.Wait()
+
+       wg.Add(1)
+       producer.SendAsync(context.Background(), &ProducerMessage{Payload: 
[]byte("hello"), Value: []byte("hello")},
+               func(id MessageID, m *ProducerMessage, e error) {
+                       assert.NotNil(t, e)
+                       assert.Nil(t, id)
+                       wg.Done()
+               })
+       wg.Wait()
 }
 
 func TestProducerCompression(t *testing.T) {

Reply via email to