This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new bf248fd [Issue 240] Add check for max message size (#263) bf248fd is described below commit bf248fd43e178f9b598ece50cbdebd6708175b94 Author: Keith Null <20233656+keithn...@users.noreply.github.com> AuthorDate: Thu May 28 09:47:25 2020 +0800 [Issue 240] Add check for max message size (#263) * Add check for max message size 1. When creating a connection, try to get maxMessageSize from handshake response command. If it's not set, then use the default maxMessageSize value defined in the client side. 2. When sending a message, check whether the size of payload exceeds maxMessageSize. If so, return error immediately without adding this meesage into sending queue. 3. To implement these, I made some tiny modifications in Connection interface and added a field in its implementation struct. * Add testing for max message size * Fix error log --- integration-tests/standalone.conf | 3 +++ pulsar/internal/batch_builder.go | 2 -- pulsar/internal/commands.go | 8 ++++++-- pulsar/internal/connection.go | 15 ++++++++++++++- pulsar/producer_partition.go | 15 ++++++++++++++- pulsar/producer_test.go | 27 +++++++++++++++++++++++++++ 6 files changed, 64 insertions(+), 6 deletions(-) diff --git a/integration-tests/standalone.conf b/integration-tests/standalone.conf index d81f493..3fdf53a 100644 --- a/integration-tests/standalone.conf +++ b/integration-tests/standalone.conf @@ -83,6 +83,9 @@ statusFilePath=/usr/local/apache/htdocs # Using a value of 0, is disabling unackeMessage limit check and consumer can receive messages without any restriction maxUnackedMessagesPerConsumer=50000 +# Set maxMessageSize to 1MB rather than the default value 5MB for testing +maxMessageSize=1048576 + ### --- Authentication --- ### # Enable TLS diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go index 80d8a00..3b54eba 100644 --- a/pulsar/internal/batch_builder.go +++ b/pulsar/internal/batch_builder.go @@ -29,8 +29,6 @@ import ( ) const ( - // MaxMessageSize limit message size for transfer - MaxMessageSize = 5 * 1024 * 1024 // MaxBatchSize will be the largest size for a batch sent from this particular producer. // This is used as a baseline to allocate a new buffer that can hold the entire batch // without needing costly re-allocations. diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go index d9f2a1f..1ee3517 100644 --- a/pulsar/internal/commands.go +++ b/pulsar/internal/commands.go @@ -30,8 +30,12 @@ import ( const ( // MaxFrameSize limit the maximum size that pulsar allows for messages to be sent. - MaxFrameSize = 5 * 1024 * 1024 - magicCrc32c uint16 = 0x0e01 + MaxFrameSize = 5 * 1024 * 1024 + // MessageFramePadding is for metadata and other frame headers + MessageFramePadding = 10 * 1024 + // MaxMessageSize limit message size for transfer + MaxMessageSize = MaxFrameSize - MessageFramePadding + magicCrc32c uint16 = 0x0e01 ) // ErrCorruptedMessage is the error returned by ReadMessageData when it has detected corrupted data. diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index d02deda..797b27a 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -71,6 +71,7 @@ type Connection interface { AddConsumeHandler(id uint64, handler ConsumerHandler) DeleteConsumeHandler(id uint64) ID() string + GetMaxMessageSize() int32 Close() } @@ -157,6 +158,8 @@ type connection struct { tlsOptions *TLSOptions auth auth.Provider + + maxMessageSize int32 } func newConnection(logicalAddr *url.URL, physicalAddr *url.URL, tlsOptions *TLSOptions, @@ -282,7 +285,13 @@ func (c *connection) doHandshake() bool { cmd.Type) return false } - + if cmd.Connected.MaxMessageSize != nil { + c.log.Debug("Got MaxMessageSize from handshake response:", *cmd.Connected.MaxMessageSize) + c.maxMessageSize = *cmd.Connected.MaxMessageSize + } else { + c.log.Debug("No MaxMessageSize from handshake response, use default: ", MaxMessageSize) + c.maxMessageSize = MaxMessageSize + } c.log.Info("Connection is ready") c.changeState(connectionReady) return true @@ -749,3 +758,7 @@ func (c *connection) consumerHandler(id uint64) (ConsumerHandler, bool) { func (c *connection) ID() string { return fmt.Sprintf("%s -> %s", c.cnx.LocalAddr(), c.cnx.RemoteAddr()) } + +func (c *connection) GetMaxMessageSize() int32 { + return c.maxMessageSize +} diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 63f7bf4..2eb3696 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -40,7 +40,10 @@ const ( producerClosed ) -var errFailAddBatch = errors.New("message send failed") +var ( + errFailAddBatch = errors.New("message send failed") + errMessageTooLarge = errors.New("message size exceeds MaxMessageSize") +) type partitionProducer struct { state int32 @@ -236,6 +239,16 @@ func (p *partitionProducer) internalSend(request *sendRequest) { msg := request.msg + // if msg is too large + if len(msg.Payload) > int(p.cnx.GetMaxMessageSize()) { + p.publishSemaphore.Release() + request.callback(nil, request.msg, errMessageTooLarge) + p.log.WithField("size", len(msg.Payload)). + WithField("properties", msg.Properties). + WithError(errMessageTooLarge).Error() + return + } + deliverAt := msg.DeliverAt if msg.DeliverAfter.Nanoseconds() > 0 { deliverAt = time.Now().Add(msg.DeliverAfter) diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 2910427..9f76873 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -790,3 +790,30 @@ func TestDelayAbsolute(t *testing.T) { assert.NotNil(t, msg) canc() } + +func TestMaxMessageSize(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: serviceURL, + }) + assert.NoError(t, err) + defer client.Close() + producer, err := client.CreateProducer(ProducerOptions{ + Topic: newTopicName(), + }) + assert.NoError(t, err) + assert.NotNil(t, producer) + defer producer.Close() + serverMaxMessageSize := 1024 * 1024 + for bias := -1; bias <= 1; bias++ { + payload := make([]byte, serverMaxMessageSize+bias) + ID, err := producer.Send(context.Background(), &ProducerMessage{ + Payload: payload, + }) + if bias <= 0 { + assert.NoError(t, err) + assert.NotNil(t, ID) + } else { + assert.Equal(t, errMessageTooLarge, err) + } + } +}