This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 72876208 Implement Additional Dump Commands (#858)
72876208 is described below
commit 72876208bae7e8446dfe156ec424e66671fc64ce
Author: OmCheeLin <[email protected]>
AuthorDate: Mon Nov 24 09:09:23 2025 +0800
Implement Additional Dump Commands (#858)
Co-authored-by: Gao Hongtao <[email protected]>
Co-authored-by: Copilot <[email protected]>
Co-authored-by: 吴晟 Wu Sheng <[email protected]>
---
CHANGES.md | 1 +
banyand/cmd/dump/main.go | 1 +
banyand/cmd/dump/stream.go | 1099 +++++++++++++++++++++++++++++++++++++++
banyand/cmd/dump/stream_test.go | 150 ++++++
banyand/stream/test_helper.go | 92 ++++
5 files changed, 1343 insertions(+)
diff --git a/CHANGES.md b/CHANGES.md
index 9335cb57..bb68d5f0 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -58,6 +58,7 @@ Release Notes.
- Add dump command-line tool to parse and display trace part data with support
for CSV export and human-readable timestamp formatting.
- Implement backoff retry mechanism for sending queue failures.
- Implement memory load shedding and dynamic gRPC buffer sizing for liaison
server to prevent OOM errors under high-throughput write traffic.
+- Add stream dump command to parse and display stream shard data with support
for CSV export, filtering, and projection.
### Bug Fixes
diff --git a/banyand/cmd/dump/main.go b/banyand/cmd/dump/main.go
index 29da173b..0f8d4cc7 100644
--- a/banyand/cmd/dump/main.go
+++ b/banyand/cmd/dump/main.go
@@ -37,6 +37,7 @@ It provides subcommands for different data types (trace,
stream, measure, etc.).
}
rootCmd.AddCommand(newTraceCmd())
+ rootCmd.AddCommand(newStreamCmd())
rootCmd.AddCommand(newSidxCmd())
if err := rootCmd.Execute(); err != nil {
diff --git a/banyand/cmd/dump/stream.go b/banyand/cmd/dump/stream.go
new file mode 100644
index 00000000..3f9a7b37
--- /dev/null
+++ b/banyand/cmd/dump/stream.go
@@ -0,0 +1,1099 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package main
+
+import (
+ "context"
+ "encoding/csv"
+ "encoding/json"
+ "fmt"
+ "io"
+ "os"
+ "path/filepath"
+ "sort"
+ "strconv"
+ "strings"
+
+ "github.com/spf13/cobra"
+ "google.golang.org/protobuf/encoding/protojson"
+
+ "github.com/apache/skywalking-banyandb/api/common"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ internalencoding
"github.com/apache/skywalking-banyandb/banyand/internal/encoding"
+ "github.com/apache/skywalking-banyandb/pkg/bytes"
+ "github.com/apache/skywalking-banyandb/pkg/compress/zstd"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+ "github.com/apache/skywalking-banyandb/pkg/encoding"
+ "github.com/apache/skywalking-banyandb/pkg/fs"
+ "github.com/apache/skywalking-banyandb/pkg/index/inverted"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/query/logical"
+)
+
+type streamDumpOptions struct {
+ shardPath string
+ segmentPath string
+ criteriaJSON string
+ projectionTags string
+ verbose bool
+ csvOutput bool
+}
+
+func newStreamCmd() *cobra.Command {
+ var shardPath string
+ var segmentPath string
+ var verbose bool
+ var csvOutput bool
+ var criteriaJSON string
+ var projectionTags string
+
+ cmd := &cobra.Command{
+ Use: "stream",
+ Short: "Dump stream shard data",
+ Long: `Dump and display contents of a stream shard directory
(containing multiple parts).
+Outputs stream data in human-readable format or CSV.
+
+Supports filtering by criteria and projecting specific tags.`,
+ Example: ` # Display stream data from shard in text format
+ dump stream --shard-path /path/to/shard-0 --segment-path /path/to/segment
+
+ # Display with verbose hex dumps
+ dump stream --shard-path /path/to/shard-0 --segment-path /path/to/segment -v
+
+ # Filter by criteria
+ dump stream --shard-path /path/to/shard-0 --segment-path /path/to/segment \
+ --criteria
'{"condition":{"name":"query","op":"BINARY_OP_HAVING","value":{"strArray":{"value":["tag1=value1","tag2=value2"]}}}}'
+
+ # Project specific tags
+ dump stream --shard-path /path/to/shard-0 --segment-path /path/to/segment \
+ --projection "tag1,tag2,tag3"
+
+ # Output as CSV
+ dump stream --shard-path /path/to/shard-0 --segment-path /path/to/segment
--csv
+
+ # Save CSV to file
+ dump stream --shard-path /path/to/shard-0 --segment-path /path/to/segment
--csv > output.csv`,
+ RunE: func(_ *cobra.Command, _ []string) error {
+ if shardPath == "" {
+ return fmt.Errorf("--shard-path flag is
required")
+ }
+ if segmentPath == "" {
+ return fmt.Errorf("--segment-path flag is
required")
+ }
+ return dumpStreamShard(streamDumpOptions{
+ shardPath: shardPath,
+ segmentPath: segmentPath,
+ verbose: verbose,
+ csvOutput: csvOutput,
+ criteriaJSON: criteriaJSON,
+ projectionTags: projectionTags,
+ })
+ },
+ }
+
+ cmd.Flags().StringVar(&shardPath, "shard-path", "", "Path to the shard
directory (required)")
+ cmd.Flags().StringVarP(&segmentPath, "segment-path", "g", "", "Path to
the segment directory (required)")
+ cmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "Verbose output
(show raw data)")
+ cmd.Flags().BoolVar(&csvOutput, "csv", false, "Output as CSV format")
+ cmd.Flags().StringVarP(&criteriaJSON, "criteria", "c", "", "Criteria
filter as JSON string")
+ cmd.Flags().StringVarP(&projectionTags, "projection", "p", "",
"Comma-separated list of tags to include as columns (e.g., tag1,tag2,tag3)")
+ _ = cmd.MarkFlagRequired("shard-path")
+ _ = cmd.MarkFlagRequired("segment-path")
+
+ return cmd
+}
+
+func dumpStreamShard(opts streamDumpOptions) error {
+ ctx, err := newStreamDumpContext(opts)
+ if err != nil || ctx == nil {
+ return err
+ }
+ defer ctx.close()
+
+ if err := ctx.processParts(); err != nil {
+ return err
+ }
+
+ ctx.printSummary()
+ return nil
+}
+
+type streamPartMetadata struct {
+ CompressedSizeBytes uint64 `json:"compressedSizeBytes"`
+ UncompressedSizeBytes uint64 `json:"uncompressedSizeBytes"`
+ TotalCount uint64 `json:"totalCount"`
+ BlocksCount uint64 `json:"blocksCount"`
+ MinTimestamp int64 `json:"minTimestamp"`
+ MaxTimestamp int64 `json:"maxTimestamp"`
+ ID uint64 `json:"-"`
+}
+
+type streamPrimaryBlockMetadata struct {
+ seriesID common.SeriesID
+ minTimestamp int64
+ maxTimestamp int64
+ offset uint64
+ size uint64
+}
+
+type streamDataBlock struct {
+ offset uint64
+ size uint64
+}
+
+type streamBlockMetadata struct {
+ tagFamilies map[string]*streamDataBlock
+ timestamps streamTimestampsMetadata
+ elementIDs streamElementIDsMetadata
+ seriesID common.SeriesID
+ uncompressedSizeBytes uint64
+ count uint64
+}
+
+type streamTimestampsMetadata struct {
+ dataBlock streamDataBlock
+ min int64
+ max int64
+ elementIDsOffset uint64
+ encodeType encoding.EncodeType
+}
+
+type streamElementIDsMetadata struct {
+ dataBlock streamDataBlock
+ encodeType encoding.EncodeType
+}
+
+type streamPart struct {
+ primary fs.Reader
+ timestamps fs.Reader
+ fileSystem fs.FileSystem
+ tagFamilyMetadata map[string]fs.Reader
+ tagFamilies map[string]fs.Reader
+ tagFamilyFilter map[string]fs.Reader
+ path string
+ primaryBlockMetadata []streamPrimaryBlockMetadata
+ partMetadata streamPartMetadata
+}
+
+type streamRowData struct {
+ tags map[string][]byte
+ elementData []byte
+ elementID uint64
+ partID uint64
+ seriesID common.SeriesID
+ timestamp int64
+}
+
+type streamDumpContext struct {
+ tagFilter logical.TagFilter
+ fileSystem fs.FileSystem
+ seriesMap map[common.SeriesID]string
+ writer *csv.Writer
+ opts streamDumpOptions
+ partIDs []uint64
+ projectionTags []string
+ tagColumns []string
+ rowNum int
+}
+
+func newStreamDumpContext(opts streamDumpOptions) (*streamDumpContext, error) {
+ ctx := &streamDumpContext{
+ opts: opts,
+ fileSystem: fs.NewLocalFileSystem(),
+ }
+
+ partIDs, err := discoverStreamPartIDs(opts.shardPath)
+ if err != nil {
+ return nil, fmt.Errorf("failed to discover part IDs: %w", err)
+ }
+ if len(partIDs) == 0 {
+ fmt.Println("No parts found in shard directory")
+ return nil, nil
+ }
+ ctx.partIDs = partIDs
+ fmt.Fprintf(os.Stderr, "Found %d parts in shard\n", len(partIDs))
+
+ ctx.seriesMap, err = loadStreamSeriesMap(opts.segmentPath)
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "Warning: Failed to load series
information: %v\n", err)
+ ctx.seriesMap = nil
+ } else {
+ fmt.Fprintf(os.Stderr, "Loaded %d series from segment\n",
len(ctx.seriesMap))
+ }
+
+ if opts.criteriaJSON != "" {
+ var criteria *modelv1.Criteria
+ criteria, err = parseStreamCriteriaJSON(opts.criteriaJSON)
+ if err != nil {
+ return nil, fmt.Errorf("failed to parse criteria: %w",
err)
+ }
+ ctx.tagFilter, err = logical.BuildSimpleTagFilter(criteria)
+ if err != nil {
+ return nil, fmt.Errorf("failed to build tag filter:
%w", err)
+ }
+ fmt.Fprintf(os.Stderr, "Applied criteria filter\n")
+ }
+
+ if opts.projectionTags != "" {
+ ctx.projectionTags =
parseStreamProjectionTags(opts.projectionTags)
+ fmt.Fprintf(os.Stderr, "Projection tags: %v\n",
ctx.projectionTags)
+ }
+
+ if opts.csvOutput {
+ if len(ctx.projectionTags) > 0 {
+ ctx.tagColumns = ctx.projectionTags
+ } else {
+ ctx.tagColumns, err =
discoverStreamTagColumns(ctx.partIDs, opts.shardPath, ctx.fileSystem)
+ if err != nil {
+ return nil, fmt.Errorf("failed to discover tag
columns: %w", err)
+ }
+ }
+ }
+
+ if err := ctx.initOutput(); err != nil {
+ return nil, err
+ }
+
+ return ctx, nil
+}
+
+func (ctx *streamDumpContext) initOutput() error {
+ if !ctx.opts.csvOutput {
+
fmt.Printf("================================================================================\n")
+ fmt.Fprintf(os.Stderr, "Processing parts...\n")
+ return nil
+ }
+
+ ctx.writer = csv.NewWriter(os.Stdout)
+ header := []string{"PartID", "ElementID", "Timestamp", "SeriesID",
"Series", "ElementDataSize"}
+ header = append(header, ctx.tagColumns...)
+ if err := ctx.writer.Write(header); err != nil {
+ return fmt.Errorf("failed to write CSV header: %w", err)
+ }
+ return nil
+}
+
+func (ctx *streamDumpContext) close() {
+ if ctx.writer != nil {
+ ctx.writer.Flush()
+ }
+}
+
+func (ctx *streamDumpContext) processParts() error {
+ for partIdx, partID := range ctx.partIDs {
+ fmt.Fprintf(os.Stderr, "Processing part %d/%d (0x%016x)...\n",
partIdx+1, len(ctx.partIDs), partID)
+
+ p, err := openStreamPart(partID, ctx.opts.shardPath,
ctx.fileSystem)
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "Warning: failed to open part
%016x: %v\n", partID, err)
+ continue
+ }
+
+ partRowCount, partErr := ctx.processPart(partID, p)
+ closeStreamPart(p)
+ if partErr != nil {
+ return partErr
+ }
+
+ fmt.Fprintf(os.Stderr, " Part %d/%d: processed %d rows (total:
%d)\n", partIdx+1, len(ctx.partIDs), partRowCount, ctx.rowNum)
+ }
+ return nil
+}
+
+func (ctx *streamDumpContext) processPart(partID uint64, p *streamPart) (int,
error) {
+ decoder := &encoding.BytesBlockDecoder{}
+ partRowCount := 0
+
+ for _, pbm := range p.primaryBlockMetadata {
+ primaryData := make([]byte, pbm.size)
+ fs.MustReadData(p.primary, int64(pbm.offset), primaryData)
+
+ decompressed, err := zstd.Decompress(nil, primaryData)
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "Warning: Error decompressing
primary data in part %016x: %v\n", partID, err)
+ continue
+ }
+
+ blockMetadatas, err := parseStreamBlockMetadata(decompressed)
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "Warning: Error parsing block
metadata in part %016x: %v\n", partID, err)
+ continue
+ }
+
+ for _, bm := range blockMetadatas {
+ rows, err := ctx.processBlock(partID, bm, p, decoder)
+ if err != nil {
+ return partRowCount, err
+ }
+ partRowCount += rows
+ }
+ }
+
+ return partRowCount, nil
+}
+
+func (ctx *streamDumpContext) processBlock(partID uint64, bm
*streamBlockMetadata, p *streamPart, decoder *encoding.BytesBlockDecoder) (int,
error) {
+ // Read timestamps and element IDs (they are stored together in
timestamps.bin)
+ timestamps, elementIDs, err := readStreamTimestamps(bm.timestamps,
int(bm.count), p.timestamps)
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "Warning: Error reading
timestamps/elementIDs for series %d in part %016x: %v\n", bm.seriesID, partID,
err)
+ return 0, nil
+ }
+
+ // Read tag families
+ tagsByElement := ctx.readBlockTagFamilies(partID, bm, p, decoder)
+
+ rows := 0
+ for i := 0; i < len(timestamps); i++ {
+ elementTags := make(map[string][]byte)
+ for tagName, tagValues := range tagsByElement {
+ if i < len(tagValues) {
+ elementTags[tagName] = tagValues[i]
+ }
+ }
+
+ if ctx.shouldSkip(elementTags) {
+ continue
+ }
+
+ // Read element data from primary (if available)
+ var elementData []byte
+ // Note: In stream, element data is typically stored in
primary.bin
+ // For now, we'll use a placeholder or empty data
+
+ row := streamRowData{
+ partID: partID,
+ elementID: elementIDs[i],
+ timestamp: timestamps[i],
+ tags: elementTags,
+ seriesID: bm.seriesID,
+ elementData: elementData,
+ }
+
+ if err := ctx.writeRow(row); err != nil {
+ return rows, err
+ }
+
+ rows++
+ }
+
+ return rows, nil
+}
+
+func (ctx *streamDumpContext) readBlockTagFamilies(partID uint64, bm
*streamBlockMetadata, p *streamPart, decoder *encoding.BytesBlockDecoder)
map[string][][]byte {
+ tags := make(map[string][][]byte)
+ for tagFamilyName, tagFamilyBlock := range bm.tagFamilies {
+ // Read tag family metadata
+ tagFamilyMetadataData := make([]byte, tagFamilyBlock.size)
+ fs.MustReadData(p.tagFamilyMetadata[tagFamilyName],
int64(tagFamilyBlock.offset), tagFamilyMetadataData)
+
+ // Parse tag family metadata to get individual tag metadata
+ tagMetadatas, err :=
parseStreamTagFamilyMetadata(tagFamilyMetadataData)
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "Warning: Error parsing tag
family metadata %s for series %d in part %016x: %v\n", tagFamilyName,
bm.seriesID, partID, err)
+ continue
+ }
+
+ // Read each tag in the tag family
+ for _, tagMeta := range tagMetadatas {
+ fullTagName := tagFamilyName + "." + tagMeta.name
+ tagValues, err := readStreamTagValues(decoder,
tagMeta.dataBlock, fullTagName, int(bm.count), p.tagFamilies[tagFamilyName],
tagMeta.valueType)
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "Warning: Error reading
tag %s for series %d in part %016x: %v\n", fullTagName, bm.seriesID, partID,
err)
+ continue
+ }
+ tags[fullTagName] = tagValues
+ }
+ }
+ return tags
+}
+
+func (ctx *streamDumpContext) shouldSkip(tags map[string][]byte) bool {
+ if ctx.tagFilter == nil || ctx.tagFilter == logical.DummyFilter {
+ return false
+ }
+ // Convert tags to modelv1.Tag format for filtering
+ modelTags := make([]*modelv1.Tag, 0, len(tags))
+ for name, value := range tags {
+ if value == nil {
+ continue
+ }
+ // Try to infer value type from the tag name or use string as
default
+ tagValue := convertStreamTagValue(value)
+ if tagValue != nil {
+ modelTags = append(modelTags, &modelv1.Tag{
+ Key: name,
+ Value: tagValue,
+ })
+ }
+ }
+
+ // Create a simple registry for tag filtering
+ registry := &streamTagRegistry{
+ tags: tags,
+ }
+
+ matcher := logical.NewTagFilterMatcher(ctx.tagFilter, registry,
streamTagValueDecoder)
+ match, _ := matcher.Match(modelTags)
+ return !match
+}
+
+func (ctx *streamDumpContext) writeRow(row streamRowData) error {
+ if ctx.opts.csvOutput {
+ if err := writeStreamRowAsCSV(ctx.writer, row, ctx.tagColumns,
ctx.seriesMap); err != nil {
+ return err
+ }
+ } else {
+ writeStreamRowAsText(row, ctx.rowNum+1, ctx.opts.verbose,
ctx.projectionTags, ctx.seriesMap)
+ }
+ ctx.rowNum++
+ return nil
+}
+
+func (ctx *streamDumpContext) printSummary() {
+ if ctx.opts.csvOutput {
+ fmt.Fprintf(os.Stderr, "Total rows written: %d\n", ctx.rowNum)
+ return
+ }
+ fmt.Printf("\nTotal rows: %d\n", ctx.rowNum)
+}
+
+func openStreamPart(id uint64, root string, fileSystem fs.FileSystem)
(*streamPart, error) {
+ var p streamPart
+ partPath := filepath.Join(root, fmt.Sprintf("%016x", id))
+ p.path = partPath
+ p.fileSystem = fileSystem
+
+ // Read metadata.json
+ metadataPath := filepath.Join(partPath, "metadata.json")
+ metadata, err := fileSystem.Read(metadataPath)
+ if err != nil {
+ return nil, fmt.Errorf("cannot read metadata.json: %w", err)
+ }
+ if unmarshalErr := json.Unmarshal(metadata, &p.partMetadata);
unmarshalErr != nil {
+ return nil, fmt.Errorf("cannot parse metadata.json: %w",
unmarshalErr)
+ }
+ p.partMetadata.ID = id
+
+ // Read primary block metadata
+ metaPath := filepath.Join(partPath, "meta.bin")
+ metaFile, err := fileSystem.OpenFile(metaPath)
+ if err != nil {
+ return nil, fmt.Errorf("cannot open meta.bin: %w", err)
+ }
+ p.primaryBlockMetadata, err = readStreamPrimaryBlockMetadata(metaFile)
+ fs.MustClose(metaFile)
+ if err != nil {
+ return nil, fmt.Errorf("cannot read primary block metadata:
%w", err)
+ }
+
+ // Open data files
+ p.primary, err = fileSystem.OpenFile(filepath.Join(partPath,
"primary.bin"))
+ if err != nil {
+ return nil, fmt.Errorf("cannot open primary.bin: %w", err)
+ }
+
+ p.timestamps, err = fileSystem.OpenFile(filepath.Join(partPath,
"timestamps.bin"))
+ if err != nil {
+ fs.MustClose(p.primary)
+ return nil, fmt.Errorf("cannot open timestamps.bin: %w", err)
+ }
+
+ // Open tag family files
+ entries := fileSystem.ReadDir(partPath)
+ p.tagFamilies = make(map[string]fs.Reader)
+ p.tagFamilyMetadata = make(map[string]fs.Reader)
+ p.tagFamilyFilter = make(map[string]fs.Reader)
+ for _, e := range entries {
+ if e.IsDir() {
+ continue
+ }
+ name := e.Name()
+ if strings.HasSuffix(name, ".tfm") {
+ tagFamilyName := name[:len(name)-4]
+ reader, err :=
fileSystem.OpenFile(filepath.Join(partPath, name))
+ if err == nil {
+ p.tagFamilyMetadata[tagFamilyName] = reader
+ }
+ }
+ if strings.HasSuffix(name, ".tf") {
+ tagFamilyName := name[:len(name)-3]
+ reader, err :=
fileSystem.OpenFile(filepath.Join(partPath, name))
+ if err == nil {
+ p.tagFamilies[tagFamilyName] = reader
+ }
+ }
+ if strings.HasSuffix(name, ".tff") {
+ tagFamilyName := name[:len(name)-4]
+ reader, err :=
fileSystem.OpenFile(filepath.Join(partPath, name))
+ if err == nil {
+ p.tagFamilyFilter[tagFamilyName] = reader
+ }
+ }
+ }
+
+ return &p, nil
+}
+
+func closeStreamPart(p *streamPart) {
+ if p.primary != nil {
+ fs.MustClose(p.primary)
+ }
+ if p.timestamps != nil {
+ fs.MustClose(p.timestamps)
+ }
+ for _, r := range p.tagFamilies {
+ fs.MustClose(r)
+ }
+ for _, r := range p.tagFamilyMetadata {
+ fs.MustClose(r)
+ }
+ for _, r := range p.tagFamilyFilter {
+ fs.MustClose(r)
+ }
+}
+
+func readStreamPrimaryBlockMetadata(r fs.Reader)
([]streamPrimaryBlockMetadata, error) {
+ sr := r.SequentialRead()
+ data, err := io.ReadAll(sr)
+ if err != nil {
+ return nil, fmt.Errorf("cannot read: %w", err)
+ }
+ fs.MustClose(sr)
+
+ decompressed, err := zstd.Decompress(nil, data)
+ if err != nil {
+ return nil, fmt.Errorf("cannot decompress: %w", err)
+ }
+
+ var result []streamPrimaryBlockMetadata
+ src := decompressed
+ for len(src) > 0 {
+ var pbm streamPrimaryBlockMetadata
+ src, err = unmarshalStreamPrimaryBlockMetadata(&pbm, src)
+ if err != nil {
+ return nil, err
+ }
+ result = append(result, pbm)
+ }
+ return result, nil
+}
+
+func unmarshalStreamPrimaryBlockMetadata(pbm *streamPrimaryBlockMetadata, src
[]byte) ([]byte, error) {
+ if len(src) < 40 {
+ return nil, fmt.Errorf("insufficient data")
+ }
+ pbm.seriesID = common.SeriesID(encoding.BytesToUint64(src))
+ src = src[8:]
+ pbm.minTimestamp = int64(encoding.BytesToUint64(src))
+ src = src[8:]
+ pbm.maxTimestamp = int64(encoding.BytesToUint64(src))
+ src = src[8:]
+ pbm.offset = encoding.BytesToUint64(src)
+ src = src[8:]
+ pbm.size = encoding.BytesToUint64(src)
+ return src[8:], nil
+}
+
+func parseStreamBlockMetadata(src []byte) ([]*streamBlockMetadata, error) {
+ var result []*streamBlockMetadata
+ for len(src) > 0 {
+ bm, tail, err := unmarshalStreamBlockMetadata(src)
+ if err != nil {
+ return nil, err
+ }
+ result = append(result, bm)
+ src = tail
+ }
+ return result, nil
+}
+
+func unmarshalStreamBlockMetadata(src []byte) (*streamBlockMetadata, []byte,
error) {
+ var bm streamBlockMetadata
+
+ if len(src) < 8 {
+ return nil, nil, fmt.Errorf("cannot unmarshal blockMetadata
from less than 8 bytes")
+ }
+ bm.seriesID = common.SeriesID(encoding.BytesToUint64(src))
+ src = src[8:]
+
+ src, n := encoding.BytesToVarUint64(src)
+ bm.uncompressedSizeBytes = n
+
+ src, n = encoding.BytesToVarUint64(src)
+ bm.count = n
+
+ // Unmarshal timestamps metadata (includes dataBlock, min, max,
encodeType, elementIDsOffset)
+ src = bm.timestamps.unmarshal(src)
+
+ // Unmarshal elementIDs metadata (includes dataBlock, encodeType)
+ src = bm.elementIDs.unmarshal(src)
+
+ // Unmarshal tag families
+ src, tagFamilyCount := encoding.BytesToVarUint64(src)
+ if tagFamilyCount > 0 {
+ bm.tagFamilies = make(map[string]*streamDataBlock)
+ for i := uint64(0); i < tagFamilyCount; i++ {
+ var nameBytes []byte
+ var err error
+ src, nameBytes, err = encoding.DecodeBytes(src)
+ if err != nil {
+ return nil, nil, fmt.Errorf("cannot unmarshal
tagFamily name: %w", err)
+ }
+ tf := &streamDataBlock{}
+ src = tf.unmarshal(src)
+ bm.tagFamilies[string(nameBytes)] = tf
+ }
+ }
+
+ return &bm, src, nil
+}
+
+func (tm *streamTimestampsMetadata) unmarshal(src []byte) []byte {
+ src = tm.dataBlock.unmarshal(src)
+ tm.min = int64(encoding.BytesToUint64(src))
+ src = src[8:]
+ tm.max = int64(encoding.BytesToUint64(src))
+ src = src[8:]
+ tm.encodeType = encoding.EncodeType(src[0])
+ src = src[1:]
+ src, n := encoding.BytesToVarUint64(src)
+ tm.elementIDsOffset = n
+ return src
+}
+
+func (em *streamElementIDsMetadata) unmarshal(src []byte) []byte {
+ src = em.dataBlock.unmarshal(src)
+ em.encodeType = encoding.EncodeType(src[0])
+ return src[1:]
+}
+
+func (db *streamDataBlock) unmarshal(src []byte) []byte {
+ src, n := encoding.BytesToVarUint64(src)
+ db.offset = n
+ src, n = encoding.BytesToVarUint64(src)
+ db.size = n
+ return src
+}
+
+func readStreamTimestamps(tm streamTimestampsMetadata, count int, reader
fs.Reader) ([]int64, []uint64, error) {
+ data := make([]byte, tm.dataBlock.size)
+ fs.MustReadData(reader, int64(tm.dataBlock.offset), data)
+
+ if tm.dataBlock.size < tm.elementIDsOffset {
+ return nil, nil, fmt.Errorf("size %d must be greater than
elementIDsOffset %d", tm.dataBlock.size, tm.elementIDsOffset)
+ }
+
+ // Decode timestamps (first part of the data)
+ var timestamps []int64
+ var err error
+ // For stream, encodeType is already the common type (not version type)
+ timestamps, err = encoding.BytesToInt64List(timestamps,
data[:tm.elementIDsOffset], tm.encodeType, tm.min, count)
+ if err != nil {
+ return nil, nil, fmt.Errorf("cannot decode timestamps: %w", err)
+ }
+
+ // Decode element IDs (second part of the data, starting from
elementIDsOffset)
+ elementIDs := make([]uint64, count)
+ _, err = encoding.BytesToVarUint64s(elementIDs,
data[tm.elementIDsOffset:])
+ if err != nil {
+ return nil, nil, fmt.Errorf("cannot decode element IDs: %w",
err)
+ }
+
+ return timestamps, elementIDs, nil
+}
+
+type streamTagMetadata struct {
+ name string
+ min []byte
+ max []byte
+ dataBlock streamDataBlock
+ valueType pbv1.ValueType
+}
+
+func parseStreamTagFamilyMetadata(src []byte) ([]streamTagMetadata, error) {
+ src, tagMetadataLen := encoding.BytesToVarUint64(src)
+ if tagMetadataLen < 1 {
+ return nil, nil
+ }
+
+ var result []streamTagMetadata
+ for i := uint64(0); i < tagMetadataLen; i++ {
+ var tm streamTagMetadata
+ var nameBytes []byte
+ var err error
+ src, nameBytes, err = encoding.DecodeBytes(src)
+ if err != nil {
+ return nil, fmt.Errorf("cannot unmarshal
tagMetadata.name: %w", err)
+ }
+ tm.name = string(nameBytes)
+
+ if len(src) < 1 {
+ return nil, fmt.Errorf("cannot unmarshal
tagMetadata.valueType: src is too short")
+ }
+ tm.valueType = pbv1.ValueType(src[0])
+ src = src[1:]
+
+ src = tm.dataBlock.unmarshal(src)
+
+ src, tm.min, err = encoding.DecodeBytes(src)
+ if err != nil {
+ return nil, fmt.Errorf("cannot unmarshal
tagMetadata.min: %w", err)
+ }
+
+ src, tm.max, err = encoding.DecodeBytes(src)
+ if err != nil {
+ return nil, fmt.Errorf("cannot unmarshal
tagMetadata.max: %w", err)
+ }
+
+ // Skip filter block
+ var filterBlock streamDataBlock
+ src = filterBlock.unmarshal(src)
+
+ result = append(result, tm)
+ }
+
+ return result, nil
+}
+
+func readStreamTagValues(decoder *encoding.BytesBlockDecoder, tagBlock
streamDataBlock, _ string, count int,
+ valueReader fs.Reader, valueType pbv1.ValueType,
+) ([][]byte, error) {
+ // Read tag values
+ bb := &bytes.Buffer{}
+ bb.Buf = make([]byte, tagBlock.size)
+ fs.MustReadData(valueReader, int64(tagBlock.offset), bb.Buf)
+
+ // Decode values using the internal encoding package
+ var err error
+ var values [][]byte
+ values, err = internalencoding.DecodeTagValues(values, decoder, bb,
valueType, count)
+ if err != nil {
+ return nil, fmt.Errorf("cannot decode tag values: %w", err)
+ }
+
+ return values, nil
+}
+
+func discoverStreamPartIDs(shardPath string) ([]uint64, error) {
+ entries, err := os.ReadDir(shardPath)
+ if err != nil {
+ return nil, fmt.Errorf("failed to read shard directory: %w",
err)
+ }
+
+ var partIDs []uint64
+ for _, entry := range entries {
+ if !entry.IsDir() {
+ continue
+ }
+ name := entry.Name()
+ if name == "sidx" || name == "meta" {
+ continue
+ }
+ partID, err := strconv.ParseUint(name, 16, 64)
+ if err == nil {
+ partIDs = append(partIDs, partID)
+ }
+ }
+
+ sort.Slice(partIDs, func(i, j int) bool {
+ return partIDs[i] < partIDs[j]
+ })
+
+ return partIDs, nil
+}
+
+func loadStreamSeriesMap(segmentPath string) (map[common.SeriesID]string,
error) {
+ seriesIndexPath := filepath.Join(segmentPath, "sidx")
+
+ l := logger.GetLogger("dump-stream")
+
+ store, err := inverted.NewStore(inverted.StoreOpts{
+ Path: seriesIndexPath,
+ Logger: l,
+ })
+ if err != nil {
+ return nil, fmt.Errorf("failed to open series index: %w", err)
+ }
+ defer store.Close()
+
+ ctx := context.Background()
+ iter, err := store.SeriesIterator(ctx)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create series iterator: %w",
err)
+ }
+ defer iter.Close()
+
+ seriesMap := make(map[common.SeriesID]string)
+ for iter.Next() {
+ series := iter.Val()
+ if len(series.EntityValues) > 0 {
+ seriesID :=
common.SeriesID(convert.Hash(series.EntityValues))
+ seriesText := string(series.EntityValues)
+ seriesMap[seriesID] = seriesText
+ }
+ }
+
+ return seriesMap, nil
+}
+
+func parseStreamCriteriaJSON(criteriaJSON string) (*modelv1.Criteria, error) {
+ criteria := &modelv1.Criteria{}
+ err := protojson.Unmarshal([]byte(criteriaJSON), criteria)
+ if err != nil {
+ return nil, fmt.Errorf("invalid criteria JSON: %w", err)
+ }
+ return criteria, nil
+}
+
+func parseStreamProjectionTags(projectionStr string) []string {
+ if projectionStr == "" {
+ return nil
+ }
+
+ tags := strings.Split(projectionStr, ",")
+ result := make([]string, 0, len(tags))
+ for _, tag := range tags {
+ tag = strings.TrimSpace(tag)
+ if tag != "" {
+ result = append(result, tag)
+ }
+ }
+ return result
+}
+
+func discoverStreamTagColumns(partIDs []uint64, shardPath string, fileSystem
fs.FileSystem) ([]string, error) {
+ if len(partIDs) == 0 {
+ return nil, nil
+ }
+
+ p, err := openStreamPart(partIDs[0], shardPath, fileSystem)
+ if err != nil {
+ return nil, fmt.Errorf("failed to open first part: %w", err)
+ }
+ defer closeStreamPart(p)
+
+ tagNames := make(map[string]bool)
+ partID := partIDs[0]
+ for tagFamilyName := range p.tagFamilies {
+ // Read tag family metadata to get tag names
+ if tagFamilyMetadataReader, ok :=
p.tagFamilyMetadata[tagFamilyName]; ok {
+ metaData, err :=
io.ReadAll(tagFamilyMetadataReader.SequentialRead())
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "Warning: Error reading
tag family metadata %s in part %016x: %v\n", tagFamilyName, partID, err)
+ continue
+ }
+ tagMetadatas, err :=
parseStreamTagFamilyMetadata(metaData)
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "Warning: Error parsing
tag family metadata %s in part %016x: %v\n", tagFamilyName, partID, err)
+ continue
+ }
+ for _, tm := range tagMetadatas {
+ fullTagName := tagFamilyName + "." + tm.name
+ tagNames[fullTagName] = true
+ }
+ }
+ }
+
+ result := make([]string, 0, len(tagNames))
+ for name := range tagNames {
+ result = append(result, name)
+ }
+ sort.Strings(result)
+
+ return result, nil
+}
+
+func writeStreamRowAsText(row streamRowData, rowNum int, verbose bool,
projectionTags []string, seriesMap map[common.SeriesID]string) {
+ fmt.Printf("Row %d:\n", rowNum)
+ fmt.Printf(" PartID: %d (0x%016x)\n", row.partID, row.partID)
+ fmt.Printf(" ElementID: %d\n", row.elementID)
+ fmt.Printf(" Timestamp: %s\n", formatTimestamp(row.timestamp))
+ fmt.Printf(" SeriesID: %d\n", row.seriesID)
+
+ if seriesMap != nil {
+ if seriesText, ok := seriesMap[row.seriesID]; ok {
+ fmt.Printf(" Series: %s\n", seriesText)
+ }
+ }
+
+ fmt.Printf(" Element Data: %d bytes\n", len(row.elementData))
+ if verbose && len(row.elementData) > 0 {
+ fmt.Printf(" Element Content:\n")
+ printHexDump(row.elementData, 4)
+ }
+
+ if len(row.tags) > 0 {
+ fmt.Printf(" Tags:\n")
+
+ var tagsToShow []string
+ if len(projectionTags) > 0 {
+ tagsToShow = projectionTags
+ } else {
+ for name := range row.tags {
+ tagsToShow = append(tagsToShow, name)
+ }
+ sort.Strings(tagsToShow)
+ }
+
+ for _, name := range tagsToShow {
+ value, exists := row.tags[name]
+ if !exists {
+ continue
+ }
+ if value == nil {
+ fmt.Printf(" %s: <nil>\n", name)
+ } else {
+ fmt.Printf(" %s: %s\n", name,
formatTagValueForDisplay(value, pbv1.ValueTypeStr))
+ }
+ }
+ }
+ fmt.Printf("\n")
+}
+
+func writeStreamRowAsCSV(writer *csv.Writer, row streamRowData, tagColumns
[]string, seriesMap map[common.SeriesID]string) error {
+ seriesText := ""
+ if seriesMap != nil {
+ if text, ok := seriesMap[row.seriesID]; ok {
+ seriesText = text
+ }
+ }
+
+ csvRow := []string{
+ fmt.Sprintf("%d", row.partID),
+ fmt.Sprintf("%d", row.elementID),
+ formatTimestamp(row.timestamp),
+ fmt.Sprintf("%d", row.seriesID),
+ seriesText,
+ strconv.Itoa(len(row.elementData)),
+ }
+
+ for _, tagName := range tagColumns {
+ value := ""
+ if tagValue, exists := row.tags[tagName]; exists && tagValue !=
nil {
+ value = string(tagValue)
+ }
+ csvRow = append(csvRow, value)
+ }
+
+ return writer.Write(csvRow)
+}
+
+func convertStreamTagValue(value []byte) *modelv1.TagValue {
+ if value == nil {
+ return pbv1.NullTagValue
+ }
+ return &modelv1.TagValue{
+ Value: &modelv1.TagValue_Str{
+ Str: &modelv1.Str{
+ Value: string(value),
+ },
+ },
+ }
+}
+
+type streamTagRegistry struct {
+ tags map[string][]byte
+}
+
+func (r *streamTagRegistry) FindTagSpecByName(name string) *logical.TagSpec {
+ return &logical.TagSpec{
+ Spec: &databasev1.TagSpec{
+ Name: name,
+ Type: databasev1.TagType_TAG_TYPE_STRING,
+ },
+ TagFamilyIdx: 0,
+ TagIdx: 0,
+ }
+}
+
+func (r *streamTagRegistry) IndexDefined(_ string) (bool,
*databasev1.IndexRule) {
+ return false, nil
+}
+
+func (r *streamTagRegistry) IndexRuleDefined(_ string) (bool,
*databasev1.IndexRule) {
+ return false, nil
+}
+
+func (r *streamTagRegistry) EntityList() []string {
+ return nil
+}
+
+func (r *streamTagRegistry) CreateTagRef(_ ...[]*logical.Tag)
([][]*logical.TagRef, error) {
+ return nil, fmt.Errorf("CreateTagRef not supported in dump tool")
+}
+
+func (r *streamTagRegistry) CreateFieldRef(_ ...*logical.Field)
([]*logical.FieldRef, error) {
+ return nil, fmt.Errorf("CreateFieldRef not supported in dump tool")
+}
+
+func (r *streamTagRegistry) ProjTags(_ ...[]*logical.TagRef) logical.Schema {
+ return r
+}
+
+func (r *streamTagRegistry) ProjFields(_ ...*logical.FieldRef) logical.Schema {
+ return r
+}
+
+func (r *streamTagRegistry) Children() []logical.Schema {
+ return nil
+}
+
+func streamTagValueDecoder(valueType pbv1.ValueType, value []byte, valueArr
[][]byte) *modelv1.TagValue {
+ if value == nil && valueArr == nil {
+ return pbv1.NullTagValue
+ }
+
+ switch valueType {
+ case pbv1.ValueTypeStr:
+ if value == nil {
+ return pbv1.NullTagValue
+ }
+ return &modelv1.TagValue{
+ Value: &modelv1.TagValue_Str{
+ Str: &modelv1.Str{
+ Value: string(value),
+ },
+ },
+ }
+ case pbv1.ValueTypeInt64:
+ if value == nil {
+ return pbv1.NullTagValue
+ }
+ return &modelv1.TagValue{
+ Value: &modelv1.TagValue_Int{
+ Int: &modelv1.Int{
+ Value: convert.BytesToInt64(value),
+ },
+ },
+ }
+ default:
+ if value != nil {
+ return &modelv1.TagValue{
+ Value: &modelv1.TagValue_Str{
+ Str: &modelv1.Str{
+ Value: string(value),
+ },
+ },
+ }
+ }
+ return pbv1.NullTagValue
+ }
+}
diff --git a/banyand/cmd/dump/stream_test.go b/banyand/cmd/dump/stream_test.go
new file mode 100644
index 00000000..ce975153
--- /dev/null
+++ b/banyand/cmd/dump/stream_test.go
@@ -0,0 +1,150 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package main
+
+import (
+ "path/filepath"
+ "strconv"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+
+ "github.com/apache/skywalking-banyandb/banyand/stream"
+ "github.com/apache/skywalking-banyandb/pkg/compress/zstd"
+ "github.com/apache/skywalking-banyandb/pkg/encoding"
+ "github.com/apache/skywalking-banyandb/pkg/fs"
+ "github.com/apache/skywalking-banyandb/pkg/test"
+)
+
+// TestDumpStreamPartFormat tests that the dump tool can parse the latest
stream part format.
+// This test creates a real part using the stream module's flush operation,
+// then verifies the dump tool can correctly parse it.
+func TestDumpStreamPartFormat(t *testing.T) {
+ tmpPath, defFn := test.Space(require.New(t))
+ defer defFn()
+
+ fileSystem := fs.NewLocalFileSystem()
+
+ // Use stream package to create a real part using actual flush operation
+ partPath, cleanup := createTestStreamPartForDump(tmpPath, fileSystem)
+ defer cleanup()
+
+ // Extract part ID from path
+ partName := filepath.Base(partPath)
+ partID, err := strconv.ParseUint(partName, 16, 64)
+ require.NoError(t, err, "part directory should have valid hex name")
+
+ // Parse the part using dump tool functions
+ p, err := openStreamPart(partID, filepath.Dir(partPath), fileSystem)
+ require.NoError(t, err, "should be able to open part created by stream
module")
+ defer closeStreamPart(p)
+
+ // Verify part metadata
+ assert.Equal(t, partID, p.partMetadata.ID)
+ t.Logf("Part metadata: TotalCount=%d, BlocksCount=%d",
p.partMetadata.TotalCount, p.partMetadata.BlocksCount)
+ assert.Greater(t, p.partMetadata.TotalCount, uint64(0), "should have
elements")
+ assert.Greater(t, p.partMetadata.BlocksCount, uint64(0), "should have
at least 1 block")
+ assert.Greater(t, p.partMetadata.MinTimestamp, int64(0), "should have
valid min timestamp")
+ assert.Greater(t, p.partMetadata.MaxTimestamp, int64(0), "should have
valid max timestamp")
+ assert.GreaterOrEqual(t, p.partMetadata.MaxTimestamp,
p.partMetadata.MinTimestamp)
+
+ // Verify primary block metadata
+ assert.Greater(t, len(p.primaryBlockMetadata), 0, "should have at least
1 primary block")
+ t.Logf("Found %d primary blocks (metadata says BlocksCount=%d)",
len(p.primaryBlockMetadata), p.partMetadata.BlocksCount)
+ for i, pbm := range p.primaryBlockMetadata {
+ t.Logf("Block %d: SeriesID=%d, MinTimestamp=%d,
MaxTimestamp=%d, Offset=%d, Size=%d", i, pbm.seriesID, pbm.minTimestamp,
pbm.maxTimestamp, pbm.offset, pbm.size)
+ }
+
+ // Verify we can decode all blocks
+ decoder := &encoding.BytesBlockDecoder{}
+ totalElements := 0
+
+ for blockIdx, pbm := range p.primaryBlockMetadata {
+ // Read primary data block
+ primaryData := make([]byte, pbm.size)
+ fs.MustReadData(p.primary, int64(pbm.offset), primaryData)
+
+ // Decompress
+ decompressed, err := zstd.Decompress(nil, primaryData)
+ require.NoError(t, err, "should decompress primary data for
primary block %d", blockIdx)
+
+ // Parse ALL block metadata entries from this primary block
+ blockMetadatas, err := parseStreamBlockMetadata(decompressed)
+ require.NoError(t, err, "should parse all block metadata from
primary block %d", blockIdx)
+ t.Logf("Primary block %d contains %d stream blocks", blockIdx,
len(blockMetadatas))
+
+ // Process each stream block
+ for bmIdx, bm := range blockMetadatas {
+ // Read timestamps and element IDs
+ timestamps, elementIDs, err :=
readStreamTimestamps(bm.timestamps, int(bm.count), p.timestamps)
+ require.NoError(t, err, "should read
timestamps/elementIDs for series %d", bm.seriesID)
+ assert.Len(t, timestamps, int(bm.count), "should have
correct number of timestamps")
+ assert.Len(t, elementIDs, int(bm.count), "should have
correct number of elementIDs")
+
+ totalElements += len(timestamps)
+ t.Logf(" Stream block %d (SeriesID=%d): read %d
elements", bmIdx, bm.seriesID, len(timestamps))
+
+ // Verify timestamps are valid
+ for i, ts := range timestamps {
+ assert.Greater(t, ts, int64(0), "timestamp
should be positive")
+ assert.GreaterOrEqual(t, ts,
p.partMetadata.MinTimestamp, "timestamp should be >= min")
+ assert.LessOrEqual(t, ts,
p.partMetadata.MaxTimestamp, "timestamp should be <= max")
+ t.Logf(" Element %d: ElementID=%d,
Timestamp=%s", i, elementIDs[i], formatTimestamp(ts))
+ }
+
+ // Read tag families if available
+ for tagFamilyName, tagFamilyBlock := range
bm.tagFamilies {
+ // Read tag family metadata
+ tagFamilyMetadataData := make([]byte,
tagFamilyBlock.size)
+
fs.MustReadData(p.tagFamilyMetadata[tagFamilyName],
int64(tagFamilyBlock.offset), tagFamilyMetadataData)
+
+ // Parse tag family metadata
+ tagMetadatas, err :=
parseStreamTagFamilyMetadata(tagFamilyMetadataData)
+ require.NoError(t, err, "should parse tag
family metadata %s for series %d", tagFamilyName, bm.seriesID)
+
+ // Read each tag in the tag family
+ for _, tagMeta := range tagMetadatas {
+ fullTagName := tagFamilyName + "." +
tagMeta.name
+ tagValues, err :=
readStreamTagValues(decoder, tagMeta.dataBlock, fullTagName, int(bm.count),
p.tagFamilies[tagFamilyName], tagMeta.valueType)
+ require.NoError(t, err, "should read
tag %s for series %d", fullTagName, bm.seriesID)
+ assert.Len(t, tagValues, int(bm.count),
"tag %s should have value for each element", fullTagName)
+
+ // Verify specific tag values
+ for i, tagValue := range tagValues {
+ if tagValue == nil {
+ continue
+ }
+ t.Logf(" Element %d tag %s:
%s", i, fullTagName, formatTagValueForDisplay(tagValue, tagMeta.valueType))
+ }
+ }
+ }
+ }
+ }
+
+ // Verify we can read all the data correctly
+ assert.Equal(t, int(p.partMetadata.TotalCount), totalElements, "should
have parsed all elements from metadata")
+ t.Logf("Successfully parsed part with %d elements across %d primary
blocks (metadata BlocksCount=%d)",
+ totalElements, len(p.primaryBlockMetadata),
p.partMetadata.BlocksCount)
+}
+
+// createTestStreamPartForDump creates a test stream part for testing the dump
tool.
+// It uses the stream package's CreateTestPartForDump function.
+func createTestStreamPartForDump(tmpPath string, fileSystem fs.FileSystem)
(string, func()) {
+ return stream.CreateTestPartForDump(tmpPath, fileSystem)
+}
diff --git a/banyand/stream/test_helper.go b/banyand/stream/test_helper.go
new file mode 100644
index 00000000..5c1ca86f
--- /dev/null
+++ b/banyand/stream/test_helper.go
@@ -0,0 +1,92 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package stream
+
+import (
+ "time"
+
+ "github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+ "github.com/apache/skywalking-banyandb/pkg/fs"
+ pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+)
+
+// CreateTestPartForDump creates a test stream part for testing the dump tool.
+// It takes a temporary path and a file system as input, generates test
elements with various tag types,
+// creates a memory part, flushes it to disk, and returns the path to the
created part directory.
+// Parameters:
+//
+// tmpPath: the base directory where the part will be created.
+// fileSystem: the file system to use for writing the part.
+//
+// Returns:
+//
+// The path to the created part directory and a cleanup function.
+func CreateTestPartForDump(tmpPath string, fileSystem fs.FileSystem) (string,
func()) {
+ now := time.Now().UnixNano()
+
+ // Create test elements with various tag types
+ es := &elements{
+ seriesIDs: []common.SeriesID{1, 2, 3},
+ timestamps: []int64{now, now + 1000, now + 2000},
+ elementIDs: []uint64{11, 21, 31},
+ tagFamilies: [][]tagValues{
+ {
+ {
+ tag: "arrTag",
+ values: []*tagValue{
+ {tag: "strArrTag", valueType:
pbv1.ValueTypeStrArr, value: nil, valueArr: [][]byte{[]byte("value1"),
[]byte("value2")}},
+ {tag: "intArrTag", valueType:
pbv1.ValueTypeInt64Arr, value: nil, valueArr:
[][]byte{convert.Int64ToBytes(25), convert.Int64ToBytes(30)}},
+ },
+ },
+ {
+ tag: "singleTag",
+ values: []*tagValue{
+ {tag: "strTag", valueType:
pbv1.ValueTypeStr, value: []byte("test-value"), valueArr: nil},
+ {tag: "intTag", valueType:
pbv1.ValueTypeInt64, value: convert.Int64ToBytes(100), valueArr: nil},
+ },
+ },
+ },
+ {
+ {
+ tag: "singleTag",
+ values: []*tagValue{
+ {tag: "strTag1", valueType:
pbv1.ValueTypeStr, value: []byte("tag1"), valueArr: nil},
+ {tag: "strTag2", valueType:
pbv1.ValueTypeStr, value: []byte("tag2"), valueArr: nil},
+ },
+ },
+ },
+ {}, // empty tagFamilies for seriesID 3
+ },
+ }
+
+ // Create memPart and flush
+ mp := generateMemPart()
+ mp.mustInitFromElements(es)
+
+ epoch := uint64(12345)
+ path := partPath(tmpPath, epoch)
+ mp.mustFlush(fileSystem, path)
+
+ cleanup := func() {
+ // Cleanup is handled by the caller's test.Space cleanup
+ releaseMemPart(mp)
+ }
+
+ return path, cleanup
+}