This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 2bf77e13 Measure Series Metadata in Liaison Sending Queue (#890)
2bf77e13 is described below

commit 2bf77e13ee58cda75bf28b999187a3cb980b93bf
Author: OmCheeLin <[email protected]>
AuthorDate: Mon Dec 15 08:57:05 2025 +0800

    Measure Series Metadata in Liaison Sending Queue (#890)
    
    
    ---------
    
    Co-authored-by: Gao Hongtao <[email protected]>
    Co-authored-by: 吴晟 Wu Sheng <[email protected]>
    Co-authored-by: Copilot <[email protected]>
---
 banyand/measure/part.go          |  40 +++++++++++--
 banyand/measure/part_test.go     | 121 +++++++++++++++++++++++++++++++++++++++
 banyand/measure/tstable.go       |  11 +++-
 banyand/measure/write_liaison.go |  31 ++++++----
 4 files changed, 184 insertions(+), 19 deletions(-)

diff --git a/banyand/measure/part.go b/banyand/measure/part.go
index e47bfbc6..7cb29232 100644
--- a/banyand/measure/part.go
+++ b/banyand/measure/part.go
@@ -18,6 +18,7 @@
 package measure
 
 import (
+       "errors"
        "fmt"
        "path"
        "path/filepath"
@@ -35,18 +36,20 @@ import (
 
 const (
        // Streaming file names for measure data parts (without extensions).
-       measurePrimaryName       = "primary"
-       measureMetaName          = "meta"
-       measureTimestampsName    = "timestamps"
-       measureFieldValuesName   = "fv"
-       measureTagFamiliesPrefix = "tf:"
-       measureTagMetadataPrefix = "tfm:"
+       measurePrimaryName        = "primary"
+       measureMetaName           = "meta"
+       measureTimestampsName     = "timestamps"
+       measureFieldValuesName    = "fv"
+       measureTagFamiliesPrefix  = "tf:"
+       measureTagMetadataPrefix  = "tfm:"
+       measureSeriesMetadataName = "smeta"
 
        metadataFilename               = "metadata.json"
        primaryFilename                = measurePrimaryName + ".bin"
        metaFilename                   = measureMetaName + ".bin"
        timestampsFilename             = measureTimestampsName + ".bin"
        fieldValuesFilename            = measureFieldValuesName + ".bin"
+       seriesMetadataFilename         = measureSeriesMetadataName + ".bin"
        tagFamiliesMetadataFilenameExt = ".tfm"
        tagFamiliesFilenameExt         = ".tf"
 )
@@ -58,6 +61,7 @@ type part struct {
        fileSystem           fs.FileSystem
        tagFamilyMetadata    map[string]fs.Reader
        tagFamilies          map[string]fs.Reader
+       seriesMetadata       fs.Reader // Optional: series metadata reader
        cache                storage.Cache
        path                 string
        primaryBlockMetadata []primaryBlockMetadata
@@ -68,6 +72,9 @@ func (p *part) close() {
        fs.MustClose(p.primary)
        fs.MustClose(p.timestamps)
        fs.MustClose(p.fieldValues)
+       if p.seriesMetadata != nil {
+               fs.MustClose(p.seriesMetadata)
+       }
        for _, tf := range p.tagFamilies {
                fs.MustClose(tf)
        }
@@ -108,6 +115,7 @@ type memPart struct {
        primary           bytes.Buffer
        timestamps        bytes.Buffer
        fieldValues       bytes.Buffer
+       seriesMetadata    bytes.Buffer
        partMetadata      partMetadata
        segmentID         int64
 }
@@ -135,6 +143,7 @@ func (mp *memPart) reset() {
        mp.primary.Reset()
        mp.timestamps.Reset()
        mp.fieldValues.Reset()
+       mp.seriesMetadata.Reset()
        if mp.tagFamilies != nil {
                for k, tf := range mp.tagFamilies {
                        tf.Reset()
@@ -208,6 +217,11 @@ func (mp *memPart) mustFlush(fileSystem fs.FileSystem, 
path string) {
                fs.MustFlush(fileSystem, tfh.Buf, filepath.Join(path, 
name+tagFamiliesMetadataFilenameExt), storage.FilePerm)
        }
 
+       // Flush series metadata if available
+       if len(mp.seriesMetadata.Buf) > 0 {
+               fs.MustFlush(fileSystem, mp.seriesMetadata.Buf, 
filepath.Join(path, seriesMetadataFilename), storage.FilePerm)
+       }
+
        mp.partMetadata.mustWriteMetadata(fileSystem, path)
 
        fileSystem.SyncPath(path)
@@ -306,6 +320,20 @@ func mustOpenFilePart(id uint64, root string, fileSystem 
fs.FileSystem) *part {
        p.primary = mustOpenReader(path.Join(partPath, primaryFilename), 
fileSystem)
        p.timestamps = mustOpenReader(path.Join(partPath, timestampsFilename), 
fileSystem)
        p.fieldValues = mustOpenReader(path.Join(partPath, 
fieldValuesFilename), fileSystem)
+
+       // Try to open series metadata file (optional, for backward 
compatibility)
+       seriesMetadataPath := path.Join(partPath, seriesMetadataFilename)
+       reader, err := fileSystem.OpenFile(seriesMetadataPath)
+       if err != nil {
+               var fsErr *fs.FileSystemError
+               // File does not exist is acceptable for backward compatibility
+               if !errors.As(err, &fsErr) || fsErr.Code != fs.IsNotExistError {
+                       logger.Panicf("cannot open series metadata file %q: 
%s", seriesMetadataPath, err)
+               }
+       } else {
+               p.seriesMetadata = reader
+       }
+
        ee := fileSystem.ReadDir(partPath)
        for _, e := range ee {
                if e.IsDir() {
diff --git a/banyand/measure/part_test.go b/banyand/measure/part_test.go
index ffd72bae..026d6956 100644
--- a/banyand/measure/part_test.go
+++ b/banyand/measure/part_test.go
@@ -18,6 +18,7 @@
 package measure
 
 import (
+       "path/filepath"
        "testing"
 
        "github.com/stretchr/testify/assert"
@@ -26,6 +27,7 @@ import (
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/index"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        "github.com/apache/skywalking-banyandb/pkg/test"
 )
@@ -218,3 +220,122 @@ var dps = &dataPoints{
                },
        },
 }
+
+func TestSeriesMetadataPersistence(t *testing.T) {
+       tmpPath, defFn := test.Space(require.New(t))
+       defer defFn()
+
+       fileSystem := fs.NewLocalFileSystem()
+       epoch := uint64(12345)
+       path := partPath(tmpPath, epoch)
+
+       // Create a memPart with data points
+       mp := generateMemPart()
+       mp.mustInitFromDataPoints(dps)
+
+       // Create sample series metadata using NewBytesField
+       field1 := index.NewBytesField(index.FieldKey{
+               IndexRuleID: 1,
+               Analyzer:    "keyword",
+               TagName:     "tag1",
+       }, []byte("term1"))
+       field2 := index.NewBytesField(index.FieldKey{
+               IndexRuleID: 2,
+               Analyzer:    "keyword",
+               TagName:     "tag2",
+       }, []byte("term2"))
+       metadataDocs := index.Documents{
+               {
+                       DocID:        1,
+                       EntityValues: []byte("entity1"),
+                       Fields:       []index.Field{field1},
+               },
+               {
+                       DocID:        2,
+                       EntityValues: []byte("entity2"),
+                       Fields:       []index.Field{field2},
+               },
+       }
+
+       // Marshal series metadata
+       seriesMetadataBytes, err := metadataDocs.Marshal()
+       require.NoError(t, err)
+       require.NotEmpty(t, seriesMetadataBytes)
+
+       // Set series metadata in memPart
+       _, err = mp.seriesMetadata.Write(seriesMetadataBytes)
+       require.NoError(t, err)
+
+       // Flush to disk
+       mp.mustFlush(fileSystem, path)
+
+       // Verify series metadata file exists by trying to read it
+       seriesMetadataPath := filepath.Join(path, seriesMetadataFilename)
+       readBytes, err := fileSystem.Read(seriesMetadataPath)
+       require.NoError(t, err, "series metadata file should exist")
+       assert.Equal(t, seriesMetadataBytes, readBytes, "series metadata 
content should match")
+
+       // Open the part and verify series metadata is accessible
+       p := mustOpenFilePart(epoch, tmpPath, fileSystem)
+       defer p.close()
+
+       // Verify series metadata reader is available
+       assert.NotNil(t, p.seriesMetadata, "series metadata reader should be 
available")
+
+       // Read and unmarshal series metadata using SequentialRead
+       seqReader := p.seriesMetadata.SequentialRead()
+       defer seqReader.Close()
+       readMetadataBytes := make([]byte, 0)
+       buf := make([]byte, 1024)
+       for {
+               var n int
+               n, err = seqReader.Read(buf)
+               if n == 0 {
+                       if err != nil {
+                               break
+                       }
+                       continue
+               }
+               if err != nil {
+                       break
+               }
+               readMetadataBytes = append(readMetadataBytes, buf[:n]...)
+       }
+
+       var readDocs index.Documents
+       err = readDocs.Unmarshal(readMetadataBytes)
+       require.NoError(t, err)
+       assert.Equal(t, len(metadataDocs), len(readDocs), "number of documents 
should match")
+       assert.Equal(t, metadataDocs[0].DocID, readDocs[0].DocID, "first 
document DocID should match")
+       assert.Equal(t, metadataDocs[1].DocID, readDocs[1].DocID, "second 
document DocID should match")
+}
+
+func TestSeriesMetadataBackwardCompatibility(t *testing.T) {
+       tmpPath, defFn := test.Space(require.New(t))
+       defer defFn()
+
+       fileSystem := fs.NewLocalFileSystem()
+       epoch := uint64(67890)
+       path := partPath(tmpPath, epoch)
+
+       // Create a memPart with data points but without series metadata
+       mp := generateMemPart()
+       mp.mustInitFromDataPoints(dps)
+       // Don't set series metadata to simulate old parts
+
+       // Flush to disk
+       mp.mustFlush(fileSystem, path)
+
+       // Verify series metadata file does not exist by trying to read it
+       seriesMetadataPath := filepath.Join(path, seriesMetadataFilename)
+       _, err := fileSystem.Read(seriesMetadataPath)
+       assert.Error(t, err, "series metadata file should not exist for old 
parts")
+
+       // Open the part - should work without series metadata (backward 
compatibility)
+       p := mustOpenFilePart(epoch, tmpPath, fileSystem)
+       defer p.close()
+
+       // Verify part can be opened successfully
+       assert.NotNil(t, p, "part should be opened successfully")
+       assert.Nil(t, p.seriesMetadata, "series metadata reader should be nil 
for old parts")
+}
diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go
index 2e59e1e6..683327e7 100644
--- a/banyand/measure/tstable.go
+++ b/banyand/measure/tstable.go
@@ -285,10 +285,10 @@ func (tst *tsTable) Close() error {
 }
 
 func (tst *tsTable) mustAddDataPoints(dps *dataPoints) {
-       tst.mustAddDataPointsWithSegmentID(dps, 0)
+       tst.mustAddDataPointsWithSegmentID(dps, 0, nil)
 }
 
-func (tst *tsTable) mustAddDataPointsWithSegmentID(dps *dataPoints, segmentID 
int64) {
+func (tst *tsTable) mustAddDataPointsWithSegmentID(dps *dataPoints, segmentID 
int64, seriesMetadata []byte) {
        if len(dps.seriesIDs) == 0 {
                return
        }
@@ -296,6 +296,13 @@ func (tst *tsTable) mustAddDataPointsWithSegmentID(dps 
*dataPoints, segmentID in
        mp := generateMemPart()
        mp.mustInitFromDataPoints(dps)
        mp.segmentID = segmentID
+       if len(seriesMetadata) > 0 {
+               // Write series metadata to buffer to avoid sharing the 
underlying slice
+               _, err := mp.seriesMetadata.Write(seriesMetadata)
+               if err != nil {
+                       logger.Panicf("cannot write series metadata to buffer: 
%s", err)
+               }
+       }
        tst.mustAddMemPart(mp)
 }
 
diff --git a/banyand/measure/write_liaison.go b/banyand/measure/write_liaison.go
index 867eb840..e870b8c9 100644
--- a/banyand/measure/write_liaison.go
+++ b/banyand/measure/write_liaison.go
@@ -35,7 +35,6 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/bus"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/encoding"
-       "github.com/apache/skywalking-banyandb/pkg/index"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
@@ -117,8 +116,18 @@ func (w *writeQueueCallback) Rev(ctx context.Context, 
message bus.Message) (resp
                g := groups[i]
                for j := range g.tables {
                        es := g.tables[j]
+                       // Marshal series metadata for persistence in part 
folder
+                       var seriesMetadataBytes []byte
+                       if len(es.metadataDocs) > 0 {
+                               var marshalErr error
+                               seriesMetadataBytes, marshalErr = 
es.metadataDocs.Marshal()
+                               if marshalErr != nil {
+                                       
w.l.Error().Err(marshalErr).Uint32("shardID", uint32(es.shardID)).Msg("failed 
to marshal series metadata for persistence")
+                                       // Continue without series metadata, 
but log the error
+                               }
+                       }
                        if es.tsTable != nil && es.dataPoints != nil {
-                               
es.tsTable.mustAddDataPointsWithSegmentID(es.dataPoints, 
es.timeRange.Start.UnixNano())
+                               
es.tsTable.mustAddDataPointsWithSegmentID(es.dataPoints, 
es.timeRange.Start.UnixNano(), seriesMetadataBytes)
                                releaseDataPoints(es.dataPoints)
                        }
                        nodes := g.queue.GetNodes(es.shardID)
@@ -126,12 +135,7 @@ func (w *writeQueueCallback) Rev(ctx context.Context, 
message bus.Message) (resp
                                w.l.Warn().Uint32("shardID", 
uint32(es.shardID)).Msg("no nodes found for shard")
                                continue
                        }
-                       sendDocuments := func(topic bus.Topic, docs 
index.Documents) {
-                               seriesDocData, marshalErr := docs.Marshal()
-                               if marshalErr != nil {
-                                       
w.l.Error().Err(marshalErr).Uint32("shardID", uint32(es.shardID)).Msg("failed 
to marshal series documents")
-                                       return
-                               }
+                       sendDocuments := func(topic bus.Topic, seriesDocData 
[]byte) {
                                // Encode group name, start timestamp from 
timeRange, and prepend to docData
                                combinedData := make([]byte, 0, 
len(seriesDocData)+len(g.name)+8)
                                combinedData = 
encoding.EncodeBytes(combinedData, convert.StringToBytes(g.name))
@@ -153,11 +157,16 @@ func (w *writeQueueCallback) Rev(ctx context.Context, 
message bus.Message) (resp
                                        }
                                }
                        }
-                       if len(es.metadataDocs) > 0 {
-                               
sendDocuments(data.TopicMeasureSeriesIndexInsert, es.metadataDocs)
+                       if len(seriesMetadataBytes) > 0 {
+                               
sendDocuments(data.TopicMeasureSeriesIndexInsert, seriesMetadataBytes)
                        }
                        if len(es.indexModeDocs) > 0 {
-                               
sendDocuments(data.TopicMeasureSeriesIndexUpdate, es.indexModeDocs)
+                               seriesDocData, marshalErr := 
es.indexModeDocs.Marshal()
+                               if marshalErr != nil {
+                                       
w.l.Error().Err(marshalErr).Uint32("shardID", uint32(es.shardID)).Msg("failed 
to marshal index mode documents")
+                               } else {
+                                       
sendDocuments(data.TopicMeasureSeriesIndexUpdate, seriesDocData)
+                               }
                        }
                }
        }

Reply via email to