This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new c3d77adc fix(trace): prevent tagType corruption in partMergeIter (#789)
c3d77adc is described below
commit c3d77adc1125151f5d79f9bfddfa7099e21ceb8f
Author: Gao Hongtao <[email protected]>
AuthorDate: Thu Sep 25 12:22:25 2025 +0800
fix(trace): prevent tagType corruption in partMergeIter (#789)
The tagType field in partMergeIter was being shared with blockMetadata,
causing it to be reset when blockMetadata.reset() was called. This led
to empty tagType maps during block iteration.
Changes:
- Use maps.Copy() instead of direct assignment in blockMetadata.unmarshal()
to create independent copies of tagType maps
- Remove tagType copying logic in blockReader.nextMetadata() that was
causing the sharing issue
- Change partMergeIter.reset() to set tagType to nil instead of clearing
- Add comprehensive test case to verify tagType isolation per part
Fixes issue where partMergeIter.tagType could become empty due to
blockMetadata resets, causing incorrect tag type handling during
trace data processing.
---
banyand/trace/block_metadata.go | 8 +++-
banyand/trace/block_reader.go | 7 ----
banyand/trace/block_reader_test.go | 82 ++++++++++++++++++++++++++++++++++++++
banyand/trace/part_iter.go | 2 +-
4 files changed, 90 insertions(+), 9 deletions(-)
diff --git a/banyand/trace/block_metadata.go b/banyand/trace/block_metadata.go
index 605462e5..d2ade455 100644
--- a/banyand/trace/block_metadata.go
+++ b/banyand/trace/block_metadata.go
@@ -19,6 +19,7 @@ package trace
import (
"fmt"
+ "maps"
"sort"
"github.com/apache/skywalking-banyandb/pkg/convert"
@@ -159,7 +160,12 @@ func (bm *blockMetadata) unmarshal(src []byte, tagType
map[string]pbv1.ValueType
return nil, fmt.Errorf("cannot unmarshal traceID: %w", err)
}
bm.traceID = string(traceIDBytes)
- bm.tagType = tagType
+ if bm.tagType == nil {
+ bm.tagType = make(map[string]pbv1.ValueType)
+ } else {
+ clear(bm.tagType)
+ }
+ maps.Copy(bm.tagType, tagType)
src, n := encoding.BytesToVarUint64(src)
bm.uncompressedSpanSizeBytes = n
src, n = encoding.BytesToVarUint64(src)
diff --git a/banyand/trace/block_reader.go b/banyand/trace/block_reader.go
index ad1ef4c1..9fdf4145 100644
--- a/banyand/trace/block_reader.go
+++ b/banyand/trace/block_reader.go
@@ -26,7 +26,6 @@ import (
"github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/logger"
- pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/pool"
)
@@ -184,11 +183,6 @@ func (br *blockReader) nextBlockMetadata() bool {
func (br *blockReader) nextMetadata() error {
head := br.pih[0]
- tagType := make(map[string]pbv1.ValueType)
- for key, tv := range br.block.bm.tagType {
- tagType[key] = tv
- }
- head.tagType = tagType
if head.nextBlockMetadata() {
heap.Fix(&br.pih, 0)
br.block = &br.pih[0].block
@@ -207,7 +201,6 @@ func (br *blockReader) nextMetadata() error {
return io.EOF
}
- br.pih[0].block.bm.tagType = tagType
br.block = &br.pih[0].block
return nil
}
diff --git a/banyand/trace/block_reader_test.go
b/banyand/trace/block_reader_test.go
index f276b753..db95a5e2 100644
--- a/banyand/trace/block_reader_test.go
+++ b/banyand/trace/block_reader_test.go
@@ -26,7 +26,9 @@ import (
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/require"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/fs"
+ pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/test"
)
@@ -153,3 +155,83 @@ func Test_blockReader_nextBlock(t *testing.T) {
})
}
}
+
+func Test_blockReader_TagTypePerPart(t *testing.T) {
+ // Build two file parts for the same traceID but with disjoint tag sets
and types.
+ // The reader should use each part's own tagType when reading its
blocks, so the
+ // decoded tag names must match the keys present in bm.tags for that
block.
+
+ // Helper to build traces with a single span and one tag.
+ buildTraces := func(traceID string, ts int64, tagName string, vt
pbv1.ValueType, val []byte) *traces {
+ tv := &tagValue{tag: tagName, valueType: vt, value: val}
+ tr := &traces{
+ traceIDs: []string{traceID},
+ timestamps: []int64{ts},
+ tags: [][]*tagValue{{tv}},
+ spans: [][]byte{[]byte("span")},
+ }
+ return tr
+ }
+
+ tmpPath, defFn := test.Space(require.New(t))
+ defer defFn()
+
+ fileSystem := fs.NewLocalFileSystem()
+
+ // Part 1: tagA as string
+ mp1 := generateMemPart()
+ defer releaseMemPart(mp1)
+ ts1 := buildTraces("trace-1", 1, "tagA", pbv1.ValueTypeStr,
[]byte("v1"))
+ mp1.mustInitFromTraces(ts1)
+ mp1.mustFlush(fileSystem, partPath(tmpPath, 1))
+ pw1 := newPartWrapper(nil, mustOpenFilePart(1, tmpPath, fileSystem))
+ defer pw1.decRef()
+
+ // Part 2: tagB as int64
+ mp2 := generateMemPart()
+ defer releaseMemPart(mp2)
+ ts2 := buildTraces("trace-1", 2, "tagB", pbv1.ValueTypeInt64,
convert.Int64ToBytes(123))
+ mp2.mustInitFromTraces(ts2)
+ mp2.mustFlush(fileSystem, partPath(tmpPath, 2))
+ pw2 := newPartWrapper(nil, mustOpenFilePart(2, tmpPath, fileSystem))
+ defer pw2.decRef()
+
+ // Initialize block reader over both parts
+ pmi1 := &partMergeIter{}
+ pmi1.mustInitFromPart(pw1.p)
+ pmi2 := &partMergeIter{}
+ pmi2.mustInitFromPart(pw2.p)
+
+ br := generateBlockReader()
+ defer releaseBlockReader(br)
+ br.init([]*partMergeIter{pmi1, pmi2})
+
+ dec := generateColumnValuesDecoder()
+ defer releaseColumnValuesDecoder(dec)
+
+ var seen int
+ for br.nextBlockMetadata() {
+ // Load block data for current metadata
+ br.loadBlockData(dec)
+
+ // Collect expected tag names from bm.tags keys (sorted for
stability)
+ var expected []string
+ for name := range br.block.bm.tags {
+ expected = append(expected, name)
+ }
+ sort.Strings(expected)
+
+ // Collect actual decoded tag names from block
+ var actual []string
+ for _, t := range br.block.block.tags {
+ actual = append(actual, t.name)
+ }
+ sort.Strings(actual)
+
+ require.Equal(t, expected, actual, "decoded tag names must
match bm.tags keys for each block")
+ seen++
+ }
+
+ // We should have seen two blocks (one from each part)
+ require.Equal(t, 2, seen)
+}
diff --git a/banyand/trace/part_iter.go b/banyand/trace/part_iter.go
index 9c584b0a..d10b7620 100644
--- a/banyand/trace/part_iter.go
+++ b/banyand/trace/part_iter.go
@@ -267,7 +267,7 @@ type partMergeIter struct {
func (pmi *partMergeIter) reset() {
pmi.err = nil
pmi.seqReaders.reset()
- clear(pmi.tagType)
+ pmi.tagType = nil
pmi.primaryBlockMetadata = nil
pmi.primaryMetadataIdx = 0
pmi.primaryBuf = pmi.primaryBuf[:0]