This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new 6fcaf26 Reduce the size of the MessageID structs by one word on 64-bit arch (#316) 6fcaf26 is described below commit 6fcaf266480c5e7fe258a67d028cb5c69b835b39 Author: dferstay <dfers...@users.noreply.github.com> AuthorDate: Wed Jul 8 14:17:41 2020 -0700 Reduce the size of the MessageID structs by one word on 64-bit arch (#316) An int occupies one word of memory; on 64-bit machines, this is 8 bytes. As a result, the messageID struct is 56-bytes: * ledgerID - 8 bytes * entryID - 8 bytes * batchIdx - 8 bytes * partitionIdx - 8 bytes * tracker - 8 bytes * consumer - 16 bytes (1 word for type, 1 word for data address) This commit changes the type of batchIdx and partitionIdx fields to int32 which saves one word of memory and maintains alignment of struct fields. Reducing the size of the MessageID structs is important as they are currently allocated on the heap for every message produced or consumed. Signed-off-by: Daniel Ferstay <dfers...@splunk.com> Co-authored-by: Daniel Ferstay <dfers...@splunk.com> --- pulsar/consumer_impl.go | 2 +- pulsar/consumer_partition.go | 8 ++++---- pulsar/impl_message.go | 18 +++++++++--------- pulsar/impl_message_test.go | 4 ++-- pulsar/producer_partition.go | 6 +++--- 5 files changed, 19 insertions(+), 19 deletions(-) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 0a3220a..20dc1af 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -459,7 +459,7 @@ func (c *consumer) messageID(msgID MessageID) (*messageID, bool) { return nil, false } - partition := mid.partitionIdx + partition := int(mid.partitionIdx) // did we receive a valid partition index? if partition < 0 || partition >= len(c.consumers) { c.log.Warnf("invalid partition index %d expected a partition between [0-%d]", diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index b4498f6..cffc2b4 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -83,7 +83,7 @@ type partitionConsumer struct { topic string name string consumerID uint64 - partitionIdx int + partitionIdx int32 // shared channel messageCh chan ConsumerMessage @@ -120,7 +120,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon topic: options.topic, name: options.consumerName, consumerID: client.rpcClient.NewConsumerID(), - partitionIdx: options.partitionIdx, + partitionIdx: int32(options.partitionIdx), eventsCh: make(chan interface{}, 3), queueSize: int32(options.receiverQueueSize), queueCh: make(chan []*message, options.receiverQueueSize), @@ -400,7 +400,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header msgID := newTrackingMessageID( int64(pbMsgID.GetLedgerId()), int64(pbMsgID.GetEntryId()), - i, + int32(i), pc.partitionIdx, ackTracker) @@ -923,7 +923,7 @@ func convertToMessageID(id *pb.MessageIdData) *messageID { } if id.BatchIndex != nil { - msgID.batchIdx = int(*id.BatchIndex) + msgID.batchIdx = *id.BatchIndex } return msgID diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go index 6aa4223..d9574cd 100644 --- a/pulsar/impl_message.go +++ b/pulsar/impl_message.go @@ -31,8 +31,8 @@ import ( type messageID struct { ledgerID int64 entryID int64 - batchIdx int - partitionIdx int + batchIdx int32 + partitionIdx int32 tracker *ackTracker consumer acker @@ -56,7 +56,7 @@ func (id *messageID) Nack() { func (id *messageID) ack() bool { if id.tracker != nil && id.batchIdx > -1 { - return id.tracker.ack(id.batchIdx) + return id.tracker.ack(int(id.batchIdx)) } return true } @@ -87,8 +87,8 @@ func (id *messageID) Serialize() []byte { msgID := &pb.MessageIdData{ LedgerId: proto.Uint64(uint64(id.ledgerID)), EntryId: proto.Uint64(uint64(id.entryID)), - BatchIndex: proto.Int(id.batchIdx), - Partition: proto.Int(id.partitionIdx), + BatchIndex: proto.Int(int(id.batchIdx)), + Partition: proto.Int(int(id.partitionIdx)), } data, _ := proto.Marshal(msgID) return data @@ -103,13 +103,13 @@ func deserializeMessageID(data []byte) (MessageID, error) { id := newMessageID( int64(msgID.GetLedgerId()), int64(msgID.GetEntryId()), - int(msgID.GetBatchIndex()), - int(msgID.GetPartition()), + msgID.GetBatchIndex(), + msgID.GetPartition(), ) return id, nil } -func newMessageID(ledgerID int64, entryID int64, batchIdx int, partitionIdx int) MessageID { +func newMessageID(ledgerID int64, entryID int64, batchIdx int32, partitionIdx int32) MessageID { return &messageID{ ledgerID: ledgerID, entryID: entryID, @@ -118,7 +118,7 @@ func newMessageID(ledgerID int64, entryID int64, batchIdx int, partitionIdx int) } } -func newTrackingMessageID(ledgerID int64, entryID int64, batchIdx int, partitionIdx int, +func newTrackingMessageID(ledgerID int64, entryID int64, batchIdx int32, partitionIdx int32, tracker *ackTracker) *messageID { return &messageID{ ledgerID: ledgerID, diff --git a/pulsar/impl_message_test.go b/pulsar/impl_message_test.go index 54c3bee..164cff6 100644 --- a/pulsar/impl_message_test.go +++ b/pulsar/impl_message_test.go @@ -33,8 +33,8 @@ func TestMessageId(t *testing.T) { assert.Equal(t, int64(1), id2.(*messageID).ledgerID) assert.Equal(t, int64(2), id2.(*messageID).entryID) - assert.Equal(t, 3, id2.(*messageID).batchIdx) - assert.Equal(t, 4, id2.(*messageID).partitionIdx) + assert.Equal(t, int32(3), id2.(*messageID).batchIdx) + assert.Equal(t, int32(4), id2.(*messageID).partitionIdx) id, err = DeserializeMessageID(nil) assert.Error(t, err) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index e4765fc..239fcd6 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -69,7 +69,7 @@ type partitionProducer struct { pendingQueue internal.BlockingQueue lastSequenceID int64 - partitionIdx int + partitionIdx int32 } func newPartitionProducer(client *client, topic string, options *ProducerOptions, partitionIdx int) ( @@ -105,7 +105,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions publishSemaphore: internal.NewSemaphore(int32(maxPendingMessages)), pendingQueue: internal.NewBlockingQueue(maxPendingMessages), lastSequenceID: -1, - partitionIdx: partitionIdx, + partitionIdx: int32(partitionIdx), } if options.Name != "" { @@ -442,7 +442,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) msgID := newMessageID( int64(response.MessageId.GetLedgerId()), int64(response.MessageId.GetEntryId()), - idx, + int32(idx), p.partitionIdx, ) sr.callback(msgID, sr.msg, nil)