This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new c3d77adc fix(trace): prevent tagType corruption in partMergeIter (#789)
c3d77adc is described below

commit c3d77adc1125151f5d79f9bfddfa7099e21ceb8f
Author: Gao Hongtao <[email protected]>
AuthorDate: Thu Sep 25 12:22:25 2025 +0800

    fix(trace): prevent tagType corruption in partMergeIter (#789)
    
    The tagType field in partMergeIter was being shared with blockMetadata,
    causing it to be reset when blockMetadata.reset() was called. This led
    to empty tagType maps during block iteration.
    
    Changes:
    - Use maps.Copy() instead of direct assignment in blockMetadata.unmarshal()
      to create independent copies of tagType maps
    - Remove tagType copying logic in blockReader.nextMetadata() that was
      causing the sharing issue
    - Change partMergeIter.reset() to set tagType to nil instead of clearing
    - Add comprehensive test case to verify tagType isolation per part
    
    Fixes issue where partMergeIter.tagType could become empty due to
    blockMetadata resets, causing incorrect tag type handling during
    trace data processing.
---
 banyand/trace/block_metadata.go    |  8 +++-
 banyand/trace/block_reader.go      |  7 ----
 banyand/trace/block_reader_test.go | 82 ++++++++++++++++++++++++++++++++++++++
 banyand/trace/part_iter.go         |  2 +-
 4 files changed, 90 insertions(+), 9 deletions(-)

diff --git a/banyand/trace/block_metadata.go b/banyand/trace/block_metadata.go
index 605462e5..d2ade455 100644
--- a/banyand/trace/block_metadata.go
+++ b/banyand/trace/block_metadata.go
@@ -19,6 +19,7 @@ package trace
 
 import (
        "fmt"
+       "maps"
        "sort"
 
        "github.com/apache/skywalking-banyandb/pkg/convert"
@@ -159,7 +160,12 @@ func (bm *blockMetadata) unmarshal(src []byte, tagType 
map[string]pbv1.ValueType
                return nil, fmt.Errorf("cannot unmarshal traceID: %w", err)
        }
        bm.traceID = string(traceIDBytes)
-       bm.tagType = tagType
+       if bm.tagType == nil {
+               bm.tagType = make(map[string]pbv1.ValueType)
+       } else {
+               clear(bm.tagType)
+       }
+       maps.Copy(bm.tagType, tagType)
        src, n := encoding.BytesToVarUint64(src)
        bm.uncompressedSpanSizeBytes = n
        src, n = encoding.BytesToVarUint64(src)
diff --git a/banyand/trace/block_reader.go b/banyand/trace/block_reader.go
index ad1ef4c1..9fdf4145 100644
--- a/banyand/trace/block_reader.go
+++ b/banyand/trace/block_reader.go
@@ -26,7 +26,6 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/logger"
-       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        "github.com/apache/skywalking-banyandb/pkg/pool"
 )
 
@@ -184,11 +183,6 @@ func (br *blockReader) nextBlockMetadata() bool {
 
 func (br *blockReader) nextMetadata() error {
        head := br.pih[0]
-       tagType := make(map[string]pbv1.ValueType)
-       for key, tv := range br.block.bm.tagType {
-               tagType[key] = tv
-       }
-       head.tagType = tagType
        if head.nextBlockMetadata() {
                heap.Fix(&br.pih, 0)
                br.block = &br.pih[0].block
@@ -207,7 +201,6 @@ func (br *blockReader) nextMetadata() error {
                return io.EOF
        }
 
-       br.pih[0].block.bm.tagType = tagType
        br.block = &br.pih[0].block
        return nil
 }
diff --git a/banyand/trace/block_reader_test.go 
b/banyand/trace/block_reader_test.go
index f276b753..db95a5e2 100644
--- a/banyand/trace/block_reader_test.go
+++ b/banyand/trace/block_reader_test.go
@@ -26,7 +26,9 @@ import (
        "github.com/google/go-cmp/cmp/cmpopts"
        "github.com/stretchr/testify/require"
 
+       "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/fs"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        "github.com/apache/skywalking-banyandb/pkg/test"
 )
 
@@ -153,3 +155,83 @@ func Test_blockReader_nextBlock(t *testing.T) {
                })
        }
 }
+
+func Test_blockReader_TagTypePerPart(t *testing.T) {
+       // Build two file parts for the same traceID but with disjoint tag sets 
and types.
+       // The reader should use each part's own tagType when reading its 
blocks, so the
+       // decoded tag names must match the keys present in bm.tags for that 
block.
+
+       // Helper to build traces with a single span and one tag.
+       buildTraces := func(traceID string, ts int64, tagName string, vt 
pbv1.ValueType, val []byte) *traces {
+               tv := &tagValue{tag: tagName, valueType: vt, value: val}
+               tr := &traces{
+                       traceIDs:   []string{traceID},
+                       timestamps: []int64{ts},
+                       tags:       [][]*tagValue{{tv}},
+                       spans:      [][]byte{[]byte("span")},
+               }
+               return tr
+       }
+
+       tmpPath, defFn := test.Space(require.New(t))
+       defer defFn()
+
+       fileSystem := fs.NewLocalFileSystem()
+
+       // Part 1: tagA as string
+       mp1 := generateMemPart()
+       defer releaseMemPart(mp1)
+       ts1 := buildTraces("trace-1", 1, "tagA", pbv1.ValueTypeStr, 
[]byte("v1"))
+       mp1.mustInitFromTraces(ts1)
+       mp1.mustFlush(fileSystem, partPath(tmpPath, 1))
+       pw1 := newPartWrapper(nil, mustOpenFilePart(1, tmpPath, fileSystem))
+       defer pw1.decRef()
+
+       // Part 2: tagB as int64
+       mp2 := generateMemPart()
+       defer releaseMemPart(mp2)
+       ts2 := buildTraces("trace-1", 2, "tagB", pbv1.ValueTypeInt64, 
convert.Int64ToBytes(123))
+       mp2.mustInitFromTraces(ts2)
+       mp2.mustFlush(fileSystem, partPath(tmpPath, 2))
+       pw2 := newPartWrapper(nil, mustOpenFilePart(2, tmpPath, fileSystem))
+       defer pw2.decRef()
+
+       // Initialize block reader over both parts
+       pmi1 := &partMergeIter{}
+       pmi1.mustInitFromPart(pw1.p)
+       pmi2 := &partMergeIter{}
+       pmi2.mustInitFromPart(pw2.p)
+
+       br := generateBlockReader()
+       defer releaseBlockReader(br)
+       br.init([]*partMergeIter{pmi1, pmi2})
+
+       dec := generateColumnValuesDecoder()
+       defer releaseColumnValuesDecoder(dec)
+
+       var seen int
+       for br.nextBlockMetadata() {
+               // Load block data for current metadata
+               br.loadBlockData(dec)
+
+               // Collect expected tag names from bm.tags keys (sorted for 
stability)
+               var expected []string
+               for name := range br.block.bm.tags {
+                       expected = append(expected, name)
+               }
+               sort.Strings(expected)
+
+               // Collect actual decoded tag names from block
+               var actual []string
+               for _, t := range br.block.block.tags {
+                       actual = append(actual, t.name)
+               }
+               sort.Strings(actual)
+
+               require.Equal(t, expected, actual, "decoded tag names must 
match bm.tags keys for each block")
+               seen++
+       }
+
+       // We should have seen two blocks (one from each part)
+       require.Equal(t, 2, seen)
+}
diff --git a/banyand/trace/part_iter.go b/banyand/trace/part_iter.go
index 9c584b0a..d10b7620 100644
--- a/banyand/trace/part_iter.go
+++ b/banyand/trace/part_iter.go
@@ -267,7 +267,7 @@ type partMergeIter struct {
 func (pmi *partMergeIter) reset() {
        pmi.err = nil
        pmi.seqReaders.reset()
-       clear(pmi.tagType)
+       pmi.tagType = nil
        pmi.primaryBlockMetadata = nil
        pmi.primaryMetadataIdx = 0
        pmi.primaryBuf = pmi.primaryBuf[:0]

Reply via email to