This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new b57a24fc Trace、Stream Series Metadata in Liaison Sending Queue (#896)
b57a24fc is described below

commit b57a24fcd0885d1f8f2c7c893574818b2670aa43
Author: OmCheeLin <[email protected]>
AuthorDate: Tue Dec 16 17:29:18 2025 +0800

    Trace、Stream Series Metadata in Liaison Sending Queue (#896)
---
 CHANGES.md                      |   1 +
 banyand/stream/part.go          |  43 ++++++++++++--
 banyand/stream/part_test.go     | 121 ++++++++++++++++++++++++++++++++++++++++
 banyand/stream/tstable.go       |  11 +++-
 banyand/stream/write_liaison.go |  57 ++++++++++---------
 banyand/trace/part.go           |  49 +++++++++++++---
 banyand/trace/part_test.go      | 121 ++++++++++++++++++++++++++++++++++++++++
 banyand/trace/tstable.go        |  11 +++-
 banyand/trace/write_liaison.go  |  45 ++++++++-------
 9 files changed, 396 insertions(+), 63 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 8223dac7..897e0d75 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -13,6 +13,7 @@ Release Notes.
 - Update bydbQL to add sorted query support for the Property.
 - Remove the windows arch for binary and docker image.
 - Support writing data with specifications.
+- Persist series metadata in liaison queue for measure, stream and trace 
models.
 
 ### Bug Fixes
 
diff --git a/banyand/stream/part.go b/banyand/stream/part.go
index 39cb2145..b727f5d7 100644
--- a/banyand/stream/part.go
+++ b/banyand/stream/part.go
@@ -18,6 +18,7 @@
 package stream
 
 import (
+       "errors"
        "fmt"
        "path"
        "path/filepath"
@@ -35,17 +36,19 @@ import (
 
 const (
        // Streaming file names (without extensions).
-       streamPrimaryName       = "primary"
-       streamMetaName          = "meta"
-       streamTimestampsName    = "timestamps"
-       streamTagFamiliesPrefix = "tf:"
-       streamTagMetadataPrefix = "tfm:"
-       streamTagFilterPrefix   = "tff:"
+       streamPrimaryName        = "primary"
+       streamMetaName           = "meta"
+       streamTimestampsName     = "timestamps"
+       streamTagFamiliesPrefix  = "tf:"
+       streamTagMetadataPrefix  = "tfm:"
+       streamTagFilterPrefix    = "tff:"
+       streamSeriesMetadataName = "smeta"
 
        metadataFilename               = "metadata.json"
        primaryFilename                = streamPrimaryName + ".bin"
        metaFilename                   = streamMetaName + ".bin"
        timestampsFilename             = streamTimestampsName + ".bin"
+       seriesMetadataFilename         = streamSeriesMetadataName + ".bin"
        elementIndexFilename           = "idx"
        tagFamiliesMetadataFilenameExt = ".tfm"
        tagFamiliesFilenameExt         = ".tf"
@@ -59,6 +62,7 @@ type part struct {
        tagFamilyMetadata    map[string]fs.Reader
        tagFamilies          map[string]fs.Reader
        tagFamilyFilter      map[string]fs.Reader
+       seriesMetadata       fs.Reader // Optional: series metadata reader
        path                 string
        primaryBlockMetadata []primaryBlockMetadata
        partMetadata         partMetadata
@@ -67,6 +71,9 @@ type part struct {
 func (p *part) close() {
        fs.MustClose(p.primary)
        fs.MustClose(p.timestamps)
+       if p.seriesMetadata != nil {
+               fs.MustClose(p.seriesMetadata)
+       }
        for _, tf := range p.tagFamilies {
                fs.MustClose(tf)
        }
@@ -91,6 +98,9 @@ func openMemPart(mp *memPart) *part {
        // Open data files
        p.primary = &mp.primary
        p.timestamps = &mp.timestamps
+       if len(mp.seriesMetadata.Buf) > 0 {
+               p.seriesMetadata = &mp.seriesMetadata
+       }
        if mp.tagFamilies != nil {
                p.tagFamilies = make(map[string]fs.Reader)
                p.tagFamilyMetadata = make(map[string]fs.Reader)
@@ -111,6 +121,7 @@ type memPart struct {
        meta              bytes.Buffer
        primary           bytes.Buffer
        timestamps        bytes.Buffer
+       seriesMetadata    bytes.Buffer
        partMetadata      partMetadata
        segmentID         int64
 }
@@ -141,6 +152,7 @@ func (mp *memPart) reset() {
        mp.meta.Reset()
        mp.primary.Reset()
        mp.timestamps.Reset()
+       mp.seriesMetadata.Reset()
        mp.segmentID = 0
        if mp.tagFamilies != nil {
                for k, tf := range mp.tagFamilies {
@@ -211,6 +223,11 @@ func (mp *memPart) mustFlush(fileSystem fs.FileSystem, 
path string) {
                fs.MustFlush(fileSystem, tfh.Buf, filepath.Join(path, 
name+tagFamiliesFilterFilenameExt), storage.FilePerm)
        }
 
+       // Flush series metadata if available
+       if len(mp.seriesMetadata.Buf) > 0 {
+               fs.MustFlush(fileSystem, mp.seriesMetadata.Buf, 
filepath.Join(path, seriesMetadataFilename), storage.FilePerm)
+       }
+
        mp.partMetadata.mustWriteMetadata(fileSystem, path)
 
        fileSystem.SyncPath(path)
@@ -304,6 +321,20 @@ func mustOpenFilePart(id uint64, root string, fileSystem 
fs.FileSystem) *part {
 
        p.primary = mustOpenReader(path.Join(partPath, primaryFilename), 
fileSystem)
        p.timestamps = mustOpenReader(path.Join(partPath, timestampsFilename), 
fileSystem)
+
+       // Try to open series metadata file (optional, for backward 
compatibility)
+       seriesMetadataPath := path.Join(partPath, seriesMetadataFilename)
+       reader, err := fileSystem.OpenFile(seriesMetadataPath)
+       if err != nil {
+               var fsErr *fs.FileSystemError
+               // File does not exist is acceptable for backward compatibility
+               if !errors.As(err, &fsErr) || fsErr.Code != fs.IsNotExistError {
+                       logger.Panicf("cannot open series metadata file %q: 
%s", seriesMetadataPath, err)
+               }
+       } else {
+               p.seriesMetadata = reader
+       }
+
        ee := fileSystem.ReadDir(partPath)
        for _, e := range ee {
                if e.IsDir() {
diff --git a/banyand/stream/part_test.go b/banyand/stream/part_test.go
index f4443998..638bde1e 100644
--- a/banyand/stream/part_test.go
+++ b/banyand/stream/part_test.go
@@ -18,6 +18,7 @@
 package stream
 
 import (
+       "path/filepath"
        "testing"
 
        "github.com/stretchr/testify/assert"
@@ -26,6 +27,7 @@ import (
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/index"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        "github.com/apache/skywalking-banyandb/pkg/test"
 )
@@ -180,3 +182,122 @@ var es = &elements{
                {}, // empty tagFamilies for seriesID 3
        },
 }
+
+func TestSeriesMetadataPersistence(t *testing.T) {
+       tmpPath, defFn := test.Space(require.New(t))
+       defer defFn()
+
+       fileSystem := fs.NewLocalFileSystem()
+       epoch := uint64(12345)
+       path := partPath(tmpPath, epoch)
+
+       // Create a memPart with elements
+       mp := generateMemPart()
+       mp.mustInitFromElements(es)
+
+       // Create sample series metadata using NewBytesField
+       field1 := index.NewBytesField(index.FieldKey{
+               IndexRuleID: 1,
+               Analyzer:    "keyword",
+               TagName:     "tag1",
+       }, []byte("term1"))
+       field2 := index.NewBytesField(index.FieldKey{
+               IndexRuleID: 2,
+               Analyzer:    "keyword",
+               TagName:     "tag2",
+       }, []byte("term2"))
+       metadataDocs := index.Documents{
+               {
+                       DocID:        1,
+                       EntityValues: []byte("entity1"),
+                       Fields:       []index.Field{field1},
+               },
+               {
+                       DocID:        2,
+                       EntityValues: []byte("entity2"),
+                       Fields:       []index.Field{field2},
+               },
+       }
+
+       // Marshal series metadata
+       seriesMetadataBytes, err := metadataDocs.Marshal()
+       require.NoError(t, err)
+       require.NotEmpty(t, seriesMetadataBytes)
+
+       // Set series metadata in memPart
+       _, err = mp.seriesMetadata.Write(seriesMetadataBytes)
+       require.NoError(t, err)
+
+       // Flush to disk
+       mp.mustFlush(fileSystem, path)
+
+       // Verify series metadata file exists by trying to read it
+       seriesMetadataPath := filepath.Join(path, seriesMetadataFilename)
+       readBytes, err := fileSystem.Read(seriesMetadataPath)
+       require.NoError(t, err, "series metadata file should exist")
+       assert.Equal(t, seriesMetadataBytes, readBytes, "series metadata 
content should match")
+
+       // Open the part and verify series metadata is accessible
+       p := mustOpenFilePart(epoch, tmpPath, fileSystem)
+       defer p.close()
+
+       // Verify series metadata reader is available
+       assert.NotNil(t, p.seriesMetadata, "series metadata reader should be 
available")
+
+       // Read and unmarshal series metadata using SequentialRead
+       seqReader := p.seriesMetadata.SequentialRead()
+       defer seqReader.Close()
+       readMetadataBytes := make([]byte, 0)
+       buf := make([]byte, 1024)
+       for {
+               var n int
+               n, err = seqReader.Read(buf)
+               if n == 0 {
+                       if err != nil {
+                               break
+                       }
+                       continue
+               }
+               if err != nil {
+                       break
+               }
+               readMetadataBytes = append(readMetadataBytes, buf[:n]...)
+       }
+
+       var readDocs index.Documents
+       err = readDocs.Unmarshal(readMetadataBytes)
+       require.NoError(t, err)
+       assert.Equal(t, len(metadataDocs), len(readDocs), "number of documents 
should match")
+       assert.Equal(t, metadataDocs[0].DocID, readDocs[0].DocID, "first 
document DocID should match")
+       assert.Equal(t, metadataDocs[1].DocID, readDocs[1].DocID, "second 
document DocID should match")
+}
+
+func TestSeriesMetadataBackwardCompatibility(t *testing.T) {
+       tmpPath, defFn := test.Space(require.New(t))
+       defer defFn()
+
+       fileSystem := fs.NewLocalFileSystem()
+       epoch := uint64(67890)
+       path := partPath(tmpPath, epoch)
+
+       // Create a memPart with elements but without series metadata
+       mp := generateMemPart()
+       mp.mustInitFromElements(es)
+       // Don't set series metadata to simulate old parts
+
+       // Flush to disk
+       mp.mustFlush(fileSystem, path)
+
+       // Verify series metadata file does not exist by trying to read it
+       seriesMetadataPath := filepath.Join(path, seriesMetadataFilename)
+       _, err := fileSystem.Read(seriesMetadataPath)
+       assert.Error(t, err, "series metadata file should not exist for old 
parts")
+
+       // Open the part - should work without series metadata (backward 
compatibility)
+       p := mustOpenFilePart(epoch, tmpPath, fileSystem)
+       defer p.close()
+
+       // Verify part can be opened successfully
+       assert.NotNil(t, p, "part should be opened successfully")
+       assert.Nil(t, p.seriesMetadata, "series metadata reader should be nil 
for old parts")
+}
diff --git a/banyand/stream/tstable.go b/banyand/stream/tstable.go
index 4ce74d7a..26f345f0 100644
--- a/banyand/stream/tstable.go
+++ b/banyand/stream/tstable.go
@@ -342,10 +342,10 @@ func (tst *tsTable) mustAddMemPart(mp *memPart) {
 }
 
 func (tst *tsTable) mustAddElements(es *elements) {
-       tst.mustAddElementsWithSegmentID(es, 0)
+       tst.mustAddElementsWithSegmentID(es, 0, nil)
 }
 
-func (tst *tsTable) mustAddElementsWithSegmentID(es *elements, segmentID 
int64) {
+func (tst *tsTable) mustAddElementsWithSegmentID(es *elements, segmentID 
int64, seriesMetadata []byte) {
        if len(es.seriesIDs) == 0 {
                return
        }
@@ -353,6 +353,13 @@ func (tst *tsTable) mustAddElementsWithSegmentID(es 
*elements, segmentID int64)
        mp := generateMemPart()
        mp.mustInitFromElements(es)
        mp.segmentID = segmentID
+       if len(seriesMetadata) > 0 {
+               // Write series metadata to buffer to avoid sharing the 
underlying slice
+               _, err := mp.seriesMetadata.Write(seriesMetadata)
+               if err != nil {
+                       logger.Panicf("cannot write series metadata to buffer: 
%s", err)
+               }
+       }
        tst.mustAddMemPart(mp)
 }
 
diff --git a/banyand/stream/write_liaison.go b/banyand/stream/write_liaison.go
index b86f48d1..15717971 100644
--- a/banyand/stream/write_liaison.go
+++ b/banyand/stream/write_liaison.go
@@ -193,8 +193,20 @@ func (w *writeQueueCallback) Rev(ctx context.Context, 
message bus.Message) (resp
                g := groups[i]
                for j := range g.tables {
                        es := g.tables[j]
-                       es.tsTable.mustAddElementsWithSegmentID(es.elements, 
es.timeRange.Start.UnixNano())
-                       releaseElements(es.elements)
+                       // Marshal series metadata for persistence in part 
folder
+                       var seriesMetadataBytes []byte
+                       if len(es.seriesDocs.docs) > 0 {
+                               var marshalErr error
+                               seriesMetadataBytes, marshalErr = 
es.seriesDocs.docs.Marshal()
+                               if marshalErr != nil {
+                                       
w.l.Error().Err(marshalErr).Uint32("shardID", uint32(es.shardID)).Msg("failed 
to marshal series metadata for persistence")
+                                       // Continue without series metadata, 
but log the error
+                               }
+                       }
+                       if es.tsTable != nil && es.elements != nil {
+                               
es.tsTable.mustAddElementsWithSegmentID(es.elements, 
es.timeRange.Start.UnixNano(), seriesMetadataBytes)
+                               releaseElements(es.elements)
+                       }
                        // Get nodes for this shard
                        nodes := g.queue.GetNodes(es.shardID)
                        if len(nodes) == 0 {
@@ -202,30 +214,25 @@ func (w *writeQueueCallback) Rev(ctx context.Context, 
message bus.Message) (resp
                                continue
                        }
                        // Process series documents independently
-                       if len(es.seriesDocs.docs) > 0 {
-                               seriesDocData, marshalErr := 
es.seriesDocs.docs.Marshal()
-                               if marshalErr != nil {
-                                       
w.l.Error().Err(marshalErr).Uint32("shardID", uint32(es.shardID)).Msg("failed 
to marshal series documents")
-                               } else {
-                                       // Encode group name, start timestamp 
from timeRange, and prepend to docData
-                                       combinedData := make([]byte, 0, 
len(seriesDocData)+len(g.name)+8)
-                                       combinedData = 
encoding.EncodeBytes(combinedData, convert.StringToBytes(g.name))
-                                       combinedData = 
encoding.Int64ToBytes(combinedData, es.timeRange.Start.UnixNano())
-                                       combinedData = append(combinedData, 
seriesDocData...)
+                       if len(seriesMetadataBytes) > 0 {
+                               // Encode group name, start timestamp from 
timeRange, and prepend to docData
+                               combinedData := make([]byte, 0, 
len(seriesMetadataBytes)+len(g.name)+8)
+                               combinedData = 
encoding.EncodeBytes(combinedData, convert.StringToBytes(g.name))
+                               combinedData = 
encoding.Int64ToBytes(combinedData, es.timeRange.Start.UnixNano())
+                               combinedData = append(combinedData, 
seriesMetadataBytes...)
 
-                                       // Send to all nodes for this shard
-                                       for _, node := range nodes {
-                                               message := 
bus.NewMessageWithNode(bus.MessageID(time.Now().UnixNano()), node, combinedData)
-                                               future, publishErr := 
w.tire2Client.Publish(ctx, data.TopicStreamSeriesIndexWrite, message)
-                                               if publishErr != nil {
-                                                       
w.l.Error().Err(publishErr).Str("node", node).Uint32("shardID", 
uint32(es.shardID)).Msg("failed to publish series index to node")
-                                                       continue
-                                               }
-                                               _, err := future.Get()
-                                               if err != nil {
-                                                       
w.l.Error().Err(err).Str("node", node).Uint32("shardID", 
uint32(es.shardID)).Msg("failed to get response from publish")
-                                                       continue
-                                               }
+                               // Send to all nodes for this shard
+                               for _, node := range nodes {
+                                       message := 
bus.NewMessageWithNode(bus.MessageID(time.Now().UnixNano()), node, combinedData)
+                                       future, publishErr := 
w.tire2Client.Publish(ctx, data.TopicStreamSeriesIndexWrite, message)
+                                       if publishErr != nil {
+                                               
w.l.Error().Err(publishErr).Str("node", node).Uint32("shardID", 
uint32(es.shardID)).Msg("failed to publish series index to node")
+                                               continue
+                                       }
+                                       _, err := future.Get()
+                                       if err != nil {
+                                               
w.l.Error().Err(err).Str("node", node).Uint32("shardID", 
uint32(es.shardID)).Msg("failed to get response from publish")
+                                               continue
                                        }
                                }
                        }
diff --git a/banyand/trace/part.go b/banyand/trace/part.go
index 0e2c50be..3ed53196 100644
--- a/banyand/trace/part.go
+++ b/banyand/trace/part.go
@@ -18,6 +18,7 @@
 package trace
 
 import (
+       "errors"
        "fmt"
        "io"
        "path"
@@ -41,12 +42,14 @@ const (
        traceSpansName          = "spans"
        traceTagsPrefix         = "t:"
        traceTagMetadataPrefix  = "tm:"
+       traceSeriesMetadataName = "smeta"
        metadataFilename        = "metadata.json"
        traceIDFilterFilename   = "traceID.filter"
        tagTypeFilename         = "tag.type"
        primaryFilename         = tracePrimaryName + ".bin"
        metaFilename            = traceMetaName + ".bin"
        spansFilename           = traceSpansName + ".bin"
+       seriesMetadataFilename  = traceSeriesMetadataName + ".bin"
        tagsMetadataFilenameExt = ".tm"
        tagsFilenameExt         = ".t"
 )
@@ -59,6 +62,7 @@ type part struct {
        tags                 map[string]fs.Reader
        tagType              tagType
        traceIDFilter        traceIDFilter
+       seriesMetadata       fs.Reader // Optional: series metadata reader
        path                 string
        primaryBlockMetadata []primaryBlockMetadata
        partMetadata         partMetadata
@@ -67,6 +71,9 @@ type part struct {
 func (p *part) close() {
        fs.MustClose(p.primary)
        fs.MustClose(p.spans)
+       if p.seriesMetadata != nil {
+               fs.MustClose(p.seriesMetadata)
+       }
        for _, t := range p.tags {
                fs.MustClose(t)
        }
@@ -91,6 +98,9 @@ func openMemPart(mp *memPart) *part {
        // Open data files
        p.primary = &mp.primary
        p.spans = &mp.spans
+       if len(mp.seriesMetadata.Buf) > 0 {
+               p.seriesMetadata = &mp.seriesMetadata
+       }
        if mp.tags != nil {
                p.tags = make(map[string]fs.Reader)
                p.tagMetadata = make(map[string]fs.Reader)
@@ -103,15 +113,16 @@ func openMemPart(mp *memPart) *part {
 }
 
 type memPart struct {
-       tagMetadata   map[string]*bytes.Buffer
-       tags          map[string]*bytes.Buffer
-       tagType       tagType
-       traceIDFilter traceIDFilter
-       spans         bytes.Buffer
-       meta          bytes.Buffer
-       primary       bytes.Buffer
-       partMetadata  partMetadata
-       segmentID     int64
+       tagMetadata    map[string]*bytes.Buffer
+       tags           map[string]*bytes.Buffer
+       tagType        tagType
+       traceIDFilter  traceIDFilter
+       spans          bytes.Buffer
+       meta           bytes.Buffer
+       primary        bytes.Buffer
+       seriesMetadata bytes.Buffer
+       partMetadata   partMetadata
+       segmentID      int64
 }
 
 func (mp *memPart) mustCreateMemTagWriters(name string) (fs.Writer, fs.Writer) 
{
@@ -142,6 +153,7 @@ func (mp *memPart) reset() {
        mp.meta.Reset()
        mp.primary.Reset()
        mp.spans.Reset()
+       mp.seriesMetadata.Reset()
        mp.segmentID = 0
        if mp.tags != nil {
                for k, t := range mp.tags {
@@ -284,6 +296,11 @@ func (mp *memPart) mustFlush(fileSystem fs.FileSystem, 
path string) {
                fs.MustFlush(fileSystem, tm.Buf, filepath.Join(path, 
name+tagsMetadataFilenameExt), storage.FilePerm)
        }
 
+       // Flush series metadata if available
+       if len(mp.seriesMetadata.Buf) > 0 {
+               fs.MustFlush(fileSystem, mp.seriesMetadata.Buf, 
filepath.Join(path, seriesMetadataFilename), storage.FilePerm)
+       }
+
        mp.partMetadata.mustWriteMetadata(fileSystem, path)
        mp.tagType.mustWriteTagType(fileSystem, path)
        mp.traceIDFilter.mustWriteTraceIDFilter(fileSystem, path)
@@ -370,6 +387,20 @@ func mustOpenFilePart(id uint64, root string, fileSystem 
fs.FileSystem) *part {
 
        p.primary = mustOpenReader(path.Join(partPath, primaryFilename), 
fileSystem)
        p.spans = mustOpenReader(path.Join(partPath, spansFilename), fileSystem)
+
+       // Try to open series metadata file (optional, for backward 
compatibility)
+       seriesMetadataPath := path.Join(partPath, seriesMetadataFilename)
+       reader, err := fileSystem.OpenFile(seriesMetadataPath)
+       if err != nil {
+               var fsErr *fs.FileSystemError
+               // File does not exist is acceptable for backward compatibility
+               if !errors.As(err, &fsErr) || fsErr.Code != fs.IsNotExistError {
+                       logger.Panicf("cannot open series metadata file %q: 
%s", seriesMetadataPath, err)
+               }
+       } else {
+               p.seriesMetadata = reader
+       }
+
        ee := fileSystem.ReadDir(partPath)
        for _, e := range ee {
                if e.IsDir() {
diff --git a/banyand/trace/part_test.go b/banyand/trace/part_test.go
index eb2fa840..f5c7a4ea 100644
--- a/banyand/trace/part_test.go
+++ b/banyand/trace/part_test.go
@@ -18,6 +18,7 @@
 package trace
 
 import (
+       "path/filepath"
        "testing"
 
        "github.com/stretchr/testify/assert"
@@ -25,6 +26,7 @@ import (
 
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/index"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        "github.com/apache/skywalking-banyandb/pkg/test"
 )
@@ -263,3 +265,122 @@ func TestMustInitFromPart(t *testing.T) {
                assert.Equal(t, originalBuffer.Buf, newBuffer.Buf)
        }
 }
+
+func TestSeriesMetadataPersistence(t *testing.T) {
+       tmpPath, defFn := test.Space(require.New(t))
+       defer defFn()
+
+       fileSystem := fs.NewLocalFileSystem()
+       epoch := uint64(12345)
+       path := partPath(tmpPath, epoch)
+
+       // Create a memPart with traces
+       mp := generateMemPart()
+       mp.mustInitFromTraces(ts)
+
+       // Create sample series metadata using NewBytesField
+       field1 := index.NewBytesField(index.FieldKey{
+               IndexRuleID: 1,
+               Analyzer:    "keyword",
+               TagName:     "tag1",
+       }, []byte("term1"))
+       field2 := index.NewBytesField(index.FieldKey{
+               IndexRuleID: 2,
+               Analyzer:    "keyword",
+               TagName:     "tag2",
+       }, []byte("term2"))
+       metadataDocs := index.Documents{
+               {
+                       DocID:        1,
+                       EntityValues: []byte("entity1"),
+                       Fields:       []index.Field{field1},
+               },
+               {
+                       DocID:        2,
+                       EntityValues: []byte("entity2"),
+                       Fields:       []index.Field{field2},
+               },
+       }
+
+       // Marshal series metadata
+       seriesMetadataBytes, err := metadataDocs.Marshal()
+       require.NoError(t, err)
+       require.NotEmpty(t, seriesMetadataBytes)
+
+       // Set series metadata in memPart
+       _, err = mp.seriesMetadata.Write(seriesMetadataBytes)
+       require.NoError(t, err)
+
+       // Flush to disk
+       mp.mustFlush(fileSystem, path)
+
+       // Verify series metadata file exists by trying to read it
+       seriesMetadataPath := filepath.Join(path, seriesMetadataFilename)
+       readBytes, err := fileSystem.Read(seriesMetadataPath)
+       require.NoError(t, err, "series metadata file should exist")
+       assert.Equal(t, seriesMetadataBytes, readBytes, "series metadata 
content should match")
+
+       // Open the part and verify series metadata is accessible
+       p := mustOpenFilePart(epoch, tmpPath, fileSystem)
+       defer p.close()
+
+       // Verify series metadata reader is available
+       assert.NotNil(t, p.seriesMetadata, "series metadata reader should be 
available")
+
+       // Read and unmarshal series metadata using SequentialRead
+       seqReader := p.seriesMetadata.SequentialRead()
+       defer seqReader.Close()
+       readMetadataBytes := make([]byte, 0)
+       buf := make([]byte, 1024)
+       for {
+               var n int
+               n, err = seqReader.Read(buf)
+               if n == 0 {
+                       if err != nil {
+                               break
+                       }
+                       continue
+               }
+               if err != nil {
+                       break
+               }
+               readMetadataBytes = append(readMetadataBytes, buf[:n]...)
+       }
+
+       var readDocs index.Documents
+       err = readDocs.Unmarshal(readMetadataBytes)
+       require.NoError(t, err)
+       assert.Equal(t, len(metadataDocs), len(readDocs), "number of documents 
should match")
+       assert.Equal(t, metadataDocs[0].DocID, readDocs[0].DocID, "first 
document DocID should match")
+       assert.Equal(t, metadataDocs[1].DocID, readDocs[1].DocID, "second 
document DocID should match")
+}
+
+func TestSeriesMetadataBackwardCompatibility(t *testing.T) {
+       tmpPath, defFn := test.Space(require.New(t))
+       defer defFn()
+
+       fileSystem := fs.NewLocalFileSystem()
+       epoch := uint64(67890)
+       path := partPath(tmpPath, epoch)
+
+       // Create a memPart with traces but without series metadata
+       mp := generateMemPart()
+       mp.mustInitFromTraces(ts)
+       // Don't set series metadata to simulate old parts
+
+       // Flush to disk
+       mp.mustFlush(fileSystem, path)
+
+       // Verify series metadata file does not exist by trying to read it
+       seriesMetadataPath := filepath.Join(path, seriesMetadataFilename)
+       _, err := fileSystem.Read(seriesMetadataPath)
+       assert.Error(t, err, "series metadata file should not exist for old 
parts")
+
+       // Open the part - should work without series metadata (backward 
compatibility)
+       p := mustOpenFilePart(epoch, tmpPath, fileSystem)
+       defer p.close()
+
+       // Verify part can be opened successfully
+       assert.NotNil(t, p, "part should be opened successfully")
+       assert.Nil(t, p.seriesMetadata, "series metadata reader should be nil 
for old parts")
+}
diff --git a/banyand/trace/tstable.go b/banyand/trace/tstable.go
index 287b17a8..6d168568 100644
--- a/banyand/trace/tstable.go
+++ b/banyand/trace/tstable.go
@@ -486,10 +486,10 @@ func (tst *tsTable) mustAddMemPart(mp *memPart, 
sidxReqsMap map[string]*sidx.Mem
 }
 
 func (tst *tsTable) mustAddTraces(ts *traces, sidxReqsMap 
map[string]*sidx.MemPart) {
-       tst.mustAddTracesWithSegmentID(ts, 0, sidxReqsMap)
+       tst.mustAddTracesWithSegmentID(ts, 0, sidxReqsMap, nil)
 }
 
-func (tst *tsTable) mustAddTracesWithSegmentID(ts *traces, segmentID int64, 
sidxReqsMap map[string]*sidx.MemPart) {
+func (tst *tsTable) mustAddTracesWithSegmentID(ts *traces, segmentID int64, 
sidxReqsMap map[string]*sidx.MemPart, seriesMetadata []byte) {
        if len(ts.traceIDs) == 0 {
                return
        }
@@ -497,6 +497,13 @@ func (tst *tsTable) mustAddTracesWithSegmentID(ts *traces, 
segmentID int64, sidx
        mp := generateMemPart()
        mp.mustInitFromTraces(ts)
        mp.segmentID = segmentID
+       if len(seriesMetadata) > 0 {
+               // Write series metadata to buffer to avoid sharing the 
underlying slice
+               _, err := mp.seriesMetadata.Write(seriesMetadata)
+               if err != nil {
+                       logger.Panicf("cannot write series metadata to buffer: 
%s", err)
+               }
+       }
 
        tst.mustAddMemPart(mp, sidxReqsMap)
 }
diff --git a/banyand/trace/write_liaison.go b/banyand/trace/write_liaison.go
index a234cfab..9660e72f 100644
--- a/banyand/trace/write_liaison.go
+++ b/banyand/trace/write_liaison.go
@@ -204,6 +204,16 @@ func (w *writeQueueCallback) Rev(ctx context.Context, 
message bus.Message) (resp
                g := groups[i]
                for j := range g.tables {
                        es := g.tables[j]
+                       // Marshal series metadata for persistence in part 
folder
+                       var seriesMetadataBytes []byte
+                       if len(es.seriesDocs.docs) > 0 {
+                               var marshalErr error
+                               seriesMetadataBytes, marshalErr = 
es.seriesDocs.docs.Marshal()
+                               if marshalErr != nil {
+                                       
w.l.Error().Err(marshalErr).Uint32("shardID", uint32(es.shardID)).Msg("failed 
to marshal series metadata for persistence")
+                                       // Continue without series metadata, 
but log the error
+                               }
+                       }
                        var sidxMemPartMap map[string]*sidx.MemPart
                        for sidxName, sidxReqs := range es.sidxReqsMap {
                                if len(sidxReqs) > 0 {
@@ -223,8 +233,10 @@ func (w *writeQueueCallback) Rev(ctx context.Context, 
message bus.Message) (resp
                                        sidxMemPartMap[sidxName] = siMemPart
                                }
                        }
-                       es.tsTable.mustAddTracesWithSegmentID(es.traces, 
es.timeRange.Start.UnixNano(), sidxMemPartMap)
-                       releaseTraces(es.traces)
+                       if es.tsTable != nil && es.traces != nil {
+                               
es.tsTable.mustAddTracesWithSegmentID(es.traces, es.timeRange.Start.UnixNano(), 
sidxMemPartMap, seriesMetadataBytes)
+                               releaseTraces(es.traces)
+                       }
 
                        nodes := g.queue.GetNodes(es.shardID)
                        if len(nodes) == 0 {
@@ -233,24 +245,19 @@ func (w *writeQueueCallback) Rev(ctx context.Context, 
message bus.Message) (resp
                        }
 
                        // Handle series index writing
-                       if len(es.seriesDocs.docs) > 0 {
-                               seriesDocData, marshalErr := 
es.seriesDocs.docs.Marshal()
-                               if marshalErr != nil {
-                                       
w.l.Error().Err(marshalErr).Uint32("shardID", uint32(es.shardID)).Msg("failed 
to marshal series documents")
-                               } else {
-                                       // Encode group name, start timestamp 
from timeRange, and prepend to docData
-                                       combinedData := make([]byte, 0, 
len(seriesDocData)+len(g.name)+8)
-                                       combinedData = 
encoding.EncodeBytes(combinedData, convert.StringToBytes(g.name))
-                                       combinedData = 
encoding.Int64ToBytes(combinedData, es.timeRange.Start.UnixNano())
-                                       combinedData = append(combinedData, 
seriesDocData...)
+                       if len(seriesMetadataBytes) > 0 {
+                               // Encode group name, start timestamp from 
timeRange, and prepend to docData
+                               combinedData := make([]byte, 0, 
len(seriesMetadataBytes)+len(g.name)+8)
+                               combinedData = 
encoding.EncodeBytes(combinedData, convert.StringToBytes(g.name))
+                               combinedData = 
encoding.Int64ToBytes(combinedData, es.timeRange.Start.UnixNano())
+                               combinedData = append(combinedData, 
seriesMetadataBytes...)
 
-                                       // Send to all nodes for this shard
-                                       for _, node := range nodes {
-                                               message := 
bus.NewMessageWithNode(bus.MessageID(time.Now().UnixNano()), node, combinedData)
-                                               _, publishErr := 
w.tire2Client.Publish(ctx, data.TopicTraceSidxSeriesWrite, message)
-                                               if publishErr != nil {
-                                                       
w.l.Error().Err(publishErr).Str("node", node).Uint32("shardID", 
uint32(es.shardID)).Msg("failed to publish series index to node")
-                                               }
+                               // Send to all nodes for this shard
+                               for _, node := range nodes {
+                                       message := 
bus.NewMessageWithNode(bus.MessageID(time.Now().UnixNano()), node, combinedData)
+                                       _, publishErr := 
w.tire2Client.Publish(ctx, data.TopicTraceSidxSeriesWrite, message)
+                                       if publishErr != nil {
+                                               
w.l.Error().Err(publishErr).Str("node", node).Uint32("shardID", 
uint32(es.shardID)).Msg("failed to publish series index to node")
                                        }
                                }
                        }

Reply via email to