This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push: new b4fb8ff3 Improve block metadata reading performance (#407) b4fb8ff3 is described below commit b4fb8ff36ef6650ada8846c04a62735cdaa52391 Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Sun Mar 10 13:49:58 2024 +0800 Improve block metadata reading performance (#407) --- banyand/measure/block_metadata.go | 185 +++++++++++++++++++++----------------- banyand/measure/part.go | 10 +-- banyand/measure/part_iter.go | 14 +-- banyand/measure/part_iter_test.go | 4 +- banyand/measure/query.go | 5 +- banyand/measure/query_test.go | 4 +- banyand/measure/tstable.go | 4 +- banyand/measure/tstable_test.go | 4 +- banyand/stream/block_metadata.go | 145 +++++++++++++++++------------- banyand/stream/part.go | 4 +- banyand/stream/part_iter.go | 14 +-- banyand/stream/part_iter_test.go | 5 +- banyand/stream/query.go | 5 +- banyand/stream/query_test.go | 5 +- banyand/stream/tstable.go | 4 +- banyand/stream/tstable_test.go | 4 +- 16 files changed, 234 insertions(+), 182 deletions(-) diff --git a/banyand/measure/block_metadata.go b/banyand/measure/block_metadata.go index 6202c9eb..91584760 100644 --- a/banyand/measure/block_metadata.go +++ b/banyand/measure/block_metadata.go @@ -34,34 +34,34 @@ type dataBlock struct { size uint64 } -func (h *dataBlock) reset() { - h.offset = 0 - h.size = 0 +func (b *dataBlock) reset() { + b.offset = 0 + b.size = 0 } -func (h *dataBlock) copyFrom(src *dataBlock) { - h.offset = src.offset - h.size = src.size +func (b *dataBlock) copyFrom(src *dataBlock) { + b.offset = src.offset + b.size = src.size } -func (h *dataBlock) marshal(dst []byte) []byte { - dst = encoding.VarUint64ToBytes(dst, h.offset) - dst = encoding.VarUint64ToBytes(dst, h.size) +func (b *dataBlock) marshal(dst []byte) []byte { + dst = encoding.VarUint64ToBytes(dst, b.offset) + dst = encoding.VarUint64ToBytes(dst, b.size) return dst } -func (h *dataBlock) unmarshal(src []byte) ([]byte, error) { +func (b *dataBlock) unmarshal(src []byte) ([]byte, error) { src, n, err := encoding.BytesToVarUint64(src) if err != nil { return nil, fmt.Errorf("cannot unmarshal offset: %w", err) } - h.offset = n + b.offset = n src, n, err = encoding.BytesToVarUint64(src) if err != nil { return nil, fmt.Errorf("cannot unmarshal size: %w", err) } - h.size = n + b.size = n return src, nil } @@ -75,84 +75,84 @@ type blockMetadata struct { count uint64 } -func (bh *blockMetadata) copyFrom(src *blockMetadata) { - bh.seriesID = src.seriesID - bh.uncompressedSizeBytes = src.uncompressedSizeBytes - bh.count = src.count - bh.timestamps.copyFrom(&src.timestamps) +func (bm *blockMetadata) copyFrom(src *blockMetadata) { + bm.seriesID = src.seriesID + bm.uncompressedSizeBytes = src.uncompressedSizeBytes + bm.count = src.count + bm.timestamps.copyFrom(&src.timestamps) for k, db := range src.tagFamilies { - if bh.tagFamilies == nil { - bh.tagFamilies = make(map[string]*dataBlock) + if bm.tagFamilies == nil { + bm.tagFamilies = make(map[string]*dataBlock) } - bh.tagFamilies[k] = &dataBlock{} - bh.tagFamilies[k].copyFrom(db) + bm.tagFamilies[k] = &dataBlock{} + bm.tagFamilies[k].copyFrom(db) } - bh.field.copyFrom(&src.field) + bm.field.copyFrom(&src.field) } -func (bh *blockMetadata) getTagFamilyMetadata(name string) *dataBlock { - if bh.tagFamilies == nil { - bh.tagFamilies = make(map[string]*dataBlock) +func (bm *blockMetadata) getTagFamilyMetadata(name string) *dataBlock { + if bm.tagFamilies == nil { + bm.tagFamilies = make(map[string]*dataBlock) } - tf, ok := bh.tagFamilies[name] + tf, ok := bm.tagFamilies[name] if !ok { tf = &dataBlock{} - bh.tagFamilies[name] = tf + bm.tagFamilies[name] = tf } return tf } -func (bh *blockMetadata) reset() { - bh.seriesID = 0 - bh.uncompressedSizeBytes = 0 - bh.count = 0 - bh.timestamps.reset() - bh.field.reset() - for k := range bh.tagFamilies { - bh.tagFamilies[k].reset() - delete(bh.tagFamilies, k) +func (bm *blockMetadata) reset() { + bm.seriesID = 0 + bm.uncompressedSizeBytes = 0 + bm.count = 0 + bm.timestamps.reset() + bm.field.reset() + for k := range bm.tagFamilies { + bm.tagFamilies[k].reset() + delete(bm.tagFamilies, k) } - bh.tagProjection = bh.tagProjection[:0] + bm.tagProjection = bm.tagProjection[:0] } -func (bh *blockMetadata) marshal(dst []byte) []byte { - dst = bh.seriesID.AppendToBytes(dst) - dst = encoding.VarUint64ToBytes(dst, bh.uncompressedSizeBytes) - dst = encoding.VarUint64ToBytes(dst, bh.count) - dst = bh.timestamps.marshal(dst) - dst = encoding.VarUint64ToBytes(dst, uint64(len(bh.tagFamilies))) +func (bm *blockMetadata) marshal(dst []byte) []byte { + dst = bm.seriesID.AppendToBytes(dst) + dst = encoding.VarUint64ToBytes(dst, bm.uncompressedSizeBytes) + dst = encoding.VarUint64ToBytes(dst, bm.count) + dst = bm.timestamps.marshal(dst) + dst = encoding.VarUint64ToBytes(dst, uint64(len(bm.tagFamilies))) // make sure the order of tagFamilies is stable - keys := make([]string, 0, len(bh.tagFamilies)) - for k := range bh.tagFamilies { + keys := make([]string, 0, len(bm.tagFamilies)) + for k := range bm.tagFamilies { keys = append(keys, k) } sort.Strings(keys) for _, name := range keys { - cf := bh.tagFamilies[name] + cf := bm.tagFamilies[name] dst = encoding.EncodeBytes(dst, convert.StringToBytes(name)) dst = cf.marshal(dst) } - return bh.field.marshal(dst) + return bm.field.marshal(dst) } -func (bh *blockMetadata) unmarshal(src []byte) ([]byte, error) { +func (bm *blockMetadata) unmarshal(src []byte) ([]byte, error) { if len(src) < 8 { return nil, errors.New("cannot unmarshal blockMetadata from less than 8 bytes") } - bh.seriesID = common.SeriesID(encoding.BytesToUint64(src)) + bm.seriesID = common.SeriesID(encoding.BytesToUint64(src)) src = src[8:] src, n, err := encoding.BytesToVarUint64(src) if err != nil { return nil, fmt.Errorf("cannot unmarshal uncompressedSizeBytes: %w", err) } - bh.uncompressedSizeBytes = n + bm.uncompressedSizeBytes = n src, n, err = encoding.BytesToVarUint64(src) if err != nil { return nil, fmt.Errorf("cannot unmarshal count: %w", err) } - bh.count = n - src, err = bh.timestamps.unmarshal(src) + bm.count = n + src, err = bm.timestamps.unmarshal(src) if err != nil { return nil, fmt.Errorf("cannot unmarshal timestampsMetadata: %w", err) } @@ -161,8 +161,8 @@ func (bh *blockMetadata) unmarshal(src []byte) ([]byte, error) { return nil, fmt.Errorf("cannot unmarshal tagFamilies count: %w", err) } if n > 0 { - if bh.tagFamilies == nil { - bh.tagFamilies = make(map[string]*dataBlock, n) + if bm.tagFamilies == nil { + bm.tagFamilies = make(map[string]*dataBlock, n) } var nameBytes []byte for i := uint64(0); i < n; i++ { @@ -176,21 +176,21 @@ func (bh *blockMetadata) unmarshal(src []byte) ([]byte, error) { if err != nil { return nil, fmt.Errorf("cannot unmarshal tagFamily dataBlock: %w", err) } - bh.tagFamilies[convert.BytesToString(nameBytes)] = tf + bm.tagFamilies[convert.BytesToString(nameBytes)] = tf } } - src, err = bh.field.unmarshal(src) + src, err = bm.field.unmarshal(src) if err != nil { return nil, fmt.Errorf("cannot unmarshal columnFamilyMetadata: %w", err) } return src, nil } -func (bh blockMetadata) less(other blockMetadata) bool { - if bh.seriesID == other.seriesID { - return bh.timestamps.min < other.timestamps.min +func (bm blockMetadata) less(other blockMetadata) bool { + if bm.seriesID == other.seriesID { + return bm.timestamps.min < other.timestamps.min } - return bh.seriesID < other.seriesID + return bm.seriesID < other.seriesID } func generateBlockMetadata() *blockMetadata { @@ -201,13 +201,32 @@ func generateBlockMetadata() *blockMetadata { return v.(*blockMetadata) } -func releaseBlockMetadata(bh *blockMetadata) { - bh.reset() - blockMetadataPool.Put(bh) +func releaseBlockMetadata(bm *blockMetadata) { + bm.reset() + blockMetadataPool.Put(bm) } var blockMetadataPool sync.Pool +type blockMetadataArray struct { + arr []blockMetadata +} + +var blockMetadataArrayPool sync.Pool + +func generateBlockMetadataArray() *blockMetadataArray { + v := blockMetadataArrayPool.Get() + if v == nil { + return &blockMetadataArray{} + } + return v.(*blockMetadataArray) +} + +func releaseBlockMetadataArray(bma *blockMetadataArray) { + bma.arr = bma.arr[:0] + blockMetadataArrayPool.Put(bma) +} + type timestampsMetadata struct { dataBlock min int64 @@ -215,38 +234,38 @@ type timestampsMetadata struct { encodeType encoding.EncodeType } -func (th *timestampsMetadata) reset() { - th.dataBlock.reset() - th.min = 0 - th.max = 0 - th.encodeType = 0 +func (tm *timestampsMetadata) reset() { + tm.dataBlock.reset() + tm.min = 0 + tm.max = 0 + tm.encodeType = 0 } -func (th *timestampsMetadata) copyFrom(src *timestampsMetadata) { - th.dataBlock.copyFrom(&src.dataBlock) - th.min = src.min - th.max = src.max - th.encodeType = src.encodeType +func (tm *timestampsMetadata) copyFrom(src *timestampsMetadata) { + tm.dataBlock.copyFrom(&src.dataBlock) + tm.min = src.min + tm.max = src.max + tm.encodeType = src.encodeType } -func (th *timestampsMetadata) marshal(dst []byte) []byte { - dst = th.dataBlock.marshal(dst) - dst = encoding.Uint64ToBytes(dst, uint64(th.min)) - dst = encoding.Uint64ToBytes(dst, uint64(th.max)) - dst = append(dst, byte(th.encodeType)) +func (tm *timestampsMetadata) marshal(dst []byte) []byte { + dst = tm.dataBlock.marshal(dst) + dst = encoding.Uint64ToBytes(dst, uint64(tm.min)) + dst = encoding.Uint64ToBytes(dst, uint64(tm.max)) + dst = append(dst, byte(tm.encodeType)) return dst } -func (th *timestampsMetadata) unmarshal(src []byte) ([]byte, error) { - src, err := th.dataBlock.unmarshal(src) +func (tm *timestampsMetadata) unmarshal(src []byte) ([]byte, error) { + src, err := tm.dataBlock.unmarshal(src) if err != nil { return nil, fmt.Errorf("cannot unmarshal dataBlock: %w", err) } - th.min = int64(encoding.BytesToUint64(src)) + tm.min = int64(encoding.BytesToUint64(src)) src = src[8:] - th.max = int64(encoding.BytesToUint64(src)) + tm.max = int64(encoding.BytesToUint64(src)) src = src[8:] - th.encodeType = encoding.EncodeType(src[0]) + tm.encodeType = encoding.EncodeType(src[0]) return src[1:], nil } diff --git a/banyand/measure/part.go b/banyand/measure/part.go index c6019539..20b6a5a7 100644 --- a/banyand/measure/part.go +++ b/banyand/measure/part.go @@ -239,15 +239,11 @@ func (pw *partWrapper) decRef() { if n > 0 { return } - if pw.mp != nil { - releaseMemPart(pw.mp) - pw.mp = nil - pw.p = nil - return - } pw.p.close() if pw.removable.Load() && pw.p.fileSystem != nil { - pw.p.fileSystem.MustRMAll(pw.p.path) + go func(pw *partWrapper) { + pw.p.fileSystem.MustRMAll(pw.p.path) + }(pw) } } diff --git a/banyand/measure/part_iter.go b/banyand/measure/part_iter.go index c44d114a..aa41f6d1 100644 --- a/banyand/measure/part_iter.go +++ b/banyand/measure/part_iter.go @@ -58,10 +58,11 @@ func (pi *partIter) reset() { pi.err = nil } -func (pi *partIter) init(p *part, sids []common.SeriesID, minTimestamp, maxTimestamp int64) { +func (pi *partIter) init(bma *blockMetadataArray, p *part, sids []common.SeriesID, minTimestamp, maxTimestamp int64) { pi.reset() pi.p = p + pi.bms = bma.arr pi.sids = sids pi.minTimestamp = minTimestamp pi.maxTimestamp = maxTimestamp @@ -145,13 +146,13 @@ func (pi *partIter) loadNextBlockMetadata() bool { continue } - bm, err := pi.readPrimaryBlock(pbm) + var err error + pi.bms, err = pi.readPrimaryBlock(pi.bms[:0], pbm) if err != nil { pi.err = fmt.Errorf("cannot read primary block for part %q at offset %d with size %d: %w", &pi.p.partMetadata, pbm.offset, pbm.size, err) return false } - pi.bms = bm return true } pi.err = io.EOF @@ -177,7 +178,7 @@ func searchPBM(pbmIndex []primaryBlockMetadata, sid common.SeriesID) []primaryBl return pbmIndex[n-1:] } -func (pi *partIter) readPrimaryBlock(mr *primaryBlockMetadata) ([]blockMetadata, error) { +func (pi *partIter) readPrimaryBlock(bms []blockMetadata, mr *primaryBlockMetadata) ([]blockMetadata, error) { pi.compressedPrimaryBuf = bytes.ResizeOver(pi.compressedPrimaryBuf, int(mr.size)) fs.MustReadData(pi.p.primary, int64(mr.offset), pi.compressedPrimaryBuf) @@ -186,12 +187,11 @@ func (pi *partIter) readPrimaryBlock(mr *primaryBlockMetadata) ([]blockMetadata, if err != nil { return nil, fmt.Errorf("cannot decompress index block: %w", err) } - bm := make([]blockMetadata, 0) - bm, err = unmarshalBlockMetadata(bm, pi.primaryBuf) + bms, err = unmarshalBlockMetadata(bms, pi.primaryBuf) if err != nil { return nil, fmt.Errorf("cannot unmarshal index block: %w", err) } - return bm, nil + return bms, nil } func (pi *partIter) findBlock() bool { diff --git a/banyand/measure/part_iter_test.go b/banyand/measure/part_iter_test.go index bc2a3301..2451c9b0 100644 --- a/banyand/measure/part_iter_test.go +++ b/banyand/measure/part_iter_test.go @@ -110,12 +110,14 @@ func Test_partIter_nextBlock(t *testing.T) { }, } + bma := generateBlockMetadataArray() + defer releaseBlockMetadataArray(bma) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { verifyPart := func(p *part) { defer p.close() pi := partIter{} - pi.init(p, tt.sids, tt.opt.minTimestamp, tt.opt.maxTimestamp) + pi.init(bma, p, tt.sids, tt.opt.minTimestamp, tt.opt.maxTimestamp) var got []blockMetadata for pi.nextBlock() { diff --git a/banyand/measure/query.go b/banyand/measure/query.go index 66160ae8..bf1c2d7c 100644 --- a/banyand/measure/query.go +++ b/banyand/measure/query.go @@ -110,12 +110,15 @@ func (s *measure) Query(ctx context.Context, mqo pbv1.MeasureQueryOptions) (pbv1 } result.snapshots = append(result.snapshots, s) } + bma := generateBlockMetadataArray() + defer releaseBlockMetadataArray(bma) // TODO: cache tstIter var tstIter tstIter + defer tstIter.reset() originalSids := make([]common.SeriesID, len(sids)) copy(originalSids, sids) sort.Slice(sids, func(i, j int) bool { return sids[i] < sids[j] }) - tstIter.init(parts, sids, qo.minTimestamp, qo.maxTimestamp) + tstIter.init(bma, parts, sids, qo.minTimestamp, qo.maxTimestamp) if tstIter.Error() != nil { return nil, fmt.Errorf("cannot init tstIter: %w", tstIter.Error()) } diff --git a/banyand/measure/query_test.go b/banyand/measure/query_test.go index e88ed586..56111323 100644 --- a/banyand/measure/query_test.go +++ b/banyand/measure/query_test.go @@ -543,6 +543,8 @@ func TestQueryResult(t *testing.T) { }, } + bma := generateBlockMetadataArray() + defer releaseBlockMetadataArray(bma) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { verify := func(t *testing.T, tst *tsTable) { @@ -561,7 +563,7 @@ func TestQueryResult(t *testing.T) { return sids[i] < tt.sids[j] }) ti := &tstIter{} - ti.init(pp, sids, tt.minTimestamp, tt.maxTimestamp) + ti.init(bma, pp, sids, tt.minTimestamp, tt.maxTimestamp) var result queryResult // Query all tags diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go index 96bcfaef..e65aef6e 100644 --- a/banyand/measure/tstable.go +++ b/banyand/measure/tstable.go @@ -299,7 +299,7 @@ func (ti *tstIter) reset() { ti.nextBlockNoop = false } -func (ti *tstIter) init(parts []*part, sids []common.SeriesID, minTimestamp, maxTimestamp int64) { +func (ti *tstIter) init(bma *blockMetadataArray, parts []*part, sids []common.SeriesID, minTimestamp, maxTimestamp int64) { ti.reset() ti.parts = parts @@ -308,7 +308,7 @@ func (ti *tstIter) init(parts []*part, sids []common.SeriesID, minTimestamp, max } ti.piPool = ti.piPool[:len(ti.parts)] for i, p := range ti.parts { - ti.piPool[i].init(p, sids, minTimestamp, maxTimestamp) + ti.piPool[i].init(bma, p, sids, minTimestamp, maxTimestamp) } ti.piHeap = ti.piHeap[:0] diff --git a/banyand/measure/tstable_test.go b/banyand/measure/tstable_test.go index bbbae3a8..a37d1a9f 100644 --- a/banyand/measure/tstable_test.go +++ b/banyand/measure/tstable_test.go @@ -136,6 +136,8 @@ func Test_tstIter(t *testing.T) { minTimestamp int64 maxTimestamp int64 } + bma := generateBlockMetadataArray() + defer releaseBlockMetadataArray(bma) verify := func(t *testing.T, tt testCtx, tst *tsTable) uint64 { defer tst.Close() @@ -147,7 +149,7 @@ func Test_tstIter(t *testing.T) { pp, n := s.getParts(nil, tt.minTimestamp, tt.maxTimestamp) require.Equal(t, len(s.parts), n) ti := &tstIter{} - ti.init(pp, tt.sids, tt.minTimestamp, tt.maxTimestamp) + ti.init(bma, pp, tt.sids, tt.minTimestamp, tt.maxTimestamp) var got []blockMetadata for ti.nextBlock() { if ti.piHeap[0].curBlock.seriesID == 0 { diff --git a/banyand/stream/block_metadata.go b/banyand/stream/block_metadata.go index 3366ac1e..08968b7f 100644 --- a/banyand/stream/block_metadata.go +++ b/banyand/stream/block_metadata.go @@ -34,34 +34,34 @@ type dataBlock struct { size uint64 } -func (h *dataBlock) reset() { - h.offset = 0 - h.size = 0 +func (d *dataBlock) reset() { + d.offset = 0 + d.size = 0 } -func (h *dataBlock) copyFrom(src *dataBlock) { - h.offset = src.offset - h.size = src.size +func (d *dataBlock) copyFrom(src *dataBlock) { + d.offset = src.offset + d.size = src.size } -func (h *dataBlock) marshal(dst []byte) []byte { - dst = encoding.VarUint64ToBytes(dst, h.offset) - dst = encoding.VarUint64ToBytes(dst, h.size) +func (d *dataBlock) marshal(dst []byte) []byte { + dst = encoding.VarUint64ToBytes(dst, d.offset) + dst = encoding.VarUint64ToBytes(dst, d.size) return dst } -func (h *dataBlock) unmarshal(src []byte) ([]byte, error) { +func (d *dataBlock) unmarshal(src []byte) ([]byte, error) { src, n, err := encoding.BytesToVarUint64(src) if err != nil { return nil, fmt.Errorf("cannot unmarshal offset: %w", err) } - h.offset = n + d.offset = n src, n, err = encoding.BytesToVarUint64(src) if err != nil { return nil, fmt.Errorf("cannot unmarshal size: %w", err) } - h.size = n + d.size = n return src, nil } @@ -75,89 +75,89 @@ type blockMetadata struct { count uint64 } -func (bh *blockMetadata) copyFrom(src *blockMetadata) { - bh.seriesID = src.seriesID - bh.uncompressedSizeBytes = src.uncompressedSizeBytes - bh.count = src.count - bh.timestamps.copyFrom(&src.timestamps) - bh.elementIDs.copyFrom(&src.elementIDs) +func (bm *blockMetadata) copyFrom(src *blockMetadata) { + bm.seriesID = src.seriesID + bm.uncompressedSizeBytes = src.uncompressedSizeBytes + bm.count = src.count + bm.timestamps.copyFrom(&src.timestamps) + bm.elementIDs.copyFrom(&src.elementIDs) for k, db := range src.tagFamilies { - if bh.tagFamilies == nil { - bh.tagFamilies = make(map[string]*dataBlock) + if bm.tagFamilies == nil { + bm.tagFamilies = make(map[string]*dataBlock) } - bh.tagFamilies[k] = &dataBlock{} - bh.tagFamilies[k].copyFrom(db) + bm.tagFamilies[k] = &dataBlock{} + bm.tagFamilies[k].copyFrom(db) } } -func (bh *blockMetadata) getTagFamilyMetadata(name string) *dataBlock { - if bh.tagFamilies == nil { - bh.tagFamilies = make(map[string]*dataBlock) +func (bm *blockMetadata) getTagFamilyMetadata(name string) *dataBlock { + if bm.tagFamilies == nil { + bm.tagFamilies = make(map[string]*dataBlock) } - tf, ok := bh.tagFamilies[name] + tf, ok := bm.tagFamilies[name] if !ok { tf = &dataBlock{} - bh.tagFamilies[name] = tf + bm.tagFamilies[name] = tf } return tf } -func (bh *blockMetadata) reset() { - bh.seriesID = 0 - bh.uncompressedSizeBytes = 0 - bh.count = 0 - bh.timestamps.reset() - bh.elementIDs.reset() - for k := range bh.tagFamilies { - bh.tagFamilies[k].reset() - delete(bh.tagFamilies, k) +func (bm *blockMetadata) reset() { + bm.seriesID = 0 + bm.uncompressedSizeBytes = 0 + bm.count = 0 + bm.timestamps.reset() + bm.elementIDs.reset() + for k := range bm.tagFamilies { + bm.tagFamilies[k].reset() + delete(bm.tagFamilies, k) } - bh.tagProjection = bh.tagProjection[:0] + bm.tagProjection = bm.tagProjection[:0] } -func (bh *blockMetadata) marshal(dst []byte) []byte { - dst = bh.seriesID.AppendToBytes(dst) - dst = encoding.VarUint64ToBytes(dst, bh.uncompressedSizeBytes) - dst = encoding.VarUint64ToBytes(dst, bh.count) - dst = bh.timestamps.marshal(dst) - dst = bh.elementIDs.marshal(dst) - dst = encoding.VarUint64ToBytes(dst, uint64(len(bh.tagFamilies))) +func (bm *blockMetadata) marshal(dst []byte) []byte { + dst = bm.seriesID.AppendToBytes(dst) + dst = encoding.VarUint64ToBytes(dst, bm.uncompressedSizeBytes) + dst = encoding.VarUint64ToBytes(dst, bm.count) + dst = bm.timestamps.marshal(dst) + dst = bm.elementIDs.marshal(dst) + dst = encoding.VarUint64ToBytes(dst, uint64(len(bm.tagFamilies))) // make sure the order of tagFamilies is stable - keys := make([]string, 0, len(bh.tagFamilies)) - for k := range bh.tagFamilies { + keys := make([]string, 0, len(bm.tagFamilies)) + for k := range bm.tagFamilies { keys = append(keys, k) } sort.Strings(keys) for _, name := range keys { - cf := bh.tagFamilies[name] + cf := bm.tagFamilies[name] dst = encoding.EncodeBytes(dst, convert.StringToBytes(name)) dst = cf.marshal(dst) } return dst } -func (bh *blockMetadata) unmarshal(src []byte) ([]byte, error) { +func (bm *blockMetadata) unmarshal(src []byte) ([]byte, error) { if len(src) < 8 { return nil, errors.New("cannot unmarshal blockMetadata from less than 8 bytes") } - bh.seriesID = common.SeriesID(encoding.BytesToUint64(src)) + bm.seriesID = common.SeriesID(encoding.BytesToUint64(src)) src = src[8:] src, n, err := encoding.BytesToVarUint64(src) if err != nil { return nil, fmt.Errorf("cannot unmarshal uncompressedSizeBytes: %w", err) } - bh.uncompressedSizeBytes = n + bm.uncompressedSizeBytes = n src, n, err = encoding.BytesToVarUint64(src) if err != nil { return nil, fmt.Errorf("cannot unmarshal count: %w", err) } - bh.count = n - src, err = bh.timestamps.unmarshal(src) + bm.count = n + src, err = bm.timestamps.unmarshal(src) if err != nil { return nil, fmt.Errorf("cannot unmarshal timestampsMetadata: %w", err) } - src, err = bh.elementIDs.unmarshal(src) + src, err = bm.elementIDs.unmarshal(src) if err != nil { return nil, fmt.Errorf("cannot unmarshal elementIDsMetadata: %w", err) } @@ -166,8 +166,8 @@ func (bh *blockMetadata) unmarshal(src []byte) ([]byte, error) { return nil, fmt.Errorf("cannot unmarshal tagFamilies count: %w", err) } if n > 0 { - if bh.tagFamilies == nil { - bh.tagFamilies = make(map[string]*dataBlock, n) + if bm.tagFamilies == nil { + bm.tagFamilies = make(map[string]*dataBlock, n) } var nameBytes []byte for i := uint64(0); i < n; i++ { @@ -181,7 +181,7 @@ func (bh *blockMetadata) unmarshal(src []byte) ([]byte, error) { if err != nil { return nil, fmt.Errorf("cannot unmarshal tagFamily dataBlock: %w", err) } - bh.tagFamilies[convert.BytesToString(nameBytes)] = tf + bm.tagFamilies[convert.BytesToString(nameBytes)] = tf } } if err != nil { @@ -190,11 +190,11 @@ func (bh *blockMetadata) unmarshal(src []byte) ([]byte, error) { return src, nil } -func (bh blockMetadata) less(other blockMetadata) bool { - if bh.seriesID == other.seriesID { - return bh.timestamps.min < other.timestamps.min +func (bm blockMetadata) less(other blockMetadata) bool { + if bm.seriesID == other.seriesID { + return bm.timestamps.min < other.timestamps.min } - return bh.seriesID < other.seriesID + return bm.seriesID < other.seriesID } func generateBlockMetadata() *blockMetadata { @@ -205,13 +205,32 @@ func generateBlockMetadata() *blockMetadata { return v.(*blockMetadata) } -func releaseBlockMetadata(bh *blockMetadata) { - bh.reset() - blockMetadataPool.Put(bh) +func releaseBlockMetadata(bm *blockMetadata) { + bm.reset() + blockMetadataPool.Put(bm) } var blockMetadataPool sync.Pool +type blockMetadataArray struct { + arr []blockMetadata +} + +var blockMetadataArrayPool sync.Pool + +func generateBlockMetadataArray() *blockMetadataArray { + v := blockMetadataArrayPool.Get() + if v == nil { + return &blockMetadataArray{} + } + return v.(*blockMetadataArray) +} + +func releaseBlockMetadataArray(bma *blockMetadataArray) { + bma.arr = bma.arr[:0] + blockMetadataArrayPool.Put(bma) +} + type timestampsMetadata struct { dataBlock min int64 diff --git a/banyand/stream/part.go b/banyand/stream/part.go index 95a12297..59af3462 100644 --- a/banyand/stream/part.go +++ b/banyand/stream/part.go @@ -410,7 +410,9 @@ func (pw *partWrapper) decRef() { } pw.p.close() if pw.removable.Load() && pw.p.fileSystem != nil { - pw.p.fileSystem.MustRMAll(pw.p.path) + go func(pw *partWrapper) { + pw.p.fileSystem.MustRMAll(pw.p.path) + }(pw) } } diff --git a/banyand/stream/part_iter.go b/banyand/stream/part_iter.go index 9cd9df3a..1b53439a 100644 --- a/banyand/stream/part_iter.go +++ b/banyand/stream/part_iter.go @@ -58,10 +58,11 @@ func (pi *partIter) reset() { pi.err = nil } -func (pi *partIter) init(p *part, sids []common.SeriesID, minTimestamp, maxTimestamp int64) { +func (pi *partIter) init(bma *blockMetadataArray, p *part, sids []common.SeriesID, minTimestamp, maxTimestamp int64) { pi.reset() pi.p = p + pi.bms = bma.arr pi.sids = sids pi.minTimestamp = minTimestamp pi.maxTimestamp = maxTimestamp @@ -145,13 +146,13 @@ func (pi *partIter) loadNextBlockMetadata() bool { continue } - bm, err := pi.readPrimaryBlock(pbm) + var err error + pi.bms, err = pi.readPrimaryBlock(pi.bms[:0], pbm) if err != nil { pi.err = fmt.Errorf("cannot read primary block for part %q at offset %d with size %d: %w", &pi.p.partMetadata, pbm.offset, pbm.size, err) return false } - pi.bms = bm return true } pi.err = io.EOF @@ -177,7 +178,7 @@ func searchPBM(pbmIndex []primaryBlockMetadata, sid common.SeriesID) []primaryBl return pbmIndex[n-1:] } -func (pi *partIter) readPrimaryBlock(mr *primaryBlockMetadata) ([]blockMetadata, error) { +func (pi *partIter) readPrimaryBlock(bms []blockMetadata, mr *primaryBlockMetadata) ([]blockMetadata, error) { pi.compressedPrimaryBuf = bytes.ResizeOver(pi.compressedPrimaryBuf, int(mr.size)) fs.MustReadData(pi.p.primary, int64(mr.offset), pi.compressedPrimaryBuf) @@ -186,12 +187,11 @@ func (pi *partIter) readPrimaryBlock(mr *primaryBlockMetadata) ([]blockMetadata, if err != nil { return nil, fmt.Errorf("cannot decompress index block: %w", err) } - bm := make([]blockMetadata, 0) - bm, err = unmarshalBlockMetadata(bm, pi.primaryBuf) + bms, err = unmarshalBlockMetadata(bms, pi.primaryBuf) if err != nil { return nil, fmt.Errorf("cannot unmarshal index block: %w", err) } - return bm, nil + return bms, nil } func (pi *partIter) findBlock() bool { diff --git a/banyand/stream/part_iter_test.go b/banyand/stream/part_iter_test.go index b0cc2174..8a600cc4 100644 --- a/banyand/stream/part_iter_test.go +++ b/banyand/stream/part_iter_test.go @@ -109,13 +109,14 @@ func Test_partIter_nextBlock(t *testing.T) { wantErr: nil, }, } - + bma := generateBlockMetadataArray() + defer releaseBlockMetadataArray(bma) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { verifyPart := func(p *part) { defer p.close() pi := partIter{} - pi.init(p, tt.sids, tt.opt.minTimestamp, tt.opt.maxTimestamp) + pi.init(bma, p, tt.sids, tt.opt.minTimestamp, tt.opt.maxTimestamp) var got []blockMetadata for pi.nextBlock() { diff --git a/banyand/stream/query.go b/banyand/stream/query.go index f8ae2167..92aca49b 100644 --- a/banyand/stream/query.go +++ b/banyand/stream/query.go @@ -546,12 +546,15 @@ func (s *stream) Query(ctx context.Context, sqo pbv1.StreamQueryOptions) (pbv1.S } result.snapshots = append(result.snapshots, s) } + bma := generateBlockMetadataArray() + defer releaseBlockMetadataArray(bma) // TODO: cache tstIter var tstIter tstIter + defer tstIter.reset() originalSids := make([]common.SeriesID, len(sids)) copy(originalSids, sids) sort.Slice(sids, func(i, j int) bool { return sids[i] < sids[j] }) - tstIter.init(parts, sids, qo.minTimestamp, qo.maxTimestamp) + tstIter.init(bma, parts, sids, qo.minTimestamp, qo.maxTimestamp) if tstIter.Error() != nil { return nil, fmt.Errorf("cannot init tstIter: %w", tstIter.Error()) } diff --git a/banyand/stream/query_test.go b/banyand/stream/query_test.go index 4d4d2bc5..fa3e1c61 100644 --- a/banyand/stream/query_test.go +++ b/banyand/stream/query_test.go @@ -326,7 +326,8 @@ func TestQueryResult(t *testing.T) { }}, }, } - + bma := generateBlockMetadataArray() + defer releaseBlockMetadataArray(bma) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { verify := func(t *testing.T, tst *tsTable) { @@ -345,7 +346,7 @@ func TestQueryResult(t *testing.T) { return sids[i] < tt.sids[j] }) ti := &tstIter{} - ti.init(pp, sids, tt.minTimestamp, tt.maxTimestamp) + ti.init(bma, pp, sids, tt.minTimestamp, tt.maxTimestamp) var result queryResult for ti.nextBlock() { diff --git a/banyand/stream/tstable.go b/banyand/stream/tstable.go index 35610207..272c956c 100644 --- a/banyand/stream/tstable.go +++ b/banyand/stream/tstable.go @@ -327,7 +327,7 @@ func (ti *tstIter) reset() { ti.nextBlockNoop = false } -func (ti *tstIter) init(parts []*part, sids []common.SeriesID, minTimestamp, maxTimestamp int64) { +func (ti *tstIter) init(bma *blockMetadataArray, parts []*part, sids []common.SeriesID, minTimestamp, maxTimestamp int64) { ti.reset() ti.parts = parts @@ -336,7 +336,7 @@ func (ti *tstIter) init(parts []*part, sids []common.SeriesID, minTimestamp, max } ti.piPool = ti.piPool[:len(ti.parts)] for i, p := range ti.parts { - ti.piPool[i].init(p, sids, minTimestamp, maxTimestamp) + ti.piPool[i].init(bma, p, sids, minTimestamp, maxTimestamp) } ti.piHeap = ti.piHeap[:0] diff --git a/banyand/stream/tstable_test.go b/banyand/stream/tstable_test.go index cec9d389..de3bef3b 100644 --- a/banyand/stream/tstable_test.go +++ b/banyand/stream/tstable_test.go @@ -135,6 +135,8 @@ func Test_tstIter(t *testing.T) { minTimestamp int64 maxTimestamp int64 } + bma := generateBlockMetadataArray() + defer releaseBlockMetadataArray(bma) verify := func(t *testing.T, tt testCtx, tst *tsTable) uint64 { defer tst.Close() @@ -146,7 +148,7 @@ func Test_tstIter(t *testing.T) { pp, n := s.getParts(nil, tt.minTimestamp, tt.maxTimestamp) require.Equal(t, len(s.parts), n) ti := &tstIter{} - ti.init(pp, tt.sids, tt.minTimestamp, tt.maxTimestamp) + ti.init(bma, pp, tt.sids, tt.minTimestamp, tt.maxTimestamp) var got []blockMetadata for ti.nextBlock() { if ti.piHeap[0].curBlock.seriesID == 0 {