This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new bf248fd  [Issue 240] Add check for max message size (#263)
bf248fd is described below

commit bf248fd43e178f9b598ece50cbdebd6708175b94
Author: Keith Null <20233656+keithn...@users.noreply.github.com>
AuthorDate: Thu May 28 09:47:25 2020 +0800

    [Issue 240] Add check for max message size (#263)
    
    * Add check for max message size
    
    1. When creating a connection, try to get maxMessageSize from handshake
    response command. If it's not set, then use the default maxMessageSize
    value defined in the client side.
    2. When sending a message, check whether the size of payload exceeds
    maxMessageSize. If so, return error immediately without adding this
    meesage into sending queue.
    3. To implement these, I made some tiny modifications in Connection
    interface and added a field in its implementation struct.
    
    * Add testing for max message size
    
    * Fix error log
---
 integration-tests/standalone.conf |  3 +++
 pulsar/internal/batch_builder.go  |  2 --
 pulsar/internal/commands.go       |  8 ++++++--
 pulsar/internal/connection.go     | 15 ++++++++++++++-
 pulsar/producer_partition.go      | 15 ++++++++++++++-
 pulsar/producer_test.go           | 27 +++++++++++++++++++++++++++
 6 files changed, 64 insertions(+), 6 deletions(-)

diff --git a/integration-tests/standalone.conf 
b/integration-tests/standalone.conf
index d81f493..3fdf53a 100644
--- a/integration-tests/standalone.conf
+++ b/integration-tests/standalone.conf
@@ -83,6 +83,9 @@ statusFilePath=/usr/local/apache/htdocs
 # Using a value of 0, is disabling unackeMessage limit check and consumer can 
receive messages without any restriction
 maxUnackedMessagesPerConsumer=50000
 
+# Set maxMessageSize to 1MB rather than the default value 5MB for testing
+maxMessageSize=1048576
+
 ### --- Authentication --- ###
 
 # Enable TLS
diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index 80d8a00..3b54eba 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -29,8 +29,6 @@ import (
 )
 
 const (
-       // MaxMessageSize limit message size for transfer
-       MaxMessageSize = 5 * 1024 * 1024
        // MaxBatchSize will be the largest size for a batch sent from this 
particular producer.
        // This is used as a baseline to allocate a new buffer that can hold 
the entire batch
        // without needing costly re-allocations.
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index d9f2a1f..1ee3517 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -30,8 +30,12 @@ import (
 
 const (
        // MaxFrameSize limit the maximum size that pulsar allows for messages 
to be sent.
-       MaxFrameSize        = 5 * 1024 * 1024
-       magicCrc32c  uint16 = 0x0e01
+       MaxFrameSize = 5 * 1024 * 1024
+       // MessageFramePadding is for metadata and other frame headers
+       MessageFramePadding = 10 * 1024
+       // MaxMessageSize limit message size for transfer
+       MaxMessageSize        = MaxFrameSize - MessageFramePadding
+       magicCrc32c    uint16 = 0x0e01
 )
 
 // ErrCorruptedMessage is the error returned by ReadMessageData when it has 
detected corrupted data.
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index d02deda..797b27a 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -71,6 +71,7 @@ type Connection interface {
        AddConsumeHandler(id uint64, handler ConsumerHandler)
        DeleteConsumeHandler(id uint64)
        ID() string
+       GetMaxMessageSize() int32
        Close()
 }
 
@@ -157,6 +158,8 @@ type connection struct {
 
        tlsOptions *TLSOptions
        auth       auth.Provider
+
+       maxMessageSize int32
 }
 
 func newConnection(logicalAddr *url.URL, physicalAddr *url.URL, tlsOptions 
*TLSOptions,
@@ -282,7 +285,13 @@ func (c *connection) doHandshake() bool {
                        cmd.Type)
                return false
        }
-
+       if cmd.Connected.MaxMessageSize != nil {
+               c.log.Debug("Got MaxMessageSize from handshake response:", 
*cmd.Connected.MaxMessageSize)
+               c.maxMessageSize = *cmd.Connected.MaxMessageSize
+       } else {
+               c.log.Debug("No MaxMessageSize from handshake response, use 
default: ", MaxMessageSize)
+               c.maxMessageSize = MaxMessageSize
+       }
        c.log.Info("Connection is ready")
        c.changeState(connectionReady)
        return true
@@ -749,3 +758,7 @@ func (c *connection) consumerHandler(id uint64) 
(ConsumerHandler, bool) {
 func (c *connection) ID() string {
        return fmt.Sprintf("%s -> %s", c.cnx.LocalAddr(), c.cnx.RemoteAddr())
 }
+
+func (c *connection) GetMaxMessageSize() int32 {
+       return c.maxMessageSize
+}
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 63f7bf4..2eb3696 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -40,7 +40,10 @@ const (
        producerClosed
 )
 
-var errFailAddBatch = errors.New("message send failed")
+var (
+       errFailAddBatch    = errors.New("message send failed")
+       errMessageTooLarge = errors.New("message size exceeds MaxMessageSize")
+)
 
 type partitionProducer struct {
        state  int32
@@ -236,6 +239,16 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
 
        msg := request.msg
 
+       // if msg is too large
+       if len(msg.Payload) > int(p.cnx.GetMaxMessageSize()) {
+               p.publishSemaphore.Release()
+               request.callback(nil, request.msg, errMessageTooLarge)
+               p.log.WithField("size", len(msg.Payload)).
+                       WithField("properties", msg.Properties).
+                       WithError(errMessageTooLarge).Error()
+               return
+       }
+
        deliverAt := msg.DeliverAt
        if msg.DeliverAfter.Nanoseconds() > 0 {
                deliverAt = time.Now().Add(msg.DeliverAfter)
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 2910427..9f76873 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -790,3 +790,30 @@ func TestDelayAbsolute(t *testing.T) {
        assert.NotNil(t, msg)
        canc()
 }
+
+func TestMaxMessageSize(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: serviceURL,
+       })
+       assert.NoError(t, err)
+       defer client.Close()
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic: newTopicName(),
+       })
+       assert.NoError(t, err)
+       assert.NotNil(t, producer)
+       defer producer.Close()
+       serverMaxMessageSize := 1024 * 1024
+       for bias := -1; bias <= 1; bias++ {
+               payload := make([]byte, serverMaxMessageSize+bias)
+               ID, err := producer.Send(context.Background(), &ProducerMessage{
+                       Payload: payload,
+               })
+               if bias <= 0 {
+                       assert.NoError(t, err)
+                       assert.NotNil(t, ID)
+               } else {
+                       assert.Equal(t, errMessageTooLarge, err)
+               }
+       }
+}

Reply via email to