This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new f6f1f6ce Improve sidx scan efficiency (#844)
f6f1f6ce is described below
commit f6f1f6cee1230d00ecc06a879ced2e8e140c5035
Author: Gao Hongtao <[email protected]>
AuthorDate: Fri Nov 14 09:34:23 2025 +0800
Improve sidx scan efficiency (#844)
---
banyand/internal/encoding/tag_encoder.go | 27 ++
banyand/internal/sidx/block.go | 157 ++++++---
banyand/internal/sidx/block_scanner.go | 146 +++++----
banyand/internal/sidx/block_test.go | 15 +-
banyand/internal/sidx/element.go | 40 ++-
banyand/internal/sidx/iter.go | 205 ++++++++----
banyand/internal/sidx/iter_test.go | 311 ++++++++++--------
banyand/internal/sidx/merge_test.go | 83 ++---
banyand/internal/sidx/part_key_iter.go | 489 ++++++++++++++++++++++++++++
banyand/internal/sidx/part_key_iter_test.go | 483 +++++++++++++++++++++++++++
banyand/internal/sidx/query.go | 49 ++-
banyand/internal/sidx/query_result.go | 49 ++-
banyand/internal/sidx/query_test.go | 41 ++-
banyand/internal/sidx/sidx.go | 276 ++++++++++------
banyand/internal/sidx/tag.go | 118 ++++---
banyand/internal/sidx/tag_filter_op.go | 4 +-
banyand/internal/sidx/tag_test.go | 98 ++++++
banyand/stream/block.go | 2 +-
banyand/stream/tag_filter.go | 4 +-
banyand/trace/block_writer.go | 4 +-
banyand/trace/bloom_filter.go | 4 +-
banyand/trace/query.go | 5 +-
banyand/trace/snapshot_test.go | 6 +-
banyand/trace/traces.go | 27 --
pkg/filter/bloom_filter.go | 41 ++-
pkg/filter/bloom_filter_test.go | 16 +
26 files changed, 2080 insertions(+), 620 deletions(-)
diff --git a/banyand/internal/encoding/tag_encoder.go
b/banyand/internal/encoding/tag_encoder.go
index 4800572d..2e375367 100644
--- a/banyand/internal/encoding/tag_encoder.go
+++ b/banyand/internal/encoding/tag_encoder.go
@@ -22,6 +22,7 @@ package encoding
import (
stdbytes "bytes"
+ "errors"
"github.com/apache/skywalking-banyandb/pkg/bytes"
"github.com/apache/skywalking-banyandb/pkg/convert"
@@ -113,6 +114,32 @@ func MarshalVarArray(dest, src []byte) []byte {
return dest
}
+// UnmarshalVarArray unmarshals a variable-length array into a byte slice.
+func UnmarshalVarArray(dest, src []byte) ([]byte, []byte, error) {
+ if len(src) == 0 {
+ return nil, nil, errors.New("empty entity value")
+ }
+ if src[0] == EntityDelimiter {
+ return dest, src[1:], nil
+ }
+ for len(src) > 0 {
+ switch {
+ case src[0] == Escape:
+ if len(src) < 2 {
+ return nil, nil, errors.New("invalid escape
character")
+ }
+ src = src[1:]
+ dest = append(dest, src[0])
+ case src[0] == EntityDelimiter:
+ return dest, src[1:], nil
+ default:
+ dest = append(dest, src[0])
+ }
+ src = src[1:]
+ }
+ return nil, nil, errors.New("invalid variable array")
+}
+
// EncodeTagValues encodes tag values based on the value type with optimal
compression.
// For int64: uses delta encoding with first value storage.
// For float64: converts to decimal integers with exponent, then delta
encoding.
diff --git a/banyand/internal/sidx/block.go b/banyand/internal/sidx/block.go
index bfd592b3..e5e77d2f 100644
--- a/banyand/internal/sidx/block.go
+++ b/banyand/internal/sidx/block.go
@@ -25,6 +25,7 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
internalencoding
"github.com/apache/skywalking-banyandb/banyand/internal/encoding"
"github.com/apache/skywalking-banyandb/pkg/bytes"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
@@ -109,7 +110,7 @@ func (b *block) mustInitFromTags(elementTags [][]*tag) {
func (b *block) processTag(tagName string, elementTags [][]*tag) {
td := generateTagData()
td.name = tagName
- td.values = make([][]byte, len(b.userKeys))
+ td.values = make([]tagRow, len(b.userKeys))
var valueType pbv1.ValueType
// Collect values for this tag across all elements
@@ -117,43 +118,25 @@ func (b *block) processTag(tagName string, elementTags
[][]*tag) {
found := false
for _, tag := range tags {
if tag.name == tagName {
- td.values[i] = tag.marshal()
+ // Store structured tagRow instead of marshaled
bytes
+ if tag.valueArr != nil {
+ td.values[i].valueArr = tag.valueArr
+ } else {
+ td.values[i].value = tag.value
+ }
valueType = tag.valueType
found = true
break
}
}
if !found {
- td.values[i] = nil // Missing tag value
+ // Missing tag value - leave as zero value
+ td.values[i] = tagRow{}
}
}
td.valueType = valueType
- // Create bloom filter for indexed tags
- td.filter = generateBloomFilter(len(b.userKeys))
- for _, tags := range elementTags {
- for _, tag := range tags {
- if tag.name == tagName {
- if tag.valueArr != nil {
- for _, v := range tag.valueArr {
- if v != nil {
- td.filter.Add(v)
- }
- }
- } else if tag.value != nil {
- td.filter.Add(tag.value)
- }
- break
- }
- }
- }
-
- // Update min/max for int64 tags
- if valueType == pbv1.ValueTypeInt64 {
- td.updateMinMax()
- }
-
b.tags[tagName] = td
}
@@ -197,9 +180,13 @@ func (b *block) uncompressedSizeBytes() uint64 {
// Add tag data sizes
for tagName, tagData := range b.tags {
nameSize := uint64(len(tagName))
- for _, value := range tagData.values {
- if value != nil {
- size += nameSize + uint64(len(value))
+ for _, row := range tagData.values {
+ if row.valueArr != nil {
+ for _, v := range row.valueArr {
+ size += nameSize + uint64(len(v))
+ }
+ } else if row.value != nil {
+ size += nameSize + uint64(len(row.value))
}
}
}
@@ -267,6 +254,16 @@ func (b *block) mustWriteTag(tagName string, td *tagData,
bm *blockMetadata, ww
tm.name = tagName
tm.valueType = td.valueType
+ // Marshal tagRow values to tmpBytes buffer for encoding
+ if cap(td.tmpBytes) < len(td.values) {
+ td.tmpBytes = make([][]byte, len(td.values))
+ } else {
+ td.tmpBytes = td.tmpBytes[:len(td.values)]
+ }
+ for i := range td.values {
+ td.tmpBytes[i] = marshalTagRow(&td.values[i], td.valueType)
+ }
+
// Write tag values to data file
bb := bigValuePool.Get()
if bb == nil {
@@ -278,7 +275,7 @@ func (b *block) mustWriteTag(tagName string, td *tagData,
bm *blockMetadata, ww
}()
// Encode tag values using the encoding module
- err := internalencoding.EncodeTagValues(bb, td.values, td.valueType)
+ err := internalencoding.EncodeTagValues(bb, td.tmpBytes, td.valueType)
if err != nil {
panic(fmt.Sprintf("failed to encode tag values: %v", err))
}
@@ -288,18 +285,82 @@ func (b *block) mustWriteTag(tagName string, td *tagData,
bm *blockMetadata, ww
tm.dataBlock.size = uint64(len(bb.Buf))
tdw.MustWrite(bb.Buf)
- // Write bloom filter
- if td.filter != nil {
- filterData := encodeBloomFilter(nil, td.filter)
- tm.filterBlock.offset = tfw.bytesWritten
- tm.filterBlock.size = uint64(len(filterData))
- tfw.MustWrite(filterData)
+ // Create and write bloom filter at write time using unique values
+ uniqueValues := td.uniqueValues
+ if uniqueValues == nil {
+ uniqueValues = make(map[string]struct{})
+ td.uniqueValues = uniqueValues
+ } else {
+ for k := range uniqueValues {
+ delete(uniqueValues, k)
+ }
}
- // Set min/max for int64 tags
- if td.valueType == pbv1.ValueTypeInt64 {
- tm.min = td.min
- tm.max = td.max
+ var (
+ minVal int64
+ maxVal int64
+ hasMinMax bool
+ )
+
+ updateMinMax := func(v []byte) {
+ if td.valueType != pbv1.ValueTypeInt64 {
+ return
+ }
+ if len(v) != 8 {
+ return
+ }
+ val := encoding.BytesToInt64(v)
+ if !hasMinMax {
+ minVal = val
+ maxVal = val
+ hasMinMax = true
+ return
+ }
+ if val < minVal {
+ minVal = val
+ }
+ if val > maxVal {
+ maxVal = val
+ }
+ }
+
+ addUnique := func(v []byte) {
+ if v == nil {
+ return
+ }
+ updateMinMax(v)
+ key := convert.BytesToString(v)
+ if _, exists := uniqueValues[key]; exists {
+ return
+ }
+ uniqueValues[key] = struct{}{}
+ }
+
+ for i := range td.values {
+ if td.values[i].valueArr != nil {
+ for _, v := range td.values[i].valueArr {
+ addUnique(v)
+ }
+ continue
+ }
+ addUnique(td.values[i].value)
+ }
+
+ bf := generateBloomFilter(len(uniqueValues))
+ for v := range uniqueValues {
+ bf.Add(convert.StringToBytes(v))
+ }
+
+ bb.Buf = encodeBloomFilter(bb.Buf[:0], bf)
+ tm.filterBlock.offset = tfw.bytesWritten
+ tm.filterBlock.size = uint64(len(bb.Buf))
+ tfw.MustWrite(bb.Buf)
+ releaseBloomFilter(bf)
+
+ // Compute min/max for int64 tags during unique value iteration
+ if td.valueType == pbv1.ValueTypeInt64 && hasMinMax {
+ tm.min = encoding.Int64ToBytes(nil, minVal)
+ tm.max = encoding.Int64ToBytes(nil, maxVal)
}
// Marshal and write tag metadata
@@ -432,7 +493,7 @@ func fullTagAppend(bi, b *blockPointer, offset int) {
for _, t := range b.tags {
newTagData := tagData{name: t.name, valueType:
t.valueType}
for j := 0; j < existDataSize; j++ {
- newTagData.values = append(newTagData.values,
nil)
+ newTagData.values = append(newTagData.values,
tagRow{})
}
assertIdxAndOffset(t.name, len(t.values), b.idx, offset)
newTagData.values = append(newTagData.values,
t.values[b.idx:offset]...)
@@ -448,7 +509,7 @@ func fullTagAppend(bi, b *blockPointer, offset int) {
} else {
newTagData := tagData{name: t.name, valueType:
t.valueType}
for j := 0; j < existDataSize; j++ {
- newTagData.values = append(newTagData.values,
nil)
+ newTagData.values = append(newTagData.values,
tagRow{})
}
assertIdxAndOffset(t.name, len(t.values), b.idx, offset)
newTagData.values = append(newTagData.values,
t.values[b.idx:offset]...)
@@ -465,7 +526,7 @@ func fullTagAppend(bi, b *blockPointer, offset int) {
for _, t := range bi.tags {
if _, exists := sourceTags[t.name]; !exists {
for j := 0; j < emptySize; j++ {
- bi.tags[t.name].values =
append(bi.tags[t.name].values, nil)
+ bi.tags[t.name].values =
append(bi.tags[t.name].values, tagRow{})
}
}
}
@@ -608,11 +669,13 @@ func (b *block) readSingleTag(decoder
*encoding.BytesBlockDecoder, sr *seqReader
td := generateTagData()
td.name = tagName
td.valueType = tm.valueType
- td.values, err = internalencoding.DecodeTagValues(td.values[:0],
decoder, bb, tm.valueType, count)
- if err != nil {
+
+ // Decode and convert tag values using common helper
+ if err := decodeAndConvertTagValues(td, decoder, bb, tm.valueType,
count); err != nil {
releaseTagData(td)
- return fmt.Errorf("cannot decode tag values: %w", err)
+ return err
}
+
b.tags[tagName] = td
return nil
}
diff --git a/banyand/internal/sidx/block_scanner.go
b/banyand/internal/sidx/block_scanner.go
index 5cedd7fe..a7b39daa 100644
--- a/banyand/internal/sidx/block_scanner.go
+++ b/banyand/internal/sidx/block_scanner.go
@@ -84,6 +84,7 @@ type blockScanner struct {
minKey int64
maxKey int64
asc bool
+ batchSize int
}
func (bsn *blockScanner) scan(ctx context.Context, blockCh chan
*blockScanResultBatch) {
@@ -91,85 +92,59 @@ func (bsn *blockScanner) scan(ctx context.Context, blockCh
chan *blockScanResult
return
}
- // Check for context cancellation before starting expensive operations
- select {
- case <-ctx.Done():
+ if !bsn.checkContext(ctx) {
return
- default:
}
- bma := generateBlockMetadataArray()
- defer releaseBlockMetadataArray(bma)
-
it := generateIter()
defer releaseIter(it)
- it.init(bma, bsn.parts, bsn.seriesIDs, bsn.minKey, bsn.maxKey,
bsn.filter)
+ it.init(bsn.parts, bsn.seriesIDs, bsn.minKey, bsn.maxKey, bsn.filter,
bsn.asc)
batch := generateBlockScanResultBatch()
if it.Error() != nil {
batch.err = fmt.Errorf("cannot init iter: %w", it.Error())
- select {
- case blockCh <- batch:
- case <-ctx.Done():
- releaseBlockScanResultBatch(batch)
- }
+ bsn.sendBatch(ctx, blockCh, batch)
return
}
+ batchThreshold := bsn.batchSize
+ if batchThreshold <= 0 {
+ batchThreshold = blockScannerBatchSize
+ }
+
var totalBlockBytes uint64
for it.nextBlock() {
- // Check for context cancellation during iteration
- select {
- case <-ctx.Done():
+ if !bsn.checkContext(ctx) {
releaseBlockScanResultBatch(batch)
return
- default:
}
- p := it.piHeap[0]
+ bm, p := it.current()
+ if err := bsn.validateBlockMetadata(bm, p, it); err != nil {
+ batch.err = err
+ bsn.sendBatch(ctx, blockCh, batch)
+ return
+ }
- // Get block size before adding to batch
- blockSize := p.curBlock.uncompressedSize
+ blockSize := bm.uncompressedSize
// Check if adding this block would exceed quota
- quota := bsn.pm.AvailableBytes()
- if quota >= 0 && totalBlockBytes+blockSize > uint64(quota) {
- if len(batch.bss) > 0 {
- // Send current batch without error
- select {
- case blockCh <- batch:
- case <-ctx.Done():
- releaseBlockScanResultBatch(batch)
- }
- return
- }
- // Batch is empty, send error
- err := fmt.Errorf("sidx block scan quota exceeded:
block size %s, quota is %s", humanize.Bytes(blockSize),
humanize.Bytes(uint64(quota)))
- batch.err = err
- select {
- case blockCh <- batch:
- case <-ctx.Done():
- releaseBlockScanResultBatch(batch)
- return
+ if exceeded, err := bsn.checkQuotaExceeded(totalBlockBytes,
blockSize, batch); exceeded {
+ if err != nil {
+ batch.err = err
}
+ bsn.sendBatch(ctx, blockCh, batch)
return
}
// Quota OK, add block to batch
- batch.bss = append(batch.bss, blockScanResult{
- p: p.p,
- })
- bs := &batch.bss[len(batch.bss)-1]
- bs.bm.copyFrom(p.curBlock)
+ bsn.addBlockToBatch(batch, bm, p)
totalBlockBytes += blockSize
// Check if batch is full
- if len(batch.bss) >= cap(batch.bss) {
- select {
- case blockCh <- batch:
- case <-ctx.Done():
- releaseBlockScanResultBatch(batch)
+ if len(batch.bss) >= batchThreshold || len(batch.bss) >=
cap(batch.bss) {
+ if !bsn.sendBatch(ctx, blockCh, batch) {
if dl := bsn.l.Debug(); dl.Enabled() {
dl.Int("batch.len",
len(batch.bss)).Msg("context canceled while sending block")
}
@@ -181,26 +156,79 @@ func (bsn *blockScanner) scan(ctx context.Context,
blockCh chan *blockScanResult
if it.Error() != nil {
batch.err = fmt.Errorf("cannot iterate iter: %w", it.Error())
- select {
- case blockCh <- batch:
- case <-ctx.Done():
- releaseBlockScanResultBatch(batch)
- }
+ bsn.sendBatch(ctx, blockCh, batch)
return
}
if len(batch.bss) > 0 {
- select {
- case blockCh <- batch:
- case <-ctx.Done():
- releaseBlockScanResultBatch(batch)
- }
+ bsn.sendBatch(ctx, blockCh, batch)
return
}
releaseBlockScanResultBatch(batch)
}
+// checkContext returns false if context is canceled.
+func (bsn *blockScanner) checkContext(ctx context.Context) bool {
+ select {
+ case <-ctx.Done():
+ return false
+ default:
+ return true
+ }
+}
+
+// sendBatch sends a batch to the channel, handling context cancellation.
+// Returns false if context was canceled, true otherwise.
+func (bsn *blockScanner) sendBatch(ctx context.Context, blockCh chan
*blockScanResultBatch, batch *blockScanResultBatch) bool {
+ select {
+ case blockCh <- batch:
+ return true
+ case <-ctx.Done():
+ releaseBlockScanResultBatch(batch)
+ return false
+ }
+}
+
+// validateBlockMetadata checks if block metadata and part are valid.
+func (bsn *blockScanner) validateBlockMetadata(bm *blockMetadata, p *part, it
*iter) error {
+ if bm == nil {
+ it.err = fmt.Errorf("sidx iterator returned nil block")
+ return it.err
+ }
+ if p == nil {
+ it.err = fmt.Errorf("block missing part reference")
+ return it.err
+ }
+ return nil
+}
+
+// checkQuotaExceeded checks if adding a block would exceed the memory quota.
+// Returns (exceeded, error) where exceeded is true if quota would be exceeded.
+func (bsn *blockScanner) checkQuotaExceeded(totalBlockBytes, blockSize uint64,
batch *blockScanResultBatch) (bool, error) {
+ quota := bsn.pm.AvailableBytes()
+ if quota < 0 || totalBlockBytes+blockSize <= uint64(quota) {
+ return false, nil
+ }
+
+ // Quota would be exceeded
+ if len(batch.bss) > 0 {
+ // Send current batch without error
+ return true, nil
+ }
+
+ // Batch is empty, return error
+ return true, fmt.Errorf("sidx block scan quota exceeded: block size %s,
quota is %s",
+ humanize.Bytes(blockSize), humanize.Bytes(uint64(quota)))
+}
+
+// addBlockToBatch adds a block to the batch.
+func (bsn *blockScanner) addBlockToBatch(batch *blockScanResultBatch, bm
*blockMetadata, p *part) {
+ batch.bss = append(batch.bss, blockScanResult{p: p})
+ bs := &batch.bss[len(batch.bss)-1]
+ bs.bm.copyFrom(bm)
+}
+
func (bsn *blockScanner) close() {
for i := range bsn.finalizers {
bsn.finalizers[i]()
diff --git a/banyand/internal/sidx/block_test.go
b/banyand/internal/sidx/block_test.go
index 64627ef4..7e96305f 100644
--- a/banyand/internal/sidx/block_test.go
+++ b/banyand/internal/sidx/block_test.go
@@ -158,9 +158,14 @@ func TestBlock_ProcessTag_WithArrValues(t *testing.T) {
b.processTag("arr_tag", elementTags)
- assert.Equal(t, "a|b|", string(b.tags["arr_tag"].values[0]))
- assert.Equal(t, "c", string(b.tags["arr_tag"].values[1]))
- assert.True(t, b.tags["arr_tag"].filter.MightContain([]byte("a")))
- assert.True(t, b.tags["arr_tag"].filter.MightContain([]byte("b")))
- assert.True(t, b.tags["arr_tag"].filter.MightContain([]byte("c")))
+ // Check the first element has valueArr
+ assert.NotNil(t, b.tags["arr_tag"].values[0].valueArr)
+ assert.Equal(t, 2, len(b.tags["arr_tag"].values[0].valueArr))
+ assert.Equal(t, "a", string(b.tags["arr_tag"].values[0].valueArr[0]))
+ assert.Equal(t, "b", string(b.tags["arr_tag"].values[0].valueArr[1]))
+
+ // Check the second element has value
+ assert.Equal(t, "c", string(b.tags["arr_tag"].values[1].value))
+
+ // Note: bloom filter is now created at write time, not during
processTag
}
diff --git a/banyand/internal/sidx/element.go b/banyand/internal/sidx/element.go
index d9987e7e..470174af 100644
--- a/banyand/internal/sidx/element.go
+++ b/banyand/internal/sidx/element.go
@@ -23,6 +23,7 @@ package sidx
import (
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/banyand/internal/encoding"
+ "github.com/apache/skywalking-banyandb/pkg/bytes"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/pool"
)
@@ -51,20 +52,37 @@ func (t *tag) reset() {
t.valueType = pbv1.ValueTypeUnknown
}
-// marshal marshals the tag value to a byte slice.
-func (t *tag) marshal() []byte {
- if t.valueArr != nil {
- var dst []byte
- for i := range t.valueArr {
- if t.valueType == pbv1.ValueTypeInt64Arr {
- dst = append(dst, t.valueArr[i]...)
- continue
+func unmarshalTag(dest [][]byte, src []byte, valueType pbv1.ValueType)
([][]byte, error) {
+ if valueType == pbv1.ValueTypeInt64Arr {
+ for i := 0; i < len(src); i += 8 {
+ dest = append(dest, src[i:i+8])
+ }
+ return dest, nil
+ }
+ if valueType == pbv1.ValueTypeStrArr {
+ bb := bigValuePool.Get()
+ if bb == nil {
+ bb = &bytes.Buffer{}
+ }
+ defer func() {
+ bb.Buf = bb.Buf[:0]
+ bigValuePool.Put(bb)
+ }()
+ var err error
+ for len(src) > 0 {
+ bb.Buf, src, err =
encoding.UnmarshalVarArray(bb.Buf[:0], src)
+ if err != nil {
+ return nil, err
}
- dst = encoding.MarshalVarArray(dst, t.valueArr[i])
+ // Make a copy since bb.Buf will be reused
+ valueCopy := make([]byte, len(bb.Buf))
+ copy(valueCopy, bb.Buf)
+ dest = append(dest, valueCopy)
}
- return dst
+ return dest, nil
}
- return t.value
+ dest = append(dest, src)
+ return dest, nil
}
// reset elements collection for pooling.
diff --git a/banyand/internal/sidx/iter.go b/banyand/internal/sidx/iter.go
index dba836f2..e48bd4a6 100644
--- a/banyand/internal/sidx/iter.go
+++ b/banyand/internal/sidx/iter.go
@@ -29,11 +29,21 @@ import (
)
type iter struct {
- err error
- parts []*part
- piPool []partIter
- piHeap partIterHeap
- nextBlockNoop bool
+ err error
+ curBlock *blockMetadata
+ curPart *part
+ parts []*part
+ partIters []*partKeyIter
+ heap partKeyIterHeap
+ asc bool
+}
+
+func (it *iter) releaseCurBlock() {
+ if it.curBlock != nil {
+ releaseBlockMetadata(it.curBlock)
+ it.curBlock = nil
+ }
+ it.curPart = nil
}
func (it *iter) reset() {
@@ -42,88 +52,132 @@ func (it *iter) reset() {
}
it.parts = it.parts[:0]
- for i := range it.piPool {
- it.piPool[i].reset()
+ for i := range it.partIters {
+ if it.partIters[i] != nil {
+ releasePartKeyIter(it.partIters[i])
+ it.partIters[i] = nil
+ }
}
- it.piPool = it.piPool[:0]
+ it.partIters = it.partIters[:0]
- for i := range it.piHeap {
- it.piHeap[i] = nil
+ for i := range it.heap {
+ it.heap[i] = nil
}
- it.piHeap = it.piHeap[:0]
+ it.heap = it.heap[:0]
+
+ it.releaseCurBlock()
it.err = nil
- it.nextBlockNoop = false
+ it.asc = false
}
-func (it *iter) init(bma *blockMetadataArray, parts []*part, sids
[]common.SeriesID, minKey, maxKey int64, blockFilter index.Filter) {
+func (it *iter) init(parts []*part, sids []common.SeriesID, minKey, maxKey
int64, blockFilter index.Filter, asc bool) {
it.reset()
- it.parts = parts
+ it.parts = append(it.parts[:0], parts...)
+ it.asc = asc
- if n := len(it.parts) - cap(it.piPool); n > 0 {
- it.piPool = append(it.piPool[:cap(it.piPool)], make([]partIter,
n)...)
- }
- it.piPool = it.piPool[:len(it.parts)]
- for i, p := range it.parts {
- it.piPool[i].init(bma, p, sids, minKey, maxKey, blockFilter)
+ if cap(it.partIters) < len(parts) {
+ it.partIters = make([]*partKeyIter, len(parts))
+ } else {
+ it.partIters = it.partIters[:len(parts)]
+ for i := range it.partIters {
+ if it.partIters[i] != nil {
+ releasePartKeyIter(it.partIters[i])
+ it.partIters[i] = nil
+ }
+ }
}
- it.piHeap = it.piHeap[:0]
- for i := range it.piPool {
- ps := &it.piPool[i]
- if !ps.nextBlock() {
- if err := ps.error(); err != nil {
+ it.heap = it.heap[:0]
+
+ for i, p := range parts {
+ pki := generatePartKeyIter()
+ it.partIters[i] = pki
+
+ pki.init(p, sids, minKey, maxKey, blockFilter, asc)
+ if err := pki.error(); err != nil {
+ if !errors.Is(err, io.EOF) {
+ releasePartKeyIter(pki)
+ it.partIters[i] = nil
it.err = fmt.Errorf("cannot initialize sidx
iteration: %w", err)
return
}
+ releasePartKeyIter(pki)
+ it.partIters[i] = nil
continue
}
- it.piHeap = append(it.piHeap, ps)
+
+ if !pki.nextBlock() {
+ if err := pki.error(); err != nil && !errors.Is(err,
io.EOF) {
+ releasePartKeyIter(pki)
+ it.partIters[i] = nil
+ it.err = fmt.Errorf("cannot initialize sidx
iteration: %w", err)
+ return
+ }
+ releasePartKeyIter(pki)
+ it.partIters[i] = nil
+ continue
+ }
+
+ bm, _ := pki.current()
+ if bm == nil {
+ releasePartKeyIter(pki)
+ it.partIters[i] = nil
+ continue
+ }
+
+ it.heap = append(it.heap, pki)
}
- if len(it.piHeap) == 0 {
+
+ if len(it.heap) == 0 {
it.err = io.EOF
return
}
- heap.Init(&it.piHeap)
- it.nextBlockNoop = true
+
+ heap.Init(&it.heap)
}
func (it *iter) nextBlock() bool {
if it.err != nil {
return false
}
- if it.nextBlockNoop {
- it.nextBlockNoop = false
- return true
- }
- it.err = it.next()
- if it.err != nil {
- if errors.Is(it.err, io.EOF) {
- it.err = fmt.Errorf("cannot obtain the next block to
search in the partition: %w", it.err)
- }
+ it.releaseCurBlock()
+
+ if len(it.heap) == 0 {
+ it.err = io.EOF
return false
}
- return true
-}
-func (it *iter) next() error {
- psMin := it.piHeap[0]
- if psMin.nextBlock() {
- heap.Fix(&it.piHeap, 0)
- return nil
+ pki := heap.Pop(&it.heap).(*partKeyIter)
+ bm, p := pki.current()
+ if bm == nil {
+ it.releasePartIter(pki)
+ it.err = fmt.Errorf("partKeyIter has no current block")
+ return false
}
- if err := psMin.error(); err != nil {
- return err
+ it.curBlock = generateBlockMetadata()
+ it.curBlock.copyFrom(bm)
+ it.curPart = p
+
+ if !pki.nextBlock() {
+ err := pki.error()
+ it.releasePartIter(pki)
+ if err != nil && !errors.Is(err, io.EOF) {
+ it.releaseCurBlock()
+ it.err = err
+ return false
+ }
+ } else {
+ heap.Push(&it.heap, pki)
}
- heap.Pop(&it.piHeap)
+ return true
+}
- if len(it.piHeap) == 0 {
- return io.EOF
- }
- return nil
+func (it *iter) current() (*blockMetadata, *part) {
+ return it.curBlock, it.curPart
}
func (it *iter) Error() error {
@@ -148,29 +202,60 @@ func releaseIter(it *iter) {
var iterPool = pool.Register[*iter]("sidx-iter")
-type partIterHeap []*partIter
+type partKeyIterHeap []*partKeyIter
-func (pih *partIterHeap) Len() int {
+func (pih *partKeyIterHeap) Len() int {
return len(*pih)
}
-func (pih *partIterHeap) Less(i, j int) bool {
+func (pih *partKeyIterHeap) Less(i, j int) bool {
x := *pih
- return x[i].curBlock.less(x[j].curBlock)
+ bmi, _ := x[i].current()
+ bmj, _ := x[j].current()
+ if bmi == nil {
+ return false
+ }
+ if bmj == nil {
+ return true
+ }
+ asc := true
+ if x[i] != nil {
+ asc = x[i].asc
+ } else if x[j] != nil {
+ asc = x[j].asc
+ }
+ if asc {
+ return bmi.lessByKey(bmj)
+ }
+ return bmj.lessByKey(bmi)
}
-func (pih *partIterHeap) Swap(i, j int) {
+func (pih *partKeyIterHeap) Swap(i, j int) {
x := *pih
x[i], x[j] = x[j], x[i]
}
-func (pih *partIterHeap) Push(x any) {
- *pih = append(*pih, x.(*partIter))
+func (pih *partKeyIterHeap) Push(x any) {
+ *pih = append(*pih, x.(*partKeyIter))
}
-func (pih *partIterHeap) Pop() any {
+func (pih *partKeyIterHeap) Pop() any {
a := *pih
v := a[len(a)-1]
*pih = a[:len(a)-1]
return v
}
+
+func (it *iter) releasePartIter(p *partKeyIter) {
+ if p == nil {
+ return
+ }
+ for i := range it.partIters {
+ if it.partIters[i] == p {
+ releasePartKeyIter(p)
+ it.partIters[i] = nil
+ return
+ }
+ }
+ releasePartKeyIter(p)
+}
diff --git a/banyand/internal/sidx/iter_test.go
b/banyand/internal/sidx/iter_test.go
index 24d61235..fe4388ff 100644
--- a/banyand/internal/sidx/iter_test.go
+++ b/banyand/internal/sidx/iter_test.go
@@ -37,16 +37,7 @@ func TestIterComprehensive(t *testing.T) {
tempDir := t.TempDir()
// Test cases for comprehensive iterator testing
- testCases := []struct {
- blockFilter index.Filter
- name string
- parts [][]testElement
- querySids []common.SeriesID
- expectOrder []blockExpectation
- minKey int64
- maxKey int64
- expectBlocks int
- }{
+ testCases := []iterTestCase{
{
name: "single_part_single_block",
parts: [][]testElement{
@@ -215,8 +206,10 @@ func TestIterComprehensive(t *testing.T) {
// Test both file-based and memory-based parts
for _, partType := range []string{"file_based", "memory_based"} {
+ partType := partType
t.Run(partType, func(t *testing.T) {
for _, tc := range testCases {
+ tc := tc
t.Run(tc.name, func(t *testing.T) {
var parts []*part
@@ -241,8 +234,29 @@ func TestIterComprehensive(t *testing.T) {
parts = append(parts, testPart)
}
- // Test the iterator
- runIteratorTest(t, tc, parts)
+ ascBlocks := runIteratorPass(t, tc,
parts, true)
+ descBlocks := runIteratorPass(t, tc,
parts, false)
+
+ assertSameBlocksIgnoreOrder(t,
ascBlocks, descBlocks)
+
+ require.True(t,
sort.SliceIsSorted(ascBlocks, func(i, j int) bool {
+ if ascBlocks[i].minKey ==
ascBlocks[j].minKey {
+ return
ascBlocks[i].seriesID <= ascBlocks[j].seriesID
+ }
+ return ascBlocks[i].minKey <=
ascBlocks[j].minKey
+ }), "ascending pass should be ordered
by non-decreasing minKey")
+
+ require.True(t,
sort.SliceIsSorted(descBlocks, func(i, j int) bool {
+ if descBlocks[i].minKey ==
descBlocks[j].minKey {
+ return
descBlocks[i].seriesID >= descBlocks[j].seriesID
+ }
+ return descBlocks[i].minKey >=
descBlocks[j].minKey
+ }), "descending pass should be ordered
by non-increasing minKey")
+
+ if len(tc.expectOrder) > 0 {
+ require.Equal(t,
tc.expectOrder, ascBlocks, "ascending pass order should match expectation")
+ require.Equal(t,
reverseExpectations(tc.expectOrder), descBlocks, "descending pass should be
reverse of expectation")
+ }
})
}
})
@@ -254,15 +268,13 @@ func TestIterEdgeCases(t *testing.T) {
tempDir := t.TempDir()
t.Run("empty_parts_list", func(t *testing.T) {
- bma := generateBlockMetadataArray()
- defer releaseBlockMetadataArray(bma)
-
- it := generateIter()
- defer releaseIter(it)
-
- it.init(bma, nil, []common.SeriesID{1, 2, 3}, 100, 200, nil)
- assert.False(t, it.nextBlock())
- assert.Nil(t, it.Error())
+ for _, asc := range []bool{true, false} {
+ it := generateIter()
+ it.init(nil, []common.SeriesID{1, 2, 3}, 100, 200, nil,
asc)
+ assert.False(t, it.nextBlock())
+ assert.Nil(t, it.Error())
+ releaseIter(it)
+ }
})
t.Run("empty_series_list", func(t *testing.T) {
@@ -281,15 +293,13 @@ func TestIterEdgeCases(t *testing.T) {
testPart := mustOpenPart(1, partDir, testFS)
defer testPart.close()
- bma := generateBlockMetadataArray()
- defer releaseBlockMetadataArray(bma)
-
- it := generateIter()
- defer releaseIter(it)
-
- it.init(bma, []*part{testPart}, []common.SeriesID{}, 0, 1000,
nil)
- assert.False(t, it.nextBlock())
- assert.Nil(t, it.Error())
+ for _, asc := range []bool{true, false} {
+ it := generateIter()
+ it.init([]*part{testPart}, []common.SeriesID{}, 0,
1000, nil, asc)
+ assert.False(t, it.nextBlock())
+ assert.Nil(t, it.Error())
+ releaseIter(it)
+ }
})
t.Run("no_matching_key_range", func(t *testing.T) {
@@ -322,16 +332,14 @@ func TestIterEdgeCases(t *testing.T) {
testPart2 := mustOpenPart(2, partDir2, testFS)
defer testPart2.close()
- bma := generateBlockMetadataArray()
- defer releaseBlockMetadataArray(bma)
-
- it := generateIter()
- defer releaseIter(it)
-
- // Query range that doesn't overlap with any blocks
- it.init(bma, []*part{testPart1, testPart2},
[]common.SeriesID{1, 2}, 400, 500, nil)
- assert.False(t, it.nextBlock())
- assert.Nil(t, it.Error())
+ for _, asc := range []bool{true, false} {
+ it := generateIter()
+ // Query range that doesn't overlap with any blocks
+ it.init([]*part{testPart1, testPart2},
[]common.SeriesID{1, 2}, 400, 500, nil, asc)
+ assert.False(t, it.nextBlock())
+ assert.Nil(t, it.Error())
+ releaseIter(it)
+ }
})
t.Run("single_part_single_block", func(t *testing.T) {
@@ -349,17 +357,15 @@ func TestIterEdgeCases(t *testing.T) {
testPart := mustOpenPart(1, partDir, testFS)
defer testPart.close()
- bma := generateBlockMetadataArray()
- defer releaseBlockMetadataArray(bma)
-
- it := generateIter()
- defer releaseIter(it)
-
- it.init(bma, []*part{testPart}, []common.SeriesID{1}, 50, 150,
nil)
+ for _, asc := range []bool{true, false} {
+ it := generateIter()
+ it.init([]*part{testPart}, []common.SeriesID{1}, 50,
150, nil, asc)
- assert.True(t, it.nextBlock())
- assert.False(t, it.nextBlock()) // Should be only one block
- assert.Nil(t, it.Error())
+ assert.True(t, it.nextBlock())
+ assert.False(t, it.nextBlock()) // Should be only one
block
+ assert.Nil(t, it.Error())
+ releaseIter(it)
+ }
})
t.Run("block_filter_error", func(t *testing.T) {
@@ -375,20 +381,18 @@ func TestIterEdgeCases(t *testing.T) {
testPart := openMemPart(mp)
defer testPart.close()
- bma := generateBlockMetadataArray()
- defer releaseBlockMetadataArray(bma)
-
- it := generateIter()
- defer releaseIter(it)
-
expectedErr := fmt.Errorf("test filter error")
mockFilter := &mockBlockFilter{err: expectedErr}
- it.init(bma, []*part{testPart}, []common.SeriesID{1}, 0, 200,
mockFilter)
+ for _, asc := range []bool{true, false} {
+ it := generateIter()
+ it.init([]*part{testPart}, []common.SeriesID{1}, 0,
200, mockFilter, asc)
- assert.False(t, it.nextBlock())
- assert.Error(t, it.Error())
- assert.Contains(t, it.Error().Error(), "cannot initialize sidx
iteration")
+ assert.False(t, it.nextBlock())
+ assert.Error(t, it.Error())
+ assert.Contains(t, it.Error().Error(), "cannot
initialize sidx iteration")
+ releaseIter(it)
+ }
})
}
@@ -432,37 +436,41 @@ func TestIterOrdering(t *testing.T) {
testPart2 := mustOpenPart(2, partDir2, testFS)
defer testPart2.close()
- bma := generateBlockMetadataArray()
- defer releaseBlockMetadataArray(bma)
-
- it := generateIter()
- defer releaseIter(it)
-
- it.init(bma, []*part{testPart1, testPart2},
[]common.SeriesID{1, 2, 3, 4, 5, 6}, 0, 1000, nil)
-
- // Blocks should come in series ID order: 1, 2, 3, 4, 5, 6
- var foundSeries []common.SeriesID
- for it.nextBlock() {
- // Access the current block from the heap - need to be
careful about heap structure
- if len(it.piHeap) > 0 {
- foundSeries = append(foundSeries,
it.piHeap[0].curBlock.seriesID)
- }
+ tc := iterTestCase{
+ name: "interleaved_series_ordering",
+ parts: nil, // not used by helper during
verification
+ querySids: []common.SeriesID{1, 2, 3, 4, 5, 6},
+ minKey: 0,
+ maxKey: 1000,
+ blockFilter: nil,
+ expectBlocks: 6,
+ expectOrder: []blockExpectation{
+ {seriesID: 1, minKey: 100, maxKey: 100},
+ {seriesID: 2, minKey: 200, maxKey: 200},
+ {seriesID: 3, minKey: 300, maxKey: 300},
+ {seriesID: 4, minKey: 400, maxKey: 400},
+ {seriesID: 5, minKey: 500, maxKey: 500},
+ {seriesID: 6, minKey: 600, maxKey: 600},
+ },
}
- assert.NoError(t, it.Error())
+ ascBlocks := runIteratorPass(t, tc, []*part{testPart1,
testPart2}, true)
+ descBlocks := runIteratorPass(t, tc, []*part{testPart1,
testPart2}, false)
+
+ assertSameBlocksIgnoreOrder(t, ascBlocks, descBlocks)
+ require.Len(t, ascBlocks, 6)
- // We expect to find all 6 series
- assert.Equal(t, 6, len(foundSeries))
+ require.True(t, sort.SliceIsSorted(ascBlocks, func(i, j int)
bool {
+ return ascBlocks[i].seriesID <= ascBlocks[j].seriesID
+ }), "ascending iteration should retain increasing series order")
- // Verify ordering
- expectedOrder := []common.SeriesID{1, 2, 3, 4, 5, 6}
- assert.True(t, sort.SliceIsSorted(foundSeries, func(i, j int)
bool {
- return foundSeries[i] < foundSeries[j]
- }), "found series should be in ascending order: %v",
foundSeries)
+ require.True(t, sort.SliceIsSorted(descBlocks, func(i, j int)
bool {
+ return descBlocks[i].seriesID >= descBlocks[j].seriesID
+ }), "descending iteration should retain decreasing series
order")
- // All expected series should be found
- for _, expectedSeries := range expectedOrder {
- assert.Contains(t, foundSeries, expectedSeries, "should
find series %d", expectedSeries)
+ for _, expected := range tc.expectOrder {
+ assert.Contains(t, ascBlocks, expected)
+ assert.Contains(t, descBlocks, expected)
}
})
}
@@ -491,15 +499,13 @@ func TestIterPoolOperations(t *testing.T) {
// Set some state
it.err = fmt.Errorf("test error")
- it.nextBlockNoop = true
// Reset should clear everything
it.reset()
assert.Nil(t, it.err)
assert.Equal(t, 0, len(it.parts))
- assert.Equal(t, 0, len(it.piPool))
- assert.Equal(t, 0, len(it.piHeap))
- assert.False(t, it.nextBlockNoop)
+ assert.Equal(t, 0, len(it.partIters))
+ assert.Equal(t, 0, len(it.heap))
})
}
@@ -525,18 +531,60 @@ func TestBlockMetadataLess(t *testing.T) {
})
}
-// Helper types and functions.
+func TestIterOverlappingBlockGroups(t *testing.T) {
+ elementsPart1 := createTestElements([]testElement{
+ {seriesID: 1, userKey: 100, data: []byte("p1-a")},
+ {seriesID: 1, userKey: 200, data: []byte("p1-b")},
+ })
+ defer releaseElements(elementsPart1)
+
+ mp1 := GenerateMemPart()
+ defer ReleaseMemPart(mp1)
+ mp1.mustInitFromElements(elementsPart1)
+ part1 := openMemPart(mp1)
+ defer part1.close()
+
+ elementsPart2 := createTestElements([]testElement{
+ {seriesID: 2, userKey: 150, data: []byte("p2-a")},
+ {seriesID: 2, userKey: 180, data: []byte("p2-b")},
+ })
+ defer releaseElements(elementsPart2)
+
+ mp2 := GenerateMemPart()
+ defer ReleaseMemPart(mp2)
+ mp2.mustInitFromElements(elementsPart2)
+ part2 := openMemPart(mp2)
+ defer part2.close()
+
+ for _, asc := range []bool{true, false} {
+ it := generateIter()
+ it.init([]*part{part1, part2}, []common.SeriesID{1, 2}, 0, 500,
nil, asc)
+
+ // Now we iterate individual blocks from both parts
+ partsSeen := make(map[*part]struct{})
+ seriesSeen := make(map[common.SeriesID]struct{})
+ totalBlocks := 0
+
+ for it.nextBlock() {
+ bm, p := it.current()
+ require.NotNil(t, bm, "current block should not be nil")
+ require.NotNil(t, p, "current part should not be nil")
+ partsSeen[p] = struct{}{}
+ seriesSeen[bm.seriesID] = struct{}{}
+ totalBlocks++
+ }
-type blockExpectation struct {
- seriesID common.SeriesID
- minKey int64
- maxKey int64
+ require.Len(t, partsSeen, 2, "expected contributions from both
parts")
+ require.Len(t, seriesSeen, 2, "expected blocks from both
series")
+ require.GreaterOrEqual(t, totalBlocks, 2, "should iterate at
least 2 blocks (one per part)")
+ assert.NoError(t, it.Error())
+ releaseIter(it)
+ }
}
-// mockBlockFilter is already defined in part_iter_test.go.
+// Helper types and functions.
-// runIteratorTest runs the iterator test with the given test case and parts.
-func runIteratorTest(t *testing.T, tc struct {
+type iterTestCase struct {
blockFilter index.Filter
name string
parts [][]testElement
@@ -545,28 +593,24 @@ func runIteratorTest(t *testing.T, tc struct {
minKey int64
maxKey int64
expectBlocks int
-}, parts []*part,
-) {
- bma := generateBlockMetadataArray()
- defer releaseBlockMetadataArray(bma)
+}
+
+// mockBlockFilter is already defined in part_iter_test.go.
+
+func runIteratorPass(t *testing.T, tc iterTestCase, parts []*part, asc bool)
[]blockExpectation {
+ t.Helper()
it := generateIter()
defer releaseIter(it)
- it.init(bma, parts, tc.querySids, tc.minKey, tc.maxKey, tc.blockFilter)
+ it.init(parts, tc.querySids, tc.minKey, tc.maxKey, tc.blockFilter, asc)
var foundBlocks []blockExpectation
- blockCount := 0
for it.nextBlock() {
- blockCount++
- // Get the minimum block from heap (the one currently being
processed)
- require.True(t, len(it.piHeap) > 0, "heap should not be empty
when nextBlock returns true")
+ curBlock, _ := it.current()
+ require.NotNil(t, curBlock, "current block should not be nil
when nextBlock returns true (order=%s)", orderName(asc))
- curBlock := it.piHeap[0].curBlock
- t.Logf("Found block for seriesID %d, key range [%d, %d]",
curBlock.seriesID, curBlock.minKey, curBlock.maxKey)
-
- // Verify the block overlaps with query range
overlaps := curBlock.maxKey >= tc.minKey && curBlock.minKey <=
tc.maxKey
assert.True(t, overlaps, "block should overlap with query range
[%d, %d], but got block range [%d, %d]",
tc.minKey, tc.maxKey, curBlock.minKey, curBlock.maxKey)
@@ -585,23 +629,34 @@ func runIteratorTest(t *testing.T, tc struct {
// Verify the number of blocks found
assert.Equal(t, tc.expectBlocks, len(foundBlocks), "should find
expected number of blocks")
- // Verify ordering - blocks should come out in sorted order by
(seriesID, minKey)
- assert.True(t, sort.SliceIsSorted(foundBlocks, func(i, j int) bool {
- if foundBlocks[i].seriesID == foundBlocks[j].seriesID {
- return foundBlocks[i].minKey < foundBlocks[j].minKey
- }
- return foundBlocks[i].seriesID < foundBlocks[j].seriesID
- }), "blocks should be in sorted order by (seriesID, minKey)")
-
- // If specific order is expected, verify it
- if len(tc.expectOrder) > 0 {
- require.Equal(t, len(tc.expectOrder), len(foundBlocks), "number
of found blocks should match expected")
- for i, expected := range tc.expectOrder {
- assert.Equal(t, expected.seriesID,
foundBlocks[i].seriesID, "block %d seriesID should match", i)
- assert.Equal(t, expected.minKey, foundBlocks[i].minKey,
"block %d minKey should match", i)
- assert.Equal(t, expected.maxKey, foundBlocks[i].maxKey,
"block %d maxKey should match", i)
- }
+ return foundBlocks
+}
+
+func reverseExpectations(src []blockExpectation) []blockExpectation {
+ if len(src) == 0 {
+ return nil
}
+ out := make([]blockExpectation, len(src))
+ for i := range src {
+ out[i] = src[len(src)-1-i]
+ }
+ return out
+}
- t.Logf("Test %s completed: found %d blocks, expected %d", tc.name,
blockCount, tc.expectBlocks)
+func assertSameBlocksIgnoreOrder(t *testing.T, left, right []blockExpectation)
{
+ t.Helper()
+
+ require.Equal(t, len(left), len(right), "block counts should match
across orders")
+
+ counts := make(map[blockExpectation]int, len(left))
+ for _, block := range left {
+ counts[block]++
+ }
+ for _, block := range right {
+ counts[block]--
+ require.GreaterOrEqual(t, counts[block], 0, "unexpected block
encountered %+v", block)
+ }
+ for block, count := range counts {
+ require.Equal(t, 0, count, "missing block %+v in comparison",
block)
+ }
}
diff --git a/banyand/internal/sidx/merge_test.go
b/banyand/internal/sidx/merge_test.go
index 3bdaa78e..0d63b5f4 100644
--- a/banyand/internal/sidx/merge_test.go
+++ b/banyand/internal/sidx/merge_test.go
@@ -18,7 +18,6 @@
package sidx
import (
- "encoding/binary"
"errors"
"path/filepath"
"reflect"
@@ -35,19 +34,6 @@ import (
"github.com/apache/skywalking-banyandb/pkg/test"
)
-func marshalStrArr(strArr [][]byte) []byte {
- if len(strArr) == 0 {
- return []byte{}
- }
- var result []byte
- result = binary.LittleEndian.AppendUint32(result, uint32(len(strArr)))
- for _, str := range strArr {
- result = binary.LittleEndian.AppendUint32(result,
uint32(len(str)))
- result = append(result, str...)
- }
- return result
-}
-
var conventionalBlock = block{
userKeys: []int64{1, 2},
data: [][]byte{[]byte("data1"), []byte("data2")},
@@ -55,7 +41,10 @@ var conventionalBlock = block{
"service": {
name: "service",
valueType: pbv1.ValueTypeStr,
- values: [][]byte{[]byte("service1"),
[]byte("service2")},
+ values: []tagRow{
+ {value: []byte("service1")},
+ {value: []byte("service2")},
+ },
},
},
}
@@ -67,11 +56,11 @@ var mergedBlock = block{
"arrTag": {
name: "arrTag",
valueType: pbv1.ValueTypeStrArr,
- values: [][]byte{
- marshalStrArr([][]byte{[]byte("value1"),
[]byte("value2")}),
- marshalStrArr([][]byte{[]byte("value3"),
[]byte("value4")}),
- marshalStrArr([][]byte{[]byte("value5"),
[]byte("value6")}),
- marshalStrArr([][]byte{[]byte("value7"),
[]byte("value8")}),
+ values: []tagRow{
+ {valueArr: [][]byte{[]byte("value1"),
[]byte("value2")}},
+ {valueArr: [][]byte{[]byte("value3"),
[]byte("value4")}},
+ {valueArr: [][]byte{[]byte("value5"),
[]byte("value6")}},
+ {valueArr: [][]byte{[]byte("value7"),
[]byte("value8")}},
},
},
},
@@ -84,13 +73,13 @@ var duplicatedMergedBlock = block{
"arrTag": {
name: "arrTag",
valueType: pbv1.ValueTypeStrArr,
- values: [][]byte{
- marshalStrArr([][]byte{[]byte("value1"),
[]byte("value2")}),
- marshalStrArr([][]byte{[]byte("duplicated1")}),
- marshalStrArr([][]byte{[]byte("value3"),
[]byte("value4")}),
- marshalStrArr([][]byte{[]byte("value5"),
[]byte("value6")}),
- marshalStrArr([][]byte{[]byte("duplicated2")}),
- marshalStrArr([][]byte{[]byte("value7"),
[]byte("value8")}),
+ values: []tagRow{
+ {valueArr: [][]byte{[]byte("value1"),
[]byte("value2")}},
+ {valueArr: [][]byte{[]byte("duplicated1")}},
+ {valueArr: [][]byte{[]byte("value3"),
[]byte("value4")}},
+ {valueArr: [][]byte{[]byte("value5"),
[]byte("value6")}},
+ {valueArr: [][]byte{[]byte("duplicated2")}},
+ {valueArr: [][]byte{[]byte("value7"),
[]byte("value8")}},
},
},
},
@@ -131,9 +120,9 @@ func Test_mergeTwoBlocks(t *testing.T) {
"arrTag": {
name: "arrTag",
valueType:
pbv1.ValueTypeStrArr,
- values: [][]byte{
-
marshalStrArr([][]byte{[]byte("value1"), []byte("value2")}),
-
marshalStrArr([][]byte{[]byte("value3"), []byte("value4")}),
+ values: []tagRow{
+ {valueArr:
[][]byte{[]byte("value1"), []byte("value2")}},
+ {valueArr:
[][]byte{[]byte("value3"), []byte("value4")}},
},
},
},
@@ -147,9 +136,9 @@ func Test_mergeTwoBlocks(t *testing.T) {
"arrTag": {
name: "arrTag",
valueType:
pbv1.ValueTypeStrArr,
- values: [][]byte{
-
marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}),
-
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")}),
+ values: []tagRow{
+ {valueArr:
[][]byte{[]byte("value5"), []byte("value6")}},
+ {valueArr:
[][]byte{[]byte("value7"), []byte("value8")}},
},
},
},
@@ -167,9 +156,9 @@ func Test_mergeTwoBlocks(t *testing.T) {
"arrTag": {
name: "arrTag",
valueType:
pbv1.ValueTypeStrArr,
- values: [][]byte{
-
marshalStrArr([][]byte{[]byte("value1"), []byte("value2")}),
-
marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}),
+ values: []tagRow{
+ {valueArr:
[][]byte{[]byte("value1"), []byte("value2")}},
+ {valueArr:
[][]byte{[]byte("value5"), []byte("value6")}},
},
},
},
@@ -183,9 +172,9 @@ func Test_mergeTwoBlocks(t *testing.T) {
"arrTag": {
name: "arrTag",
valueType:
pbv1.ValueTypeStrArr,
- values: [][]byte{
-
marshalStrArr([][]byte{[]byte("value3"), []byte("value4")}),
-
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")}),
+ values: []tagRow{
+ {valueArr:
[][]byte{[]byte("value3"), []byte("value4")}},
+ {valueArr:
[][]byte{[]byte("value7"), []byte("value8")}},
},
},
},
@@ -203,10 +192,10 @@ func Test_mergeTwoBlocks(t *testing.T) {
"arrTag": {
name: "arrTag",
valueType:
pbv1.ValueTypeStrArr,
- values: [][]byte{
-
marshalStrArr([][]byte{[]byte("value1"), []byte("value2")}),
-
marshalStrArr([][]byte{[]byte("duplicated1")}),
-
marshalStrArr([][]byte{[]byte("duplicated2")}),
+ values: []tagRow{
+ {valueArr:
[][]byte{[]byte("value1"), []byte("value2")}},
+ {valueArr:
[][]byte{[]byte("duplicated1")}},
+ {valueArr:
[][]byte{[]byte("duplicated2")}},
},
},
},
@@ -220,10 +209,10 @@ func Test_mergeTwoBlocks(t *testing.T) {
"arrTag": {
name: "arrTag",
valueType:
pbv1.ValueTypeStrArr,
- values: [][]byte{
-
marshalStrArr([][]byte{[]byte("value3"), []byte("value4")}),
-
marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}),
-
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")}),
+ values: []tagRow{
+ {valueArr:
[][]byte{[]byte("value3"), []byte("value4")}},
+ {valueArr:
[][]byte{[]byte("value5"), []byte("value6")}},
+ {valueArr:
[][]byte{[]byte("value7"), []byte("value8")}},
},
},
},
diff --git a/banyand/internal/sidx/part_key_iter.go
b/banyand/internal/sidx/part_key_iter.go
new file mode 100644
index 00000000..edb7de9d
--- /dev/null
+++ b/banyand/internal/sidx/part_key_iter.go
@@ -0,0 +1,489 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sidx
+
+import (
+ "container/heap"
+ "errors"
+ "fmt"
+ "io"
+ "sort"
+
+ "github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/pkg/bytes"
+ "github.com/apache/skywalking-banyandb/pkg/compress/zstd"
+ "github.com/apache/skywalking-banyandb/pkg/fs"
+ "github.com/apache/skywalking-banyandb/pkg/index"
+ "github.com/apache/skywalking-banyandb/pkg/pool"
+)
+
+// lessByKey compares two blockMetadata by key range, then by seriesID and
finally by data block offset.
+func (bm *blockMetadata) lessByKey(other *blockMetadata) bool {
+ if bm.minKey != other.minKey {
+ return bm.minKey < other.minKey
+ }
+ if bm.maxKey != other.maxKey {
+ return bm.maxKey < other.maxKey
+ }
+ if bm.seriesID != other.seriesID {
+ return bm.seriesID < other.seriesID
+ }
+ return bm.dataBlock.offset < other.dataBlock.offset
+}
+
+type blockRef struct {
+ primaryIdx int
+ blockIdx int
+ seriesID common.SeriesID
+ minKey int64
+ maxKey int64
+}
+
+type seriesCursor struct {
+ iter *partKeyIter
+ refs []blockRef
+ curBlock blockMetadata
+ seriesID common.SeriesID
+ refIdx int
+ curLoaded bool
+}
+
+func (sc *seriesCursor) less(other *seriesCursor, asc bool) bool {
+ cur := sc.current()
+ otherCur := other.current()
+ if cur == nil {
+ return false
+ }
+ if otherCur == nil {
+ return true
+ }
+ if asc {
+ return cur.lessByKey(otherCur)
+ }
+ return otherCur.lessByKey(cur)
+}
+
+func (sc *seriesCursor) init(iter *partKeyIter, sid common.SeriesID, refs
[]blockRef) {
+ sc.reset()
+ sc.iter = iter
+ sc.seriesID = sid
+ // Reuse underlying slice when possible
+ if cap(sc.refs) < len(refs) {
+ sc.refs = make([]blockRef, len(refs))
+ copy(sc.refs, refs)
+ } else {
+ sc.refs = sc.refs[:len(refs)]
+ copy(sc.refs, refs)
+ }
+ if iter != nil && !iter.asc && len(sc.refs) > 1 {
+ for i, j := 0, len(sc.refs)-1; i < j; i, j = i+1, j-1 {
+ sc.refs[i], sc.refs[j] = sc.refs[j], sc.refs[i]
+ }
+ }
+}
+
+func (sc *seriesCursor) reset() {
+ sc.iter = nil
+ sc.seriesID = 0
+ sc.refIdx = 0
+ if sc.curLoaded {
+ sc.curBlock.reset()
+ }
+ sc.curLoaded = false
+ sc.refs = sc.refs[:0]
+}
+
+func (sc *seriesCursor) current() *blockMetadata {
+ if !sc.curLoaded {
+ return nil
+ }
+ return &sc.curBlock
+}
+
+func (sc *seriesCursor) advance() (bool, error) {
+ if sc.iter == nil {
+ return false, nil
+ }
+ for sc.refIdx < len(sc.refs) {
+ ref := sc.refs[sc.refIdx]
+ sc.refIdx++
+ bma, err := sc.iter.ensurePrimaryBlocks(ref.primaryIdx)
+ if err != nil {
+ return false, err
+ }
+ if ref.blockIdx >= len(bma.arr) {
+ return false, fmt.Errorf("block index %d out of range
for primary %d", ref.blockIdx, ref.primaryIdx)
+ }
+ bm := &bma.arr[ref.blockIdx]
+ if bm.maxKey < sc.iter.minKey || bm.minKey > sc.iter.maxKey {
+ continue
+ }
+ if sc.iter.blockFilter != nil {
+ shouldSkip, err := sc.iter.shouldSkipBlock(bm)
+ if err != nil {
+ return false, err
+ }
+ if shouldSkip {
+ continue
+ }
+ }
+ sc.curBlock.copyFrom(bm)
+ sc.curLoaded = true
+ return true, nil
+ }
+ sc.curLoaded = false
+ return false, nil
+}
+
+type seriesCursorHeap []*seriesCursor
+
+func (sch *seriesCursorHeap) Len() int {
+ return len(*sch)
+}
+
+func (sch *seriesCursorHeap) Less(i, j int) bool {
+ x := *sch
+ asc := true
+ if x[i] != nil && x[i].iter != nil {
+ asc = x[i].iter.asc
+ } else if x[j] != nil && x[j].iter != nil {
+ asc = x[j].iter.asc
+ }
+ return x[i].less(x[j], asc)
+}
+
+func (sch *seriesCursorHeap) Swap(i, j int) {
+ x := *sch
+ x[i], x[j] = x[j], x[i]
+}
+
+func (sch *seriesCursorHeap) Push(x any) {
+ *sch = append(*sch, x.(*seriesCursor))
+}
+
+func (sch *seriesCursorHeap) Pop() any {
+ a := *sch
+ v := a[len(a)-1]
+ *sch = a[:len(a)-1]
+ return v
+}
+
+type partKeyIter struct {
+ err error
+ blockFilter index.Filter
+ sidSet map[common.SeriesID]struct{}
+ p *part
+ primaryCache map[int]*blockMetadataArray
+ curBlock *blockMetadata
+ cursorPool []seriesCursor
+ cursorHeap seriesCursorHeap
+ sids []common.SeriesID
+ primaryBuf []byte
+ compressedPrimaryBuf []byte
+ minKey int64
+ maxKey int64
+ asc bool
+}
+
+func (pki *partKeyIter) releaseCurBlock() {
+ if pki.curBlock != nil {
+ releaseBlockMetadata(pki.curBlock)
+ pki.curBlock = nil
+ }
+}
+
+func (pki *partKeyIter) reset() {
+ pki.err = nil
+ pki.p = nil
+ pki.minKey = 0
+ pki.maxKey = 0
+ pki.blockFilter = nil
+
+ pki.releaseCurBlock()
+
+ for i := range pki.cursorHeap {
+ pki.cursorHeap[i].reset()
+ }
+ pki.cursorHeap = pki.cursorHeap[:0]
+
+ for i := range pki.cursorPool {
+ pki.cursorPool[i].reset()
+ }
+ pki.cursorPool = pki.cursorPool[:0]
+
+ for idx, cache := range pki.primaryCache {
+ if cache != nil {
+ releaseBlockMetadataArray(cache)
+ pki.primaryCache[idx] = nil
+ }
+ }
+ if pki.primaryCache != nil {
+ clear(pki.primaryCache)
+ }
+
+ if pki.sidSet != nil {
+ clear(pki.sidSet)
+ }
+ pki.sids = pki.sids[:0]
+
+ pki.compressedPrimaryBuf = pki.compressedPrimaryBuf[:0]
+ pki.primaryBuf = pki.primaryBuf[:0]
+}
+
+func (pki *partKeyIter) init(p *part, sids []common.SeriesID, minKey, maxKey
int64, blockFilter index.Filter, asc bool) {
+ pki.reset()
+ pki.p = p
+ pki.minKey = minKey
+ pki.maxKey = maxKey
+ pki.blockFilter = blockFilter
+ pki.asc = asc
+
+ if len(sids) == 0 {
+ pki.err = io.EOF
+ return
+ }
+
+ pki.sids = append(pki.sids[:0], sids...)
+ sort.Slice(pki.sids, func(i, j int) bool {
+ return pki.sids[i] < pki.sids[j]
+ })
+
+ if pki.sidSet == nil {
+ pki.sidSet = make(map[common.SeriesID]struct{}, len(pki.sids))
+ } else {
+ clear(pki.sidSet)
+ }
+ for _, sid := range pki.sids {
+ pki.sidSet[sid] = struct{}{}
+ }
+
+ maxSID := pki.sids[len(pki.sids)-1]
+ minSID := pki.sids[0]
+
+ seriesRefs := make(map[common.SeriesID][]blockRef, len(pki.sids))
+
+ for idx := range p.primaryBlockMetadata {
+ pbm := &p.primaryBlockMetadata[idx]
+
+ if pbm.seriesID > maxSID {
+ break
+ }
+
+ if pbm.maxKey < pki.minKey || pbm.minKey > pki.maxKey {
+ continue
+ }
+
+ bma, err := pki.ensurePrimaryBlocks(idx)
+ if err != nil {
+ pki.err = fmt.Errorf("cannot load primary block
metadata: %w", err)
+ return
+ }
+ if len(bma.arr) == 0 {
+ continue
+ }
+
+ if bma.arr[len(bma.arr)-1].seriesID < minSID {
+ continue
+ }
+
+ lastSeries := bma.arr[len(bma.arr)-1].seriesID
+ for _, sid := range pki.sids {
+ if sid < pbm.seriesID {
+ continue
+ }
+ if sid > lastSeries {
+ continue
+ }
+
+ start := sort.Search(len(bma.arr), func(i int) bool {
+ return bma.arr[i].seriesID >= sid
+ })
+ if start == len(bma.arr) || bma.arr[start].seriesID !=
sid {
+ continue
+ }
+
+ for i := start; i < len(bma.arr) && bma.arr[i].seriesID
== sid; i++ {
+ bm := &bma.arr[i]
+ if bm.maxKey < pki.minKey || bm.minKey >
pki.maxKey {
+ continue
+ }
+ if _, ok := pki.sidSet[bm.seriesID]; !ok {
+ continue
+ }
+ seriesRefs[sid] = append(seriesRefs[sid],
blockRef{
+ primaryIdx: idx,
+ blockIdx: i,
+ seriesID: sid,
+ minKey: bm.minKey,
+ maxKey: bm.maxKey,
+ })
+ }
+ }
+ }
+
+ activeSeries := 0
+ for _, sid := range pki.sids {
+ if refs := seriesRefs[sid]; len(refs) > 0 {
+ activeSeries++
+ }
+ }
+
+ if activeSeries == 0 {
+ pki.err = io.EOF
+ return
+ }
+
+ if n := activeSeries - cap(pki.cursorPool); n > 0 {
+ pki.cursorPool = append(pki.cursorPool[:cap(pki.cursorPool)],
make([]seriesCursor, n)...)
+ }
+ pki.cursorPool = pki.cursorPool[:activeSeries]
+
+ pki.cursorHeap = pki.cursorHeap[:0]
+ cursorIdx := 0
+ for _, sid := range pki.sids {
+ refs := seriesRefs[sid]
+ if len(refs) == 0 {
+ continue
+ }
+ cursor := &pki.cursorPool[cursorIdx]
+ cursorIdx++
+ cursor.init(pki, sid, refs)
+ ok, err := cursor.advance()
+ if err != nil {
+ pki.err = fmt.Errorf("cannot initialize cursor for
series %d: %w", sid, err)
+ return
+ }
+ if !ok {
+ cursor.reset()
+ continue
+ }
+ pki.cursorHeap = append(pki.cursorHeap, cursor)
+ }
+ pki.cursorPool = pki.cursorPool[:cursorIdx]
+
+ if len(pki.cursorHeap) == 0 {
+ pki.err = io.EOF
+ return
+ }
+ heap.Init(&pki.cursorHeap)
+}
+
+func (pki *partKeyIter) nextBlock() bool {
+ if pki.err != nil {
+ return false
+ }
+
+ pki.releaseCurBlock()
+
+ if len(pki.cursorHeap) == 0 {
+ pki.err = io.EOF
+ return false
+ }
+
+ cursor := heap.Pop(&pki.cursorHeap).(*seriesCursor)
+ current := cursor.current()
+ if current == nil {
+ cursor.reset()
+ pki.err = fmt.Errorf("series cursor %d has no current block",
cursor.seriesID)
+ return false
+ }
+
+ pki.curBlock = generateBlockMetadata()
+ pki.curBlock.copyFrom(current)
+
+ ok, err := cursor.advance()
+ if err != nil {
+ cursor.reset()
+ pki.releaseCurBlock()
+ pki.err = err
+ return false
+ }
+ if ok {
+ heap.Push(&pki.cursorHeap, cursor)
+ } else {
+ cursor.reset()
+ }
+
+ return true
+}
+
+func (pki *partKeyIter) error() error {
+ if errors.Is(pki.err, io.EOF) {
+ return nil
+ }
+ return pki.err
+}
+
+func (pki *partKeyIter) ensurePrimaryBlocks(primaryIdx int)
(*blockMetadataArray, error) {
+ if pki.primaryCache == nil {
+ pki.primaryCache = make(map[int]*blockMetadataArray)
+ }
+ if bma, ok := pki.primaryCache[primaryIdx]; ok && bma != nil {
+ return bma, nil
+ }
+
+ pbm := &pki.p.primaryBlockMetadata[primaryIdx]
+ bma := generateBlockMetadataArray()
+
+ pki.compressedPrimaryBuf = bytes.ResizeOver(pki.compressedPrimaryBuf,
int(pbm.size))
+ fs.MustReadData(pki.p.primary, int64(pbm.offset),
pki.compressedPrimaryBuf)
+
+ var err error
+ pki.primaryBuf, err = zstd.Decompress(pki.primaryBuf[:0],
pki.compressedPrimaryBuf)
+ if err != nil {
+ releaseBlockMetadataArray(bma)
+ return nil, fmt.Errorf("cannot decompress primary block: %w",
err)
+ }
+
+ bma.arr, err = unmarshalBlockMetadata(bma.arr[:0], pki.primaryBuf)
+ if err != nil {
+ releaseBlockMetadataArray(bma)
+ return nil, fmt.Errorf("cannot unmarshal primary block
metadata: %w", err)
+ }
+
+ pki.primaryCache[primaryIdx] = bma
+ return bma, nil
+}
+
+func (pki *partKeyIter) shouldSkipBlock(bm *blockMetadata) (bool, error) {
+ tfo := generateTagFilterOp(bm, pki.p)
+ defer releaseTagFilterOp(tfo)
+ return pki.blockFilter.ShouldSkip(tfo)
+}
+
+func (pki *partKeyIter) current() (*blockMetadata, *part) {
+ return pki.curBlock, pki.p
+}
+
+func generatePartKeyIter() *partKeyIter {
+ v := partKeyIterPool.Get()
+ if v == nil {
+ return &partKeyIter{}
+ }
+ return v
+}
+
+func releasePartKeyIter(pki *partKeyIter) {
+ if pki == nil {
+ return
+ }
+ pki.reset()
+ partKeyIterPool.Put(pki)
+}
+
+var partKeyIterPool = pool.Register[*partKeyIter]("sidx-partKeyIter")
diff --git a/banyand/internal/sidx/part_key_iter_test.go
b/banyand/internal/sidx/part_key_iter_test.go
new file mode 100644
index 00000000..f31d7500
--- /dev/null
+++ b/banyand/internal/sidx/part_key_iter_test.go
@@ -0,0 +1,483 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sidx
+
+import (
+ "errors"
+ "sort"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+
+ "github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/pkg/index"
+ pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+)
+
+func setupPartForKeyIter(t *testing.T, elems []testElement) (*part, func()) {
+ t.Helper()
+
+ elements := createTestElements(elems)
+
+ mp := GenerateMemPart()
+ mp.mustInitFromElements(elements)
+
+ part := openMemPart(mp)
+
+ cleanup := func() {
+ part.close()
+ ReleaseMemPart(mp)
+ releaseElements(elements)
+ }
+ return part, cleanup
+}
+
+type blockExpectation struct {
+ seriesID common.SeriesID
+ minKey int64
+ maxKey int64
+}
+
+func runPartKeyIterPass(t *testing.T, part *part, sids []common.SeriesID,
minKey, maxKey int64, blockFilter index.Filter, asc bool) ([]blockExpectation,
error) {
+ iter := generatePartKeyIter()
+ defer releasePartKeyIter(iter)
+
+ iter.init(part, sids, minKey, maxKey, blockFilter, asc)
+
+ var results []blockExpectation
+ for iter.nextBlock() {
+ block, _ := iter.current()
+ require.NotNil(t, block)
+ results = append(results, blockExpectation{
+ seriesID: block.seriesID,
+ minKey: block.minKey,
+ maxKey: block.maxKey,
+ })
+ }
+
+ return results, iter.error()
+}
+
+func runPartKeyIterDoublePass(t *testing.T, part *part, sids
[]common.SeriesID, minKey, maxKey int64,
+ blockFilter index.Filter,
+) (ascBlocks, descBlocks []blockExpectation, ascErr, descErr error) {
+ ascBlocks, ascErr = runPartKeyIterPass(t, part, sids, minKey, maxKey,
blockFilter, true)
+ descBlocks, descErr = runPartKeyIterPass(t, part, sids, minKey, maxKey,
blockFilter, false)
+ return
+}
+
+func orderName(asc bool) string {
+ if asc {
+ return "asc"
+ }
+ return "desc"
+}
+
+func verifyDescendingOrder(t *testing.T, blocks []blockExpectation) {
+ require.True(t, sort.SliceIsSorted(blocks, func(i, j int) bool {
+ if blocks[i].minKey == blocks[j].minKey {
+ return blocks[i].seriesID > blocks[j].seriesID
+ }
+ return blocks[i].minKey >= blocks[j].minKey
+ }), "blocks should be in non-increasing order by minKey")
+}
+
+func verifyAscendingOrder(t *testing.T, blocks []blockExpectation) {
+ require.True(t, sort.SliceIsSorted(blocks, func(i, j int) bool {
+ if blocks[i].minKey == blocks[j].minKey {
+ return blocks[i].seriesID < blocks[j].seriesID
+ }
+ return blocks[i].minKey <= blocks[j].minKey
+ }), "blocks should be in non-decreasing order by minKey")
+}
+
+func TestPartKeyIterOrdersBlocksByMinKey(t *testing.T) {
+ part, cleanup := setupPartForKeyIter(t, []testElement{
+ {seriesID: 1, userKey: 100, data: []byte("s1")},
+ {seriesID: 2, userKey: 50, data: []byte("s2")},
+ {seriesID: 3, userKey: 150, data: []byte("s3")},
+ })
+ defer cleanup()
+
+ asc, desc, ascErr, descErr := runPartKeyIterDoublePass(t, part,
[]common.SeriesID{1, 2, 3}, 0, 200, nil)
+ require.NoError(t, ascErr)
+ require.NoError(t, descErr)
+
+ require.Len(t, asc, 3, "expected three blocks in ascending order")
+ verifyAscendingOrder(t, asc)
+ assert.Equal(t, []common.SeriesID{2, 1, 3},
[]common.SeriesID{asc[0].seriesID, asc[1].seriesID, asc[2].seriesID})
+
+ require.Len(t, desc, 3, "expected three blocks in descending order")
+ verifyDescendingOrder(t, desc)
+ assert.Equal(t, []common.SeriesID{3, 1, 2},
[]common.SeriesID{desc[0].seriesID, desc[1].seriesID, desc[2].seriesID})
+}
+
+func TestPartKeyIterFiltersSeriesIDs(t *testing.T) {
+ part, cleanup := setupPartForKeyIter(t, []testElement{
+ {seriesID: 1, userKey: 100, data: []byte("s1")},
+ {seriesID: 2, userKey: 50, data: []byte("s2")},
+ {seriesID: 3, userKey: 150, data: []byte("s3")},
+ })
+ defer cleanup()
+
+ asc, desc, ascErr, descErr := runPartKeyIterDoublePass(t, part,
[]common.SeriesID{1, 3}, 0, 200, nil)
+ require.NoError(t, ascErr)
+ require.NoError(t, descErr)
+ require.Len(t, asc, 2)
+ require.Len(t, desc, 2)
+ require.Equal(t, []common.SeriesID{1, 3},
[]common.SeriesID{asc[0].seriesID, asc[1].seriesID})
+ require.Equal(t, []common.SeriesID{3, 1},
[]common.SeriesID{desc[0].seriesID, desc[1].seriesID})
+}
+
+func TestPartKeyIterAppliesKeyRange(t *testing.T) {
+ part, cleanup := setupPartForKeyIter(t, []testElement{
+ {seriesID: 1, userKey: 100, data: []byte("s1")},
+ {seriesID: 2, userKey: 50, data: []byte("s2")},
+ {seriesID: 3, userKey: 150, data: []byte("s3")},
+ })
+ defer cleanup()
+
+ asc, desc, ascErr, descErr := runPartKeyIterDoublePass(t, part,
[]common.SeriesID{1, 2, 3}, 120, 200, nil)
+ require.NoError(t, ascErr)
+ require.NoError(t, descErr)
+ require.Len(t, asc, 1)
+ require.Len(t, desc, 1)
+ assert.Equal(t, common.SeriesID(3), asc[0].seriesID)
+ assert.Equal(t, common.SeriesID(3), desc[0].seriesID)
+}
+
+func TestPartKeyIterNoBlocksInRange(t *testing.T) {
+ part, cleanup := setupPartForKeyIter(t, []testElement{
+ {seriesID: 1, userKey: 100, data: []byte("s1")},
+ {seriesID: 2, userKey: 50, data: []byte("s2")},
+ })
+ defer cleanup()
+
+ asc, desc, ascErr, descErr := runPartKeyIterDoublePass(t, part,
[]common.SeriesID{1, 2}, 1000, 2000, nil)
+ require.NoError(t, ascErr)
+ require.NoError(t, descErr)
+ require.Empty(t, asc)
+ require.Empty(t, desc)
+}
+
+func TestPartKeyIterHandlesEmptySeries(t *testing.T) {
+ part, cleanup := setupPartForKeyIter(t, []testElement{
+ {seriesID: 1, userKey: 100, data: []byte("s1")},
+ })
+ defer cleanup()
+
+ asc, desc, ascErr, descErr := runPartKeyIterDoublePass(t, part,
[]common.SeriesID{}, 0, 200, nil)
+ require.NoError(t, ascErr)
+ require.NoError(t, descErr)
+ require.Len(t, asc, 0)
+ require.Len(t, desc, 0)
+}
+
+func TestPartKeyIterHonorsBlockFilter(t *testing.T) {
+ part, cleanup := setupPartForKeyIter(t, []testElement{
+ {seriesID: 1, userKey: 100, data: []byte("s1")},
+ {seriesID: 2, userKey: 150, data: []byte("s2")},
+ })
+ defer cleanup()
+
+ asc, desc, ascErr, descErr := runPartKeyIterDoublePass(t, part,
[]common.SeriesID{1, 2}, 0, 200, &mockBlockFilter{shouldSkip: true})
+ require.NoError(t, ascErr)
+ require.NoError(t, descErr)
+ require.Empty(t, asc)
+ require.Empty(t, desc)
+}
+
+func TestPartKeyIterPropagatesFilterError(t *testing.T) {
+ part, cleanup := setupPartForKeyIter(t, []testElement{
+ {seriesID: 1, userKey: 100, data: []byte("s1")},
+ })
+ defer cleanup()
+
+ expectedErr := errors.New("filter failure")
+ asc, desc, ascErr, descErr := runPartKeyIterDoublePass(t, part,
[]common.SeriesID{1}, 0, 200, &mockBlockFilter{err: expectedErr})
+ require.ErrorIs(t, ascErr, expectedErr)
+ require.ErrorIs(t, descErr, expectedErr)
+ require.Empty(t, asc)
+ require.Empty(t, desc)
+}
+
+func TestPartKeyIterBreaksTiesBySeriesID(t *testing.T) {
+ part, cleanup := setupPartForKeyIter(t, []testElement{
+ {seriesID: 1, userKey: 100, data: []byte("s1")},
+ {seriesID: 2, userKey: 100, data: []byte("s2")},
+ })
+ defer cleanup()
+
+ asc, desc, ascErr, descErr := runPartKeyIterDoublePass(t, part,
[]common.SeriesID{1, 2}, 0, 200, nil)
+ require.NoError(t, ascErr)
+ require.NoError(t, descErr)
+ require.Len(t, asc, 2)
+ require.Len(t, desc, 2)
+ assert.Equal(t, []common.SeriesID{1, 2},
[]common.SeriesID{asc[0].seriesID, asc[1].seriesID})
+ assert.Equal(t, []common.SeriesID{2, 1},
[]common.SeriesID{desc[0].seriesID, desc[1].seriesID})
+}
+
+func TestPartKeyIterGroupsOverlappingRanges(t *testing.T) {
+ part, cleanup := setupPartForKeyIter(t, []testElement{
+ {seriesID: 1, userKey: 100, data: []byte("s1a")},
+ {seriesID: 1, userKey: 180, data: []byte("s1b")},
+ {seriesID: 2, userKey: 120, data: []byte("s2a")},
+ {seriesID: 2, userKey: 220, data: []byte("s2b")},
+ {seriesID: 3, userKey: 400, data: []byte("s3")},
+ })
+ defer cleanup()
+
+ asc, desc, ascErr, descErr := runPartKeyIterDoublePass(t, part,
[]common.SeriesID{1, 2, 3}, 0, 500, nil)
+ require.NoError(t, ascErr)
+ require.NoError(t, descErr)
+ require.Len(t, asc, 3)
+ require.Len(t, desc, 3)
+
+ expectedAsc := []blockExpectation{
+ {seriesID: 1, minKey: 100, maxKey: 180},
+ {seriesID: 2, minKey: 120, maxKey: 220},
+ {seriesID: 3, minKey: 400, maxKey: 400},
+ }
+ assert.Equal(t, expectedAsc, asc)
+ for i := range desc {
+ assert.Equal(t, expectedAsc[len(expectedAsc)-1-i], desc[i])
+ }
+
+ iter := generatePartKeyIter()
+ defer releasePartKeyIter(iter)
+ iter.init(part, []common.SeriesID{1, 2, 3}, 0, 500, nil, true)
+ var ids []common.SeriesID
+ for iter.nextBlock() {
+ block, _ := iter.current()
+ require.NotNil(t, block)
+ ids = append(ids, block.seriesID)
+ }
+ require.NoError(t, iter.error())
+ require.GreaterOrEqual(t, len(ids), 3, "expected at least three blocks")
+ // Verify we get blocks from all three series
+ require.Contains(t, ids, common.SeriesID(1))
+ require.Contains(t, ids, common.SeriesID(2))
+ require.Contains(t, ids, common.SeriesID(3))
+}
+
+func TestPartKeyIterSelectiveFilterAllowsLaterBlocks(t *testing.T) {
+ const elementsPerBatch = maxBlockLength + 10
+
+ var elems []testElement
+ for i := 0; i < elementsPerBatch; i++ {
+ elems = append(elems, testElement{
+ seriesID: 1,
+ userKey: int64(i),
+ data: []byte("pending"),
+ tags: []tag{
+ {
+ name: "status",
+ value: []byte("pending"),
+ valueType: pbv1.ValueTypeStr,
+ },
+ },
+ })
+ }
+ for i := 0; i < elementsPerBatch; i++ {
+ elems = append(elems, testElement{
+ seriesID: 1,
+ userKey: int64(20000 + i),
+ data: []byte("success"),
+ tags: []tag{
+ {
+ name: "status",
+ value: []byte("success"),
+ valueType: pbv1.ValueTypeStr,
+ },
+ },
+ })
+ }
+ elems = append(elems, testElement{
+ seriesID: 2,
+ userKey: 5000,
+ data: []byte("series2"),
+ tags: []tag{
+ {
+ name: "status",
+ value: []byte("other"),
+ valueType: pbv1.ValueTypeStr,
+ },
+ },
+ })
+
+ part, cleanup := setupPartForKeyIter(t, elems)
+ defer cleanup()
+
+ filter := &selectiveMockBlockFilter{
+ tagName: "status",
+ skipValue: "pending",
+ }
+
+ asc, desc, ascErr, descErr := runPartKeyIterDoublePass(t, part,
[]common.SeriesID{1, 2}, 0, 50000, filter)
+ require.NoError(t, ascErr)
+ require.NoError(t, descErr)
+ assert.Greater(t, filter.skipCallCount, 0, "filter should have been
invoked")
+
+ require.NotEmpty(t, asc)
+ require.NotEmpty(t, desc)
+
+ seriesTwoCountAsc := 0
+ for _, block := range asc {
+ if block.seriesID == 1 {
+ require.GreaterOrEqual(t, block.minKey, int64(20000),
"pending block should be skipped in ascending order")
+ }
+ if block.seriesID == 2 {
+ seriesTwoCountAsc++
+ }
+ }
+ require.Equal(t, 1, seriesTwoCountAsc, "series 2 block should appear
once in ascending results")
+
+ seriesTwoCountDesc := 0
+ for _, block := range desc {
+ if block.seriesID == 1 {
+ require.GreaterOrEqual(t, block.minKey, int64(20000),
"pending block should be skipped in descending order")
+ }
+ if block.seriesID == 2 {
+ seriesTwoCountDesc++
+ }
+ }
+ require.Equal(t, 1, seriesTwoCountDesc, "series 2 block should appear
once in descending results")
+}
+
+func TestPartKeyIterExhaustion(t *testing.T) {
+ part, cleanup := setupPartForKeyIter(t, []testElement{
+ {seriesID: 1, userKey: 100, data: []byte("s1")},
+ {seriesID: 2, userKey: 120, data: []byte("s2")},
+ })
+ defer cleanup()
+
+ for _, asc := range []bool{true, false} {
+ t.Run(orderName(asc), func(t *testing.T) {
+ iter := generatePartKeyIter()
+ defer releasePartKeyIter(iter)
+
+ iter.init(part, []common.SeriesID{1, 2}, 0, 200, nil,
asc)
+
+ blockCount := 0
+ for iter.nextBlock() {
+ block, _ := iter.current()
+ require.NotNil(t, block)
+ blockCount++
+ }
+ require.NoError(t, iter.error())
+ require.Greater(t, blockCount, 0, "iterator should
yield at least one block")
+
+ assert.False(t, iter.nextBlock(), "iterator should
report exhaustion")
+ assert.NoError(t, iter.error())
+ })
+ }
+}
+
+func TestPartKeyIterSkipsPrimaryBeyondMaxSID(t *testing.T) {
+ part, cleanup := setupPartForKeyIter(t, []testElement{
+ {seriesID: 1, userKey: 100, data: []byte("s1")},
+ {seriesID: 50, userKey: 200, data: []byte("s50")},
+ })
+
+ defer cleanup()
+
+ require.NotEmpty(t, part.primaryBlockMetadata, "part should have at
least one primary block")
+
+ first := part.primaryBlockMetadata[0]
+ part.primaryBlockMetadata = append(part.primaryBlockMetadata,
primaryBlockMetadata{
+ seriesID: 50,
+ minKey: 300,
+ maxKey: 300,
+ dataBlock: dataBlock{
+ offset: first.offset + first.size + 1,
+ size: 1,
+ },
+ })
+
+ asc, desc, ascErr, descErr := runPartKeyIterDoublePass(t, part,
[]common.SeriesID{1}, 0, 500, nil)
+ require.NoError(t, ascErr)
+ require.NoError(t, descErr)
+ require.NotEmpty(t, asc)
+ require.NotEmpty(t, desc)
+ for _, block := range asc {
+ assert.Equal(t, common.SeriesID(1), block.seriesID)
+ }
+ for _, block := range desc {
+ assert.Equal(t, common.SeriesID(1), block.seriesID)
+ }
+}
+
+func TestPartKeyIterRequeuesOnGapBetweenBlocks(t *testing.T) {
+ const elementsPerBatch = maxBlockLength + 10
+
+ var elems []testElement
+ for i := 0; i < elementsPerBatch; i++ {
+ elems = append(elems, testElement{
+ seriesID: 1,
+ userKey: int64(i),
+ data: []byte("batch1"),
+ })
+ }
+ for i := 0; i < 16; i++ {
+ elems = append(elems, testElement{
+ seriesID: 1,
+ userKey: int64(50000 + i),
+ data: []byte("batch2"),
+ })
+ }
+
+ part, cleanup := setupPartForKeyIter(t, elems)
+ defer cleanup()
+
+ for _, asc := range []bool{true, false} {
+ t.Run(orderName(asc), func(t *testing.T) {
+ iter := generatePartKeyIter()
+ defer releasePartKeyIter(iter)
+
+ iter.init(part, []common.SeriesID{1}, 0, 100000, nil,
asc)
+
+ var blocks []struct {
+ min int64
+ max int64
+ }
+ for iter.nextBlock() {
+ block, _ := iter.current()
+ require.NotNil(t, block)
+ blocks = append(blocks, struct {
+ min int64
+ max int64
+ }{min: block.minKey, max: block.maxKey})
+ }
+
+ require.NoError(t, iter.error())
+ require.GreaterOrEqual(t, len(blocks), 2, "expected at
least two blocks for the same series")
+
+ // Verify blocks are in proper order
+ for i := 1; i < len(blocks); i++ {
+ prev := blocks[i-1]
+ curr := blocks[i]
+ if asc {
+ assert.LessOrEqual(t, prev.min,
curr.min, "ascending iteration should maintain order")
+ } else {
+ assert.GreaterOrEqual(t, prev.max,
curr.max, "descending iteration should maintain order")
+ }
+ }
+ })
+ }
+}
diff --git a/banyand/internal/sidx/query.go b/banyand/internal/sidx/query.go
index c9618f58..36d2f80e 100644
--- a/banyand/internal/sidx/query.go
+++ b/banyand/internal/sidx/query.go
@@ -215,9 +215,10 @@ func (s *sidx) prepareStreamingResources(
minKey: minKey,
maxKey: maxKey,
asc: asc,
+ batchSize: req.MaxBatchSize,
}
- blockCh := make(chan *blockScanResultBatch, 1)
+ blockCh := make(chan *blockScanResultBatch)
go func() {
bs.scan(ctx, blockCh)
close(blockCh)
@@ -259,38 +260,27 @@ func (s *sidx) processStreamingLoop(
}
scannerBatchCount := 0
- for {
- select {
- case <-ctx.Done():
+ for batch := range resources.blockCh {
+ scannerBatchCount++
+ if err := s.handleStreamingBatch(ctx, batch, resources, req,
resultsCh, metrics); err != nil {
if loopSpan != nil {
- loopSpan.Tag("termination_reason",
"context_canceled")
- loopSpan.Tagf("scanner_batches_before_cancel",
"%d", scannerBatchCount)
- }
- return ctx.Err()
- case batch, ok := <-resources.blockCh:
- if !ok {
- if loopSpan != nil {
- loopSpan.Tag("termination_reason",
"channel_closed")
- loopSpan.Tagf("total_scanner_batches",
"%d", scannerBatchCount)
+ if errors.Is(err, context.Canceled) {
+ loopSpan.Tag("termination_reason",
"context_canceled")
+
loopSpan.Tagf("scanner_batches_before_cancel", "%d", scannerBatchCount)
+ } else {
+ loopSpan.Tag("termination_reason",
"batch_error")
+
loopSpan.Tagf("scanner_batches_before_error", "%d", scannerBatchCount)
+ loopSpan.Error(err)
}
- return resources.heap.merge(ctx,
req.MaxBatchSize, resultsCh, metrics)
- }
- scannerBatchCount++
- if err := s.handleStreamingBatch(ctx, batch, resources,
req, resultsCh, metrics); err != nil {
- if loopSpan != nil {
- if errors.Is(err, context.Canceled) {
-
loopSpan.Tag("termination_reason", "context_canceled")
-
loopSpan.Tagf("scanner_batches_before_cancel", "%d", scannerBatchCount)
- } else {
-
loopSpan.Tag("termination_reason", "batch_error")
-
loopSpan.Tagf("scanner_batches_before_error", "%d", scannerBatchCount)
- loopSpan.Error(err)
- }
- }
- return err
}
+ return err
}
}
+ if loopSpan != nil {
+ loopSpan.Tag("termination_reason", "channel_closed")
+ loopSpan.Tagf("total_scanner_batches", "%d", scannerBatchCount)
+ }
+ return resources.heap.merge(ctx, req.MaxBatchSize, resultsCh, metrics)
}
func (s *sidx) handleStreamingBatch(
@@ -301,9 +291,9 @@ func (s *sidx) handleStreamingBatch(
resultsCh chan<- *QueryResponse,
metrics *batchMetrics,
) error {
+ defer releaseBlockScanResultBatch(batch)
if batch.err != nil {
err := batch.err
- releaseBlockScanResultBatch(batch)
return err
}
@@ -314,7 +304,6 @@ func (s *sidx) handleStreamingBatch(
}
cursors, cursorsErr := s.buildCursorsForBatch(ctx, batch,
resources.tagsToLoad, req, resources.asc, metrics)
- releaseBlockScanResultBatch(batch)
if cursorsErr != nil {
return cursorsErr
}
diff --git a/banyand/internal/sidx/query_result.go
b/banyand/internal/sidx/query_result.go
index ab45ee88..3669a291 100644
--- a/banyand/internal/sidx/query_result.go
+++ b/banyand/internal/sidx/query_result.go
@@ -21,13 +21,11 @@ import (
"container/heap"
"github.com/apache/skywalking-banyandb/api/common"
- internalencoding
"github.com/apache/skywalking-banyandb/banyand/internal/encoding"
"github.com/apache/skywalking-banyandb/banyand/protector"
"github.com/apache/skywalking-banyandb/pkg/bytes"
"github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/logger"
- pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
)
// queryResult is used internally for processing logic only.
@@ -178,28 +176,13 @@ func (qr *queryResult) loadTagData(tmpBlock *block, p
*part, tagName string, tag
// Create tag data structure and populate block
td := generateTagData()
- // Decode tag values directly (no compression)
- td.values, err = internalencoding.DecodeTagValues(td.values[:0],
decoder, bb2, tm.valueType, count)
- if err != nil {
- logger.Panicf("cannot decode tag values: %v", err)
- return false
- }
-
td.name = tagName
td.valueType = tm.valueType
- // Set min/max for int64 tags
- if tm.valueType == pbv1.ValueTypeInt64 {
- td.min = tm.min
- td.max = tm.max
- }
-
- // Create bloom filter for indexed tags
- td.filter = generateBloomFilter(count)
- for _, value := range td.values {
- if value != nil {
- td.filter.Add(value)
- }
+ // Decode and convert tag values using common helper
+ if err := decodeAndConvertTagValues(td, decoder, bb2, tm.valueType,
count); err != nil {
+ logger.Panicf("cannot decode tag values: %v", err)
+ return false
}
tmpBlock.tags[tagName] = td
@@ -247,11 +230,17 @@ func (qr *queryResult) extractElementTags(block *block,
elemIndex int) []Tag {
for _, proj := range qr.request.TagProjection {
for _, tagName := range proj.Names {
if tagData, exists := block.tags[tagName];
exists && elemIndex < len(tagData.values) {
- elementTags = append(elementTags, Tag{
+ row := &tagData.values[elemIndex]
+ tag := Tag{
Name: tagName,
- Value:
tagData.values[elemIndex],
ValueType: tagData.valueType,
- })
+ }
+ if len(row.valueArr) > 0 {
+ tag.ValueArr = row.valueArr
+ } else if len(row.value) > 0 {
+ tag.Value = row.value
+ }
+ elementTags = append(elementTags, tag)
}
}
}
@@ -260,11 +249,17 @@ func (qr *queryResult) extractElementTags(block *block,
elemIndex int) []Tag {
elementTags = make([]Tag, 0, len(block.tags))
for tagName, tagData := range block.tags {
if elemIndex < len(tagData.values) {
- elementTags = append(elementTags, Tag{
+ row := &tagData.values[elemIndex]
+ tag := Tag{
Name: tagName,
- Value: tagData.values[elemIndex],
ValueType: tagData.valueType,
- })
+ }
+ if len(row.valueArr) > 0 {
+ tag.ValueArr = row.valueArr
+ } else if len(row.value) > 0 {
+ tag.Value = row.value
+ }
+ elementTags = append(elementTags, tag)
}
}
}
diff --git a/banyand/internal/sidx/query_test.go
b/banyand/internal/sidx/query_test.go
index 2c841250..225f2587 100644
--- a/banyand/internal/sidx/query_test.go
+++ b/banyand/internal/sidx/query_test.go
@@ -235,14 +235,37 @@ func TestSIDX_Query_Ordering(t *testing.T) {
}
if len(allKeys) > 1 {
- isSorted := sort.SliceIsSorted(allKeys, func(i,
j int) bool {
- if tt.ascending {
- return allKeys[i] < allKeys[j]
+ if len(tt.seriesIDs) == 1 {
+ // For single series, verify global
sorting
+ isSorted := sort.SliceIsSorted(allKeys,
func(i, j int) bool {
+ if tt.ascending {
+ return allKeys[i] <
allKeys[j]
+ }
+ return allKeys[i] > allKeys[j]
+ })
+ assert.True(t, isSorted, "Keys should
be sorted in %s order. Keys: %v",
+ map[bool]string{true:
"ascending", false: "descending"}[tt.ascending], allKeys)
+ } else {
+ // For multiple series, verify sorting
within each series group
+ seriesGroups :=
make(map[common.SeriesID][]int64)
+ for i, sid := range allSIDs {
+ if i < len(allKeys) {
+ seriesGroups[sid] =
append(seriesGroups[sid], allKeys[i])
+ }
+ }
+ for sid, keys := range seriesGroups {
+ if len(keys) > 1 {
+ isSorted :=
sort.SliceIsSorted(keys, func(i, j int) bool {
+ if tt.ascending
{
+ return
keys[i] < keys[j]
+ }
+ return keys[i]
> keys[j]
+ })
+ assert.True(t,
isSorted, "Keys for series %d should be sorted in %s order. Keys: %v",
+ sid,
map[bool]string{true: "ascending", false: "descending"}[tt.ascending], keys)
+ }
}
- return allKeys[i] > allKeys[j]
- })
- assert.True(t, isSorted, "Keys should be sorted
in %s order. Keys: %v",
- map[bool]string{true: "ascending",
false: "descending"}[tt.ascending], allKeys)
+ }
}
})
}
@@ -298,7 +321,9 @@ func TestSIDX_Query_WithArrValues(t *testing.T) {
for i := 0; i < len(keys); i++ {
if keys[i] == 100 {
assert.Equal(t, "arr_tag", tags[i][0].Name)
- assert.Equal(t, "a|b|", string(tags[i][0].Value))
+ assert.Equal(t, 2, len(tags[i][0].ValueArr))
+ assert.Equal(t, "a", string(tags[i][0].ValueArr[0]))
+ assert.Equal(t, "b", string(tags[i][0].ValueArr[1]))
}
}
}
diff --git a/banyand/internal/sidx/sidx.go b/banyand/internal/sidx/sidx.go
index f2cc93e9..370e4204 100644
--- a/banyand/internal/sidx/sidx.go
+++ b/banyand/internal/sidx/sidx.go
@@ -32,6 +32,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/pool"
)
@@ -251,6 +252,147 @@ type blockCursor struct {
idx int
}
+type blockCursorBuilder struct {
+ bc *blockCursor
+ block *block
+ metrics *batchMetrics
+ seen map[uint64][][]byte
+ minKey int64
+ maxKey int64
+ hasMin bool
+ hasMax bool
+}
+
+func (b *blockCursorBuilder) processWithFilter(req QueryRequest, log
*logger.Logger) error {
+ if req.TagFilter == nil {
+ return nil
+ }
+
+ tags := make([]*modelv1.Tag, 0, len(b.block.tags))
+ decoder := req.TagFilter.GetDecoder()
+
+ for i := 0; i < len(b.block.userKeys); i++ {
+ dataBytes := b.block.data[i]
+ hash, duplicate := b.checkDuplicate(dataBytes, true)
+ if duplicate {
+ continue
+ }
+
+ tags = b.collectTagsForFilter(tags, decoder, i)
+
+ matched, err := req.TagFilter.Match(tags)
+ if err != nil {
+ log.Error().Err(err).Msg("tag filter match error")
+ return err
+ }
+ if !matched {
+ continue
+ }
+
+ b.appendElement(i, hash, dataBytes)
+ }
+
+ return nil
+}
+
+func (b *blockCursorBuilder) processWithoutFilter() {
+ for i := 0; i < len(b.block.userKeys); i++ {
+ dataBytes := b.block.data[i]
+ hash, duplicate := b.checkDuplicate(dataBytes, false)
+ if duplicate {
+ continue
+ }
+
+ b.appendElement(i, hash, dataBytes)
+ }
+}
+
+func (b *blockCursorBuilder) collectTagsForFilter(buf []*modelv1.Tag, decoder
func(pbv1.ValueType, []byte) *modelv1.TagValue, index int) []*modelv1.Tag {
+ buf = buf[:0]
+ for tagName, tagData := range b.block.tags {
+ if index >= len(tagData.values) {
+ continue
+ }
+
+ row := &tagData.values[index]
+ var marshaledValue []byte
+ if row.valueArr != nil || row.value != nil {
+ marshaledValue = marshalTagRow(row, tagData.valueType)
+ }
+ if marshaledValue == nil {
+ continue
+ }
+
+ tagValue := decoder(tagData.valueType, marshaledValue)
+ if tagValue != nil {
+ buf = append(buf, &modelv1.Tag{
+ Key: tagName,
+ Value: tagValue,
+ })
+ }
+ }
+ return buf
+}
+
+func (b *blockCursorBuilder) appendElement(index int, hash uint64, dataBytes
[]byte) {
+ key := b.block.userKeys[index]
+ if !b.keyInRange(key) {
+ return
+ }
+
+ b.markSeen(hash, dataBytes)
+
+ b.bc.userKeys = append(b.bc.userKeys, key)
+
+ dataCopy := make([]byte, len(dataBytes))
+ copy(dataCopy, dataBytes)
+ b.bc.data = append(b.bc.data, dataCopy)
+
+ for tagName, tagData := range b.block.tags {
+ if index < len(tagData.values) {
+ row := &tagData.values[index]
+ tag := Tag{
+ Name: tagName,
+ ValueType: tagData.valueType,
+ }
+ if len(row.valueArr) > 0 {
+ tag.ValueArr = row.valueArr
+ } else if len(row.value) > 0 {
+ tag.Value = row.value
+ }
+ b.bc.tags[tagName] = append(b.bc.tags[tagName], tag)
+ }
+ }
+}
+
+func (b *blockCursorBuilder) checkDuplicate(dataBytes []byte, recordMetric
bool) (uint64, bool) {
+ hash := convert.Hash(dataBytes)
+ bucket := b.seen[hash]
+ for _, existing := range bucket {
+ if bytes.Equal(existing, dataBytes) {
+ if recordMetric && b.metrics != nil {
+ b.metrics.elementsDeduplicated.Add(1)
+ }
+ return hash, true
+ }
+ }
+ return hash, false
+}
+
+func (b *blockCursorBuilder) markSeen(hash uint64, dataBytes []byte) {
+ b.seen[hash] = append(b.seen[hash], dataBytes)
+}
+
+func (b *blockCursorBuilder) keyInRange(key int64) bool {
+ if b.hasMin && key < b.minKey {
+ return false
+ }
+ if b.hasMax && key > b.maxKey {
+ return false
+ }
+ return true
+}
+
// init initializes the block cursor.
func (bc *blockCursor) init(p *part, bm *blockMetadata, req QueryRequest) {
bc.p = p
@@ -288,6 +430,19 @@ func (s *sidx) loadBlockCursor(bc *blockCursor, tmpBlock
*block, bs blockScanRes
return false
}
+ var (
+ minKey int64
+ maxKey int64
+ hasMin = req.MinKey != nil
+ hasMax = req.MaxKey != nil
+ )
+ if hasMin {
+ minKey = *req.MinKey
+ }
+ if hasMax {
+ maxKey = *req.MaxKey
+ }
+
// Pre-allocate slices for filtered data (optimize for common case
where most elements match)
bc.userKeys = make([]int64, 0, totalElements)
bc.data = make([][]byte, 0, totalElements)
@@ -299,118 +454,23 @@ func (s *sidx) loadBlockCursor(bc *blockCursor, tmpBlock
*block, bs blockScanRes
}
// Track seen data for deduplication using hash buckets with collision
checks
- seenData := make(map[uint64][][]byte)
+ builder := &blockCursorBuilder{
+ bc: bc,
+ block: tmpBlock,
+ hasMin: hasMin,
+ minKey: minKey,
+ hasMax: hasMax,
+ maxKey: maxKey,
+ metrics: metrics,
+ seen: make(map[uint64][][]byte),
+ }
- // Single loop: filter and copy data in one pass
if req.TagFilter != nil {
- tags := make([]*modelv1.Tag, 0, len(tmpBlock.tags))
- decoder := req.TagFilter.GetDecoder()
-
- for i := 0; i < totalElements; i++ {
- // Check for duplicate data before processing via hash
+ bytes.Equal on collisions
- dataBytes := tmpBlock.data[i]
- h := convert.Hash(dataBytes)
- bucket := seenData[h]
- duplicate := false
- for _, b := range bucket {
- if bytes.Equal(b, dataBytes) {
- duplicate = true
- break
- }
- }
- if duplicate {
- if metrics != nil {
- metrics.elementsDeduplicated.Add(1)
- }
- continue
- }
-
- // Build tags slice for this element
- tags = tags[:0]
- for tagName, tagData := range tmpBlock.tags {
- if i < len(tagData.values) && tagData.values[i]
!= nil {
- // Decode []byte to *modelv1.TagValue
using the provided decoder
- tagValue := decoder(tagData.valueType,
tagData.values[i])
- if tagValue != nil {
- tags = append(tags,
&modelv1.Tag{
- Key: tagName,
- Value: tagValue,
- })
- }
- }
- }
-
- // Apply filter
- matched, err := req.TagFilter.Match(tags)
- if err != nil {
- s.l.Error().Err(err).Msg("tag filter match
error")
- return false
- }
-
- if matched {
- // Mark data as seen
- seenData[h] = append(bucket, dataBytes)
-
- // Copy userKey
- bc.userKeys = append(bc.userKeys,
tmpBlock.userKeys[i])
-
- // Copy data
- dataCopy := make([]byte, len(tmpBlock.data[i]))
- copy(dataCopy, tmpBlock.data[i])
- bc.data = append(bc.data, dataCopy)
-
- // Copy tags
- for tagName, tagData := range tmpBlock.tags {
- if i < len(tagData.values) {
- bc.tags[tagName] =
append(bc.tags[tagName], Tag{
- Name: tagName,
- Value:
tagData.values[i],
- ValueType:
tagData.valueType,
- })
- }
- }
- }
+ if err := builder.processWithFilter(req, s.l); err != nil {
+ return false
}
} else {
- // No filter - copy all elements but skip duplicates
- for i := 0; i < totalElements; i++ {
- // Check for duplicate data via hash + bytes.Equal on
collisions
- dataBytes := tmpBlock.data[i]
- h := convert.Hash(dataBytes)
- bucket := seenData[h]
- duplicate := false
- for _, b := range bucket {
- if bytes.Equal(b, dataBytes) {
- duplicate = true
- break
- }
- }
- if duplicate {
- continue
- }
-
- // Mark data as seen
- seenData[h] = append(bucket, dataBytes)
-
- // Copy userKey
- bc.userKeys = append(bc.userKeys, tmpBlock.userKeys[i])
-
- // Copy data
- dataCopy := make([]byte, len(tmpBlock.data[i]))
- copy(dataCopy, tmpBlock.data[i])
- bc.data = append(bc.data, dataCopy)
-
- // Copy tags
- for tagName, tagData := range tmpBlock.tags {
- if i < len(tagData.values) {
- bc.tags[tagName] =
append(bc.tags[tagName], Tag{
- Name: tagName,
- Value: tagData.values[i],
- ValueType: tagData.valueType,
- })
- }
- }
- }
+ builder.processWithoutFilter()
}
if metrics != nil {
diff --git a/banyand/internal/sidx/tag.go b/banyand/internal/sidx/tag.go
index d02ebf24..0314500f 100644
--- a/banyand/internal/sidx/tag.go
+++ b/banyand/internal/sidx/tag.go
@@ -20,6 +20,8 @@ package sidx
import (
"fmt"
+ internalencoding
"github.com/apache/skywalking-banyandb/banyand/internal/encoding"
+ "github.com/apache/skywalking-banyandb/pkg/bytes"
pkgencoding "github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/filter"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
@@ -44,12 +46,21 @@ type tagMetadata struct {
// tagData represents the runtime data for a tag with filtering capabilities.
type tagData struct {
- values [][]byte
- filter *filter.BloomFilter // For indexed tags
- name string
- min []byte // For int64 tags
- max []byte // For int64 tags
- valueType pbv1.ValueType
+ uniqueValues map[string]struct{}
+ name string
+ values []tagRow
+ tmpBytes [][]byte
+ valueType pbv1.ValueType
+}
+
+type tagRow struct {
+ value []byte
+ valueArr [][]byte
+}
+
+func (tr *tagRow) reset() {
+ tr.value = nil
+ tr.valueArr = tr.valueArr[:0]
}
var (
@@ -101,19 +112,20 @@ func (td *tagData) reset() {
// Reset values slice
for i := range td.values {
- td.values[i] = nil
+ td.values[i].reset()
}
td.values = td.values[:0]
- // Reset filter
- if td.filter != nil {
- releaseBloomFilter(td.filter)
- td.filter = nil
+ // Reset tmpBytes slice
+ for i := range td.tmpBytes {
+ td.tmpBytes[i] = nil
}
+ td.tmpBytes = td.tmpBytes[:0]
- // Reset min/max
- td.min = nil
- td.max = nil
+ // Reset uniqueValues map for reuse
+ for k := range td.uniqueValues {
+ delete(td.uniqueValues, k)
+ }
}
// reset clears tagMetadata for reuse in object pool.
@@ -134,8 +146,7 @@ func generateBloomFilter(expectedElements int)
*filter.BloomFilter {
}
// Reset and resize for new expected elements
v.SetN(expectedElements)
- m := expectedElements * filter.B
- v.ResizeBits((m + 63) / 64)
+ v.ResizeBits(filter.OptimalBitsSize(expectedElements))
return v
}
@@ -166,11 +177,9 @@ func decodeBloomFilter(src []byte) (*filter.BloomFilter,
error) {
n := pkgencoding.BytesToInt64(src)
bf := generateBloomFilter(int(n))
+ bitsLen := len(bf.Bits())
- m := n * filter.B
- bits := make([]uint64, 0, (m+63)/64)
- var err error
- bits, _, err = pkgencoding.DecodeUint64Block(bits, src[8:],
uint64((m+63)/64))
+ bits, _, err := pkgencoding.DecodeUint64Block(bf.Bits()[:0], src[8:],
uint64(bitsLen))
if err != nil {
releaseBloomFilter(bf)
return nil, fmt.Errorf("failed to decode bloom filter bits:
%w", err)
@@ -180,40 +189,59 @@ func decodeBloomFilter(src []byte) (*filter.BloomFilter,
error) {
return bf, nil
}
-// updateMinMax updates min/max values for int64 tags.
-func (td *tagData) updateMinMax() {
- if td.valueType != pbv1.ValueTypeInt64 || len(td.values) == 0 {
- return
+// marshalTagRow marshals the tagRow value to a byte slice.
+func marshalTagRow(tr *tagRow, valueType pbv1.ValueType) []byte {
+ if tr.valueArr != nil {
+ var dst []byte
+ for i := range tr.valueArr {
+ if valueType == pbv1.ValueTypeInt64Arr {
+ dst = append(dst, tr.valueArr[i]...)
+ continue
+ }
+ dst = internalencoding.MarshalVarArray(dst,
tr.valueArr[i])
+ }
+ return dst
+ }
+ return tr.value
+}
+
+// decodeAndConvertTagValues decodes encoded tag values and converts them to
tagRow format.
+// This is a common operation shared between reading from disk and querying.
+func decodeAndConvertTagValues(td *tagData, decoder
*pkgencoding.BytesBlockDecoder, encodedData *bytes.Buffer, valueType
pbv1.ValueType, count int) error {
+ var err error
+
+ // Decode to tmpBytes buffer, reusing the existing slice to avoid
allocations
+ td.tmpBytes, err = internalencoding.DecodeTagValues(td.tmpBytes[:0],
decoder, encodedData, valueType, count)
+ if err != nil {
+ return fmt.Errorf("cannot decode tag values: %w", err)
}
- var minVal, maxVal int64
- first := true
+ // Convert [][]byte to []tagRow based on valueType
+ if cap(td.values) < len(td.tmpBytes) {
+ td.values = make([]tagRow, len(td.tmpBytes))
+ } else {
+ td.values = td.values[:len(td.tmpBytes)]
+ }
- for _, value := range td.values {
- if len(value) != 8 {
- continue // Skip invalid int64 values
+ for i, encodedValue := range td.tmpBytes {
+ if encodedValue == nil {
+ td.values[i] = tagRow{}
+ continue
}
- val := pkgencoding.BytesToInt64(value)
-
- if first {
- minVal = val
- maxVal = val
- first = false
- } else {
- if val < minVal {
- minVal = val
- }
- if val > maxVal {
- maxVal = val
+ if valueType == pbv1.ValueTypeStrArr || valueType ==
pbv1.ValueTypeInt64Arr {
+ // For array types, unmarshal to valueArr
+ td.values[i].valueArr, err =
unmarshalTag(td.values[i].valueArr[:0], encodedValue, valueType)
+ if err != nil {
+ return fmt.Errorf("cannot unmarshal tag array:
%w", err)
}
+ } else {
+ // For scalar types, set value directly
+ td.values[i].value = encodedValue
}
}
- if !first {
- td.min = pkgencoding.Int64ToBytes(nil, minVal)
- td.max = pkgencoding.Int64ToBytes(nil, maxVal)
- }
+ return nil
}
// marshal serializes tag metadata to bytes using encoding package.
diff --git a/banyand/internal/sidx/tag_filter_op.go
b/banyand/internal/sidx/tag_filter_op.go
index 620f8aff..2f3318cc 100644
--- a/banyand/internal/sidx/tag_filter_op.go
+++ b/banyand/internal/sidx/tag_filter_op.go
@@ -246,9 +246,9 @@ func decodeBloomFilterFromBytes(src []byte, bf
*filter.BloomFilter) *filter.Bloo
n := encoding.BytesToInt64(src)
bf.SetN(int(n))
- m := n * filter.B
+ // With B=16, use optimized bit shift calculation
bits := make([]uint64, 0)
- bits, _, err := encoding.DecodeUint64Block(bits[:0], src[8:],
uint64((m+63)/64))
+ bits, _, err := encoding.DecodeUint64Block(bits[:0], src[8:],
uint64(filter.OptimalBitsSize(int(n))))
if err != nil {
logger.Panicf("failed to decode Bloom filter: %v", err)
}
diff --git a/banyand/internal/sidx/tag_test.go
b/banyand/internal/sidx/tag_test.go
index 2f96ff2e..e5eb113b 100644
--- a/banyand/internal/sidx/tag_test.go
+++ b/banyand/internal/sidx/tag_test.go
@@ -21,6 +21,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
)
@@ -115,3 +116,100 @@ func TestTagInWriteRequest(t *testing.T) {
assert.Equal(t, "service", req.Tags[0].Name)
assert.Equal(t, "environment", req.Tags[1].Name)
}
+
+func TestEncodeDecodeBloomFilter_RoundTrip(t *testing.T) {
+ // Test round-trip encoding and decoding
+ testCases := []struct {
+ name string
+ itemsToAdd [][]byte
+ itemsToCheck [][]byte
+ shouldContain []bool
+ expectedItems int
+ }{
+ {
+ name: "small filter",
+ expectedItems: 5,
+ itemsToAdd: [][]byte{
+ []byte("item1"),
+ []byte("item2"),
+ []byte("item3"),
+ },
+ itemsToCheck: [][]byte{
+ []byte("item1"),
+ []byte("item2"),
+ []byte("item3"),
+ []byte("not-added"),
+ },
+ shouldContain: []bool{true, true, true, false},
+ },
+ {
+ name: "medium filter",
+ expectedItems: 100,
+ itemsToAdd: [][]byte{
+ []byte("service1"),
+ []byte("service2"),
+ []byte("service3"),
+ },
+ itemsToCheck: [][]byte{
+ []byte("service1"),
+ []byte("service2"),
+ []byte("service3"),
+ []byte("not-added"),
+ },
+ shouldContain: []bool{true, true, true, false},
+ },
+ {
+ name: "empty filter",
+ expectedItems: 0,
+ itemsToAdd: [][]byte{},
+ itemsToCheck: [][]byte{
+ []byte("any-item"),
+ },
+ shouldContain: []bool{false},
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ // Create and populate original filter
+ original := generateBloomFilter(tc.expectedItems)
+ defer releaseBloomFilter(original)
+
+ for _, item := range tc.itemsToAdd {
+ original.Add(item)
+ }
+
+ // Encode
+ dst := make([]byte, 0)
+ encoded := encodeBloomFilter(dst, original)
+ require.Greater(t, len(encoded), 0, "encoded data
should not be empty")
+
+ // Decode
+ decoded, err := decodeBloomFilter(encoded)
+ require.NoError(t, err, "decoding should succeed")
+ require.NotNil(t, decoded, "decoded filter should not
be nil")
+ defer releaseBloomFilter(decoded)
+
+ // Verify N matches
+ assert.Equal(t, original.N(), decoded.N(), "N should
match")
+
+ // Verify bits match
+ originalBits := original.Bits()
+ decodedBits := decoded.Bits()
+ assert.Equal(t, len(originalBits), len(decodedBits),
"bits length should match")
+ for i := range originalBits {
+ assert.Equal(t, originalBits[i],
decodedBits[i], "bits[%d] should match", i)
+ }
+
+ // Verify filter behavior matches
+ for i, item := range tc.itemsToCheck {
+ originalResult := original.MightContain(item)
+ decodedResult := decoded.MightContain(item)
+ assert.Equal(t, originalResult, decodedResult,
"MightContain for item %d should match", i)
+ if len(tc.shouldContain) > i {
+ assert.Equal(t, tc.shouldContain[i],
decodedResult, "MightContain result should match expected")
+ }
+ }
+ })
+ }
+}
diff --git a/banyand/stream/block.go b/banyand/stream/block.go
index 3cf8c5c3..ddaa6ec7 100644
--- a/banyand/stream/block.go
+++ b/banyand/stream/block.go
@@ -113,7 +113,7 @@ func (b *block) processTags(tf tagValues, tagFamilyIdx, i
int, elementsLen int)
tags[j].filter = filter
}
tags[j].filter.SetN(elementsLen)
- tags[j].filter.ResizeBits((elementsLen*filter.B + 63) / 64)
+ tags[j].filter.ResizeBits(filter.OptimalBitsSize(elementsLen))
tags[j].filter.Add(t.value)
if t.valueType == pbv1.ValueTypeInt64 {
if len(tags[j].min) == 0 {
diff --git a/banyand/stream/tag_filter.go b/banyand/stream/tag_filter.go
index 1f7925be..3a8748a5 100644
--- a/banyand/stream/tag_filter.go
+++ b/banyand/stream/tag_filter.go
@@ -41,9 +41,9 @@ func decodeBloomFilter(src []byte, bf *filter.BloomFilter)
*filter.BloomFilter {
n := encoding.BytesToInt64(src)
bf.SetN(int(n))
- m := n * filter.B
+ // With B=16, use optimized bit shift calculation
bits := make([]uint64, 0)
- bits, _, err := encoding.DecodeUint64Block(bits[:0], src[8:],
uint64((m+63)/64))
+ bits, _, err := encoding.DecodeUint64Block(bits[:0], src[8:],
uint64(filter.OptimalBitsSize(int(n))))
if err != nil {
logger.Panicf("failed to decode Bloom filter: %v", err)
}
diff --git a/banyand/trace/block_writer.go b/banyand/trace/block_writer.go
index a3993dde..46aef8c4 100644
--- a/banyand/trace/block_writer.go
+++ b/banyand/trace/block_writer.go
@@ -192,7 +192,7 @@ func (bw *blockWriter) MustInitForMemPart(mp *memPart,
traceSize int) {
filter: generateTraceIDBloomFilter(),
}
bw.traceIDFilter.filter.SetN(traceSize)
- bw.traceIDFilter.filter.ResizeBits((traceSize*filter.B + 63) /
64)
+
bw.traceIDFilter.filter.ResizeBits(filter.OptimalBitsSize(traceSize))
}
}
@@ -214,7 +214,7 @@ func (bw *blockWriter) mustInitForFilePart(fileSystem
fs.FileSystem, path string
filter: generateTraceIDBloomFilter(),
}
bw.traceIDFilter.filter.SetN(traceSize)
- bw.traceIDFilter.filter.ResizeBits((traceSize*filter.B + 63) /
64)
+
bw.traceIDFilter.filter.ResizeBits(filter.OptimalBitsSize(traceSize))
}
}
diff --git a/banyand/trace/bloom_filter.go b/banyand/trace/bloom_filter.go
index 8bbbe26c..fb4da2ce 100644
--- a/banyand/trace/bloom_filter.go
+++ b/banyand/trace/bloom_filter.go
@@ -34,9 +34,9 @@ func decodeBloomFilter(src []byte, bf *filter.BloomFilter)
*filter.BloomFilter {
n := encoding.BytesToInt64(src)
bf.SetN(int(n))
- m := n * filter.B
+ // With B=16, use optimized bit shift calculation
bits := make([]uint64, 0)
- bits, _, err := encoding.DecodeUint64Block(bits[:0], src[8:],
uint64((m+63)/64))
+ bits, _, err := encoding.DecodeUint64Block(bits[:0], src[8:],
uint64(filter.OptimalBitsSize(int(n))))
if err != nil {
logger.Panicf("failed to decode Bloom filter: %v", err)
}
diff --git a/banyand/trace/query.go b/banyand/trace/query.go
index 2b0d9ded..6fa20fae 100644
--- a/banyand/trace/query.go
+++ b/banyand/trace/query.go
@@ -28,6 +28,7 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ "github.com/apache/skywalking-banyandb/banyand/internal/encoding"
"github.com/apache/skywalking-banyandb/banyand/internal/sidx"
"github.com/apache/skywalking-banyandb/banyand/internal/storage"
"github.com/apache/skywalking-banyandb/pkg/convert"
@@ -522,9 +523,9 @@ func mustDecodeTagValue(valueType pbv1.ValueType, value
[]byte) *modelv1.TagValu
defer bigValuePool.Release(bb)
var err error
for len(value) > 0 {
- bb.Buf, value, err = unmarshalVarArray(bb.Buf[:0],
value)
+ bb.Buf, value, err =
encoding.UnmarshalVarArray(bb.Buf[:0], value)
if err != nil {
- logger.Panicf("unmarshalVarArray failed: %v",
err)
+ logger.Panicf("UnmarshalVarArray failed: %v",
err)
}
values = append(values, string(bb.Buf))
}
diff --git a/banyand/trace/snapshot_test.go b/banyand/trace/snapshot_test.go
index f028f66e..81f8c916 100644
--- a/banyand/trace/snapshot_test.go
+++ b/banyand/trace/snapshot_test.go
@@ -125,13 +125,13 @@ func TestSnapshotGetParts(t *testing.T) {
snapshot: func() *snapshot {
bf1 := filter.NewBloomFilter(0)
bf1.SetN(2)
- bf1.ResizeBits((2*filter.B + 63) / 64)
+ bf1.ResizeBits(filter.OptimalBitsSize(2))
bf1.Add(convert.StringToBytes("trace1"))
bf1.Add(convert.StringToBytes("trace2"))
bf2 := filter.NewBloomFilter(0)
bf2.SetN(1)
- bf2.ResizeBits((1*filter.B + 63) / 64)
+ bf2.ResizeBits(filter.OptimalBitsSize(1))
bf2.Add(convert.StringToBytes("trace3"))
return &snapshot{
@@ -182,7 +182,7 @@ func TestSnapshotGetParts(t *testing.T) {
snapshot: func() *snapshot {
bf := filter.NewBloomFilter(0)
bf.SetN(2)
- bf.ResizeBits((2*filter.B + 63) / 64)
+ bf.ResizeBits(filter.OptimalBitsSize(2))
bf.Add(convert.StringToBytes("trace1"))
bf.Add(convert.StringToBytes("trace2"))
diff --git a/banyand/trace/traces.go b/banyand/trace/traces.go
index 1fc5b5ab..5b057af1 100644
--- a/banyand/trace/traces.go
+++ b/banyand/trace/traces.go
@@ -18,8 +18,6 @@
package trace
import (
- "github.com/pkg/errors"
-
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/banyand/internal/encoding"
"github.com/apache/skywalking-banyandb/banyand/internal/sidx"
@@ -74,31 +72,6 @@ func releaseTagValue(v *tagValue) {
var tagValuePool = pool.Register[*tagValue]("trace-tagValue")
-func unmarshalVarArray(dest, src []byte) ([]byte, []byte, error) {
- if len(src) == 0 {
- return nil, nil, errors.New("empty entity value")
- }
- if src[0] == encoding.EntityDelimiter {
- return dest, src[1:], nil
- }
- for len(src) > 0 {
- switch {
- case src[0] == encoding.Escape:
- if len(src) < 2 {
- return nil, nil, errors.New("invalid escape
character")
- }
- src = src[1:]
- dest = append(dest, src[0])
- case src[0] == encoding.EntityDelimiter:
- return dest, src[1:], nil
- default:
- dest = append(dest, src[0])
- }
- src = src[1:]
- }
- return nil, nil, errors.New("invalid variable array")
-}
-
type traces struct {
traceIDs []string
timestamps []int64
diff --git a/pkg/filter/bloom_filter.go b/pkg/filter/bloom_filter.go
index 98ff47c4..ad53692d 100644
--- a/pkg/filter/bloom_filter.go
+++ b/pkg/filter/bloom_filter.go
@@ -26,11 +26,25 @@ import (
)
const (
- k = 6
+ k = 10
// B specifies the number of bits allocated for each item.
- B = 15
+ // Using B=16 (power of 2) maintains memory alignment and enables
shift-based math.
+ // With 8k items per block: memory = 8192 * 16 / 8 = 16KB per block.
+ // FPR with k=10, B=16: ~0.042%.
+ B = 16
+ // _bMustBePowerOf2 ensures B is a power of 2 at compile time.
+ // A number is a power of 2 if and only if B > 0 and B & (B - 1) == 0.
+ // This check uses: 1 / (1 - (B & (B - 1))).
+ // - For powers of 2: B & (B - 1) == 0, so 1 / (1 - 0) = 1 (valid).
+ // - For non-powers of 2 where B & (B - 1) == 1: 1 / (1 - 1) = 1 / 0
(compile error).
+ // - For other non-powers of 2: may compile but provides basic
validation.
+ // Note: This is a best-effort compile-time check. For complete
validation,
+ // ensure B is explicitly set to a power of 2 (1, 2, 4, 8, 16, 32, 64,
...).
+ _bMustBePowerOf2 = 1 / (1 - (B & (B - 1)))
)
+var _ = _bMustBePowerOf2 // Ensure compile-time check is evaluated
+
// BloomFilter is a probabilistic data structure designed to test whether an
element is a member of a set.
type BloomFilter struct {
bits []uint64
@@ -39,8 +53,14 @@ type BloomFilter struct {
// NewBloomFilter creates a new Bloom filter with the number of items n and
false positive rate p.
func NewBloomFilter(n int) *BloomFilter {
- m := n * B
- bits := make([]uint64, (m+63)/64)
+ // With B=16, we can optimize: m = n * 16 = n << 4
+ // Number of uint64s needed: (n * 16) / 64 = n / 4 = n >> 2
+ // Ensure at least 1 uint64 to avoid empty slice
+ numBits := n >> 2
+ if numBits == 0 {
+ numBits = 1
+ }
+ bits := make([]uint64, numBits)
return &BloomFilter{
bits,
n,
@@ -49,6 +69,9 @@ func NewBloomFilter(n int) *BloomFilter {
// Reset resets the Bloom filter.
func (bf *BloomFilter) Reset() {
+ for i := range bf.bits {
+ bf.bits[i] = 0
+ }
bf.bits = bf.bits[:0]
bf.n = 0
}
@@ -130,3 +153,13 @@ func (bf *BloomFilter) ResizeBits(n int) {
bits = bits[:n]
bf.bits = bits
}
+
+// OptimalBitsSize returns the optimal number of uint64s needed for n items.
+// With B=16, this is simply n/4 (n >> 2), with a minimum of 1.
+func OptimalBitsSize(n int) int {
+ size := n >> 2
+ if size == 0 {
+ return 1
+ }
+ return size
+}
diff --git a/pkg/filter/bloom_filter_test.go b/pkg/filter/bloom_filter_test.go
index e76204b7..82fa7525 100644
--- a/pkg/filter/bloom_filter_test.go
+++ b/pkg/filter/bloom_filter_test.go
@@ -55,6 +55,22 @@ func TestBloomFilter(t *testing.T) {
}
}
+func TestBloomFilterResetClearsBits(t *testing.T) {
+ assert := assert.New(t)
+
+ const expected = 16
+ key := []byte("reuse-key")
+
+ bf := NewBloomFilter(expected)
+ assert.True(bf.Add(key))
+
+ bf.Reset()
+ bf.SetN(expected)
+ bf.ResizeBits(OptimalBitsSize(expected))
+
+ assert.False(bf.MightContain(key))
+}
+
func BenchmarkFilterAdd(b *testing.B) {
for _, n := range []int{1e3, 1e4, 1e5, 1e6, 1e7} {
data := generateTestData(n)