This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new c43573c5 Fix the OOM when Merging Trace (#787)
c43573c5 is described below
commit c43573c59155bf1d2d4982135526d36a29b99b33
Author: Gao Hongtao <[email protected]>
AuthorDate: Wed Sep 24 22:11:25 2025 +0800
Fix the OOM when Merging Trace (#787)
---
banyand/trace/block_metadata.go | 28 +++++++---------------
banyand/trace/block_metadata_test.go | 12 +++++-----
banyand/trace/block_writer.go | 6 ++---
banyand/trace/merger.go | 17 ++++---------
banyand/trace/part.go | 5 ----
banyand/trace/part_iter.go | 5 ++--
banyand/trace/primary_metadata.go | 28 ++++++++--------------
banyand/trace/syncer.go | 14 +++++++++++
.../distributed/query/query_suite_test.go | 5 ----
9 files changed, 48 insertions(+), 72 deletions(-)
diff --git a/banyand/trace/block_metadata.go b/banyand/trace/block_metadata.go
index 02402671..605462e5 100644
--- a/banyand/trace/block_metadata.go
+++ b/banyand/trace/block_metadata.go
@@ -18,14 +18,11 @@
package trace
import (
- "bytes"
"fmt"
"sort"
- "strings"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/encoding"
- "github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/pool"
"github.com/apache/skywalking-banyandb/pkg/query/model"
@@ -136,12 +133,8 @@ func (bm *blockMetadata) reset() {
}
}
-func (bm *blockMetadata) marshal(dst []byte, traceIDLen uint32) []byte {
- dst = append(dst, bm.traceID...)
- paddingLen := traceIDLen - uint32(len(bm.traceID))
- if paddingLen > 0 {
- dst = append(dst, bytes.Repeat([]byte{0}, int(paddingLen))...)
- }
+func (bm *blockMetadata) marshal(dst []byte) []byte {
+ dst = encoding.EncodeBytes(dst, convert.StringToBytes(bm.traceID))
dst = encoding.VarUint64ToBytes(dst, bm.uncompressedSpanSizeBytes)
dst = encoding.VarUint64ToBytes(dst, bm.count)
dst = bm.spans.marshal(dst)
@@ -160,16 +153,13 @@ func (bm *blockMetadata) marshal(dst []byte, traceIDLen
uint32) []byte {
return dst
}
-func (bm *blockMetadata) unmarshal(src []byte, tagType
map[string]pbv1.ValueType, traceIDLen int) ([]byte, error) {
- if len(src) < traceIDLen {
- return nil, fmt.Errorf("cannot unmarshal blockMetadata from
less than %d bytes", traceIDLen)
- }
- bm.traceID = strings.TrimRight(string(src[:traceIDLen]), "\x00")
- if len(tagType) == 0 {
- logger.GetLogger().Error().Msg("tagType is empty")
+func (bm *blockMetadata) unmarshal(src []byte, tagType
map[string]pbv1.ValueType) ([]byte, error) {
+ src, traceIDBytes, err := encoding.DecodeBytes(src)
+ if err != nil {
+ return nil, fmt.Errorf("cannot unmarshal traceID: %w", err)
}
+ bm.traceID = string(traceIDBytes)
bm.tagType = tagType
- src = src[traceIDLen:]
src, n := encoding.BytesToVarUint64(src)
bm.uncompressedSpanSizeBytes = n
src, n = encoding.BytesToVarUint64(src)
@@ -259,7 +249,7 @@ func (tm *timestampsMetadata) copyFrom(src
*timestampsMetadata) {
tm.max = src.max
}
-func unmarshalBlockMetadata(dst []blockMetadata, src []byte, tagType
map[string]pbv1.ValueType, traceIDLen int) ([]blockMetadata, error) {
+func unmarshalBlockMetadata(dst []blockMetadata, src []byte, tagType
map[string]pbv1.ValueType) ([]blockMetadata, error) {
dstOrig := dst
var pre *blockMetadata
for len(src) > 0 {
@@ -269,7 +259,7 @@ func unmarshalBlockMetadata(dst []blockMetadata, src
[]byte, tagType map[string]
dst = append(dst, blockMetadata{})
}
bm := &dst[len(dst)-1]
- tail, err := bm.unmarshal(src, tagType, traceIDLen)
+ tail, err := bm.unmarshal(src, tagType)
if err != nil {
return dstOrig, fmt.Errorf("cannot unmarshal
blockMetadata entries: %w", err)
}
diff --git a/banyand/trace/block_metadata_test.go
b/banyand/trace/block_metadata_test.go
index 3084edf1..b6f60866 100644
--- a/banyand/trace/block_metadata_test.go
+++ b/banyand/trace/block_metadata_test.go
@@ -168,13 +168,13 @@ func Test_blockMetadata_marshal_unmarshal(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
- marshaled := tc.original.marshal(nil, 6)
+ marshaled := tc.original.marshal(nil)
unmarshaled := blockMetadata{
tags: make(map[string]*dataBlock),
}
- _, err := unmarshaled.unmarshal(marshaled, nil, 6)
+ _, err := unmarshaled.unmarshal(marshaled, nil)
require.NoError(t, err)
assert.Equal(t, tc.original.traceID,
unmarshaled.traceID)
@@ -242,11 +242,11 @@ func Test_unmarshalBlockMetadata(t *testing.T) {
var marshaled []byte
for _, bm := range original {
- marshaled = bm.marshal(marshaled, 6)
+ marshaled = bm.marshal(marshaled)
}
tagType := make(map[string]pbv1.ValueType)
- unmarshaled, err := unmarshalBlockMetadata(nil, marshaled,
tagType, 6)
+ unmarshaled, err := unmarshalBlockMetadata(nil, marshaled,
tagType)
require.NoError(t, err)
require.Equal(t, wanted, unmarshaled)
})
@@ -277,11 +277,11 @@ func Test_unmarshalBlockMetadata(t *testing.T) {
var marshaled []byte
for _, bm := range original {
- marshaled = bm.marshal(marshaled, 6)
+ marshaled = bm.marshal(marshaled)
}
tagType := make(map[string]pbv1.ValueType)
- _, err := unmarshalBlockMetadata(nil, marshaled, tagType, 6)
+ _, err := unmarshalBlockMetadata(nil, marshaled, tagType)
require.Error(t, err)
})
}
diff --git a/banyand/trace/block_writer.go b/banyand/trace/block_writer.go
index 6d896bfd..88b1c858 100644
--- a/banyand/trace/block_writer.go
+++ b/banyand/trace/block_writer.go
@@ -148,12 +148,10 @@ type blockWriter struct {
totalMinTimestamp int64
totalMaxTimestamp int64
minTimestampLast int64
- traceIDLen uint32
}
func (bw *blockWriter) reset() {
bw.writers.reset()
- bw.traceIDLen = 0
bw.traceIDs = bw.traceIDs[:0]
if bw.tagType == nil {
bw.tagType = make(tagType)
@@ -249,7 +247,7 @@ func (bw *blockWriter) mustWriteBlock(tid string, b *block)
{
bw.totalCount += bm.count
bw.totalBlocksCount++
- bw.primaryBlockData = bm.marshal(bw.primaryBlockData, bw.traceIDLen)
+ bw.primaryBlockData = bm.marshal(bw.primaryBlockData)
releaseBlockMetadata(bm)
if len(bw.primaryBlockData) > maxUncompressedPrimaryBlockSize {
bw.mustFlushPrimaryBlock(bw.primaryBlockData)
@@ -259,7 +257,7 @@ func (bw *blockWriter) mustWriteBlock(tid string, b *block)
{
func (bw *blockWriter) mustFlushPrimaryBlock(data []byte) {
if len(data) > 0 {
- bw.primaryBlockMetadata.mustWriteBlock(data, bw.traceIDLen,
bw.traceIDs[0], &bw.writers)
+ bw.primaryBlockMetadata.mustWriteBlock(data, bw.traceIDs[0],
&bw.writers)
bw.metaData = bw.primaryBlockMetadata.marshal(bw.metaData)
}
bw.minTimestamp = 0
diff --git a/banyand/trace/merger.go b/banyand/trace/merger.go
index d96fad6f..a19400b8 100644
--- a/banyand/trace/merger.go
+++ b/banyand/trace/merger.go
@@ -259,13 +259,6 @@ func (tst *tsTable) mergeParts(fileSystem fs.FileSystem,
closeCh <-chan struct{}
br.init(pii)
bw := generateBlockWriter()
bw.mustInitForFilePart(fileSystem, dstPath, shouldCache)
- for _, pw := range parts {
- for _, pbm := range pw.p.primaryBlockMetadata {
- if len(pbm.traceID) > int(bw.traceIDLen) {
- bw.traceIDLen = uint32(len(pbm.traceID))
- }
- }
- }
var minTimestamp, maxTimestamp int64
for i, pw := range parts {
@@ -284,16 +277,16 @@ func (tst *tsTable) mergeParts(fileSystem fs.FileSystem,
closeCh <-chan struct{}
}
pm, tf, tt, err := mergeBlocks(closeCh, bw, br)
- if err != nil {
- return nil, err
- }
- pm.MinTimestamp = minTimestamp
- pm.MaxTimestamp = maxTimestamp
releaseBlockWriter(bw)
releaseBlockReader(br)
for i := range pii {
releasePartMergeIter(pii[i])
}
+ if err != nil {
+ return nil, err
+ }
+ pm.MinTimestamp = minTimestamp
+ pm.MaxTimestamp = maxTimestamp
pm.mustWriteMetadata(fileSystem, dstPath)
tf.mustWriteTraceIDFilter(fileSystem, dstPath)
tt.mustWriteTagType(fileSystem, dstPath)
diff --git a/banyand/trace/part.go b/banyand/trace/part.go
index b76caab9..213a15ff 100644
--- a/banyand/trace/part.go
+++ b/banyand/trace/part.go
@@ -446,11 +446,6 @@ func (mp *memPart) mustInitFromTraces(ts *traces) {
bsw := generateBlockWriter()
bsw.MustInitForMemPart(mp)
- for _, tid := range ts.traceIDs {
- if len(tid) > int(bsw.traceIDLen) {
- bsw.traceIDLen = uint32(len(tid))
- }
- }
var tidPrev string
uncompressedSpansSizeBytes := uint64(0)
diff --git a/banyand/trace/part_iter.go b/banyand/trace/part_iter.go
index f6f069a1..9c584b0a 100644
--- a/banyand/trace/part_iter.go
+++ b/banyand/trace/part_iter.go
@@ -214,7 +214,7 @@ func (pi *partIter) readPrimaryBlock(bms []blockMetadata,
mr *primaryBlockMetada
if err != nil {
return nil, fmt.Errorf("cannot decompress index block: %w", err)
}
- bms, err = unmarshalBlockMetadata(bms, pi.primaryBuf, pi.p.tagType,
int(mr.traceIDLen))
+ bms, err = unmarshalBlockMetadata(bms, pi.primaryBuf, pi.p.tagType)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal index block: %w", err)
}
@@ -326,8 +326,7 @@ func (pmi *partMergeIter) loadPrimaryBuf() error {
func (pmi *partMergeIter) loadBlockMetadata() error {
pmi.block.reset()
var err error
- traceIDLen :=
pmi.primaryBlockMetadata[pmi.primaryMetadataIdx-1].traceIDLen
- pmi.primaryBuf, err = pmi.block.bm.unmarshal(pmi.primaryBuf,
pmi.tagType, int(traceIDLen))
+ pmi.primaryBuf, err = pmi.block.bm.unmarshal(pmi.primaryBuf,
pmi.tagType)
if err != nil {
pm := pmi.primaryBlockMetadata[pmi.primaryMetadataIdx-1]
return fmt.Errorf("can't read block metadata from primary at
%d: %w", pm.offset, err)
diff --git a/banyand/trace/primary_metadata.go
b/banyand/trace/primary_metadata.go
index 5e7b16a5..304ae833 100644
--- a/banyand/trace/primary_metadata.go
+++ b/banyand/trace/primary_metadata.go
@@ -18,12 +18,11 @@
package trace
import (
- "bytes"
"fmt"
"io"
- "strings"
"github.com/apache/skywalking-banyandb/pkg/compress/zstd"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/logger"
@@ -32,19 +31,16 @@ import (
type primaryBlockMetadata struct {
traceID string
dataBlock
- traceIDLen uint32
}
// reset resets pbm for subsequent re-use.
func (pbm *primaryBlockMetadata) reset() {
- pbm.traceIDLen = 0
pbm.traceID = ""
pbm.offset = 0
pbm.size = 0
}
-func (pbm *primaryBlockMetadata) mustWriteBlock(data []byte, traceIDLen
uint32, traceID string, sw *writers) {
- pbm.traceIDLen = traceIDLen
+func (pbm *primaryBlockMetadata) mustWriteBlock(data []byte, traceID string,
sw *writers) {
pbm.traceID = traceID
bb := bigValuePool.Generate()
@@ -56,25 +52,21 @@ func (pbm *primaryBlockMetadata) mustWriteBlock(data
[]byte, traceIDLen uint32,
}
func (pbm *primaryBlockMetadata) marshal(dst []byte) []byte {
- dst = encoding.Uint32ToBytes(dst, pbm.traceIDLen)
- dst = append(dst, pbm.traceID...)
- paddingLen := pbm.traceIDLen - uint32(len(pbm.traceID))
- if paddingLen > 0 {
- dst = append(dst, bytes.Repeat([]byte{0}, int(paddingLen))...)
- }
+ dst = encoding.EncodeBytes(dst, convert.StringToBytes(pbm.traceID))
dst = encoding.Uint64ToBytes(dst, pbm.offset)
dst = encoding.Uint64ToBytes(dst, pbm.size)
return dst
}
func (pbm *primaryBlockMetadata) unmarshal(src []byte) ([]byte, error) {
- pbm.traceIDLen = encoding.BytesToUint32(src)
- src = src[4:]
- if len(src) < int(16+pbm.traceIDLen) {
- return nil, fmt.Errorf("cannot unmarshal primaryBlockMetadata
from %d bytes; expect at least %d bytes", len(src), 32+pbm.traceIDLen)
+ if len(src) < 4 {
+ return nil, fmt.Errorf("cannot unmarshal primaryBlockMetadata
from %d bytes; expect at least 4 bytes for traceID length", len(src))
+ }
+ src, traceIDBytes, err := encoding.DecodeBytes(src)
+ if err != nil {
+ return nil, fmt.Errorf("cannot unmarshal traceID: %w", err)
}
- pbm.traceID = strings.TrimRight(string(src[:pbm.traceIDLen]), "\x00")
- src = src[pbm.traceIDLen:]
+ pbm.traceID = string(traceIDBytes)
pbm.offset = encoding.BytesToUint64(src)
src = src[8:]
pbm.size = encoding.BytesToUint64(src)
diff --git a/banyand/trace/syncer.go b/banyand/trace/syncer.go
index a1ef98e0..ba95c470 100644
--- a/banyand/trace/syncer.go
+++ b/banyand/trace/syncer.go
@@ -189,6 +189,20 @@ func (tst *tsTable) syncSnapshot(curSnapshot *snapshot,
syncCh chan *syncIntrodu
if err != nil {
return err
}
+ if len(partsToSync) == 0 && len(sidxPartsToSync) == 0 {
+ return nil
+ }
+ hasSidxParts := false
+ for _, sidxParts := range sidxPartsToSync {
+ if len(sidxParts) == 0 {
+ continue
+ }
+ hasSidxParts = true
+ break
+ }
+ if len(partsToSync) == 0 && !hasSidxParts {
+ return nil
+ }
// Validate sync preconditions
if err := tst.validateSyncPreconditions(partsToSync, sidxPartsToSync);
err != nil {
diff --git a/test/integration/distributed/query/query_suite_test.go
b/test/integration/distributed/query/query_suite_test.go
index 297445ba..1f67007d 100644
--- a/test/integration/distributed/query/query_suite_test.go
+++ b/test/integration/distributed/query/query_suite_test.go
@@ -49,7 +49,6 @@ import (
casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure"
casesstream "github.com/apache/skywalking-banyandb/test/cases/stream"
casestopn "github.com/apache/skywalking-banyandb/test/cases/topn"
- casestrace "github.com/apache/skywalking-banyandb/test/cases/trace"
)
func TestQuery(t *testing.T) {
@@ -131,10 +130,6 @@ var _ = SynchronizedBeforeSuite(func() []byte {
Connection: connection,
BaseTime: now,
}
- casestrace.SharedContext = helpers.SharedContext{
- Connection: connection,
- BaseTime: now,
- }
Expect(err).NotTo(HaveOccurred())
})