This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch pool in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 974fcf32e853abbb03ae9ec58a750a412d00ad4f Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Tue Aug 6 14:40:52 2024 +0800 Add the tracked pool to fix leak issues Signed-off-by: Gao Hongtao <hanahm...@gmail.com> --- .github/workflows/e2e.storage.yml | 2 +- CHANGES.md | 1 + banyand/internal/storage/index_test.go | 10 +- banyand/measure/block.go | 14 +- banyand/measure/block_metadata.go | 11 +- banyand/measure/block_reader.go | 10 +- banyand/measure/block_reader_test.go | 15 +- banyand/measure/block_writer.go | 6 +- banyand/measure/column.go | 2 +- banyand/measure/column_metadata.go | 6 +- banyand/measure/introducer.go | 15 +- banyand/measure/part.go | 30 +- banyand/measure/part_iter.go | 10 +- banyand/measure/query.go | 5 +- banyand/measure/tstable.go | 16 + banyand/measure/tstable_test.go | 62 ++++ banyand/observability/meter_prom.go | 12 +- banyand/observability/service.go | 11 +- banyand/stream/block.go | 14 +- banyand/stream/block_metadata.go | 11 +- banyand/stream/block_reader.go | 10 +- banyand/stream/block_writer.go | 6 +- banyand/stream/introducer.go | 15 +- banyand/stream/part.go | 6 +- banyand/stream/part_iter.go | 10 +- banyand/stream/query.go | 7 +- banyand/stream/tag.go | 2 +- banyand/stream/tag_metadata.go | 6 +- banyand/stream/tstable.go | 16 + pkg/bytes/buffer.go | 12 +- pkg/encoding/bytes.go | 2 +- pkg/encoding/encoder.go | 292 --------------- pkg/encoding/encoder_test.go | 392 --------------------- pkg/encoding/int.go | 11 +- pkg/fs/local_file_system.go | 10 +- pkg/index/inverted/inverted.go | 6 +- pkg/pb/v1/series.go | 21 -- pkg/pool/pool.go | 71 ++++ pkg/test/gmatcher/gmatcher.go | 57 +++ test/cases/measure/data/data.go | 2 - .../distributed/query/query_suite_test.go | 3 + test/integration/etcd/client_test.go | 3 + test/integration/load/load_suite_test.go | 3 + .../standalone/cold_query/query_suite_test.go | 3 + test/integration/standalone/other/measure_test.go | 3 + test/integration/standalone/other/property_test.go | 3 + .../standalone/query/query_suite_test.go | 3 + .../query_ondisk/query_ondisk_suite_test.go | 3 + 48 files changed, 403 insertions(+), 838 deletions(-) diff --git a/.github/workflows/e2e.storage.yml b/.github/workflows/e2e.storage.yml index 69711ba8..a6c4ba1e 100644 --- a/.github/workflows/e2e.storage.yml +++ b/.github/workflows/e2e.storage.yml @@ -93,7 +93,7 @@ jobs: make docker.build || make docker.build docker image ls - name: ${{ matrix.test.name }} - uses: apache/skywalking-infra-e2e@1485ae03f0ad90496ad7626a5ae4a6a73a1f6296 + uses: apache/skywalking-infra-e2e@cf589b4a0b9f8e6f436f78e9cfd94a1ee5494180 with: e2e-file: $GITHUB_WORKSPACE/${{ matrix.test.config }} diff --git a/CHANGES.md b/CHANGES.md index 8355e291..55dbcfe9 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -35,6 +35,7 @@ Release Notes. - Fix the bug that segment's reference count is increased twice when the controller try to create an existing segment. - Fix a bug where a distributed query would return an empty result if the "limit" was set much lower than the "offset". - Fix duplicated measure data in a single part. +- Fix several "sync.Pool" leak issues by adding a tracker to the pool. ### Documentation diff --git a/banyand/internal/storage/index_test.go b/banyand/internal/storage/index_test.go index 0e61b623..d73886a3 100644 --- a/banyand/internal/storage/index_test.go +++ b/banyand/internal/storage/index_test.go @@ -33,8 +33,6 @@ import ( "github.com/apache/skywalking-banyandb/pkg/test/flags" ) -var testSeriesPool pbv1.SeriesPool - func TestSeriesIndex_Primary(t *testing.T) { ctx := context.Background() path, fn := setUp(require.New(t)) @@ -46,7 +44,7 @@ func TestSeriesIndex_Primary(t *testing.T) { }() var docs index.Documents for i := 0; i < 100; i++ { - series := testSeriesPool.Generate() + var series pbv1.Series series.Subject = "service_instance_latency" series.EntityValues = []*modelv1.TagValue{ {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: fmt.Sprintf("svc_%d", i)}}}, @@ -64,7 +62,6 @@ func TestSeriesIndex_Primary(t *testing.T) { } copy(doc.EntityValues, series.Buffer) docs = append(docs, doc) - testSeriesPool.Release(series) } require.NoError(t, si.Write(docs)) // Restart the index @@ -155,11 +152,10 @@ func TestSeriesIndex_Primary(t *testing.T) { t.Run(tt.name, func(t *testing.T) { var seriesQueries []*pbv1.Series for i := range tt.entityValues { - seriesQuery := testSeriesPool.Generate() - defer testSeriesPool.Release(seriesQuery) + var seriesQuery pbv1.Series seriesQuery.Subject = tt.subject seriesQuery.EntityValues = tt.entityValues[i] - seriesQueries = append(seriesQueries, seriesQuery) + seriesQueries = append(seriesQueries, &seriesQuery) } sl, _, err := si.searchPrimary(ctx, seriesQueries, nil) require.NoError(t, err) diff --git a/banyand/measure/block.go b/banyand/measure/block.go index 001940cc..ae5006e6 100644 --- a/banyand/measure/block.go +++ b/banyand/measure/block.go @@ -20,7 +20,6 @@ package measure import ( "slices" "sort" - "sync" "github.com/apache/skywalking-banyandb/api/common" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" @@ -29,6 +28,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/pool" "github.com/apache/skywalking-banyandb/pkg/query/model" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) @@ -403,7 +403,7 @@ func generateBlock() *block { if v == nil { return &block{} } - return v.(*block) + return v } func releaseBlock(b *block) { @@ -411,7 +411,7 @@ func releaseBlock(b *block) { blockPool.Put(b) } -var blockPool sync.Pool +var blockPool = pool.Register[*block]("measure-block") type blockCursor struct { p *part @@ -705,14 +705,14 @@ func (bc *blockCursor) loadData(tmpBlock *block) bool { return true } -var blockCursorPool sync.Pool +var blockCursorPool = pool.Register[*blockCursor]("measure-blockCursor") func generateBlockCursor() *blockCursor { v := blockCursorPool.Get() if v == nil { return &blockCursor{} } - return v.(*blockCursor) + return v } func releaseBlockCursor(bc *blockCursor) { @@ -832,7 +832,7 @@ func generateBlockPointer() *blockPointer { if v == nil { return &blockPointer{} } - return v.(*blockPointer) + return v } func releaseBlockPointer(bi *blockPointer) { @@ -840,4 +840,4 @@ func releaseBlockPointer(bi *blockPointer) { blockPointerPool.Put(bi) } -var blockPointerPool sync.Pool +var blockPointerPool = pool.Register[*blockPointer]("measure-blockPointer") diff --git a/banyand/measure/block_metadata.go b/banyand/measure/block_metadata.go index 41093282..ff23c4af 100644 --- a/banyand/measure/block_metadata.go +++ b/banyand/measure/block_metadata.go @@ -21,11 +21,11 @@ import ( "errors" "fmt" "sort" - "sync" "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/encoding" + "github.com/apache/skywalking-banyandb/pkg/pool" "github.com/apache/skywalking-banyandb/pkg/query/model" ) @@ -170,7 +170,6 @@ func (bm *blockMetadata) unmarshal(src []byte) ([]byte, error) { if err != nil { return nil, fmt.Errorf("cannot unmarshal tagFamily name: %w", err) } - // TODO: cache dataBlock tf := &dataBlock{} src, err = tf.unmarshal(src) if err != nil { @@ -198,7 +197,7 @@ func generateBlockMetadata() *blockMetadata { if v == nil { return &blockMetadata{} } - return v.(*blockMetadata) + return v } func releaseBlockMetadata(bm *blockMetadata) { @@ -206,7 +205,7 @@ func releaseBlockMetadata(bm *blockMetadata) { blockMetadataPool.Put(bm) } -var blockMetadataPool sync.Pool +var blockMetadataPool = pool.Register[*blockMetadata]("measure-blockMetadata") type blockMetadataArray struct { arr []blockMetadata @@ -219,14 +218,14 @@ func (bma *blockMetadataArray) reset() { bma.arr = bma.arr[:0] } -var blockMetadataArrayPool sync.Pool +var blockMetadataArrayPool = pool.Register[*blockMetadataArray]("measure-blockMetadataArray") func generateBlockMetadataArray() *blockMetadataArray { v := blockMetadataArrayPool.Get() if v == nil { return &blockMetadataArray{} } - return v.(*blockMetadataArray) + return v } func releaseBlockMetadataArray(bma *blockMetadataArray) { diff --git a/banyand/measure/block_reader.go b/banyand/measure/block_reader.go index eb221eef..238b6833 100644 --- a/banyand/measure/block_reader.go +++ b/banyand/measure/block_reader.go @@ -22,11 +22,11 @@ import ( "errors" "fmt" "io" - "sync" "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/pool" ) type seqReader struct { @@ -70,7 +70,7 @@ func (sr *seqReader) mustReadFull(data []byte) { func generateSeqReader() *seqReader { if v := seqReaderPool.Get(); v != nil { - return v.(*seqReader) + return v } return &seqReader{} } @@ -80,7 +80,7 @@ func releaseSeqReader(sr *seqReader) { seqReaderPool.Put(sr) } -var seqReaderPool sync.Pool +var seqReaderPool = pool.Register[*seqReader]("measure-seqReader") type seqReaders struct { tagFamilyMetadata map[string]*seqReader @@ -219,11 +219,11 @@ func (br *blockReader) error() error { return br.err } -var blockReaderPool sync.Pool +var blockReaderPool = pool.Register[*blockReader]("measure-blockReader") func generateBlockReader() *blockReader { if v := blockReaderPool.Get(); v != nil { - return v.(*blockReader) + return v } return &blockReader{} } diff --git a/banyand/measure/block_reader_test.go b/banyand/measure/block_reader_test.go index 63f3184e..cd576b78 100644 --- a/banyand/measure/block_reader_test.go +++ b/banyand/measure/block_reader_test.go @@ -62,6 +62,13 @@ func Test_blockReader_nextBlock(t *testing.T) { {seriesID: 3, count: 1, uncompressedSizeBytes: 24}, }, }, + { + name: "Test with a single part with same ts", + dpsList: []*dataPoints{duplicatedDps}, + want: []blockMetadata{ + {seriesID: 1, count: 1, uncompressedSizeBytes: 24}, + }, + }, { name: "Test with multiple parts with same ts", dpsList: []*dataPoints{dpsTS1, dpsTS1}, @@ -77,7 +84,7 @@ func Test_blockReader_nextBlock(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - verify := func(pp []*part) { + verify := func(t *testing.T, pp []*part) { var pii []*partMergeIter for _, p := range pp { pmi := &partMergeIter{} @@ -116,7 +123,7 @@ func Test_blockReader_nextBlock(t *testing.T) { } } - t.Run("memory parts", func(_ *testing.T) { + t.Run("memory parts", func(t *testing.T) { var mpp []*memPart defer func() { for _, mp := range mpp { @@ -130,7 +137,7 @@ func Test_blockReader_nextBlock(t *testing.T) { mp.mustInitFromDataPoints(dps) pp = append(pp, openMemPart(mp)) } - verify(pp) + verify(t, pp) }) t.Run("file parts", func(t *testing.T) { @@ -158,7 +165,7 @@ func Test_blockReader_nextBlock(t *testing.T) { fpp = append(fpp, filePW) pp = append(pp, filePW.p) } - verify(pp) + verify(t, pp) }) }) } diff --git a/banyand/measure/block_writer.go b/banyand/measure/block_writer.go index fd065f1a..4a07291c 100644 --- a/banyand/measure/block_writer.go +++ b/banyand/measure/block_writer.go @@ -19,12 +19,12 @@ package measure import ( "path/filepath" - "sync" "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/pkg/compress/zstd" "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/pool" ) type writer struct { @@ -285,7 +285,7 @@ func generateBlockWriter() *blockWriter { }, } } - return v.(*blockWriter) + return v } func releaseBlockWriter(bsw *blockWriter) { @@ -293,4 +293,4 @@ func releaseBlockWriter(bsw *blockWriter) { blockWriterPool.Put(bsw) } -var blockWriterPool sync.Pool +var blockWriterPool = pool.Register[*blockWriter]("measure-blockWriter") diff --git a/banyand/measure/column.go b/banyand/measure/column.go index eafa0134..35f34ab8 100644 --- a/banyand/measure/column.go +++ b/banyand/measure/column.go @@ -112,7 +112,7 @@ func (c *column) mustSeqReadValues(decoder *encoding.BytesBlockDecoder, reader * } } -var bigValuePool bytes.BufferPool +var bigValuePool = bytes.NewBufferPool("measure-big-value") type columnFamily struct { name string diff --git a/banyand/measure/column_metadata.go b/banyand/measure/column_metadata.go index 3f1eb957..3f7102af 100644 --- a/banyand/measure/column_metadata.go +++ b/banyand/measure/column_metadata.go @@ -36,11 +36,11 @@ package measure import ( "fmt" - "sync" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/encoding" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/pool" ) type columnMetadata struct { @@ -148,7 +148,7 @@ func generateColumnFamilyMetadata() *columnFamilyMetadata { if v == nil { return &columnFamilyMetadata{} } - return v.(*columnFamilyMetadata) + return v } func releaseColumnFamilyMetadata(cfm *columnFamilyMetadata) { @@ -156,4 +156,4 @@ func releaseColumnFamilyMetadata(cfm *columnFamilyMetadata) { columnFamilyMetadataPool.Put(cfm) } -var columnFamilyMetadataPool sync.Pool +var columnFamilyMetadataPool = pool.Register[*columnFamilyMetadata]("measure-columnFamilyMetadata") diff --git a/banyand/measure/introducer.go b/banyand/measure/introducer.go index 7fce11d1..0bfb2e23 100644 --- a/banyand/measure/introducer.go +++ b/banyand/measure/introducer.go @@ -18,8 +18,7 @@ package measure import ( - "sync" - + "github.com/apache/skywalking-banyandb/pkg/pool" "github.com/apache/skywalking-banyandb/pkg/watcher" ) @@ -33,14 +32,14 @@ func (i *introduction) reset() { i.applied = nil } -var introductionPool = sync.Pool{} +var introductionPool = pool.Register[*introduction]("measure-introduction") func generateIntroduction() *introduction { v := introductionPool.Get() if v == nil { return &introduction{} } - i := v.(*introduction) + i := v i.reset() return i } @@ -61,7 +60,7 @@ func (i *flusherIntroduction) reset() { i.applied = nil } -var flusherIntroductionPool = sync.Pool{} +var flusherIntroductionPool = pool.Register[*flusherIntroduction]("measure-flusher-introduction") func generateFlusherIntroduction() *flusherIntroduction { v := flusherIntroductionPool.Get() @@ -70,7 +69,7 @@ func generateFlusherIntroduction() *flusherIntroduction { flushed: make(map[uint64]*partWrapper), } } - i := v.(*flusherIntroduction) + i := v i.reset() return i } @@ -95,14 +94,14 @@ func (i *mergerIntroduction) reset() { i.creator = 0 } -var mergerIntroductionPool = sync.Pool{} +var mergerIntroductionPool = pool.Register[*mergerIntroduction]("measure-merger-introduction") func generateMergerIntroduction() *mergerIntroduction { v := mergerIntroductionPool.Get() if v == nil { return &mergerIntroduction{} } - i := v.(*mergerIntroduction) + i := v i.reset() return i } diff --git a/banyand/measure/part.go b/banyand/measure/part.go index dd4c11ae..41baf29b 100644 --- a/banyand/measure/part.go +++ b/banyand/measure/part.go @@ -22,7 +22,6 @@ import ( "path" "path/filepath" "sort" - "sync" "sync/atomic" "time" @@ -30,6 +29,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/bytes" "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/pool" ) const ( @@ -150,23 +150,29 @@ func (mp *memPart) mustInitFromDataPoints(dps *dataPoints) { var sidPrev common.SeriesID uncompressedBlockSizeBytes := uint64(0) var indexPrev int - var prevTS int64 + var tsPrev int64 for i := 0; i < len(dps.timestamps); i++ { sid := dps.seriesIDs[i] if sidPrev == 0 { sidPrev = sid } - if prevTS == dps.timestamps[i] { - dps.skip(i) - i-- - continue + + if sid == sidPrev { + if tsPrev == dps.timestamps[i] { + dps.skip(i) + i-- + continue + } + tsPrev = dps.timestamps[i] + } else { + tsPrev = 0 } - prevTS = dps.timestamps[i] if uncompressedBlockSizeBytes >= maxUncompressedBlockSize || (i-indexPrev) > maxBlockLength || sid != sidPrev { bsw.MustWriteDataPoints(sidPrev, dps.timestamps[indexPrev:i], dps.versions[indexPrev:i], dps.tagFamilies[indexPrev:i], dps.fields[indexPrev:i]) sidPrev = sid + tsPrev = 0 indexPrev = i uncompressedBlockSizeBytes = 0 } @@ -216,7 +222,7 @@ func generateMemPart() *memPart { if v == nil { return &memPart{} } - return v.(*memPart) + return v } func releaseMemPart(mp *memPart) { @@ -224,7 +230,7 @@ func releaseMemPart(mp *memPart) { memPartPool.Put(mp) } -var memPartPool sync.Pool +var memPartPool = pool.Register[*memPart]("measure-memPart") type partWrapper struct { mp *memPart @@ -246,6 +252,12 @@ func (pw *partWrapper) decRef() { if n > 0 { return } + if pw.mp != nil { + releaseMemPart(pw.mp) + pw.mp = nil + pw.p = nil + return + } pw.p.close() if pw.removable.Load() && pw.p.fileSystem != nil { go func(pw *partWrapper) { diff --git a/banyand/measure/part_iter.go b/banyand/measure/part_iter.go index 0c916334..cf67d320 100644 --- a/banyand/measure/part_iter.go +++ b/banyand/measure/part_iter.go @@ -22,7 +22,6 @@ import ( "fmt" "io" "sort" - "sync" "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/pkg/bytes" @@ -30,6 +29,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/pool" ) type partIter struct { @@ -327,7 +327,7 @@ func generatePartMergeIter() *partMergeIter { if v == nil { return &partMergeIter{} } - return v.(*partMergeIter) + return v } func releasePartMergeIter(pmi *partMergeIter) { @@ -335,7 +335,7 @@ func releasePartMergeIter(pmi *partMergeIter) { pmiPool.Put(pmi) } -var pmiPool sync.Pool +var pmiPool = pool.Register[*partMergeIter]("measure-partMergeIter") type partMergeIterHeap []*partMergeIter @@ -369,7 +369,7 @@ func generateColumnValuesDecoder() *encoding.BytesBlockDecoder { if v == nil { return &encoding.BytesBlockDecoder{} } - return v.(*encoding.BytesBlockDecoder) + return v } func releaseColumnValuesDecoder(d *encoding.BytesBlockDecoder) { @@ -377,4 +377,4 @@ func releaseColumnValuesDecoder(d *encoding.BytesBlockDecoder) { columnValuesDecoderPool.Put(d) } -var columnValuesDecoderPool sync.Pool +var columnValuesDecoderPool = pool.Register[*encoding.BytesBlockDecoder]("measure-columnValuesDecoder") diff --git a/banyand/measure/query.go b/banyand/measure/query.go index ae37521a..2bda84da 100644 --- a/banyand/measure/query.go +++ b/banyand/measure/query.go @@ -242,9 +242,8 @@ func (s *measure) searchBlocks(ctx context.Context, result *queryResult, sids [] defer releaseBlockMetadataArray(bma) defFn := startBlockScanSpan(ctx, len(sids), parts, result) defer defFn() - // TODO: cache tstIter - var tstIter tstIter - defer tstIter.reset() + tstIter := generateTstIter() + defer releaseTstIter(tstIter) originalSids := make([]common.SeriesID, len(sids)) copy(originalSids, sids) sort.Slice(sids, func(i, j int) bool { return sids[i] < sids[j] }) diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go index d2a20b4a..033979a8 100644 --- a/banyand/measure/tstable.go +++ b/banyand/measure/tstable.go @@ -33,6 +33,7 @@ import ( "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/pool" "github.com/apache/skywalking-banyandb/pkg/run" "github.com/apache/skywalking-banyandb/pkg/timestamp" "github.com/apache/skywalking-banyandb/pkg/watcher" @@ -376,6 +377,21 @@ func (ti *tstIter) Error() error { return ti.err } +func generateTstIter() *tstIter { + v := tstIterPool.Get() + if v == nil { + return &tstIter{} + } + return v +} + +func releaseTstIter(ti *tstIter) { + ti.reset() + tstIterPool.Put(ti) +} + +var tstIterPool = pool.Register[*tstIter]("measure-tstIter") + type partIterHeap []*partIter func (pih *partIterHeap) Len() int { diff --git a/banyand/measure/tstable_test.go b/banyand/measure/tstable_test.go index 57ea15a4..386f57b1 100644 --- a/banyand/measure/tstable_test.go +++ b/banyand/measure/tstable_test.go @@ -214,6 +214,16 @@ func Test_tstIter(t *testing.T) { {seriesID: 3, count: 1, uncompressedSizeBytes: 24}, }, }, + { + name: "Test with a single part with same ts", + dpsList: []*dataPoints{duplicatedDps}, + sids: []common.SeriesID{1}, + minTimestamp: 1, + maxTimestamp: 1, + want: []blockMetadata{ + {seriesID: 1, count: 1, uncompressedSizeBytes: 24}, + }, + }, { name: "Test with multiple parts with same ts", dpsList: []*dataPoints{dpsTS1, dpsTS1}, @@ -568,6 +578,58 @@ var dpsTS2 = &dataPoints{ }, } +var duplicatedDps = &dataPoints{ + seriesIDs: []common.SeriesID{1, 1, 1}, + timestamps: []int64{1, 1, 1}, + versions: []int64{1, 2, 3}, + tagFamilies: [][]nameValues{ + { + { + name: "arrTag", values: []*nameValue{ + {name: "strArrTag", valueType: pbv1.ValueTypeStrArr, value: nil, valueArr: [][]byte{[]byte("value1"), []byte("value2")}}, + {name: "intArrTag", valueType: pbv1.ValueTypeInt64Arr, value: nil, valueArr: [][]byte{convert.Int64ToBytes(25), convert.Int64ToBytes(30)}}, + }, + }, + { + name: "binaryTag", values: []*nameValue{ + {name: "binaryTag", valueType: pbv1.ValueTypeBinaryData, value: longText, valueArr: nil}, + }, + }, + { + name: "singleTag", values: []*nameValue{ + {name: "strTag", valueType: pbv1.ValueTypeStr, value: []byte("value1"), valueArr: nil}, + {name: "intTag", valueType: pbv1.ValueTypeInt64, value: convert.Int64ToBytes(10), valueArr: nil}, + }, + }, + }, + { + { + name: "singleTag", values: []*nameValue{ + {name: "strTag1", valueType: pbv1.ValueTypeStr, value: []byte("tag1"), valueArr: nil}, + {name: "strTag2", valueType: pbv1.ValueTypeStr, value: []byte("tag2"), valueArr: nil}, + }, + }, + }, + {}, // empty tagFamilies for seriesID 3 + }, + fields: []nameValues{ + { + name: "skipped", values: []*nameValue{ + {name: "strField", valueType: pbv1.ValueTypeStr, value: []byte("field1"), valueArr: nil}, + {name: "intField", valueType: pbv1.ValueTypeInt64, value: convert.Int64ToBytes(1110), valueArr: nil}, + {name: "floatField", valueType: pbv1.ValueTypeFloat64, value: convert.Float64ToBytes(1221233.343), valueArr: nil}, + {name: "binaryField", valueType: pbv1.ValueTypeBinaryData, value: longText, valueArr: nil}, + }, + }, + {}, // empty fields for seriesID 2 + { + name: "onlyFields", values: []*nameValue{ + {name: "intField", valueType: pbv1.ValueTypeInt64, value: convert.Int64ToBytes(1110), valueArr: nil}, + }, + }, + }, +} + func generateHugeDps(startTimestamp, endTimestamp, timestamp int64) *dataPoints { hugeDps := &dataPoints{ seriesIDs: []common.SeriesID{}, diff --git a/banyand/observability/meter_prom.go b/banyand/observability/meter_prom.go index 260b442f..ea5d2d8a 100644 --- a/banyand/observability/meter_prom.go +++ b/banyand/observability/meter_prom.go @@ -18,6 +18,7 @@ package observability import ( + "net/http" "sync" grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" @@ -42,10 +43,6 @@ var ( func init() { reg.MustRegister(collectors.NewGoCollector()) reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) - metricsMux.Handle("/metrics", promhttp.HandlerFor( - reg, - promhttp.HandlerOpts{}, - )) } // NewMeterProvider returns a meter.Provider based on the given scope. @@ -53,6 +50,13 @@ func newPromMeterProvider() meter.Provider { return prom.NewProvider(SystemScope, reg) } +func registerMetricsEndpoint(metricsMux *http.ServeMux) { + metricsMux.Handle("/metrics", promhttp.HandlerFor( + reg, + promhttp.HandlerOpts{}, + )) +} + // MetricsServerInterceptor returns a server interceptor for metrics. func promMetricsServerInterceptor() (grpc.UnaryServerInterceptor, grpc.StreamServerInterceptor) { once.Do(func() { diff --git a/banyand/observability/service.go b/banyand/observability/service.go index 68bd30e3..4170b09f 100644 --- a/banyand/observability/service.go +++ b/banyand/observability/service.go @@ -42,9 +42,8 @@ const ( ) var ( - _ run.Service = (*metricService)(nil) - _ run.Config = (*metricService)(nil) - metricsMux = http.NewServeMux() + _ run.Service = (*metricService)(nil) + _ run.Config = (*metricService)(nil) // MetricsServerInterceptor is the function to obtain grpc metrics interceptor. MetricsServerInterceptor func() (grpc.UnaryServerInterceptor, grpc.StreamServerInterceptor) = emptyMetricsServerInterceptor ) @@ -127,7 +126,6 @@ func (p *metricService) PreRun(ctx context.Context) error { NativeMeterProvider = newNativeMeterProvider(ctx, p.metadata, nodeInfo) } initMetrics(p.modes) - metricsMux.HandleFunc("/_route", p.routeTableHandler) return nil } @@ -147,6 +145,11 @@ func (p *metricService) Serve() run.StopNotify { if err != nil { p.l.Fatal().Err(err).Msg("Failed to register metrics collector") } + metricsMux := http.NewServeMux() + metricsMux.HandleFunc("/_route", p.routeTableHandler) + if containsMode(p.modes, flagPromethusMode) { + registerMetricsEndpoint(metricsMux) + } if containsMode(p.modes, flagNativeMode) { err = p.scheduler.Register("native-metric-collection", cron.Descriptor, "@every 5s", func(_ time.Time, _ *logger.Logger) bool { NativeMetricCollection.FlushMetrics() diff --git a/banyand/stream/block.go b/banyand/stream/block.go index bd0db065..e6e7b964 100644 --- a/banyand/stream/block.go +++ b/banyand/stream/block.go @@ -19,7 +19,6 @@ package stream import ( "sort" - "sync" "golang.org/x/exp/slices" @@ -31,6 +30,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/index/posting" "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/pool" "github.com/apache/skywalking-banyandb/pkg/query/model" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) @@ -351,7 +351,7 @@ func generateBlock() *block { if v == nil { return &block{} } - return v.(*block) + return v } func releaseBlock(b *block) { @@ -359,7 +359,7 @@ func releaseBlock(b *block) { blockPool.Put(b) } -var blockPool sync.Pool +var blockPool = pool.Register[*block]("stream-block") type blockCursor struct { p *part @@ -559,14 +559,14 @@ func (bc *blockCursor) loadData(tmpBlock *block) bool { return true } -var blockCursorPool sync.Pool +var blockCursorPool = pool.Register[*blockCursor]("stream-blockCursor") func generateBlockCursor() *blockCursor { v := blockCursorPool.Get() if v == nil { return &blockCursor{} } - return v.(*blockCursor) + return v } func releaseBlockCursor(bc *blockCursor) { @@ -668,7 +668,7 @@ func generateBlockPointer() *blockPointer { if v == nil { return &blockPointer{} } - return v.(*blockPointer) + return v } func releaseBlockPointer(bi *blockPointer) { @@ -676,4 +676,4 @@ func releaseBlockPointer(bi *blockPointer) { blockPointerPool.Put(bi) } -var blockPointerPool sync.Pool +var blockPointerPool = pool.Register[*blockPointer]("stream-blockPointer") diff --git a/banyand/stream/block_metadata.go b/banyand/stream/block_metadata.go index 9f0091ee..4f410cd5 100644 --- a/banyand/stream/block_metadata.go +++ b/banyand/stream/block_metadata.go @@ -21,11 +21,11 @@ import ( "errors" "fmt" "sort" - "sync" "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/encoding" + "github.com/apache/skywalking-banyandb/pkg/pool" "github.com/apache/skywalking-banyandb/pkg/query/model" ) @@ -175,7 +175,6 @@ func (bm *blockMetadata) unmarshal(src []byte) ([]byte, error) { if err != nil { return nil, fmt.Errorf("cannot unmarshal tagFamily name: %w", err) } - // TODO: cache dataBlock tf := &dataBlock{} src, err = tf.unmarshal(src) if err != nil { @@ -202,7 +201,7 @@ func generateBlockMetadata() *blockMetadata { if v == nil { return &blockMetadata{} } - return v.(*blockMetadata) + return v } func releaseBlockMetadata(bm *blockMetadata) { @@ -210,7 +209,7 @@ func releaseBlockMetadata(bm *blockMetadata) { blockMetadataPool.Put(bm) } -var blockMetadataPool sync.Pool +var blockMetadataPool = pool.Register[*blockMetadata]("stream-blockMetadata") type blockMetadataArray struct { arr []blockMetadata @@ -223,14 +222,14 @@ func (bma *blockMetadataArray) reset() { bma.arr = bma.arr[:0] } -var blockMetadataArrayPool sync.Pool +var blockMetadataArrayPool = pool.Register[*blockMetadataArray]("stream-blockMetadataArray") func generateBlockMetadataArray() *blockMetadataArray { v := blockMetadataArrayPool.Get() if v == nil { return &blockMetadataArray{} } - return v.(*blockMetadataArray) + return v } func releaseBlockMetadataArray(bma *blockMetadataArray) { diff --git a/banyand/stream/block_reader.go b/banyand/stream/block_reader.go index 60701515..1c8a987c 100644 --- a/banyand/stream/block_reader.go +++ b/banyand/stream/block_reader.go @@ -22,11 +22,11 @@ import ( "errors" "fmt" "io" - "sync" "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/pool" ) type seqReader struct { @@ -70,7 +70,7 @@ func (sr *seqReader) mustReadFull(data []byte) { func generateSeqReader() *seqReader { if v := seqReaderPool.Get(); v != nil { - return v.(*seqReader) + return v } return &seqReader{} } @@ -80,7 +80,7 @@ func releaseSeqReader(sr *seqReader) { seqReaderPool.Put(sr) } -var seqReaderPool sync.Pool +var seqReaderPool = pool.Register[*seqReader]("stream-seqReader") type seqReaders struct { tagFamilyMetadata map[string]*seqReader @@ -216,11 +216,11 @@ func (br *blockReader) error() error { return br.err } -var blockReaderPool sync.Pool +var blockReaderPool = pool.Register[*blockReader]("stream-blockReader") func generateBlockReader() *blockReader { if v := blockReaderPool.Get(); v != nil { - return v.(*blockReader) + return v } return &blockReader{} } diff --git a/banyand/stream/block_writer.go b/banyand/stream/block_writer.go index 5125bd85..f55f9ebf 100644 --- a/banyand/stream/block_writer.go +++ b/banyand/stream/block_writer.go @@ -19,12 +19,12 @@ package stream import ( "path/filepath" - "sync" "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/pkg/compress/zstd" "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/pool" ) type writer struct { @@ -279,7 +279,7 @@ func generateBlockWriter() *blockWriter { }, } } - return v.(*blockWriter) + return v } func releaseBlockWriter(bsw *blockWriter) { @@ -287,4 +287,4 @@ func releaseBlockWriter(bsw *blockWriter) { blockWriterPool.Put(bsw) } -var blockWriterPool sync.Pool +var blockWriterPool = pool.Register[*blockWriter]("stream-blockWriter") diff --git a/banyand/stream/introducer.go b/banyand/stream/introducer.go index 76c6e659..072e8b39 100644 --- a/banyand/stream/introducer.go +++ b/banyand/stream/introducer.go @@ -18,8 +18,7 @@ package stream import ( - "sync" - + "github.com/apache/skywalking-banyandb/pkg/pool" "github.com/apache/skywalking-banyandb/pkg/watcher" ) @@ -33,14 +32,14 @@ func (i *introduction) reset() { i.applied = nil } -var introductionPool = sync.Pool{} +var introductionPool = pool.Register[*introduction]("stream-introduction") func generateIntroduction() *introduction { v := introductionPool.Get() if v == nil { return &introduction{} } - intro := v.(*introduction) + intro := v intro.reset() return intro } @@ -61,7 +60,7 @@ func (i *flusherIntroduction) reset() { i.applied = nil } -var flusherIntroductionPool = sync.Pool{} +var flusherIntroductionPool = pool.Register[*flusherIntroduction]("stream-flusher-introduction") func generateFlusherIntroduction() *flusherIntroduction { v := flusherIntroductionPool.Get() @@ -70,7 +69,7 @@ func generateFlusherIntroduction() *flusherIntroduction { flushed: make(map[uint64]*partWrapper), } } - fi := v.(*flusherIntroduction) + fi := v fi.reset() return fi } @@ -95,14 +94,14 @@ func (i *mergerIntroduction) reset() { i.creator = 0 } -var mergerIntroductionPool = sync.Pool{} +var mergerIntroductionPool = pool.Register[*mergerIntroduction]("stream-merger-introduction") func generateMergerIntroduction() *mergerIntroduction { v := mergerIntroductionPool.Get() if v == nil { return &mergerIntroduction{} } - mi := v.(*mergerIntroduction) + mi := v mi.reset() return mi } diff --git a/banyand/stream/part.go b/banyand/stream/part.go index b6ad05c7..228e607c 100644 --- a/banyand/stream/part.go +++ b/banyand/stream/part.go @@ -22,7 +22,6 @@ import ( "path" "path/filepath" "sort" - "sync" "sync/atomic" "time" @@ -30,6 +29,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/bytes" "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/pool" ) const ( @@ -198,7 +198,7 @@ func generateMemPart() *memPart { if v == nil { return &memPart{} } - return v.(*memPart) + return v } func releaseMemPart(mp *memPart) { @@ -206,7 +206,7 @@ func releaseMemPart(mp *memPart) { memPartPool.Put(mp) } -var memPartPool sync.Pool +var memPartPool = pool.Register[*memPart]("stream-memPart") type partWrapper struct { mp *memPart diff --git a/banyand/stream/part_iter.go b/banyand/stream/part_iter.go index e9c24b98..f4277032 100644 --- a/banyand/stream/part_iter.go +++ b/banyand/stream/part_iter.go @@ -22,7 +22,6 @@ import ( "fmt" "io" "sort" - "sync" "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/pkg/bytes" @@ -30,6 +29,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/pool" ) type partIter struct { @@ -324,7 +324,7 @@ func generatePartMergeIter() *partMergeIter { if v == nil { return &partMergeIter{} } - return v.(*partMergeIter) + return v } func releasePartMergeIter(pmi *partMergeIter) { @@ -332,7 +332,7 @@ func releasePartMergeIter(pmi *partMergeIter) { pmiPool.Put(pmi) } -var pmiPool sync.Pool +var pmiPool = pool.Register[*partMergeIter]("stream-partMergeIter") type partMergeIterHeap []*partMergeIter @@ -366,7 +366,7 @@ func generateColumnValuesDecoder() *encoding.BytesBlockDecoder { if v == nil { return &encoding.BytesBlockDecoder{} } - return v.(*encoding.BytesBlockDecoder) + return v } func releaseColumnValuesDecoder(d *encoding.BytesBlockDecoder) { @@ -374,4 +374,4 @@ func releaseColumnValuesDecoder(d *encoding.BytesBlockDecoder) { columnValuesDecoderPool.Put(d) } -var columnValuesDecoderPool sync.Pool +var columnValuesDecoderPool = pool.Register[*encoding.BytesBlockDecoder]("stream-columnValuesDecoder") diff --git a/banyand/stream/query.go b/banyand/stream/query.go index 9dff205f..827c896d 100644 --- a/banyand/stream/query.go +++ b/banyand/stream/query.go @@ -190,9 +190,8 @@ func (qr *queryResult) scanParts(ctx context.Context, qo queryOptions) error { defer releaseBlockMetadataArray(bma) defFn := startBlockScanSpan(ctx, len(qo.sortedSids), parts, qr) defer defFn() - // TODO: cache tstIter - var ti tstIter - defer ti.reset() + ti := generateTstIter() + defer releaseTstIter(ti) sids := qo.sortedSids ti.init(bma, parts, sids, qo.minTimestamp, qo.maxTimestamp) if ti.Error() != nil { @@ -288,6 +287,7 @@ func (qr *queryResult) load(ctx context.Context, qo queryOptions) *model.StreamR return blankCursorList[i] > blankCursorList[j] }) for _, index := range blankCursorList { + releaseBlockCursor(qr.data[index]) qr.data = append(qr.data[:index], qr.data[index+1:]...) } qr.loaded = true @@ -610,6 +610,7 @@ func mustDecodeTagValue(valueType pbv1.ValueType, value []byte) *modelv1.TagValu case pbv1.ValueTypeStrArr: var values []string bb := bigValuePool.Generate() + defer bigValuePool.Release(bb) var err error for len(value) > 0 { bb.Buf, value, err = unmarshalVarArray(bb.Buf[:0], value) diff --git a/banyand/stream/tag.go b/banyand/stream/tag.go index 7f5459c0..b7954810 100644 --- a/banyand/stream/tag.go +++ b/banyand/stream/tag.go @@ -112,7 +112,7 @@ func (t *tag) mustSeqReadValues(decoder *encoding.BytesBlockDecoder, reader *seq } } -var bigValuePool bytes.BufferPool +var bigValuePool = bytes.NewBufferPool("stream-big-value") type tagFamily struct { name string diff --git a/banyand/stream/tag_metadata.go b/banyand/stream/tag_metadata.go index d044f0ec..d470b6f9 100644 --- a/banyand/stream/tag_metadata.go +++ b/banyand/stream/tag_metadata.go @@ -19,11 +19,11 @@ package stream import ( "fmt" - "sync" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/encoding" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/pool" ) type tagMetadata struct { @@ -131,7 +131,7 @@ func generateTagFamilyMetadata() *tagFamilyMetadata { if v == nil { return &tagFamilyMetadata{} } - return v.(*tagFamilyMetadata) + return v } func releaseTagFamilyMetadata(tfm *tagFamilyMetadata) { @@ -139,4 +139,4 @@ func releaseTagFamilyMetadata(tfm *tagFamilyMetadata) { tagFamilyMetadataPool.Put(tfm) } -var tagFamilyMetadataPool sync.Pool +var tagFamilyMetadataPool = pool.Register[*tagFamilyMetadata]("stream-tagFamilyMetadata") diff --git a/banyand/stream/tstable.go b/banyand/stream/tstable.go index dff93b39..ea329ff1 100644 --- a/banyand/stream/tstable.go +++ b/banyand/stream/tstable.go @@ -34,6 +34,7 @@ import ( "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/pool" "github.com/apache/skywalking-banyandb/pkg/run" "github.com/apache/skywalking-banyandb/pkg/timestamp" "github.com/apache/skywalking-banyandb/pkg/watcher" @@ -388,6 +389,21 @@ func (ti *tstIter) Error() error { return ti.err } +func generateTstIter() *tstIter { + v := tstIterPool.Get() + if v == nil { + return &tstIter{} + } + return v +} + +func releaseTstIter(ti *tstIter) { + ti.reset() + tstIterPool.Put(ti) +} + +var tstIterPool = pool.Register[*tstIter]("stream-tstIter") + type partIterHeap []*partIter func (pih *partIterHeap) Len() int { diff --git a/pkg/bytes/buffer.go b/pkg/bytes/buffer.go index 595ce54c..2086e147 100644 --- a/pkg/bytes/buffer.go +++ b/pkg/bytes/buffer.go @@ -21,9 +21,9 @@ package bytes import ( "fmt" "io" - "sync" "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/pool" ) var ( @@ -107,9 +107,15 @@ func (r *reader) Close() error { return nil } +func NewBufferPool(name string) *BufferPool { + return &BufferPool{ + p: pool.Register[*Buffer](name), + } +} + // BufferPool is a pool of Buffer. type BufferPool struct { - p sync.Pool + p *pool.Synced[*Buffer] } // Generate generates a Buffer. @@ -118,7 +124,7 @@ func (bp *BufferPool) Generate() *Buffer { if bbv == nil { return &Buffer{} } - return bbv.(*Buffer) + return bbv } // Release releases a Buffer. diff --git a/pkg/encoding/bytes.go b/pkg/encoding/bytes.go index 6aab3f89..25ece6fa 100644 --- a/pkg/encoding/bytes.go +++ b/pkg/encoding/bytes.go @@ -297,4 +297,4 @@ func decompressBlock(dst, src []byte) ([]byte, []byte, error) { } } -var bbPool bytes.BufferPool +var bbPool = bytes.NewBufferPool("encoding.bytesBlock") diff --git a/pkg/encoding/encoder.go b/pkg/encoding/encoder.go deleted file mode 100644 index 78add92c..00000000 --- a/pkg/encoding/encoder.go +++ /dev/null @@ -1,292 +0,0 @@ -// Licensed to Apache Software Foundation (ASF) under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Apache Software Foundation (ASF) licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package encoding - -import ( - "bytes" - "encoding/binary" - "io" - "sync" - "time" - - "github.com/pkg/errors" - - "github.com/apache/skywalking-banyandb/pkg/convert" -) - -var ( - encoderPool = sync.Pool{ - New: newEncoder, - } - decoderPool = sync.Pool{ - New: func() interface{} { - return &decoder{} - }, - } - - errInvalidValue = errors.New("invalid encoded value") - errNoData = errors.New("there is no data") -) - -type encoderPoolDelegator struct { - pool *sync.Pool - fn ParseInterval - name string - size int -} - -// NewEncoderPool returns a SeriesEncoderPool which provides int-based xor encoders. -func NewEncoderPool(name string, size int, fn ParseInterval) SeriesEncoderPool { - return &encoderPoolDelegator{ - name: name, - pool: &encoderPool, - size: size, - fn: fn, - } -} - -func (b *encoderPoolDelegator) Get(metadata []byte, buffer BufferWriter) SeriesEncoder { - encoder := b.pool.Get().(*encoder) - encoder.name = b.name - encoder.size = b.size - encoder.fn = b.fn - encoder.Reset(metadata, buffer) - return encoder -} - -func (b *encoderPoolDelegator) Put(seriesEncoder SeriesEncoder) { - _, ok := seriesEncoder.(*encoder) - if ok { - b.pool.Put(seriesEncoder) - } -} - -type decoderPoolDelegator struct { - pool *sync.Pool - fn ParseInterval - name string - size int -} - -// NewDecoderPool returns a SeriesDecoderPool which provides int-based xor decoders. -func NewDecoderPool(name string, size int, fn ParseInterval) SeriesDecoderPool { - return &decoderPoolDelegator{ - name: name, - pool: &decoderPool, - size: size, - fn: fn, - } -} - -func (b *decoderPoolDelegator) Get(_ []byte) SeriesDecoder { - decoder := b.pool.Get().(*decoder) - decoder.name = b.name - decoder.size = b.size - decoder.fn = b.fn - return decoder -} - -func (b *decoderPoolDelegator) Put(seriesDecoder SeriesDecoder) { - _, ok := seriesDecoder.(*decoder) - if ok { - b.pool.Put(seriesDecoder) - } -} - -var _ SeriesEncoder = (*encoder)(nil) - -// ParseInterval parses the interval rule from the key in a kv pair. -type ParseInterval = func(key []byte) time.Duration - -type encoder struct { - buff BufferWriter - bw *Writer - values *XOREncoder - fn ParseInterval - name string - interval time.Duration - startTime uint64 - prevTime uint64 - num int - size int -} - -func newEncoder() interface{} { - bw := NewWriter() - return &encoder{ - bw: bw, - values: NewXOREncoder(bw), - } -} - -func (ie *encoder) Append(ts uint64, value []byte) { - if len(value) > 8 { - return - } - if ie.startTime == 0 { - ie.startTime = ts - ie.prevTime = ts - } else if ie.startTime > ts { - ie.startTime = ts - } - gap := int(ie.prevTime) - int(ts) - if gap < 0 { - return - } - zeroNum := gap/int(ie.interval) - 1 - for i := 0; i < zeroNum; i++ { - ie.bw.WriteBool(false) - ie.num++ - } - ie.prevTime = ts - l := len(value) - ie.bw.WriteBool(l > 0) - ie.values.Write(convert.BytesToUint64(value)) - ie.num++ -} - -func (ie *encoder) IsFull() bool { - return ie.num >= ie.size -} - -func (ie *encoder) Reset(key []byte, buffer BufferWriter) { - ie.buff = buffer - ie.bw.Reset(buffer) - ie.interval = ie.fn(key) - ie.startTime = 0 - ie.prevTime = 0 - ie.num = 0 - ie.values = NewXOREncoder(ie.bw) -} - -func (ie *encoder) Encode() error { - ie.bw.Flush() - buffWriter := NewPacker(ie.buff) - buffWriter.PutUint64(ie.startTime) - buffWriter.PutUint16(uint16(ie.num)) - return nil -} - -func (ie *encoder) StartTime() uint64 { - return ie.startTime -} - -var _ SeriesDecoder = (*decoder)(nil) - -type decoder struct { - fn ParseInterval - name string - area []byte - size int - interval time.Duration - startTime uint64 - num int -} - -func (i *decoder) Decode(key, data []byte) error { - if len(data) < 10 { - return errInvalidValue - } - i.interval = i.fn(key) - i.startTime = binary.LittleEndian.Uint64(data[len(data)-10 : len(data)-2]) - i.num = int(binary.LittleEndian.Uint16(data[len(data)-2:])) - i.area = data[:len(data)-10] - return nil -} - -func (i decoder) Len() int { - return i.num -} - -func (i decoder) IsFull() bool { - return i.num >= i.size -} - -func (i decoder) Get(ts uint64) ([]byte, error) { - for iter := i.Iterator(); iter.Next(); { - if iter.Time() == ts { - return iter.Val(), nil - } - } - return nil, errors.WithMessagef(errNoData, "ts:%d", ts) -} - -func (i decoder) Range() (start, end uint64) { - return i.startTime, i.startTime + uint64(i.num-1)*uint64(i.interval) -} - -func (i decoder) Iterator() SeriesIterator { - br := NewReader(bytes.NewReader(i.area)) - return &intIterator{ - endTime: i.startTime + uint64(i.num*int(i.interval)), - interval: int(i.interval), - br: br, - values: NewXORDecoder(br), - size: i.num, - } -} - -var _ SeriesIterator = (*intIterator)(nil) - -type intIterator struct { - err error - br *Reader - values *XORDecoder - endTime uint64 - interval int - size int - currVal uint64 - currTime uint64 - index int -} - -func (i *intIterator) Next() bool { - if i.index >= i.size { - return false - } - var b bool - var err error - for !b { - b, err = i.br.ReadBool() - if errors.Is(err, io.EOF) { - return false - } - if err != nil { - i.err = err - return false - } - i.index++ - i.currTime = i.endTime - uint64(i.interval*i.index) - } - if i.values.Next() { - i.currVal = i.values.Value() - } - return true -} - -func (i *intIterator) Val() []byte { - return convert.Uint64ToBytes(i.currVal) -} - -func (i *intIterator) Time() uint64 { - return i.currTime -} - -func (i *intIterator) Error() error { - return i.err -} diff --git a/pkg/encoding/encoder_test.go b/pkg/encoding/encoder_test.go deleted file mode 100644 index 840a883f..00000000 --- a/pkg/encoding/encoder_test.go +++ /dev/null @@ -1,392 +0,0 @@ -// Licensed to Apache Software Foundation (ASF) under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Apache Software Foundation (ASF) licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package encoding - -import ( - "bytes" - "testing" - "time" - - "github.com/stretchr/testify/assert" - - "github.com/apache/skywalking-banyandb/pkg/convert" -) - -func TestNewEncoderAndDecoder(t *testing.T) { - type tsData struct { - ts []uint64 - data []any - start uint64 - end uint64 - } - tests := []struct { - name string - args tsData - want tsData - }{ - { - name: "int golden path", - args: tsData{ - ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)}, - data: []any{7, 8, 7, 9}, - }, - want: tsData{ - ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)}, - data: []any{7, 8, 7, 9}, - start: uint64(time.Minute), - end: uint64(4 * time.Minute), - }, - }, - { - name: "int more than the size", - args: tsData{ - ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), uint64(1 * time.Minute)}, - data: []any{7, 8, 7, 9, 6}, - }, - want: tsData{ - ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)}, - data: []any{7, 8, 7, 9}, - start: uint64(time.Minute), - end: uint64(4 * time.Minute), - }, - }, - { - name: "int less than the size", - args: tsData{ - ts: []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)}, - data: []any{7, 8, 7}, - }, - want: tsData{ - ts: []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)}, - data: []any{7, 8, 7}, - start: uint64(time.Minute), - end: uint64(3 * time.Minute), - }, - }, - { - name: "int empty slot in the middle", - args: tsData{ - ts: []uint64{uint64(4 * time.Minute), uint64(time.Minute)}, - data: []any{7, 9}, - }, - want: tsData{ - ts: []uint64{uint64(4 * time.Minute), uint64(1 * time.Minute)}, - data: []any{7, 9}, - start: uint64(time.Minute), - end: uint64(4 * time.Minute), - }, - }, - { - name: "float64 golden path", - args: tsData{ - ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)}, - data: []any{7.0, 8.0, 7.0, 9.0}, - }, - want: tsData{ - ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)}, - data: []any{7.0, 8.0, 7.0, 9.0}, - start: uint64(time.Minute), - end: uint64(4 * time.Minute), - }, - }, - { - name: "float64 more than the size", - args: tsData{ - ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), uint64(1 * time.Minute)}, - data: []any{0.7, 0.8, 0.7, 0.9, 0.6}, - }, - want: tsData{ - ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)}, - data: []any{0.7, 0.8, 0.7, 0.9}, - start: uint64(time.Minute), - end: uint64(4 * time.Minute), - }, - }, - { - name: "float64 less than the size", - args: tsData{ - ts: []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)}, - data: []any{1.7, 1.8, 1.7}, - }, - want: tsData{ - ts: []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)}, - data: []any{1.7, 1.8, 1.7}, - start: uint64(time.Minute), - end: uint64(3 * time.Minute), - }, - }, - { - name: "float64 empty slot in the middle", - args: tsData{ - ts: []uint64{uint64(4 * time.Minute), uint64(time.Minute)}, - data: []any{0.700033, 0.988822}, - }, - want: tsData{ - ts: []uint64{uint64(4 * time.Minute), uint64(1 * time.Minute)}, - data: []any{0.700033, 0.988822}, - start: uint64(time.Minute), - end: uint64(4 * time.Minute), - }, - }, - } - key := []byte("foo") - fn := func(k []byte) time.Duration { - assert.Equal(t, key, k) - return 1 * time.Minute - } - encoderPool := NewEncoderPool("minute", 4, fn) - decoderPool := NewDecoderPool("minute", 4, fn) - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - at := assert.New(t) - var buffer bytes.Buffer - encoder := encoderPool.Get(key, &buffer) - defer encoderPool.Put(encoder) - decoder := decoderPool.Get(key) - defer decoderPool.Put(decoder) - isFull := false - for i, v := range tt.args.ts { - encoder.Append(v, ToBytes(tt.args.data[i])) - if encoder.IsFull() { - isFull = true - break - } - } - err := encoder.Encode() - at.NoError(err) - - at.Equal(tt.want.start, encoder.StartTime()) - at.NoError(decoder.Decode(key, buffer.Bytes())) - start, end := decoder.Range() - at.Equal(tt.want.start, start) - at.Equal(tt.want.end, end) - if isFull { - at.True(decoder.IsFull()) - } - i := 0 - for iter := decoder.Iterator(); iter.Next(); i++ { - at.NoError(iter.Error()) - at.Equal(tt.want.ts[i], iter.Time()) - at.Equal(tt.want.data[i], BytesTo(tt.want.data[i], iter.Val())) - v, err := decoder.Get(iter.Time()) - at.NoError(err) - at.Equal(tt.want.data[i], BytesTo(tt.want.data[i], v)) - } - at.Equal(len(tt.want.ts), i) - }) - } -} - -func ToBytes(v any) []byte { - switch d := v.(type) { - case int: - return convert.Int64ToBytes(int64(d)) - case float64: - return convert.Float64ToBytes(d) - } - return nil -} - -func BytesTo(t any, b []byte) any { - switch t.(type) { - case int: - return int(convert.BytesToInt64(b)) - case float64: - return convert.BytesToFloat64(b) - } - return nil -} - -func TestNewDecoderGet(t *testing.T) { - type tsData struct { - ts []uint64 - data []any - } - type wantData struct { - ts []uint64 - data []any - wantErr []bool - start uint64 - end uint64 - } - tests := []struct { - name string - args tsData - want wantData - }{ - { - name: "int golden path", - args: tsData{ - ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)}, - data: []any{7, 8, 7, 9}, - }, - want: wantData{ - ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)}, - data: []any{7, 8, 7, 9}, - start: uint64(time.Minute), - end: uint64(4 * time.Minute), - }, - }, - { - name: "int more than the size", - args: tsData{ - ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), uint64(1 * time.Minute)}, - data: []any{7, 8, 7, 9, 6}, - }, - want: wantData{ - ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), 0}, - data: []any{7, 8, 7, 9, nil}, - wantErr: []bool{false, false, false, false, true}, - start: uint64(time.Minute), - end: uint64(4 * time.Minute), - }, - }, - { - name: "int less than the size", - args: tsData{ - ts: []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)}, - data: []any{7, 8, 7}, - }, - want: wantData{ - ts: []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)}, - data: []any{7, 8, 7}, - start: uint64(time.Minute), - end: uint64(3 * time.Minute), - }, - }, - { - name: "int empty slot in the middle", - args: tsData{ - ts: []uint64{uint64(4 * time.Minute), uint64(time.Minute)}, - data: []any{7, 9}, - }, - want: wantData{ - ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)}, - data: []any{7, nil, nil, 9}, - wantErr: []bool{false, true, true, false}, - start: uint64(time.Minute), - end: uint64(4 * time.Minute), - }, - }, - { - name: "float golden path", - args: tsData{ - ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)}, - data: []any{7.0, 8.0, 7.0, 9.0}, - }, - want: wantData{ - ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)}, - data: []any{7.0, 8.0, 7.0, 9.0}, - start: uint64(time.Minute), - end: uint64(4 * time.Minute), - }, - }, - { - name: "float more than the size", - args: tsData{ - ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), uint64(1 * time.Minute)}, - data: []any{1.7, 1.8, 1.7, 1.9, 1.6}, - }, - want: wantData{ - ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), 0}, - data: []any{1.7, 1.8, 1.7, 1.9, nil}, - wantErr: []bool{false, false, false, false, true}, - start: uint64(time.Minute), - end: uint64(4 * time.Minute), - }, - }, - { - name: "float less than the size", - args: tsData{ - ts: []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)}, - data: []any{0.71, 0.833, 0.709}, - }, - want: wantData{ - ts: []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)}, - data: []any{0.71, 0.833, 0.709}, - start: uint64(time.Minute), - end: uint64(3 * time.Minute), - }, - }, - { - name: "float empty slot in the middle", - args: tsData{ - ts: []uint64{uint64(4 * time.Minute), uint64(time.Minute)}, - data: []any{1.7, 1.9}, - }, - want: wantData{ - ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)}, - data: []any{1.7, nil, nil, 1.9}, - wantErr: []bool{false, true, true, false}, - start: uint64(time.Minute), - end: uint64(4 * time.Minute), - }, - }, - } - key := []byte("foo") - fn := func(k []byte) time.Duration { - assert.Equal(t, key, k) - return 1 * time.Minute - } - encoderPool := NewEncoderPool("minute", 4, fn) - decoderPool := NewDecoderPool("minute", 4, fn) - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - at := assert.New(t) - var buffer bytes.Buffer - encoder := encoderPool.Get(key, &buffer) - defer encoderPool.Put(encoder) - decoder := decoderPool.Get(key) - defer decoderPool.Put(decoder) - isFull := false - for i, v := range tt.args.ts { - encoder.Append(v, ToBytes(tt.args.data[i])) - if encoder.IsFull() { - isFull = true - break - } - } - err := encoder.Encode() - at.NoError(err) - - at.Equal(tt.want.start, encoder.StartTime()) - at.NoError(decoder.Decode(key, buffer.Bytes())) - start, end := decoder.Range() - at.Equal(tt.want.start, start) - at.Equal(tt.want.end, end) - if isFull { - at.True(decoder.IsFull()) - } - for i, t := range tt.want.ts { - wantErr := false - if tt.want.wantErr != nil { - wantErr = tt.want.wantErr[i] - } - v, err := decoder.Get(t) - if wantErr { - at.ErrorIs(err, errNoData) - } else { - at.NoError(err) - at.Equal(tt.want.data[i], BytesTo(tt.want.data[i], v)) - } - } - }) - } -} diff --git a/pkg/encoding/int.go b/pkg/encoding/int.go index 21b77482..05a227ab 100644 --- a/pkg/encoding/int.go +++ b/pkg/encoding/int.go @@ -20,7 +20,8 @@ package encoding import ( "encoding/binary" "fmt" - "sync" + + "github.com/apache/skywalking-banyandb/pkg/pool" ) // Uint16ToBytes appends the bytes of the given uint16 to the given byte slice. @@ -224,7 +225,7 @@ func GenerateInt64List(size int) *Int64List { L: make([]int64, size), } } - is := v.(*Int64List) + is := v if n := size - cap(is.L); n > 0 { is.L = append(is.L[:cap(is.L)], make([]int64, n)...) } @@ -243,7 +244,7 @@ type Int64List struct { L []int64 } -var int64ListPool sync.Pool +var int64ListPool = pool.Register[*Int64List]("encoding-int64List") // GenerateUint64List generates a list of uint64 with the given size. // The returned list may be from a pool and should be released after use. @@ -254,7 +255,7 @@ func GenerateUint64List(size int) *Uint64List { L: make([]uint64, size), } } - is := v.(*Uint64List) + is := v if n := size - cap(is.L); n > 0 { is.L = append(is.L[:cap(is.L)], make([]uint64, n)...) } @@ -273,4 +274,4 @@ type Uint64List struct { L []uint64 } -var uint64ListPool sync.Pool +var uint64ListPool = pool.Register[*Uint64List]("encoding-uin64List") diff --git a/pkg/fs/local_file_system.go b/pkg/fs/local_file_system.go index ff7dac5e..39fcb22a 100644 --- a/pkg/fs/local_file_system.go +++ b/pkg/fs/local_file_system.go @@ -25,12 +25,12 @@ import ( "io" "os" "path/filepath" - "sync" "time" "github.com/shirou/gopsutil/v3/disk" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/pool" ) const defaultIOSize = 256 * 1024 @@ -465,7 +465,7 @@ func generateReader(f *os.File) *bufio.Reader { if v == nil { return bufio.NewReaderSize(f, defaultIOSize) } - br := v.(*bufio.Reader) + br := v br.Reset(f) return br } @@ -475,14 +475,14 @@ func releaseReader(br *bufio.Reader) { bufReaderPool.Put(br) } -var bufReaderPool sync.Pool +var bufReaderPool = pool.Register[*bufio.Reader]("fs-bufReader") func generateWriter(f *os.File) *bufio.Writer { v := bufWriterPool.Get() if v == nil { return bufio.NewWriterSize(f, defaultIOSize) } - bw := v.(*bufio.Writer) + bw := v bw.Reset(f) return bw } @@ -492,4 +492,4 @@ func releaseWriter(bw *bufio.Writer) { bufWriterPool.Put(bw) } -var bufWriterPool sync.Pool +var bufWriterPool = pool.Register[*bufio.Writer]("fs-bufWriter") diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go index 4d831fb2..0da32e02 100644 --- a/pkg/index/inverted/inverted.go +++ b/pkg/index/inverted/inverted.go @@ -25,7 +25,6 @@ import ( "io" "log" "math" - "sync" "time" "github.com/blugelabs/bluge" @@ -43,6 +42,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/index/posting" "github.com/apache/skywalking-banyandb/pkg/index/posting/roaring" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/pool" "github.com/apache/skywalking-banyandb/pkg/run" ) @@ -87,14 +87,14 @@ type store struct { l *logger.Logger } -var batchPool sync.Pool +var batchPool = pool.Register[*blugeIndex.Batch]("index-bluge-batch") func generateBatch() *blugeIndex.Batch { b := batchPool.Get() if b == nil { return bluge.NewBatch() } - return b.(*blugeIndex.Batch) + return b } func releaseBatch(b *blugeIndex.Batch) { diff --git a/pkg/pb/v1/series.go b/pkg/pb/v1/series.go index 754d155e..a0e76827 100644 --- a/pkg/pb/v1/series.go +++ b/pkg/pb/v1/series.go @@ -19,7 +19,6 @@ package v1 import ( "sort" - "sync" "github.com/pkg/errors" @@ -97,26 +96,6 @@ func (s *Series) reset() { s.Buffer = s.Buffer[:0] } -// SeriesPool is a pool of Series. -type SeriesPool struct { - pool sync.Pool -} - -// Generate creates a new Series or gets one from the pool. -func (sp *SeriesPool) Generate() *Series { - sv := sp.pool.Get() - if sv == nil { - return &Series{} - } - return sv.(*Series) -} - -// Release puts a Series back to the pool. -func (sp *SeriesPool) Release(s *Series) { - s.reset() - sp.pool.Put(s) -} - // SeriesList is a collection of Series. type SeriesList []*Series diff --git a/pkg/pool/pool.go b/pkg/pool/pool.go new file mode 100644 index 00000000..8f44c8a2 --- /dev/null +++ b/pkg/pool/pool.go @@ -0,0 +1,71 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pool + +import ( + "fmt" + "sync" + "sync/atomic" +) + +var poolMap = sync.Map{} + +func Register[T any](name string) *Synced[T] { + p := new(Synced[T]) + if _, ok := poolMap.LoadOrStore(name, p); ok { + panic(fmt.Sprintf("duplicated pool: %s", name)) + } + return p +} + +func AllRefsCount() map[string]int { + result := make(map[string]int) + poolMap.Range(func(key, value any) bool { + result[key.(string)] = value.(Trackable).RefsCount() + return true + }) + return result +} + +type Trackable interface { + RefsCount() int +} + +type Synced[T any] struct { + sync.Pool + refs atomic.Int32 +} + +func (p *Synced[T]) Get() T { + v := p.Pool.Get() + p.refs.Add(1) + if v == nil { + var t T + return t + } + return v.(T) +} + +func (p *Synced[T]) Put(v T) { + p.Pool.Put(v) + p.refs.Add(-1) +} + +func (p *Synced[T]) RefsCount() int { + return int(p.refs.Load()) +} diff --git a/pkg/test/gmatcher/gmatcher.go b/pkg/test/gmatcher/gmatcher.go new file mode 100644 index 00000000..ae761529 --- /dev/null +++ b/pkg/test/gmatcher/gmatcher.go @@ -0,0 +1,57 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package gmatcher + +import ( + "fmt" + + "github.com/onsi/gomega" +) + +// HaveZeroRef returns a matcher that checks if all pools have 0 references. +func HaveZeroRef() gomega.OmegaMatcher { + return &ZeroRefMatcher{} +} + +var _ gomega.OmegaMatcher = &ZeroRefMatcher{} + +type ZeroRefMatcher struct{} + +// FailureMessage implements types.GomegaMatcher. +func (p *ZeroRefMatcher) FailureMessage(actual interface{}) (message string) { + return fmt.Sprintf("expected all pools to have 0 references, got %v", actual) +} + +// Match implements types.GomegaMatcher. +func (p *ZeroRefMatcher) Match(actual interface{}) (success bool, err error) { + data, ok := actual.(map[string]int) + if !ok { + return false, fmt.Errorf("expected map[string]int, got %T", actual) + } + for pooName, refers := range data { + if refers > 0 { + return false, fmt.Errorf("pool %s has %d references", pooName, refers) + } + } + return true, nil +} + +// NegatedFailureMessage implements types.GomegaMatcher. +func (p *ZeroRefMatcher) NegatedFailureMessage(actual interface{}) (message string) { + return fmt.Sprintf("expected at least one pool to have references, got %v", actual) +} diff --git a/test/cases/measure/data/data.go b/test/cases/measure/data/data.go index f50dc4e1..3077c510 100644 --- a/test/cases/measure/data/data.go +++ b/test/cases/measure/data/data.go @@ -108,14 +108,12 @@ func loadData(md *commonv1.Metadata, measure measurev1.MeasureService_WriteClien content, err := dataFS.ReadFile("testdata/" + dataFile) gm.Expect(err).ShouldNot(gm.HaveOccurred()) gm.Expect(json.Unmarshal(content, &templates)).ShouldNot(gm.HaveOccurred()) - nano := baseTime.UnixNano() for i, template := range templates { rawDataPointValue, errMarshal := json.Marshal(template) gm.Expect(errMarshal).ShouldNot(gm.HaveOccurred()) dataPointValue := &measurev1.DataPointValue{} gm.Expect(protojson.Unmarshal(rawDataPointValue, dataPointValue)).ShouldNot(gm.HaveOccurred()) dataPointValue.Timestamp = timestamppb.New(baseTime.Add(-time.Duration(len(templates)-i-1) * interval)) - dataPointValue.Version = nano + int64(i) gm.Expect(measure.Send(&measurev1.WriteRequest{Metadata: md, DataPoint: dataPointValue, MessageId: uint64(time.Now().UnixNano())})). Should(gm.Succeed()) } diff --git a/test/integration/distributed/query/query_suite_test.go b/test/integration/distributed/query/query_suite_test.go index 0c78a14f..3d8eaad3 100644 --- a/test/integration/distributed/query/query_suite_test.go +++ b/test/integration/distributed/query/query_suite_test.go @@ -35,8 +35,10 @@ import ( "github.com/apache/skywalking-banyandb/banyand/metadata/schema" "github.com/apache/skywalking-banyandb/pkg/grpchelper" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/pool" "github.com/apache/skywalking-banyandb/pkg/test" "github.com/apache/skywalking-banyandb/pkg/test/flags" + "github.com/apache/skywalking-banyandb/pkg/test/gmatcher" "github.com/apache/skywalking-banyandb/pkg/test/helpers" test_measure "github.com/apache/skywalking-banyandb/pkg/test/measure" "github.com/apache/skywalking-banyandb/pkg/test/setup" @@ -132,4 +134,5 @@ var _ = SynchronizedAfterSuite(func() { }, func() { deferFunc() Eventually(gleak.Goroutines, flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods)) + Eventually(pool.AllRefsCount, flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef()) }) diff --git a/test/integration/etcd/client_test.go b/test/integration/etcd/client_test.go index a78d8345..ddfb4797 100644 --- a/test/integration/etcd/client_test.go +++ b/test/integration/etcd/client_test.go @@ -36,8 +36,10 @@ import ( databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" "github.com/apache/skywalking-banyandb/banyand/metadata/schema" + "github.com/apache/skywalking-banyandb/pkg/pool" "github.com/apache/skywalking-banyandb/pkg/test" "github.com/apache/skywalking-banyandb/pkg/test/flags" + "github.com/apache/skywalking-banyandb/pkg/test/gmatcher" "github.com/apache/skywalking-banyandb/pkg/test/helpers" "github.com/apache/skywalking-banyandb/pkg/test/setup" ) @@ -76,6 +78,7 @@ var _ = Describe("Client Test", func() { AfterEach(func() { dirSpaceDef() Eventually(gleak.Goroutines, flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods)) + Eventually(pool.AllRefsCount, flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef()) }) It("should be using user/password connect etcd server successfully", func() { diff --git a/test/integration/load/load_suite_test.go b/test/integration/load/load_suite_test.go index 1858e380..84dccc38 100644 --- a/test/integration/load/load_suite_test.go +++ b/test/integration/load/load_suite_test.go @@ -36,8 +36,10 @@ import ( streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1" "github.com/apache/skywalking-banyandb/pkg/grpchelper" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/pool" "github.com/apache/skywalking-banyandb/pkg/test" "github.com/apache/skywalking-banyandb/pkg/test/flags" + "github.com/apache/skywalking-banyandb/pkg/test/gmatcher" "github.com/apache/skywalking-banyandb/pkg/test/helpers" "github.com/apache/skywalking-banyandb/pkg/test/setup" cases_stream_data "github.com/apache/skywalking-banyandb/test/cases/stream/data" @@ -157,6 +159,7 @@ var _ = Describe("Load Test Suit", func() { } deferFunc() Eventually(gleak.Goroutines, flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods)) + Eventually(pool.AllRefsCount, flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef()) }) }) diff --git a/test/integration/standalone/cold_query/query_suite_test.go b/test/integration/standalone/cold_query/query_suite_test.go index 18e832ad..a28c145e 100644 --- a/test/integration/standalone/cold_query/query_suite_test.go +++ b/test/integration/standalone/cold_query/query_suite_test.go @@ -29,7 +29,9 @@ import ( "github.com/apache/skywalking-banyandb/pkg/grpchelper" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/pool" "github.com/apache/skywalking-banyandb/pkg/test/flags" + "github.com/apache/skywalking-banyandb/pkg/test/gmatcher" "github.com/apache/skywalking-banyandb/pkg/test/helpers" "github.com/apache/skywalking-banyandb/pkg/test/setup" "github.com/apache/skywalking-banyandb/pkg/timestamp" @@ -89,4 +91,5 @@ var _ = SynchronizedAfterSuite(func() { }, func() { deferFunc() Eventually(gleak.Goroutines, flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods)) + Eventually(pool.AllRefsCount, flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef()) }) diff --git a/test/integration/standalone/other/measure_test.go b/test/integration/standalone/other/measure_test.go index a2ab6e81..a0ced9ac 100644 --- a/test/integration/standalone/other/measure_test.go +++ b/test/integration/standalone/other/measure_test.go @@ -27,7 +27,9 @@ import ( "google.golang.org/grpc/credentials/insecure" "github.com/apache/skywalking-banyandb/pkg/grpchelper" + "github.com/apache/skywalking-banyandb/pkg/pool" "github.com/apache/skywalking-banyandb/pkg/test/flags" + "github.com/apache/skywalking-banyandb/pkg/test/gmatcher" "github.com/apache/skywalking-banyandb/pkg/test/helpers" "github.com/apache/skywalking-banyandb/pkg/test/setup" "github.com/apache/skywalking-banyandb/pkg/timestamp" @@ -57,6 +59,7 @@ var _ = g.Describe("Query service_cpm_minute", func() { gm.Expect(conn.Close()).To(gm.Succeed()) deferFn() gm.Eventually(gleak.Goroutines, flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods)) + gm.Eventually(pool.AllRefsCount, flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef()) }) g.It("queries service_cpm_minute by id after updating", func() { casesMeasureData.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data1.json", baseTime, interval) diff --git a/test/integration/standalone/other/property_test.go b/test/integration/standalone/other/property_test.go index ab217059..164b3ce3 100644 --- a/test/integration/standalone/other/property_test.go +++ b/test/integration/standalone/other/property_test.go @@ -32,7 +32,9 @@ import ( modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" "github.com/apache/skywalking-banyandb/pkg/grpchelper" + "github.com/apache/skywalking-banyandb/pkg/pool" "github.com/apache/skywalking-banyandb/pkg/test/flags" + "github.com/apache/skywalking-banyandb/pkg/test/gmatcher" "github.com/apache/skywalking-banyandb/pkg/test/setup" ) @@ -57,6 +59,7 @@ var _ = Describe("Property application", func() { Expect(conn.Close()).To(Succeed()) deferFn() Eventually(gleak.Goroutines, flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods)) + Eventually(pool.AllRefsCount, flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef()) }) It("applies properties", func() { md := &propertyv1.Metadata{ diff --git a/test/integration/standalone/query/query_suite_test.go b/test/integration/standalone/query/query_suite_test.go index a1e684b2..765cd893 100644 --- a/test/integration/standalone/query/query_suite_test.go +++ b/test/integration/standalone/query/query_suite_test.go @@ -29,7 +29,9 @@ import ( "github.com/apache/skywalking-banyandb/pkg/grpchelper" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/pool" "github.com/apache/skywalking-banyandb/pkg/test/flags" + "github.com/apache/skywalking-banyandb/pkg/test/gmatcher" "github.com/apache/skywalking-banyandb/pkg/test/helpers" "github.com/apache/skywalking-banyandb/pkg/test/setup" "github.com/apache/skywalking-banyandb/pkg/timestamp" @@ -90,4 +92,5 @@ var _ = SynchronizedAfterSuite(func() { }, func() { deferFunc() Eventually(gleak.Goroutines, flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods)) + Eventually(pool.AllRefsCount, flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef()) }) diff --git a/test/integration/standalone/query_ondisk/query_ondisk_suite_test.go b/test/integration/standalone/query_ondisk/query_ondisk_suite_test.go index f6fcb14c..346b8918 100644 --- a/test/integration/standalone/query_ondisk/query_ondisk_suite_test.go +++ b/test/integration/standalone/query_ondisk/query_ondisk_suite_test.go @@ -29,8 +29,10 @@ import ( "github.com/apache/skywalking-banyandb/pkg/grpchelper" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/pool" "github.com/apache/skywalking-banyandb/pkg/test" "github.com/apache/skywalking-banyandb/pkg/test/flags" + "github.com/apache/skywalking-banyandb/pkg/test/gmatcher" "github.com/apache/skywalking-banyandb/pkg/test/helpers" "github.com/apache/skywalking-banyandb/pkg/test/setup" "github.com/apache/skywalking-banyandb/pkg/timestamp" @@ -103,4 +105,5 @@ var _ = SynchronizedAfterSuite(func() { }, func() { deferFunc() Eventually(gleak.Goroutines, flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods)) + Eventually(pool.AllRefsCount, flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef()) })