This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 352c463 Optimize batch index ACK performance (#988)
352c463 is described below
commit 352c463194916cab7772c0729c9cdac14404e8bd
Author: Yunze Xu <[email protected]>
AuthorDate: Fri Mar 10 09:57:38 2023 +0800
Optimize batch index ACK performance (#988)
### Motivation
Currently, when `EnableBatchIndexAck` is true, the ACK performance is
very poor. There are two main reasons:
1. Acknowledgment by list is not supported. It means that even N
MessageIDs are grouped, there are still N ACK requests to send.
2. The implementation of ACK grouping tracker is wrong. Give a batch
that has N messages, when batch index ACK is enabled, each MessageID
is cached. However, after all these N MessageIDs arrived, the current
implementation does not clear them.
### Modifications
- Add a `func(id []*pb.MessageIdData)` to the ACK grouping tracker. When
flushing individual ACKs, construct the slice and wrap the slice to
`CommandAck` directly.
- Refactor the implementation of the ACK grouping tracker:
- Do not save each MessageID instance, instead, save the ledger id and
the entry id as the key of `pendingAcks`.
- Release the mutex before calling ACK functions
- Add `TestTrackerPendingAcks` to verify the list of MessageIDs to ACK.
After this change, the ACK order cannot be guaranteed, sort the
acknowledged MessageIDs in the `ack_grouping_tracker_test.go`.
---
pulsar/ack_grouping_tracker.go | 281 ++++++++++++++++++------------------
pulsar/ack_grouping_tracker_test.go | 54 +++++--
pulsar/consumer_partition.go | 13 +-
pulsar/consumer_partition_test.go | 6 +-
pulsar/message.go | 8 -
5 files changed, 196 insertions(+), 166 deletions(-)
diff --git a/pulsar/ack_grouping_tracker.go b/pulsar/ack_grouping_tracker.go
index 22aba11..dd05186 100644
--- a/pulsar/ack_grouping_tracker.go
+++ b/pulsar/ack_grouping_tracker.go
@@ -19,8 +19,10 @@ package pulsar
import (
"sync"
+ "sync/atomic"
"time"
+ pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/bits-and-blooms/bitset"
)
@@ -40,7 +42,8 @@ type ackGroupingTracker interface {
func newAckGroupingTracker(options *AckGroupingOptions,
ackIndividual func(id MessageID),
- ackCumulative func(id MessageID)) ackGroupingTracker {
+ ackCumulative func(id MessageID),
+ ackList func(ids []*pb.MessageIdData)) ackGroupingTracker {
if options == nil {
options = &AckGroupingOptions{
MaxSize: 1000,
@@ -56,38 +59,28 @@ func newAckGroupingTracker(options *AckGroupingOptions,
}
t := &timedAckGroupingTracker{
- singleAcks: make([]MessageID, options.MaxSize),
- pendingAcks: make(map[int64]*bitset.BitSet),
- lastCumulativeAck: EarliestMessageID(),
- ackIndividual: ackIndividual,
+ maxNumAcks: int(options.MaxSize),
ackCumulative: ackCumulative,
- ackList: func(ids []MessageID) {
- // TODO: support ack a list of MessageIDs
- for _, id := range ids {
- ackIndividual(id)
- }
- },
- options: *options,
- tick: time.NewTicker(time.Hour),
- donCh: make(chan struct{}),
+ ackList: ackList,
+ pendingAcks: make(map[[2]uint64]*bitset.BitSet),
+ lastCumulativeAck: EarliestMessageID(),
}
if options.MaxTime > 0 {
- t.tick = time.NewTicker(options.MaxTime)
- } else {
- t.tick.Stop()
+ t.ticker = time.NewTicker(options.MaxTime)
+ t.exitCh = make(chan struct{})
+ go func() {
+ for {
+ select {
+ case <-t.exitCh:
+ return
+ case <-t.ticker.C:
+ t.flush()
+ }
+ }
+ }()
}
- go func() {
- for {
- select {
- case <-t.donCh:
- return
- case <-t.tick.C:
- t.flush()
- }
- }
- }()
return t
}
@@ -117,157 +110,157 @@ func (i *immediateAckGroupingTracker) flushAndClean() {
func (i *immediateAckGroupingTracker) close() {
}
-func (t *timedAckGroupingTracker) addAndCheckIfFull(id MessageID) bool {
- t.mutex.Lock()
- defer t.mutex.Unlock()
- t.singleAcks[t.index] = id
- t.index++
- key := messageIDHash(id)
- ackSet, found := t.pendingAcks[key]
- if !found {
- if messageIDIsBatch(id) {
- ackSet = bitset.New(uint(id.BatchSize()))
- for i := 0; i < int(id.BatchSize()); i++ {
- ackSet.Set(uint(i))
- }
- t.pendingAcks[key] = ackSet
- } else {
- t.pendingAcks[key] = nil
- }
- }
- if ackSet != nil {
- ackSet.Clear(uint(id.BatchIdx()))
- }
- return t.index == len(t.singleAcks)
-}
-
-func (t *timedAckGroupingTracker) tryUpdateLastCumulativeAck(id MessageID) {
- t.mutex.Lock()
- defer t.mutex.Unlock()
- if messageIDCompare(t.lastCumulativeAck, id) < 0 {
- t.lastCumulativeAck = id
- t.cumulativeAckRequired = true
- }
-}
-
-func (t *timedAckGroupingTracker) flushIndividualAcks() {
- t.mutex.Lock()
- defer t.mutex.Unlock()
- if t.index > 0 {
- t.ackList(t.singleAcks[0:t.index])
- for _, id := range t.singleAcks[0:t.index] {
- key := messageIDHash(id)
- ackSet, found := t.pendingAcks[key]
- if !found {
- continue
- }
- if ackSet == nil {
- delete(t.pendingAcks, key)
- } else {
- ackSet.Clear(uint(id.BatchIdx()))
- if ackSet.None() { // all messages have been
acknowledged
- delete(t.pendingAcks, key)
- }
- }
- delete(t.pendingAcks, messageIDHash(id))
- }
- t.index = 0
- }
-}
-
-func (t *timedAckGroupingTracker) flushCumulativeAck() {
- t.mutex.Lock()
- defer t.mutex.Unlock()
- if t.cumulativeAckRequired {
- t.ackCumulative(t.lastCumulativeAck)
- t.cumulativeAckRequired = false
- }
-}
-
-func (t *timedAckGroupingTracker) clean() {
- t.mutex.Lock()
- defer t.mutex.Unlock()
- maxSize := len(t.singleAcks)
- t.singleAcks = make([]MessageID, maxSize)
- t.index = 0
- t.pendingAcks = make(map[int64]*bitset.BitSet)
- t.lastCumulativeAck = EarliestMessageID()
- t.cumulativeAckRequired = false
-}
-
type timedAckGroupingTracker struct {
- singleAcks []MessageID
- index int
+ sync.RWMutex
- // Key is the hash code of the ledger id and the netry id,
+ maxNumAcks int
+ ackCumulative func(id MessageID)
+ ackList func(ids []*pb.MessageIdData)
+ ticker *time.Ticker
+
+ // Key is the pair of the ledger id and the entry id,
// Value is the bit set that represents which messages are acknowledged
if the entry stores a batch.
// The bit 1 represents the message has been acknowledged, i.e. the
bits "111" represents all messages
// in the batch whose batch size is 3 are not acknowledged.
// After the 1st message (i.e. batch index is 0) is acknowledged, the
bits will become "011".
// Value is nil if the entry represents a single message.
- pendingAcks map[int64]*bitset.BitSet
+ pendingAcks map[[2]uint64]*bitset.BitSet
lastCumulativeAck MessageID
- cumulativeAckRequired bool
-
- ackIndividual func(id MessageID)
- ackCumulative func(id MessageID)
- ackList func(ids []MessageID)
-
- options AckGroupingOptions
- donCh chan struct{}
- tick *time.Ticker
+ cumulativeAckRequired int32
- mutex sync.RWMutex
+ exitCh chan struct{}
}
func (t *timedAckGroupingTracker) add(id MessageID) {
- if t.addAndCheckIfFull(id) {
- t.flushIndividualAcks()
- if t.options.MaxTime > 0 {
- t.tick.Reset(t.options.MaxTime)
+ if acks := t.tryAddIndividual(id); acks != nil {
+ t.flushIndividual(acks)
+ }
+}
+
+func (t *timedAckGroupingTracker) tryAddIndividual(id MessageID)
map[[2]uint64]*bitset.BitSet {
+ t.Lock()
+ defer t.Unlock()
+ key := [2]uint64{uint64(id.LedgerID()), uint64(id.EntryID())}
+
+ batchIdx := id.BatchIdx()
+ batchSize := id.BatchSize()
+
+ if batchIdx >= 0 && batchSize > 0 {
+ bs, found := t.pendingAcks[key]
+ if !found {
+ if batchSize > 1 {
+ bs = bitset.New(uint(batchSize))
+ for i := uint(0); i < uint(batchSize); i++ {
+ bs.Set(i)
+ }
+ }
+ t.pendingAcks[key] = bs
}
+ if bs != nil {
+ bs.Clear(uint(batchIdx))
+ }
+ } else {
+ t.pendingAcks[key] = nil
}
+
+ if len(t.pendingAcks) >= t.maxNumAcks {
+ pendingAcks := t.pendingAcks
+ t.pendingAcks = make(map[[2]uint64]*bitset.BitSet)
+ return pendingAcks
+ }
+ return nil
}
func (t *timedAckGroupingTracker) addCumulative(id MessageID) {
- t.tryUpdateLastCumulativeAck(id)
- if t.options.MaxTime <= 0 {
- t.flushCumulativeAck()
+ if t.tryUpdateCumulative(id) && t.ticker == nil {
+ t.ackCumulative(id)
}
}
+func (t *timedAckGroupingTracker) tryUpdateCumulative(id MessageID) bool {
+ t.Lock()
+ defer t.Unlock()
+ if messageIDCompare(t.lastCumulativeAck, id) < 0 {
+ t.lastCumulativeAck = id
+ atomic.StoreInt32(&t.cumulativeAckRequired, 1)
+ return true
+ }
+ return false
+}
+
func (t *timedAckGroupingTracker) isDuplicate(id MessageID) bool {
- t.mutex.RLock()
+ t.RLock()
+ defer t.RUnlock()
if messageIDCompare(t.lastCumulativeAck, id) >= 0 {
- t.mutex.RUnlock()
return true
}
- ackSet, found := t.pendingAcks[messageIDHash(id)]
- if !found {
- t.mutex.RUnlock()
- return false
- }
- t.mutex.RUnlock()
- if ackSet == nil || !messageIDIsBatch(id) {
- // NOTE: should we panic when ackSet != nil and
messageIDIsBatch(id) is true?
- return true
+ key := [2]uint64{uint64(id.LedgerID()), uint64(id.EntryID())}
+ if bs, found := t.pendingAcks[key]; found {
+ if bs == nil {
+ return true
+ }
+ if !bs.Test(uint(id.BatchIdx())) {
+ return true
+ }
}
- // 0 represents the message has been acknowledged
- return !ackSet.Test(uint(id.BatchIdx()))
+ return false
}
func (t *timedAckGroupingTracker) flush() {
- t.flushIndividualAcks()
- t.flushCumulativeAck()
+ if acks := t.clearPendingAcks(); len(acks) > 0 {
+ t.flushIndividual(acks)
+ }
+ if atomic.CompareAndSwapInt32(&t.cumulativeAckRequired, 1, 0) {
+ t.RLock()
+ id := t.lastCumulativeAck
+ t.RUnlock()
+ t.ackCumulative(id)
+ }
}
func (t *timedAckGroupingTracker) flushAndClean() {
- t.flush()
- t.clean()
+ if acks := t.clearPendingAcks(); len(acks) > 0 {
+ t.flushIndividual(acks)
+ }
+ if atomic.CompareAndSwapInt32(&t.cumulativeAckRequired, 1, 0) {
+ t.Lock()
+ id := t.lastCumulativeAck
+ t.lastCumulativeAck = EarliestMessageID()
+ t.Unlock()
+ t.ackCumulative(id)
+ }
+}
+
+func (t *timedAckGroupingTracker) clearPendingAcks()
map[[2]uint64]*bitset.BitSet {
+ t.Lock()
+ defer t.Unlock()
+ pendingAcks := t.pendingAcks
+ t.pendingAcks = make(map[[2]uint64]*bitset.BitSet)
+ return pendingAcks
}
func (t *timedAckGroupingTracker) close() {
t.flushAndClean()
- close(t.donCh)
+ if t.exitCh != nil {
+ close(t.exitCh)
+ }
+}
+
+func (t *timedAckGroupingTracker) flushIndividual(pendingAcks
map[[2]uint64]*bitset.BitSet) {
+ msgIDs := make([]*pb.MessageIdData, 0, len(pendingAcks))
+ for k, v := range pendingAcks {
+ ledgerID := k[0]
+ entryID := k[1]
+ msgID := &pb.MessageIdData{LedgerId: &ledgerID, EntryId:
&entryID}
+ if v != nil && !v.None() {
+ bytes := v.Bytes()
+ msgID.AckSet = make([]int64, len(bytes))
+ for i := 0; i < len(bytes); i++ {
+ msgID.AckSet[i] = int64(bytes[i])
+ }
+ }
+ msgIDs = append(msgIDs, msgID)
+ }
+ t.ackList(msgIDs)
}
diff --git a/pulsar/ack_grouping_tracker_test.go
b/pulsar/ack_grouping_tracker_test.go
index 41d24d4..0a794f6 100644
--- a/pulsar/ack_grouping_tracker_test.go
+++ b/pulsar/ack_grouping_tracker_test.go
@@ -19,11 +19,13 @@ package pulsar
import (
"fmt"
+ "sort"
"sync"
"sync/atomic"
"testing"
"time"
+ pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/stretchr/testify/assert"
)
@@ -45,7 +47,8 @@ func TestNoCacheTracker(t *testing.T) {
ledgerID1 := int64(-1)
tracker := newAckGroupingTracker(&option,
func(id MessageID) { ledgerID0 =
id.LedgerID() },
- func(id MessageID) { ledgerID1 =
id.LedgerID() })
+ func(id MessageID) { ledgerID1 =
id.LedgerID() },
+ nil)
tracker.add(&messageID{ledgerID: 1})
assert.Equal(t, atomic.LoadInt64(&ledgerID0),
int64(1))
@@ -61,10 +64,12 @@ type mockAcker struct {
cumulativeLedgerID int64
}
-func (a *mockAcker) ack(id MessageID) {
+func (a *mockAcker) ack(ids []*pb.MessageIdData) {
defer a.Unlock()
a.Lock()
- a.ledgerIDs = append(a.ledgerIDs, id.LedgerID())
+ for _, id := range ids {
+ a.ledgerIDs = append(a.ledgerIDs, int64(*id.LedgerId))
+ }
}
func (a *mockAcker) ackCumulative(id MessageID) {
@@ -74,6 +79,8 @@ func (a *mockAcker) ackCumulative(id MessageID) {
func (a *mockAcker) getLedgerIDs() []int64 {
defer a.Unlock()
a.Lock()
+
+ sort.Slice(a.ledgerIDs, func(i, j int) bool { return a.ledgerIDs[i] <
a.ledgerIDs[j] })
return a.ledgerIDs
}
@@ -88,8 +95,8 @@ func (a *mockAcker) reset() {
func TestCachedTracker(t *testing.T) {
var acker mockAcker
- tracker := newAckGroupingTracker(&AckGroupingOptions{MaxSize: 3,
MaxTime: 0},
- func(id MessageID) { acker.ack(id) }, func(id MessageID) {
acker.ackCumulative(id) })
+ tracker := newAckGroupingTracker(&AckGroupingOptions{MaxSize: 3,
MaxTime: 0}, nil,
+ func(id MessageID) { acker.ackCumulative(id) }, func(ids
[]*pb.MessageIdData) { acker.ack(ids) })
tracker.add(&messageID{ledgerID: 1})
tracker.add(&messageID{ledgerID: 2})
@@ -126,7 +133,8 @@ func TestCachedTracker(t *testing.T) {
func TestTimedTrackerIndividualAck(t *testing.T) {
var acker mockAcker
// MaxSize: 1000, MaxTime: 100ms
- tracker := newAckGroupingTracker(nil, func(id MessageID) {
acker.ack(id) }, nil)
+ tracker := newAckGroupingTracker(nil, nil,
+ func(id MessageID) { acker.ackCumulative(id) }, func(ids
[]*pb.MessageIdData) { acker.ack(ids) })
expected := make([]int64, 0)
for i := 0; i < 999; i++ {
@@ -161,7 +169,7 @@ func TestTimedTrackerIndividualAck(t *testing.T) {
func TestTimedTrackerCumulativeAck(t *testing.T) {
var acker mockAcker
// MaxTime is 100ms
- tracker := newAckGroupingTracker(nil, nil, func(id MessageID) {
acker.ackCumulative(id) })
+ tracker := newAckGroupingTracker(nil, nil, func(id MessageID) {
acker.ackCumulative(id) }, nil)
// case 1: flush because of the timeout
tracker.addCumulative(&messageID{ledgerID: 1})
@@ -182,7 +190,8 @@ func TestTimedTrackerCumulativeAck(t *testing.T) {
}
func TestTimedTrackerIsDuplicate(t *testing.T) {
- tracker := newAckGroupingTracker(nil, func(id MessageID) {}, func(id
MessageID) {})
+ tracker := newAckGroupingTracker(nil, func(id MessageID) {}, func(id
MessageID) {},
+ func(id []*pb.MessageIdData) {})
tracker.add(&messageID{batchIdx: 0, batchSize: 3})
tracker.add(&messageID{batchIdx: 2, batchSize: 3})
@@ -198,8 +207,8 @@ func TestTimedTrackerIsDuplicate(t *testing.T) {
func TestDuplicateAfterClose(t *testing.T) {
var acker mockAcker
- tracker := newAckGroupingTracker(&AckGroupingOptions{MaxSize: 3,
MaxTime: 0},
- func(id MessageID) { acker.ack(id) }, func(id MessageID) {
acker.ackCumulative(id) })
+ tracker := newAckGroupingTracker(&AckGroupingOptions{MaxSize: 3,
MaxTime: 0}, nil,
+ func(id MessageID) { acker.ackCumulative(id) }, func(ids
[]*pb.MessageIdData) { acker.ack(ids) })
tracker.add(&messageID{ledgerID: 1})
assert.True(t, tracker.isDuplicate(&messageID{ledgerID: 1}))
@@ -207,3 +216,28 @@ func TestDuplicateAfterClose(t *testing.T) {
tracker.close()
assert.False(t, tracker.isDuplicate(&messageID{ledgerID: 1}))
}
+
+func TestTrackerPendingAcks(t *testing.T) {
+ m := make(map[uint64][]int64)
+ tracker := newAckGroupingTracker(&AckGroupingOptions{MaxSize: 3,
MaxTime: 0}, nil, nil,
+ func(ids []*pb.MessageIdData) {
+ for _, id := range ids {
+ m[*id.LedgerId] = id.AckSet
+ }
+ })
+ tracker.add(&messageID{ledgerID: 0, batchIdx: 0, batchSize: 30})
+ for i := 0; i < 10; i++ {
+ tracker.add(&messageID{ledgerID: 1, batchIdx: int32(i),
batchSize: 10})
+ }
+ assert.Equal(t, 0, len(m)) // the number of entries is 2, so it's not
flushed
+ tracker.flush()
+ assert.Equal(t, 2, len(m))
+
+ ackSet, found := m[0]
+ assert.True(t, found)
+ assert.Greater(t, len(ackSet), 0)
+
+ ackSet, found = m[1]
+ assert.True(t, found)
+ assert.Equal(t, 0, len(ackSet)) // all messages in the batch are
acknowledged
+}
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index a3dac19..18100a9 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -310,7 +310,8 @@ func newPartitionConsumer(parent Consumer, client *client,
options *partitionCon
pc.unAckChunksTracker = newUnAckChunksTracker(pc)
pc.ackGroupingTracker =
newAckGroupingTracker(options.ackGroupingOptions,
func(id MessageID) { pc.sendIndividualAck(id) },
- func(id MessageID) { pc.sendCumulativeAck(id) })
+ func(id MessageID) { pc.sendCumulativeAck(id) },
+ func(ids []*pb.MessageIdData) { pc.eventsCh <- ids })
pc.setConsumerState(consumerInit)
pc.log = client.log.SubLogger(log.Fields{
"name": pc.name,
@@ -837,6 +838,14 @@ func (pc *partitionConsumer) internalAck(req *ackRequest) {
}
}
+func (pc *partitionConsumer) internalAckList(msgIDs []*pb.MessageIdData) {
+ pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(),
pb.BaseCommand_ACK, &pb.CommandAck{
+ AckType: pb.CommandAck_Individual.Enum(),
+ ConsumerId: proto.Uint64(pc.consumerID),
+ MessageId: msgIDs,
+ })
+}
+
func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage,
headersAndPayload internal.Buffer) error {
pbMsgID := response.GetMessageId()
@@ -1364,6 +1373,8 @@ func (pc *partitionConsumer) runEventsLoop() {
switch v := i.(type) {
case *ackRequest:
pc.internalAck(v)
+ case []*pb.MessageIdData:
+ pc.internalAckList(v)
case *redeliveryRequest:
pc.internalRedeliver(v)
case *unsubscribeRequest:
diff --git a/pulsar/consumer_partition_test.go
b/pulsar/consumer_partition_test.go
index 16c4399..21bc0e3 100644
--- a/pulsar/consumer_partition_test.go
+++ b/pulsar/consumer_partition_test.go
@@ -38,7 +38,7 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) {
decryptor: crypto.NewNoopDecryptor(),
}
pc.ackGroupingTracker =
newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
- func(id MessageID) { pc.sendIndividualAck(id) }, nil)
+ func(id MessageID) { pc.sendIndividualAck(id) }, nil, nil)
headersAndPayload := internal.NewBufferWrapper(rawCompatSingleMessage)
if err := pc.MessageReceived(nil, headersAndPayload); err != nil {
@@ -76,7 +76,7 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
decryptor: crypto.NewNoopDecryptor(),
}
pc.ackGroupingTracker =
newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
- func(id MessageID) { pc.sendIndividualAck(id) }, nil)
+ func(id MessageID) { pc.sendIndividualAck(id) }, nil, nil)
headersAndPayload := internal.NewBufferWrapper(rawBatchMessage1)
if err := pc.MessageReceived(nil, headersAndPayload); err != nil {
@@ -111,7 +111,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
decryptor: crypto.NewNoopDecryptor(),
}
pc.ackGroupingTracker =
newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
- func(id MessageID) { pc.sendIndividualAck(id) }, nil)
+ func(id MessageID) { pc.sendIndividualAck(id) }, nil, nil)
headersAndPayload := internal.NewBufferWrapper(rawBatchMessage10)
if err := pc.MessageReceived(nil, headersAndPayload); err != nil {
diff --git a/pulsar/message.go b/pulsar/message.go
index c44957d..98190e9 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -212,11 +212,3 @@ func messageIDCompare(lhs MessageID, rhs MessageID) int {
}
return 0
}
-
-func messageIDHash(id MessageID) int64 {
- return id.LedgerID() + 31*id.EntryID()
-}
-
-func messageIDIsBatch(id MessageID) bool {
- return id.BatchIdx() >= 0 && id.BatchSize() > 0
-}