This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 51399612 Fix the sorting timestamps issue of the measure model (#781)
51399612 is described below
commit 513996126898e78b8207ccd8f714e8d6306c0c47
Author: Gao Hongtao <[email protected]>
AuthorDate: Tue Sep 23 10:42:15 2025 +0800
Fix the sorting timestamps issue of the measure model (#781)
---
CHANGES.md | 1 +
banyand/internal/sidx/interfaces.go | 4 +-
banyand/internal/sidx/metadata.go | 3 +-
banyand/internal/sidx/multi_sidx_query_test.go | 4 +-
banyand/internal/sidx/sidx.go | 3 +-
banyand/internal/sidx/sidx_test.go | 26 +-
banyand/internal/sidx/snapshot_test.go | 10 +-
banyand/internal/sidx/sync.go | 6 +-
banyand/measure/flusher.go | 85 ++++--
banyand/measure/metadata.go | 9 +-
banyand/measure/part.go | 1 +
banyand/measure/query.go | 217 ++++++++++---
banyand/measure/query_test.go | 340 +++++++++++++++++++++
banyand/measure/syncer.go | 1 -
banyand/measure/tstable.go | 8 +
banyand/measure/write_liaison.go | 2 +-
banyand/stream/flusher.go | 85 ++++--
banyand/stream/part.go | 2 +
banyand/stream/tstable.go | 8 +
banyand/stream/write_liaison.go | 2 +-
banyand/trace/flusher.go | 138 ++++++---
banyand/trace/part.go | 2 +
banyand/trace/syncer.go | 14 +-
banyand/trace/tstable.go | 8 +
banyand/trace/write_liaison.go | 4 +-
banyand/trace/write_standalone.go | 2 +-
test/cases/measure/data/data.go | 21 ++
test/cases/measure/data/want/entity_in.yaml | 8 +-
test/cases/measure/data/want/tag_filter_int.yaml | 4 +-
test/cases/measure/measure.go | 19 +-
.../multi_segments/multi_segments_suite_test.go | 161 ++++++++++
.../multi_segments/multi_segments_suite_test.go | 109 +++++++
32 files changed, 1124 insertions(+), 183 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index ab84e0d0..79e28c3a 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -59,6 +59,7 @@ Release Notes.
- Fix returning empty result when using IN operatior on the array type tags.
- Fix memory leaks and OOM issues in streaming processing by implementing
deduplication logic in priority queues and improving sliding window memory
management.
- Fix etcd prefix matching any key that starts with this prefix.
+- Fix the sorting timestamps issue of the measure model when there are more
than one segment.
### Document
diff --git a/banyand/internal/sidx/interfaces.go
b/banyand/internal/sidx/interfaces.go
index bad91fb1..94cf6d29 100644
--- a/banyand/internal/sidx/interfaces.go
+++ b/banyand/internal/sidx/interfaces.go
@@ -40,7 +40,7 @@ type SIDX interface {
MustAddMemPart(ctx context.Context, mp *memPart)
// Write performs batch write operations. All writes must be submitted
as batches.
// Elements within each batch should be pre-sorted by the caller for
optimal performance.
- Write(ctx context.Context, reqs []WriteRequest) error
+ Write(ctx context.Context, reqs []WriteRequest, segmentID int64) error
// Query executes a query with key range and tag filtering.
// Returns a QueryResponse directly with all results loaded.
// Both setup/validation errors and execution errors are returned via
the error return value.
@@ -56,7 +56,7 @@ type SIDX interface {
// PartsToSync returns the parts to sync.
PartsToSync() []*part
// StreamingParts returns the streaming parts.
- StreamingParts(partsToSync []*part, group string, shardID uint32, name
string, minTimestamps []int64) ([]queue.StreamingPartData, []func())
+ StreamingParts(partsToSync []*part, group string, shardID uint32, name
string) ([]queue.StreamingPartData, []func())
// SyncCh returns the sync channel for external synchronization.
SyncCh() chan<- *SyncIntroduction
}
diff --git a/banyand/internal/sidx/metadata.go
b/banyand/internal/sidx/metadata.go
index deb8eb06..b60197ae 100644
--- a/banyand/internal/sidx/metadata.go
+++ b/banyand/internal/sidx/metadata.go
@@ -47,7 +47,8 @@ type partMetadata struct {
MaxKey int64 `json:"maxKey"` // Maximum user key in part
// Identity
- ID uint64 `json:"id"` // Unique part identifier
+ ID uint64 `json:"id"` // Unique part identifier
+ SegmentID int64 `json:"segmentID"` // Segment identifier
}
func validatePartMetadata(fileSystem fs.FileSystem, partPath string) error {
diff --git a/banyand/internal/sidx/multi_sidx_query_test.go
b/banyand/internal/sidx/multi_sidx_query_test.go
index 87d57c28..b15553b7 100644
--- a/banyand/internal/sidx/multi_sidx_query_test.go
+++ b/banyand/internal/sidx/multi_sidx_query_test.go
@@ -40,7 +40,7 @@ type mockSIDX struct {
func (m *mockSIDX) MustAddMemPart(_ context.Context, _ *memPart) {}
-func (m *mockSIDX) Write(_ context.Context, _ []WriteRequest) error {
+func (m *mockSIDX) Write(_ context.Context, _ []WriteRequest, _ int64) error {
return nil // Not implemented for tests
}
@@ -71,7 +71,7 @@ func (m *mockSIDX) PartsToSync() []*part {
return nil
}
-func (m *mockSIDX) StreamingParts(_ []*part, _ string, _ uint32, _ string, _
[]int64) ([]queue.StreamingPartData, []func()) {
+func (m *mockSIDX) StreamingParts(_ []*part, _ string, _ uint32, _ string)
([]queue.StreamingPartData, []func()) {
return nil, nil
}
diff --git a/banyand/internal/sidx/sidx.go b/banyand/internal/sidx/sidx.go
index 84ba4280..b2aa29df 100644
--- a/banyand/internal/sidx/sidx.go
+++ b/banyand/internal/sidx/sidx.go
@@ -130,7 +130,7 @@ func (s *sidx) MustAddMemPart(ctx context.Context, mp
*memPart) {
}
// Write implements SIDX interface.
-func (s *sidx) Write(ctx context.Context, reqs []WriteRequest) error {
+func (s *sidx) Write(ctx context.Context, reqs []WriteRequest, segmentID
int64) error {
// Validate requests
for _, req := range reqs {
if err := req.Validate(); err != nil {
@@ -160,6 +160,7 @@ func (s *sidx) Write(ctx context.Context, reqs
[]WriteRequest) error {
intro := generateIntroduction()
intro.memPart = mp
intro.memPart.partMetadata.ID = partID
+ intro.memPart.partMetadata.SegmentID = segmentID
intro.applied = make(chan struct{})
// Send to introducer loop
diff --git a/banyand/internal/sidx/sidx_test.go
b/banyand/internal/sidx/sidx_test.go
index 47d82f6e..95aef2b0 100644
--- a/banyand/internal/sidx/sidx_test.go
+++ b/banyand/internal/sidx/sidx_test.go
@@ -100,7 +100,7 @@ func TestSIDX_Write_SingleRequest(t *testing.T) {
createTestWriteRequest(1, 100, "data1", createTestTag("tag1",
"value1")),
}
- err := sidx.Write(ctx, reqs)
+ err := sidx.Write(ctx, reqs, 1)
assert.NoError(t, err)
// Verify stats
@@ -124,7 +124,7 @@ func TestSIDX_Write_BatchRequest(t *testing.T) {
createTestWriteRequest(2, 200, "data3", createTestTag("tag2",
"value3")),
}
- err := sidx.Write(ctx, reqs)
+ err := sidx.Write(ctx, reqs, 1)
assert.NoError(t, err)
// Verify stats
@@ -170,7 +170,7 @@ func TestSIDX_Write_Validation(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- err := sidx.Write(ctx, []WriteRequest{tt.req})
+ err := sidx.Write(ctx, []WriteRequest{tt.req}, 1)
if tt.expectErr {
assert.Error(t, err)
} else {
@@ -199,7 +199,7 @@ func TestSIDX_Write_WithTags(t *testing.T) {
createTestWriteRequest(1, 100, "trace-data", tags...),
}
- err := sidx.Write(ctx, reqs)
+ err := sidx.Write(ctx, reqs, 1)
assert.NoError(t, err)
}
@@ -218,7 +218,7 @@ func TestSIDX_Query_BasicQuery(t *testing.T) {
createTestWriteRequest(1, 100, "data1"),
createTestWriteRequest(1, 101, "data2"),
}
- err := sidx.Write(ctx, reqs)
+ err := sidx.Write(ctx, reqs, 1)
require.NoError(t, err)
// Wait for introducer loop to process
@@ -268,7 +268,7 @@ func TestSIDX_Query_KeyRangeFilter(t *testing.T) {
createTestWriteRequest(1, 150, "data150"),
createTestWriteRequest(1, 200, "data200"),
}
- err := sidx.Write(ctx, reqs)
+ err := sidx.Write(ctx, reqs, 1)
require.NoError(t, err)
// Wait for introducer loop to process
@@ -322,7 +322,7 @@ func TestSIDX_Query_Ordering(t *testing.T) {
createTestWriteRequest(3, 75, "series3-data75"),
createTestWriteRequest(3, 175, "series3-data175"),
}
- err := sidx.Write(ctx, reqs)
+ err := sidx.Write(ctx, reqs, 1)
require.NoError(t, err)
// Wait for introducer loop to process
@@ -479,7 +479,7 @@ func TestSIDX_WriteQueryIntegration(t *testing.T) {
createTestWriteRequest(2, 180, "series2-data2",
createTestTag("env", "dev")),
}
- err := sidx.Write(ctx, reqs)
+ err := sidx.Write(ctx, reqs, 1)
require.NoError(t, err)
// Test 1: Query single series
@@ -540,7 +540,7 @@ func TestSIDX_DataConsistency(t *testing.T) {
reqs = append(reqs, createTestWriteRequest(1, key, data))
}
- err := sidx.Write(ctx, reqs)
+ err := sidx.Write(ctx, reqs, 1)
require.NoError(t, err)
// Query back and verify data integrity
@@ -587,7 +587,7 @@ func TestSIDX_LargeDataset(t *testing.T) {
))
}
- err := sidx.Write(ctx, reqs)
+ err := sidx.Write(ctx, reqs, 1)
require.NoError(t, err)
// Query back and verify we can handle large result sets
@@ -634,7 +634,7 @@ func TestSIDX_ConcurrentWrites(t *testing.T) {
reqs = append(reqs,
createTestWriteRequest(seriesID, key, data))
}
- if err := sidx.Write(ctx, reqs); err != nil {
+ if err := sidx.Write(ctx, reqs, 1); err != nil {
errors <- err
}
}(g)
@@ -666,7 +666,7 @@ func TestSIDX_ConcurrentReadsWrites(t *testing.T) {
initialReqs := []WriteRequest{
createTestWriteRequest(1, 100, "initial-data"),
}
- err := sidx.Write(ctx, initialReqs)
+ err := sidx.Write(ctx, initialReqs, 1)
require.NoError(t, err)
var wg sync.WaitGroup
@@ -708,7 +708,7 @@ func TestSIDX_ConcurrentReadsWrites(t *testing.T) {
int64(writeCount),
fmt.Sprintf("writer-%d-data-%d",
writerID, writeCount),
)
- sidx.Write(ctx, []WriteRequest{req}) // Ignore
errors during concurrent stress
+ sidx.Write(ctx, []WriteRequest{req}, 1) //
Ignore errors during concurrent stress
writeCount++
}
}(i)
diff --git a/banyand/internal/sidx/snapshot_test.go
b/banyand/internal/sidx/snapshot_test.go
index 3322a459..fd4d5abe 100644
--- a/banyand/internal/sidx/snapshot_test.go
+++ b/banyand/internal/sidx/snapshot_test.go
@@ -413,7 +413,7 @@ func TestSnapshotReplacement_Basic(t *testing.T) {
Tags: []Tag{{Name: "test", Value:
[]byte("snapshot-replacement")}},
}
- if err := sidx.Write(ctx, []WriteRequest{req}); err != nil {
+ if err := sidx.Write(ctx, []WriteRequest{req}, 1); err != nil {
t.Errorf("write %d failed: %v", i, err)
}
@@ -504,7 +504,7 @@ func
TestSnapshotReplacement_ConcurrentReadsConsistentData(t *testing.T) {
},
}
- if err := sidx.Write(ctx, reqs); err != nil {
+ if err := sidx.Write(ctx, reqs, 1); err != nil {
t.Errorf("write %d failed: %v", i, err)
}
@@ -593,7 +593,7 @@ func TestSnapshotReplacement_NoDataRacesDuringReplacement(t
*testing.T) {
},
},
}
- sidx.Write(ctx, reqs)
+ sidx.Write(ctx, reqs, 1)
case 1:
// Stats operation - accesses current
snapshot
sidx.Stats(ctx)
@@ -653,7 +653,7 @@ func TestSnapshotReplacement_MemoryLeaksPrevention(t
*testing.T) {
},
}
- if writeErr := sidx.Write(ctx, reqs); writeErr != nil {
+ if writeErr := sidx.Write(ctx, reqs, 1); writeErr !=
nil {
t.Errorf("batch %d write %d failed: %v", i, j,
writeErr)
}
}
@@ -714,7 +714,7 @@ func TestSnapshotReplacement_MemoryLeaksPrevention(t
*testing.T) {
},
}
- if writeErr := sidx.Write(ctx, reqs); writeErr
!= nil {
+ if writeErr := sidx.Write(ctx, reqs, 1);
writeErr != nil {
t.Errorf("concurrent writer %d write %d
failed: %v", writerID, j, writeErr)
}
diff --git a/banyand/internal/sidx/sync.go b/banyand/internal/sidx/sync.go
index 7d4f1c65..0a671ebb 100644
--- a/banyand/internal/sidx/sync.go
+++ b/banyand/internal/sidx/sync.go
@@ -62,10 +62,10 @@ func (s *sidx) PartsToSync() []*part {
}
// StreamingParts returns the streaming parts.
-func (s *sidx) StreamingParts(partsToSync []*part, group string, shardID
uint32, name string, minTimestamps []int64) ([]queue.StreamingPartData,
[]func()) {
+func (s *sidx) StreamingParts(partsToSync []*part, group string, shardID
uint32, name string) ([]queue.StreamingPartData, []func()) {
var streamingParts []queue.StreamingPartData
var releaseFuncs []func()
- for i, part := range partsToSync {
+ for _, part := range partsToSync {
// Create streaming reader for the part
files, release := createPartFileReaders(part)
releaseFuncs = append(releaseFuncs, release)
@@ -80,7 +80,7 @@ func (s *sidx) StreamingParts(partsToSync []*part, group
string, shardID uint32,
UncompressedSizeBytes:
part.partMetadata.UncompressedSizeBytes,
TotalCount: part.partMetadata.TotalCount,
BlocksCount: part.partMetadata.BlocksCount,
- MinTimestamp: minTimestamps[i],
+ MinTimestamp: part.partMetadata.SegmentID,
MinKey: part.partMetadata.MinKey,
MaxKey: part.partMetadata.MaxKey,
PartType: name,
diff --git a/banyand/measure/flusher.go b/banyand/measure/flusher.go
index 7c2b0bba..c0390f50 100644
--- a/banyand/measure/flusher.go
+++ b/banyand/measure/flusher.go
@@ -99,33 +99,80 @@ func (tst *tsTable) pauseFlusherToPileupMemParts(epoch
uint64, flushWatcher watc
}
func (tst *tsTable) mergeMemParts(snp *snapshot, mergeCh chan
*mergerIntroduction) (bool, error) {
+ var merged bool
+ var currentSegmentID int64
var memParts []*partWrapper
mergedIDs := make(map[uint64]struct{})
+
+ // Helper function to merge current segment's parts
+ mergeCurrentSegment := func() (bool, error) {
+ if len(memParts) < 2 {
+ return false, nil
+ }
+
+ // Create a copy of mergedIDs for this merge operation
+ currentMergedIDs := make(map[uint64]struct{}, len(mergedIDs))
+ for id := range mergedIDs {
+ currentMergedIDs[id] = struct{}{}
+ }
+
+ // merge memory must not be closed by the tsTable.close
+ closeCh := make(chan struct{})
+ newPart, err :=
tst.mergePartsThenSendIntroduction(snapshotCreatorMergedFlusher, memParts,
+ currentMergedIDs, mergeCh, closeCh, "mem")
+ close(closeCh)
+ if err != nil {
+ if errors.Is(err, errClosed) {
+ return true, nil
+ }
+ return false, err
+ }
+ return newPart != nil, nil
+ }
+
+ // Process parts grouped by segmentID
for i := range snp.parts {
- if snp.parts[i].mp != nil {
- memParts = append(memParts, snp.parts[i])
- mergedIDs[snp.parts[i].ID()] = struct{}{}
+ if snp.parts[i].mp == nil {
continue
}
- }
- if len(memParts) < 2 {
- return false, nil
- }
- // merge memory must not be closed by the tsTable.close
- closeCh := make(chan struct{})
- newPart, err :=
tst.mergePartsThenSendIntroduction(snapshotCreatorMergedFlusher, memParts,
- mergedIDs, mergeCh, closeCh, "mem")
- close(closeCh)
- if err != nil {
- if errors.Is(err, errClosed) {
- return true, nil
+
+ segID := snp.parts[i].mp.segmentID
+
+ // If this is a new segment, merge the previous segment first
+ if currentSegmentID != 0 && currentSegmentID != segID {
+ m, err := mergeCurrentSegment()
+ if err != nil {
+ return false, err
+ }
+ if m {
+ merged = true
+ }
+
+ // Reset for next segment
+ memParts = memParts[:0]
+ for id := range mergedIDs {
+ delete(mergedIDs, id)
+ }
}
- return false, err
+
+ // Add part to current segment
+ currentSegmentID = segID
+ memParts = append(memParts, snp.parts[i])
+ mergedIDs[snp.parts[i].ID()] = struct{}{}
}
- if newPart == nil {
- return false, nil
+
+ // Merge the last segment if it has parts
+ if len(memParts) >= 2 {
+ m, err := mergeCurrentSegment()
+ if err != nil {
+ return false, err
+ }
+ if m {
+ merged = true
+ }
}
- return true, nil
+
+ return merged, nil
}
func (tst *tsTable) flush(snapshot *snapshot, flushCh chan
*flusherIntroduction) {
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index f9c64eb2..53475c5a 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -96,7 +96,7 @@ func newSchemaRepo(path string, svc *standalone, nodeLabels
map[string]string, n
sr.Repository = resourceSchema.NewRepository(
svc.metadata,
svc.l,
- newSupplier(path, svc, sr, nodeLabels, nodeID),
+ newSupplier(path, svc, sr, nodeLabels),
resourceSchema.NewMetrics(svc.omr.With(metadataScope)),
)
sr.start()
@@ -372,8 +372,9 @@ func (sr *schemaRepo)
stopAllProcessorsWithGroupPrefix(groupName string) {
manager := v.(*topNProcessorManager)
if err := manager.Close(); err != nil {
sr.l.Error().Err(err).Str("key",
key).Msg("failed to close topN processor manager")
+ } else {
+ sr.topNProcessorMap.Delete(key)
}
- sr.topNProcessorMap.Delete(key)
}
}
@@ -392,12 +393,11 @@ type supplier struct {
l *logger.Logger
schemaRepo *schemaRepo
nodeLabels map[string]string
- nodeID string
path string
option option
}
-func newSupplier(path string, svc *standalone, sr *schemaRepo, nodeLabels
map[string]string, nodeID string) *supplier {
+func newSupplier(path string, svc *standalone, sr *schemaRepo, nodeLabels
map[string]string) *supplier {
if svc.pm == nil {
svc.l.Panic().Msg("CRITICAL: svc.pm is nil in newSupplier")
}
@@ -418,7 +418,6 @@ func newSupplier(path string, svc *standalone, sr
*schemaRepo, nodeLabels map[st
pm: svc.pm,
schemaRepo: sr,
nodeLabels: nodeLabels,
- nodeID: nodeID,
}
}
diff --git a/banyand/measure/part.go b/banyand/measure/part.go
index d7d639ea..e47bfbc6 100644
--- a/banyand/measure/part.go
+++ b/banyand/measure/part.go
@@ -109,6 +109,7 @@ type memPart struct {
timestamps bytes.Buffer
fieldValues bytes.Buffer
partMetadata partMetadata
+ segmentID int64
}
func (mp *memPart) mustCreateMemTagFamilyWriters(name string) (fs.Writer,
fs.Writer) {
diff --git a/banyand/measure/query.go b/banyand/measure/query.go
index 066f732a..8e0fceeb 100644
--- a/banyand/measure/query.go
+++ b/banyand/measure/query.go
@@ -226,9 +226,14 @@ func (m *measure) searchSeriesList(ctx context.Context,
series []*pbv1.Series, m
newTagProjection = append(newTagProjection,
tagProjection)
}
}
+
+ // Collect all search results first
+ var segResults []*segResult
+ var needsSorting bool
seriesFilter := roaring.NewPostingList()
+
for i := range segments {
- sd, _, err := segments[i].IndexDB().Search(ctx, series,
storage.IndexSearchOpts{
+ sd, sortedValues, err := segments[i].IndexDB().Search(ctx,
series, storage.IndexSearchOpts{
Query: mqo.Query,
Order: mqo.Order,
PreloadSize: preloadSize,
@@ -241,44 +246,135 @@ func (m *measure) searchSeriesList(ctx context.Context,
series []*pbv1.Series, m
tt, cc := segments[i].Tables()
tables = append(tables, tt...)
caches = append(caches, cc...)
+
+ // Create segResult for this segment
+ sr := &segResult{
+ SeriesData: sd,
+ sortedValues: sortedValues,
+ i: 0,
+ }
+ segResults = append(segResults, sr)
+
+ // Check if we need sorting
+ if mqo.Order != nil && sortedValues != nil {
+ needsSorting = true
+ }
}
- for j := range sd.SeriesList {
- if seriesFilter.Contains(uint64(sd.SeriesList[j].ID)) {
+ }
+
+ // Sort if needed, otherwise use original order
+ if needsSorting && len(segResults) > 0 {
+ // Use segResultHeap to sort
+ segHeap := &segResultHeap{
+ results: segResults,
+ sortDesc: mqo.Order.Sort == modelv1.Sort_SORT_DESC,
+ }
+ heap.Init(segHeap)
+
+ // Extract sorted series IDs
+ for segHeap.Len() > 0 {
+ top := heap.Pop(segHeap).(*segResult)
+ series := top.SeriesList[top.i]
+
+ if seriesFilter.Contains(uint64(series.ID)) {
+ // Move to next in this segment
+ top.i++
+ if top.i < len(top.SeriesList) {
+ heap.Push(segHeap, top)
+ }
continue
}
- seriesFilter.Insert(uint64(sd.SeriesList[j].ID))
- sl = append(sl, sd.SeriesList[j].ID)
- if projectedEntityOffsets == nil && sd.Fields == nil {
- continue
+
+ seriesFilter.Insert(uint64(series.ID))
+ sl = append(sl, series.ID)
+
+ // Build storedIndexValue for this series
+ var fieldResult map[string][]byte
+ if top.Fields != nil && top.i < len(top.Fields) {
+ fieldResult = top.Fields[top.i]
}
- if storedIndexValue == nil {
- storedIndexValue =
make(map[common.SeriesID]map[string]*modelv1.TagValue)
+ storedIndexValue = m.buildStoredIndexValue(
+ series.ID,
+ series.EntityValues,
+ fieldResult,
+ projectedEntityOffsets,
+ fieldToValueType,
+ storedIndexValue,
+ )
+
+ // Move to next in this segment
+ top.i++
+ if top.i < len(top.SeriesList) {
+ heap.Push(segHeap, top)
}
- tagValues := make(map[string]*modelv1.TagValue)
- storedIndexValue[sd.SeriesList[j].ID] = tagValues
- for name, offset := range projectedEntityOffsets {
- if offset < 0 || offset >=
len(sd.SeriesList[j].EntityValues) {
- logger.Warningf("offset %d for tag %s
is out of range for series ID %v", offset, name, sd.SeriesList[j].ID)
- tagValues[name] = pbv1.NullTagValue
+ }
+ } else {
+ // Original logic when no sorting is needed
+ for _, sr := range segResults {
+ for j := range sr.SeriesList {
+ if
seriesFilter.Contains(uint64(sr.SeriesList[j].ID)) {
continue
}
- tagValues[name] =
sd.SeriesList[j].EntityValues[offset]
- }
- if sd.Fields == nil {
- continue
- }
- for f, v := range sd.Fields[j] {
- if tnt, ok := fieldToValueType[f]; ok {
- tagValues[tnt.fieldName] =
mustDecodeTagValue(tnt.typ, v)
- } else {
- logger.Panicf("unknown field %s not
found in fieldToValueType", f)
+ seriesFilter.Insert(uint64(sr.SeriesList[j].ID))
+ sl = append(sl, sr.SeriesList[j].ID)
+
+ var fieldResult map[string][]byte
+ if sr.Fields != nil && j < len(sr.Fields) {
+ fieldResult = sr.Fields[j]
}
+ storedIndexValue = m.buildStoredIndexValue(
+ sr.SeriesList[j].ID,
+ sr.SeriesList[j].EntityValues,
+ fieldResult,
+ projectedEntityOffsets,
+ fieldToValueType,
+ storedIndexValue,
+ )
}
}
}
+
return sl, tables, caches, storedIndexValue, newTagProjection, nil
}
+func (m *measure) buildStoredIndexValue(
+ seriesID common.SeriesID,
+ entityValues []*modelv1.TagValue,
+ fieldResult map[string][]byte,
+ projectedEntityOffsets map[string]int,
+ fieldToValueType map[string]tagNameWithType,
+ storedIndexValue map[common.SeriesID]map[string]*modelv1.TagValue,
+) map[common.SeriesID]map[string]*modelv1.TagValue {
+ if projectedEntityOffsets == nil && fieldResult == nil {
+ return storedIndexValue
+ }
+
+ if storedIndexValue == nil {
+ storedIndexValue =
make(map[common.SeriesID]map[string]*modelv1.TagValue)
+ }
+ tagValues := make(map[string]*modelv1.TagValue)
+ storedIndexValue[seriesID] = tagValues
+
+ for name, offset := range projectedEntityOffsets {
+ if offset < 0 || offset >= len(entityValues) {
+ logger.Warningf("offset %d for tag %s is out of range
for series ID %v", offset, name, seriesID)
+ tagValues[name] = pbv1.NullTagValue
+ continue
+ }
+ tagValues[name] = entityValues[offset]
+ }
+
+ for f, v := range fieldResult {
+ if tnt, ok := fieldToValueType[f]; ok {
+ tagValues[tnt.fieldName] = mustDecodeTagValue(tnt.typ,
v)
+ } else {
+ logger.Panicf("unknown field %s not found in
fieldToValueType", f)
+ }
+ }
+
+ return storedIndexValue
+}
+
func (m *measure) buildIndexQueryResult(ctx context.Context, mqo
model.MeasureQueryOptions,
segments []storage.Segment[*tsTable, option],
) (model.MeasureQueryResult, error) {
@@ -351,11 +447,17 @@ func (m *measure) buildIndexQueryResult(ctx
context.Context, mqo model.MeasureQu
if len(sr.SeriesList) < 1 {
continue
}
- r.segResults = append(r.segResults, sr)
+ r.segResults.results = append(r.segResults.results, sr)
}
- if len(r.segResults) < 1 {
+ if len(r.segResults.results) < 1 {
return nilResult, nil
}
+
+ // Set sort order based on mqo.Order.Sort
+ if mqo.Order != nil && mqo.Order.Sort == modelv1.Sort_SORT_DESC {
+ r.segResults.sortDesc = true
+ }
+
heap.Init(&r.segResults)
return r, nil
}
@@ -770,18 +872,18 @@ type indexSortResult struct {
// Pull implements model.MeasureQueryResult.
func (iqr *indexSortResult) Pull() *model.MeasureResult {
- if len(iqr.segResults) < 1 {
+ if len(iqr.segResults.results) < 1 {
return nil
}
- if len(iqr.segResults) == 1 {
- if iqr.segResults[0].i >= len(iqr.segResults[0].SeriesList) {
+ if len(iqr.segResults.results) == 1 {
+ if iqr.segResults.results[0].i >=
len(iqr.segResults.results[0].SeriesList) {
return nil
}
- sr := iqr.segResults[0]
+ sr := iqr.segResults.results[0]
r := iqr.copyTo(sr)
sr.i++
if sr.i >= len(sr.SeriesList) {
- iqr.segResults = iqr.segResults[:0]
+ iqr.segResults.results = iqr.segResults.results[:0]
}
return r
}
@@ -861,25 +963,56 @@ func (sr *segResult) remove(i int) {
}
}
-type segResultHeap []*segResult
+type segResultHeap struct {
+ results []*segResult
+ sortDesc bool
+}
+
+func (h *segResultHeap) Len() int { return len(h.results) }
+func (h *segResultHeap) Less(i, j int) bool {
+ // Handle NPE - check for nil results or invalid indices
+ if i >= len(h.results) || j >= len(h.results) {
+ return false
+ }
+ if h.results[i] == nil || h.results[j] == nil {
+ return false
+ }
+ if h.results[i].i >= len(h.results[i].SeriesList) || h.results[j].i >=
len(h.results[j].SeriesList) {
+ return false
+ }
+
+ // If no sortedValues, compare by SeriesID
+ if h.results[i].sortedValues == nil || h.results[j].sortedValues == nil
{
+ if h.sortDesc {
+ return h.results[i].SeriesList[h.results[i].i].ID >
h.results[j].SeriesList[h.results[j].i].ID
+ }
+ return h.results[i].SeriesList[h.results[i].i].ID <
h.results[j].SeriesList[h.results[j].i].ID
+ }
+
+ // Handle potential index out of bounds for sortedValues
+ if h.results[i].i >= len(h.results[i].sortedValues) || h.results[j].i
>= len(h.results[j].sortedValues) {
+ if h.sortDesc {
+ return h.results[i].SeriesList[h.results[i].i].ID >
h.results[j].SeriesList[h.results[j].i].ID
+ }
+ return h.results[i].SeriesList[h.results[i].i].ID <
h.results[j].SeriesList[h.results[j].i].ID
+ }
-func (h segResultHeap) Len() int { return len(h) }
-func (h segResultHeap) Less(i, j int) bool {
- if h[i].sortedValues == nil {
- return h[i].SeriesList[h[i].i].ID < h[j].SeriesList[h[j].i].ID
+ cmp := bytes.Compare(h.results[i].sortedValues[h.results[i].i],
h.results[j].sortedValues[h.results[j].i])
+ if h.sortDesc {
+ return cmp > 0
}
- return bytes.Compare(h[i].sortedValues[h[i].i],
h[j].sortedValues[h[j].i]) < 0
+ return cmp < 0
}
-func (h segResultHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
+func (h *segResultHeap) Swap(i, j int) { h.results[i], h.results[j] =
h.results[j], h.results[i] }
func (h *segResultHeap) Push(x interface{}) {
- *h = append(*h, x.(*segResult))
+ h.results = append(h.results, x.(*segResult))
}
func (h *segResultHeap) Pop() interface{} {
- old := *h
+ old := h.results
n := len(old)
x := old[n-1]
- *h = old[0 : n-1]
+ h.results = old[0 : n-1]
return x
}
diff --git a/banyand/measure/query_test.go b/banyand/measure/query_test.go
index 1f2d24a8..17ca9c37 100644
--- a/banyand/measure/query_test.go
+++ b/banyand/measure/query_test.go
@@ -34,6 +34,7 @@ import (
itest "github.com/apache/skywalking-banyandb/banyand/internal/test"
"github.com/apache/skywalking-banyandb/banyand/protector"
"github.com/apache/skywalking-banyandb/pkg/fs"
+ "github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/query/model"
@@ -1515,3 +1516,342 @@ func TestQueryResult_QuotaExceeded(t *testing.T) {
})
}
}
+
+func TestSegResultHeap_Sorting(t *testing.T) {
+ tests := []struct {
+ name string
+ segResults []*segResult
+ expectOrder []int
+ sortDesc bool
+ }{
+ {
+ name: "Sort ascending by SeriesID (no
sortedValues)",
+ sortDesc: false,
+ segResults: []*segResult{
+ {
+ SeriesData: storage.SeriesData{
+ SeriesList: pbv1.SeriesList{
+ &pbv1.Series{ID:
common.SeriesID(30)},
+ },
+ },
+ i: 0,
+ },
+ {
+ SeriesData: storage.SeriesData{
+ SeriesList: pbv1.SeriesList{
+ &pbv1.Series{ID:
common.SeriesID(10)},
+ },
+ },
+ i: 0,
+ },
+ {
+ SeriesData: storage.SeriesData{
+ SeriesList: pbv1.SeriesList{
+ &pbv1.Series{ID:
common.SeriesID(20)},
+ },
+ },
+ i: 0,
+ },
+ },
+ expectOrder: []int{1, 0, 2}, // SeriesID order: 10, 30,
20
+ },
+ {
+ name: "Sort descending by SeriesID (no
sortedValues)",
+ sortDesc: true,
+ segResults: []*segResult{
+ {
+ SeriesData: storage.SeriesData{
+ SeriesList: pbv1.SeriesList{
+ &pbv1.Series{ID:
common.SeriesID(10)},
+ },
+ },
+ i: 0,
+ },
+ {
+ SeriesData: storage.SeriesData{
+ SeriesList: pbv1.SeriesList{
+ &pbv1.Series{ID:
common.SeriesID(30)},
+ },
+ },
+ i: 0,
+ },
+ {
+ SeriesData: storage.SeriesData{
+ SeriesList: pbv1.SeriesList{
+ &pbv1.Series{ID:
common.SeriesID(20)},
+ },
+ },
+ i: 0,
+ },
+ },
+ expectOrder: []int{1, 0, 2}, // SeriesID order: 30, 10,
20
+ },
+ {
+ name: "Sort ascending by sortedValues",
+ sortDesc: false,
+ segResults: []*segResult{
+ {
+ SeriesData: storage.SeriesData{
+ SeriesList: pbv1.SeriesList{
+ &pbv1.Series{ID:
common.SeriesID(1)},
+ },
+ },
+ sortedValues:
[][]byte{[]byte("charlie")},
+ i: 0,
+ },
+ {
+ SeriesData: storage.SeriesData{
+ SeriesList: pbv1.SeriesList{
+ &pbv1.Series{ID:
common.SeriesID(2)},
+ },
+ },
+ sortedValues: [][]byte{[]byte("alpha")},
+ i: 0,
+ },
+ {
+ SeriesData: storage.SeriesData{
+ SeriesList: pbv1.SeriesList{
+ &pbv1.Series{ID:
common.SeriesID(3)},
+ },
+ },
+ sortedValues: [][]byte{[]byte("beta")},
+ i: 0,
+ },
+ },
+ expectOrder: []int{1, 0, 2}, // alpha, charlie, beta
+ },
+ {
+ name: "Sort descending by sortedValues",
+ sortDesc: true,
+ segResults: []*segResult{
+ {
+ SeriesData: storage.SeriesData{
+ SeriesList: pbv1.SeriesList{
+ &pbv1.Series{ID:
common.SeriesID(1)},
+ },
+ },
+ sortedValues: [][]byte{[]byte("alpha")},
+ i: 0,
+ },
+ {
+ SeriesData: storage.SeriesData{
+ SeriesList: pbv1.SeriesList{
+ &pbv1.Series{ID:
common.SeriesID(2)},
+ },
+ },
+ sortedValues:
[][]byte{[]byte("charlie")},
+ i: 0,
+ },
+ {
+ SeriesData: storage.SeriesData{
+ SeriesList: pbv1.SeriesList{
+ &pbv1.Series{ID:
common.SeriesID(3)},
+ },
+ },
+ sortedValues: [][]byte{[]byte("beta")},
+ i: 0,
+ },
+ },
+ expectOrder: []int{1, 0, 2}, // charlie, alpha, beta
+ },
+ {
+ name: "Mixed sortedValues and nil sortedValues
ascending",
+ sortDesc: false,
+ segResults: []*segResult{
+ {
+ SeriesData: storage.SeriesData{
+ SeriesList: pbv1.SeriesList{
+ &pbv1.Series{ID:
common.SeriesID(30)},
+ },
+ },
+ sortedValues: nil, // Will use SeriesID
for sorting
+ i: 0,
+ },
+ {
+ SeriesData: storage.SeriesData{
+ SeriesList: pbv1.SeriesList{
+ &pbv1.Series{ID:
common.SeriesID(10)},
+ },
+ },
+ sortedValues: [][]byte{[]byte("zzz")},
// Should come after nil sortedValues when sorted by SeriesID
+ i: 0,
+ },
+ {
+ SeriesData: storage.SeriesData{
+ SeriesList: pbv1.SeriesList{
+ &pbv1.Series{ID:
common.SeriesID(20)},
+ },
+ },
+ sortedValues: nil, // Will use SeriesID
for sorting
+ i: 0,
+ },
+ },
+ expectOrder: []int{1, 0, 2}, // SeriesID 10, 30, 20
(nil sortedValues use SeriesID)
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ // Create and initialize heap
+ heap := segResultHeap{
+ results: make([]*segResult, 0),
+ sortDesc: tt.sortDesc,
+ }
+
+ // Add all results to heap
+ heap.results = append(heap.results, tt.segResults...)
+
+ // Initialize heap
+ require.Equal(t, len(tt.segResults), heap.Len())
+
+ // Sort using Go's heap
+ heapImpl := &heap
+ heap2 := make([]*segResult, len(tt.segResults))
+ copy(heap2, heap.results)
+
+ // Sort manually to get expected order
+ sort.Slice(heap2, func(i, j int) bool {
+ return heapImpl.Less(i, j)
+ })
+
+ // Verify the order matches expectation
+ for i, expectedIdx := range tt.expectOrder {
+ actual := heap2[i]
+ expected := tt.segResults[expectedIdx]
+ require.Equal(t,
expected.SeriesList[expected.i].ID, actual.SeriesList[actual.i].ID,
+ "Position %d: expected SeriesID %d, got
%d", i, expected.SeriesList[expected.i].ID, actual.SeriesList[actual.i].ID)
+ }
+ })
+ }
+}
+
+func TestSegResultHeap_NPE_Prevention(t *testing.T) {
+ tests := []struct {
+ name string
+ segResults []*segResult
+ i, j int
+ expectLess bool
+ }{
+ {
+ name: "Out of bounds indices",
+ segResults: []*segResult{{SeriesData:
storage.SeriesData{SeriesList: pbv1.SeriesList{{ID: 1}}}, i: 0}},
+ i: 0,
+ j: 5, // Out of bounds
+ expectLess: false,
+ },
+ {
+ name: "Nil segResult",
+ segResults: []*segResult{nil, {SeriesData:
storage.SeriesData{SeriesList: pbv1.SeriesList{{ID: 1}}}, i: 0}},
+ i: 0,
+ j: 1,
+ expectLess: false,
+ },
+ {
+ name: "Index out of bounds for SeriesList",
+ segResults: []*segResult{
+ {SeriesData: storage.SeriesData{SeriesList:
pbv1.SeriesList{{ID: 1}}}, i: 5}, // i is out of bounds
+ {SeriesData: storage.SeriesData{SeriesList:
pbv1.SeriesList{{ID: 2}}}, i: 0},
+ },
+ i: 0,
+ j: 1,
+ expectLess: false,
+ },
+ {
+ name: "Index out of bounds for sortedValues",
+ segResults: []*segResult{
+ {
+ SeriesData:
storage.SeriesData{SeriesList: pbv1.SeriesList{{ID: 1}}},
+ sortedValues: [][]byte{[]byte("test")},
+ i: 5, // i is out of bounds
for sortedValues
+ },
+ {
+ SeriesData:
storage.SeriesData{SeriesList: pbv1.SeriesList{{ID: 2}}},
+ sortedValues: [][]byte{[]byte("test2")},
+ i: 0,
+ },
+ },
+ i: 0,
+ j: 1,
+ expectLess: false, // Should fallback to SeriesID
comparison
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ heap := segResultHeap{
+ results: tt.segResults,
+ sortDesc: false,
+ }
+
+ // This should not panic due to NPE prevention
+ result := heap.Less(tt.i, tt.j)
+ require.Equal(t, tt.expectLess, result)
+ })
+ }
+}
+
+func TestIndexSortResult_OrderBySortDesc(t *testing.T) {
+ tests := []struct {
+ name string
+ sortOrder modelv1.Sort
+ expectDesc bool
+ }{
+ {
+ name: "SORT_ASC should be ascending",
+ sortOrder: modelv1.Sort_SORT_ASC,
+ expectDesc: false,
+ },
+ {
+ name: "SORT_UNSPECIFIED should be ascending",
+ sortOrder: modelv1.Sort_SORT_UNSPECIFIED,
+ expectDesc: false,
+ },
+ {
+ name: "SORT_DESC should be descending",
+ sortOrder: modelv1.Sort_SORT_DESC,
+ expectDesc: true,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ // Create mock segments and measure data
+ mqo := model.MeasureQueryOptions{
+ Order: &index.OrderBy{
+ Sort: tt.sortOrder,
+ },
+ }
+
+ // Create a simple segResult
+ sr := &segResult{
+ SeriesData: storage.SeriesData{
+ SeriesList: pbv1.SeriesList{
+ &pbv1.Series{ID:
common.SeriesID(1)},
+ &pbv1.Series{ID:
common.SeriesID(2)},
+ },
+ Timestamps: []int64{1000, 2000},
+ Versions: []int64{1, 2},
+ },
+ sortedValues: [][]byte{[]byte("test1"),
[]byte("test2")},
+ i: 0,
+ }
+
+ // Create index sort result
+ r := &indexSortResult{
+ tfl: []tagFamilyLocation{},
+ segResults: segResultHeap{
+ results: []*segResult{sr},
+ sortDesc: false, // This should be set
by buildIndexQueryResult
+ },
+ }
+
+ // Simulate the logic from buildIndexQueryResult
+ if mqo.Order != nil && mqo.Order.Sort ==
modelv1.Sort_SORT_DESC {
+ r.segResults.sortDesc = true
+ }
+
+ // Verify the sort order was set correctly
+ require.Equal(t, tt.expectDesc, r.segResults.sortDesc)
+ })
+ }
+}
diff --git a/banyand/measure/syncer.go b/banyand/measure/syncer.go
index f91841b3..b3b7f3a4 100644
--- a/banyand/measure/syncer.go
+++ b/banyand/measure/syncer.go
@@ -230,7 +230,6 @@ func (tst *tsTable) syncSnapshot(curSnapshot *snapshot,
syncCh chan *syncIntrodu
// Create streaming reader for the part.
files, release := createPartFileReaders(part)
releaseFuncs = append(releaseFuncs, release)
-
// Create streaming part sync data.
streamingParts = append(streamingParts,
queue.StreamingPartData{
ID: part.partMetadata.ID,
diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go
index 2f71eddd..5ddab3ea 100644
--- a/banyand/measure/tstable.go
+++ b/banyand/measure/tstable.go
@@ -106,6 +106,9 @@ func initTSTable(fileSystem fs.FileSystem, rootPath string,
p common.Position,
fileSystem.MustRMAll(filepath.Join(rootPath, needToDelete[i]))
}
if len(loadedParts) == 0 || len(loadedSnapshots) == 0 {
+ for _, id := range loadedSnapshots {
+ fileSystem.MustRMAll(filepath.Join(rootPath,
snapshotName(id)))
+ }
return &tst, uint64(time.Now().UnixNano())
}
sort.Slice(loadedSnapshots, func(i, j int) bool {
@@ -279,12 +282,17 @@ func (tst *tsTable) Close() error {
}
func (tst *tsTable) mustAddDataPoints(dps *dataPoints) {
+ tst.mustAddDataPointsWithSegmentID(dps, 0)
+}
+
+func (tst *tsTable) mustAddDataPointsWithSegmentID(dps *dataPoints, segmentID
int64) {
if len(dps.seriesIDs) == 0 {
return
}
mp := generateMemPart()
mp.mustInitFromDataPoints(dps)
+ mp.segmentID = segmentID
tst.mustAddMemPart(mp)
}
diff --git a/banyand/measure/write_liaison.go b/banyand/measure/write_liaison.go
index 7b287ff5..e64cad88 100644
--- a/banyand/measure/write_liaison.go
+++ b/banyand/measure/write_liaison.go
@@ -108,7 +108,7 @@ func (w *writeQueueCallback) Rev(ctx context.Context,
message bus.Message) (resp
for j := range g.tables {
es := g.tables[j]
if es.tsTable != nil && es.dataPoints != nil {
- es.tsTable.mustAddDataPoints(es.dataPoints)
+
es.tsTable.mustAddDataPointsWithSegmentID(es.dataPoints,
es.timeRange.Start.UnixNano())
releaseDataPoints(es.dataPoints)
}
nodes := g.queue.GetNodes(es.shardID)
diff --git a/banyand/stream/flusher.go b/banyand/stream/flusher.go
index d8d4cc2f..04788af0 100644
--- a/banyand/stream/flusher.go
+++ b/banyand/stream/flusher.go
@@ -129,33 +129,80 @@ func (tst *tsTable) pauseFlusherToPileupMemParts(epoch
uint64, flushWatcher watc
}
func (tst *tsTable) mergeMemParts(snp *snapshot, mergeCh chan
*mergerIntroduction) (bool, error) {
+ var merged bool
+ var currentSegmentID int64
var memParts []*partWrapper
mergedIDs := make(map[uint64]struct{})
+
+ // Helper function to merge current segment's parts
+ mergeCurrentSegment := func() (bool, error) {
+ if len(memParts) < 2 {
+ return false, nil
+ }
+
+ // Create a copy of mergedIDs for this merge operation
+ currentMergedIDs := make(map[uint64]struct{}, len(mergedIDs))
+ for id := range mergedIDs {
+ currentMergedIDs[id] = struct{}{}
+ }
+
+ // merge memory must not be closed by the tsTable.close
+ closeCh := make(chan struct{})
+ newPart, err :=
tst.mergePartsThenSendIntroduction(snapshotCreatorMergedFlusher, memParts,
+ currentMergedIDs, mergeCh, closeCh, "mem")
+ close(closeCh)
+ if err != nil {
+ if errors.Is(err, errClosed) {
+ return true, nil
+ }
+ return false, err
+ }
+ return newPart != nil, nil
+ }
+
+ // Process parts grouped by segmentID
for i := range snp.parts {
- if snp.parts[i].mp != nil {
- memParts = append(memParts, snp.parts[i])
- mergedIDs[snp.parts[i].ID()] = struct{}{}
+ if snp.parts[i].mp == nil {
continue
}
- }
- if len(memParts) < 2 {
- return false, nil
- }
- // merge memory must not be closed by the tsTable.close
- closeCh := make(chan struct{})
- newPart, err :=
tst.mergePartsThenSendIntroduction(snapshotCreatorMergedFlusher, memParts,
- mergedIDs, mergeCh, closeCh, "mem")
- close(closeCh)
- if err != nil {
- if errors.Is(err, errClosed) {
- return true, nil
+
+ segID := snp.parts[i].mp.segmentID
+
+ // If this is a new segment, merge the previous segment first
+ if currentSegmentID != 0 && currentSegmentID != segID {
+ m, err := mergeCurrentSegment()
+ if err != nil {
+ return false, err
+ }
+ if m {
+ merged = true
+ }
+
+ // Reset for next segment
+ memParts = memParts[:0]
+ for id := range mergedIDs {
+ delete(mergedIDs, id)
+ }
}
- return false, err
+
+ // Add part to current segment
+ currentSegmentID = segID
+ memParts = append(memParts, snp.parts[i])
+ mergedIDs[snp.parts[i].ID()] = struct{}{}
}
- if newPart == nil {
- return false, nil
+
+ // Merge the last segment if it has parts
+ if len(memParts) >= 2 {
+ m, err := mergeCurrentSegment()
+ if err != nil {
+ return false, err
+ }
+ if m {
+ merged = true
+ }
}
- return true, nil
+
+ return merged, nil
}
func (tst *tsTable) flush(snapshot *snapshot, flushCh chan
*flusherIntroduction) {
diff --git a/banyand/stream/part.go b/banyand/stream/part.go
index 07e5a22f..39cb2145 100644
--- a/banyand/stream/part.go
+++ b/banyand/stream/part.go
@@ -112,6 +112,7 @@ type memPart struct {
primary bytes.Buffer
timestamps bytes.Buffer
partMetadata partMetadata
+ segmentID int64
}
func (mp *memPart) mustCreateMemTagFamilyWriters(name string) (fs.Writer,
fs.Writer, fs.Writer) {
@@ -140,6 +141,7 @@ func (mp *memPart) reset() {
mp.meta.Reset()
mp.primary.Reset()
mp.timestamps.Reset()
+ mp.segmentID = 0
if mp.tagFamilies != nil {
for k, tf := range mp.tagFamilies {
tf.Reset()
diff --git a/banyand/stream/tstable.go b/banyand/stream/tstable.go
index bd521ce4..9384c4b4 100644
--- a/banyand/stream/tstable.go
+++ b/banyand/stream/tstable.go
@@ -261,6 +261,9 @@ func initTSTable(fileSystem fs.FileSystem, rootPath string,
p common.Position,
fileSystem.MustRMAll(filepath.Join(rootPath, needToDelete[i]))
}
if len(loadedParts) == 0 || len(loadedSnapshots) == 0 {
+ for _, id := range loadedSnapshots {
+ fileSystem.MustRMAll(filepath.Join(rootPath,
snapshotName(id)))
+ }
return &tst, uint64(time.Now().UnixNano()), nil
}
sort.Slice(loadedSnapshots, func(i, j int) bool {
@@ -336,12 +339,17 @@ func (tst *tsTable) mustAddMemPart(mp *memPart) {
}
func (tst *tsTable) mustAddElements(es *elements) {
+ tst.mustAddElementsWithSegmentID(es, 0)
+}
+
+func (tst *tsTable) mustAddElementsWithSegmentID(es *elements, segmentID
int64) {
if len(es.seriesIDs) == 0 {
return
}
mp := generateMemPart()
mp.mustInitFromElements(es)
+ mp.segmentID = segmentID
tst.mustAddMemPart(mp)
}
diff --git a/banyand/stream/write_liaison.go b/banyand/stream/write_liaison.go
index c78d1a37..be566f09 100644
--- a/banyand/stream/write_liaison.go
+++ b/banyand/stream/write_liaison.go
@@ -181,7 +181,7 @@ func (w *writeQueueCallback) Rev(ctx context.Context,
message bus.Message) (resp
g := groups[i]
for j := range g.tables {
es := g.tables[j]
- es.tsTable.mustAddElements(es.elements)
+ es.tsTable.mustAddElementsWithSegmentID(es.elements,
es.timeRange.Start.UnixNano())
releaseElements(es.elements)
// Get nodes for this shard
nodes := g.queue.GetNodes(es.shardID)
diff --git a/banyand/trace/flusher.go b/banyand/trace/flusher.go
index 56f81c32..727cad6c 100644
--- a/banyand/trace/flusher.go
+++ b/banyand/trace/flusher.go
@@ -48,33 +48,20 @@ func (tst *tsTable) flusherLoop(flushCh chan
*flusherIntroduction, mergeCh chan
tst.incTotalFlushLatency(time.Since(start).Seconds())
}()
curSnapshot := tst.currentSnapshot()
- if curSnapshot != nil {
- flusherWatchers =
tst.pauseFlusherToPileupMemParts(epoch, flusherWatcher, flusherWatchers)
- curSnapshot.decRef()
- curSnapshot = nil
- }
- tst.RLock()
- if tst.snapshot != nil && tst.snapshot.epoch >
epoch {
- curSnapshot = tst.snapshot
- curSnapshot.incRef()
- }
- tst.RUnlock()
+ var ok bool
+ var merged bool
+ curSnapshot, flusherWatchers, ok, merged =
tst.pauseFlusherToPileupMemPartsWithMerge(curSnapshot, flusherWatcher,
flusherWatchers, epoch, mergeCh)
if curSnapshot != nil {
defer curSnapshot.decRef()
- merged, err :=
tst.mergeMemParts(curSnapshot, mergeCh)
- if err != nil {
-
tst.l.Logger.Warn().Err(err).Msgf("cannot merge snapshot: %d",
curSnapshot.epoch)
- tst.incTotalFlushLoopErr(1)
+ if !ok {
return false
}
if !merged {
tst.flush(curSnapshot, flushCh)
}
- epoch = curSnapshot.epoch
- // Notify merger to start a new round
of merge.
- // This round might have be triggered
in pauseFlusherToPileupMemParts.
flusherWatchers.Notify(math.MaxUint64)
flusherWatchers = nil
+ epoch = curSnapshot.epoch
if tst.currentEpoch() != epoch {
tst.incTotalFlushLoopProgress(1)
return false
@@ -132,6 +119,36 @@ func (tst *tsTable) flusherLoopNoMerger(flushCh chan
*flusherIntroduction, intro
}
}
+func (tst *tsTable) pauseFlusherToPileupMemPartsWithMerge(
+ curSnapshot *snapshot, flusherWatcher watcher.Channel, flusherWatchers
watcher.Epochs,
+ epoch uint64, mergeCh chan *mergerIntroduction,
+) (*snapshot, watcher.Epochs, bool, bool) {
+ if tst.option.flushTimeout < 1 {
+ return curSnapshot, flusherWatchers, true, false
+ }
+ if curSnapshot != nil {
+ flusherWatchers = tst.pauseFlusherToPileupMemParts(epoch,
flusherWatcher, flusherWatchers)
+ curSnapshot.decRef()
+ curSnapshot = nil
+ }
+ tst.RLock()
+ if tst.snapshot != nil && tst.snapshot.epoch > epoch {
+ curSnapshot = tst.snapshot
+ curSnapshot.incRef()
+ }
+ tst.RUnlock()
+ if curSnapshot == nil {
+ return nil, flusherWatchers, false, false
+ }
+ merged, err := tst.mergeMemParts(curSnapshot, mergeCh)
+ if err != nil {
+ tst.l.Logger.Warn().Err(err).Msgf("cannot merge snapshot: %d",
curSnapshot.epoch)
+ tst.incTotalFlushLoopErr(1)
+ return curSnapshot, flusherWatchers, false, false
+ }
+ return curSnapshot, flusherWatchers, true, merged
+}
+
// pauseFlusherToPileupMemParts takes a pause to wait for in-memory parts to
pile up.
// If there is no in-memory part, we can skip the pause.
// When a merging is finished, we can skip the pause.
@@ -155,33 +172,80 @@ func (tst *tsTable) pauseFlusherToPileupMemParts(epoch
uint64, flushWatcher watc
}
func (tst *tsTable) mergeMemParts(snp *snapshot, mergeCh chan
*mergerIntroduction) (bool, error) {
+ var merged bool
+ var currentSegmentID int64
var memParts []*partWrapper
mergedIDs := make(map[partHandle]struct{})
+
+ // Helper function to merge current segment's parts
+ mergeCurrentSegment := func() (bool, error) {
+ if len(memParts) < 2 {
+ return false, nil
+ }
+
+ // Create a copy of mergedIDs for this merge operation
+ currentMergedIDs := make(map[partHandle]struct{},
len(mergedIDs))
+ for id := range mergedIDs {
+ currentMergedIDs[id] = struct{}{}
+ }
+
+ // merge memory must not be closed by the tsTable.close
+ closeCh := make(chan struct{})
+ newPart, err :=
tst.mergePartsThenSendIntroduction(snapshotCreatorMergedFlusher, memParts,
+ currentMergedIDs, mergeCh, closeCh, "mem")
+ close(closeCh)
+ if err != nil {
+ if errors.Is(err, errClosed) {
+ return true, nil
+ }
+ return false, err
+ }
+ return newPart != nil, nil
+ }
+
+ // Process parts grouped by segmentID
for i := range snp.parts {
- if snp.parts[i].mp != nil {
- memParts = append(memParts, snp.parts[i])
- mergedIDs[partHandle{partID: snp.parts[i].ID(),
partType: PartTypeCore}] = struct{}{}
+ if snp.parts[i].mp == nil {
continue
}
- }
- if len(memParts) < 2 {
- return false, nil
- }
- // merge memory must not be closed by the tsTable.close
- closeCh := make(chan struct{})
- newPart, err :=
tst.mergePartsThenSendIntroduction(snapshotCreatorMergedFlusher, memParts,
- mergedIDs, mergeCh, closeCh, "mem")
- close(closeCh)
- if err != nil {
- if errors.Is(err, errClosed) {
- return true, nil
+
+ segID := snp.parts[i].mp.segmentID
+
+ // If this is a new segment, merge the previous segment first
+ if currentSegmentID != 0 && currentSegmentID != segID {
+ m, err := mergeCurrentSegment()
+ if err != nil {
+ return false, err
+ }
+ if m {
+ merged = true
+ }
+
+ // Reset for next segment
+ memParts = memParts[:0]
+ for id := range mergedIDs {
+ delete(mergedIDs, id)
+ }
}
- return false, err
+
+ // Add part to current segment
+ currentSegmentID = segID
+ memParts = append(memParts, snp.parts[i])
+ mergedIDs[partHandle{partID: snp.parts[i].ID(), partType:
PartTypeCore}] = struct{}{}
}
- if newPart == nil {
- return false, nil
+
+ // Merge the last segment if it has parts
+ if len(memParts) >= 2 {
+ m, err := mergeCurrentSegment()
+ if err != nil {
+ return false, err
+ }
+ if m {
+ merged = true
+ }
}
- return true, nil
+
+ return merged, nil
}
func (tst *tsTable) flush(snapshot *snapshot, flushCh chan
*flusherIntroduction) {
diff --git a/banyand/trace/part.go b/banyand/trace/part.go
index d8ba721d..b76caab9 100644
--- a/banyand/trace/part.go
+++ b/banyand/trace/part.go
@@ -113,6 +113,7 @@ type memPart struct {
meta bytes.Buffer
primary bytes.Buffer
partMetadata partMetadata
+ segmentID int64
}
func (mp *memPart) mustCreateMemTagWriters(name string) (fs.Writer, fs.Writer)
{
@@ -143,6 +144,7 @@ func (mp *memPart) reset() {
mp.meta.Reset()
mp.primary.Reset()
mp.spans.Reset()
+ mp.segmentID = 0
if mp.tags != nil {
for k, t := range mp.tags {
t.Reset()
diff --git a/banyand/trace/syncer.go b/banyand/trace/syncer.go
index 8e9b024c..a95c9e8c 100644
--- a/banyand/trace/syncer.go
+++ b/banyand/trace/syncer.go
@@ -296,21 +296,9 @@ func (tst *tsTable) syncStreamingPartsToNodes(ctx
context.Context, nodes []strin
}
// Prepare all streaming parts data
streamingParts := make([]queue.StreamingPartData, 0)
- snapshot := tst.currentSnapshot()
// Add sidx streaming parts
for name, sidxParts := range sidxPartsToSync {
- // TODO: minTimestmaps should be read only once
- minTimestamps := make([]int64, 0)
- for _, part := range sidxParts {
- partID := part.ID()
- for _, pw := range snapshot.parts {
- if pw.p.partMetadata.ID == partID {
- minTimestamps =
append(minTimestamps, pw.p.partMetadata.MinTimestamp)
- break
- }
- }
- }
- sidxStreamingParts, sidxReleaseFuncs :=
tst.sidxMap[name].StreamingParts(sidxParts, tst.group, uint32(tst.shardID),
name, minTimestamps)
+ sidxStreamingParts, sidxReleaseFuncs :=
tst.sidxMap[name].StreamingParts(sidxParts, tst.group, uint32(tst.shardID),
name)
streamingParts = append(streamingParts,
sidxStreamingParts...)
*releaseFuncs = append(*releaseFuncs,
sidxReleaseFuncs...)
}
diff --git a/banyand/trace/tstable.go b/banyand/trace/tstable.go
index 8e3be10f..645dde18 100644
--- a/banyand/trace/tstable.go
+++ b/banyand/trace/tstable.go
@@ -248,6 +248,9 @@ func initTSTable(fileSystem fs.FileSystem, rootPath string,
p common.Position,
fileSystem.MustRMAll(filepath.Join(rootPath, needToDelete[i]))
}
if len(loadedParts) == 0 || len(loadedSnapshots) == 0 {
+ for _, id := range loadedSnapshots {
+ fileSystem.MustRMAll(filepath.Join(rootPath,
snapshotName(id)))
+ }
return &tst, uint64(time.Now().UnixNano())
}
sort.Slice(loadedSnapshots, func(i, j int) bool {
@@ -412,12 +415,17 @@ func (tst *tsTable) mustAddMemPart(mp *memPart) {
}
func (tst *tsTable) mustAddTraces(ts *traces) {
+ tst.mustAddTracesWithSegmentID(ts, 0)
+}
+
+func (tst *tsTable) mustAddTracesWithSegmentID(ts *traces, segmentID int64) {
if len(ts.traceIDs) == 0 {
return
}
mp := generateMemPart()
mp.mustInitFromTraces(ts)
+ mp.segmentID = segmentID
tst.mustAddMemPart(mp)
}
diff --git a/banyand/trace/write_liaison.go b/banyand/trace/write_liaison.go
index 58a37f60..d4741f50 100644
--- a/banyand/trace/write_liaison.go
+++ b/banyand/trace/write_liaison.go
@@ -191,7 +191,7 @@ func (w *writeQueueCallback) Rev(ctx context.Context,
message bus.Message) (resp
g := groups[i]
for j := range g.tables {
es := g.tables[j]
- es.tsTable.mustAddTraces(es.traces)
+ es.tsTable.mustAddTracesWithSegmentID(es.traces,
es.timeRange.Start.UnixNano())
releaseTraces(es.traces)
for sidxName, sidxReqs := range es.sidxReqsMap {
@@ -201,7 +201,7 @@ func (w *writeQueueCallback) Rev(ctx context.Context,
message bus.Message) (resp
w.l.Error().Err(err).Str("sidx", sidxName).Msg("cannot get or create sidx
instance")
continue
}
- if err := sidxInstance.Write(ctx,
sidxReqs); err != nil {
+ if err := sidxInstance.Write(ctx,
sidxReqs, es.timeRange.Start.UnixNano()); err != nil {
w.l.Error().Err(err).Str("sidx", sidxName).Msg("cannot write to secondary
index")
}
}
diff --git a/banyand/trace/write_standalone.go
b/banyand/trace/write_standalone.go
index 653ffbd4..6ef873fd 100644
--- a/banyand/trace/write_standalone.go
+++ b/banyand/trace/write_standalone.go
@@ -362,7 +362,7 @@ func (w *writeCallback) Rev(ctx context.Context, message
bus.Message) (resp bus.
w.l.Error().Err(err).Str("sidx", sidxName).Msg("cannot get or create sidx
instance")
continue
}
- if err := sidxInstance.Write(ctx,
sidxReqs); err != nil {
+ if err := sidxInstance.Write(ctx,
sidxReqs, es.timeRange.Start.UnixNano()); err != nil {
w.l.Error().Err(err).Str("sidx", sidxName).Msg("cannot write to secondary
index")
}
}
diff --git a/test/cases/measure/data/data.go b/test/cases/measure/data/data.go
index 77274d08..1e8bb2b2 100644
--- a/test/cases/measure/data/data.go
+++ b/test/cases/measure/data/data.go
@@ -23,6 +23,7 @@ import (
"embed"
"encoding/json"
"io"
+ "slices"
"time"
"github.com/google/go-cmp/cmp"
@@ -75,6 +76,26 @@ func verifyWithContext(ctx context.Context, innerGm
gm.Gomega, sharedContext hel
innerGm.Expect(err).NotTo(gm.HaveOccurred())
want := &measurev1.QueryResponse{}
helpers.UnmarshalYAML(ww, want)
+ if args.DisOrder {
+ slices.SortFunc(want.DataPoints, func(a, b
*measurev1.DataPoint) int {
+ if a.Sid != b.Sid {
+ if a.Sid < b.Sid {
+ return -1
+ }
+ return 1
+ }
+ return
a.Timestamp.AsTime().Compare(b.Timestamp.AsTime())
+ })
+ slices.SortFunc(resp.DataPoints, func(a, b
*measurev1.DataPoint) int {
+ if a.Sid != b.Sid {
+ if a.Sid < b.Sid {
+ return -1
+ }
+ return 1
+ }
+ return
a.Timestamp.AsTime().Compare(b.Timestamp.AsTime())
+ })
+ }
for i := range resp.DataPoints {
if resp.DataPoints[i].Timestamp != nil {
innerGm.Expect(resp.DataPoints[i].Version).Should(gm.BeNumerically(">", 0))
diff --git a/test/cases/measure/data/want/entity_in.yaml
b/test/cases/measure/data/want/entity_in.yaml
index 73ba584e..125eb6aa 100644
--- a/test/cases/measure/data/want/entity_in.yaml
+++ b/test/cases/measure/data/want/entity_in.yaml
@@ -22,19 +22,19 @@ dataPoints:
- key: name
value:
str:
- value: service_name_1
+ value: service_name_2
- key: short_name
value:
str:
- value: service_short_name_1
+ value: service_short_name_2
- tagFamilies:
- name: default
tags:
- key: name
value:
str:
- value: service_name_2
+ value: service_name_1
- key: short_name
value:
str:
- value: service_short_name_2
+ value: service_short_name_1
diff --git a/test/cases/measure/data/want/tag_filter_int.yaml
b/test/cases/measure/data/want/tag_filter_int.yaml
index 5a726b9d..3a6ffc22 100644
--- a/test/cases/measure/data/want/tag_filter_int.yaml
+++ b/test/cases/measure/data/want/tag_filter_int.yaml
@@ -22,7 +22,7 @@ dataPoints:
- key: name
value:
str:
- value: service_name_1
+ value: service_name_3
- key: layer
value:
int:
@@ -33,7 +33,7 @@ dataPoints:
- key: name
value:
str:
- value: service_name_3
+ value: service_name_1
- key: layer
value:
int:
diff --git a/test/cases/measure/measure.go b/test/cases/measure/measure.go
index aeacde4f..274dd3cb 100644
--- a/test/cases/measure/measure.go
+++ b/test/cases/measure/measure.go
@@ -44,7 +44,7 @@ var _ = g.DescribeTable("Scanning Measures", verify,
g.Entry("all only fields", helpers.Args{Input: "all_only_fields",
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
g.Entry("the max limit", helpers.Args{Input: "all_max_limit", Want:
"all", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
g.Entry("filter by tag", helpers.Args{Input: "tag_filter", Duration: 25
* time.Minute, Offset: -20 * time.Minute}),
- g.Entry("filter by a integer tag", helpers.Args{Input:
"tag_filter_int", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
+ g.Entry("filter by a integer tag", helpers.Args{Input:
"tag_filter_int", Duration: 25 * time.Minute, Offset: -20 * time.Minute,
DisOrder: true}),
g.Entry("filter by an unknown tag", helpers.Args{Input:
"tag_filter_unknown", Duration: 25 * time.Minute, Offset: -20 * time.Minute,
WantEmpty: true}),
g.Entry("group and max", helpers.Args{Input: "group_max", Duration: 25
* time.Minute, Offset: -20 * time.Minute}),
g.Entry("group and min", helpers.Args{Input: "group_min", Duration: 25
* time.Minute, Offset: -20 * time.Minute}),
@@ -62,10 +62,10 @@ var _ = g.DescribeTable("Scanning Measures", verify,
g.Entry("limit 3,2", helpers.Args{Input: "limit", Duration: 25 *
time.Minute, Offset: -20 * time.Minute}),
g.Entry("match a node", helpers.Args{Input: "match_node", Duration: 25
* time.Minute, Offset: -20 * time.Minute}),
g.Entry("match nodes", helpers.Args{Input: "match_nodes", Duration: 25
* time.Minute, Offset: -20 * time.Minute}),
- g.Entry("filter by entity id", helpers.Args{Input: "entity", Duration:
25 * time.Minute, Offset: -20 * time.Minute}),
- g.Entry("filter by several entity ids", helpers.Args{Input:
"entity_in", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
- g.Entry("filter by entity id and service id", helpers.Args{Input:
"entity_service", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
- g.Entry("without field", helpers.Args{Input: "no_field", Duration: 25 *
time.Minute, Offset: -20 * time.Minute}),
+ g.Entry("filter by entity id", helpers.Args{Input: "entity", Duration:
25 * time.Minute, Offset: -20 * time.Minute, DisOrder: true}),
+ g.Entry("filter by several entity ids", helpers.Args{Input:
"entity_in", Duration: 25 * time.Minute, Offset: -20 * time.Minute, DisOrder:
true}),
+ g.Entry("filter by entity id and service id", helpers.Args{Input:
"entity_service", Duration: 25 * time.Minute, Offset: -20 * time.Minute,
DisOrder: true}),
+ g.Entry("without field", helpers.Args{Input: "no_field", Duration: 25 *
time.Minute, Offset: -20 * time.Minute, DisOrder: true}),
g.Entry("invalid logical expression", helpers.Args{Input:
"err_invalid_le", Duration: 25 * time.Minute, Offset: -20 * time.Minute,
WantErr: true}),
g.Entry("linked or expressions", helpers.Args{Input: "linked_or",
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
g.Entry("In and not In expressions", helpers.Args{Input: "in",
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
@@ -74,12 +74,13 @@ var _ = g.DescribeTable("Scanning Measures", verify,
g.Entry("all_latency", helpers.Args{Input: "all_latency", Duration: 25
* time.Minute, Offset: -20 * time.Minute}),
g.Entry("duplicated in a part", helpers.Args{Input: "duplicated_part",
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
g.Entry("match a tag belongs to the entity", helpers.Args{Input:
"entity_match", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
- g.Entry("all of index mode", helpers.Args{Input: "index_mode_all",
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
+ g.Entry("all of index mode", helpers.Args{Input: "index_mode_all",
Duration: 25 * time.Minute, Offset: -20 * time.Minute, DisOrder: true}),
g.Entry("all of index mode in a larger time range",
- helpers.Args{Input: "index_mode_all", Want:
"index_mode_all_xl", Duration: 96 * time.Hour, Offset: -72 * time.Hour}),
- g.Entry("all in all segments of index mode", helpers.Args{Input:
"index_mode_all", Want: "index_mode_all_segs", Duration: 96 * time.Hour,
Offset: -72 * time.Hour}),
+ helpers.Args{Input: "index_mode_all", Want:
"index_mode_all_xl", Duration: 96 * time.Hour, Offset: -72 * time.Hour,
DisOrder: true}),
+ g.Entry("all in all segments of index mode",
+ helpers.Args{Input: "index_mode_all", Want:
"index_mode_all_segs", Duration: 96 * time.Hour, Offset: -72 * time.Hour,
DisOrder: true}),
g.Entry("order by desc of index mode", helpers.Args{Input:
"index_mode_order_desc", Duration: 25 * time.Minute, Offset: -20 *
time.Minute}),
- g.Entry("range of index mode", helpers.Args{Input: "index_mode_range",
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
+ g.Entry("range of index mode", helpers.Args{Input: "index_mode_range",
Duration: 25 * time.Minute, Offset: -20 * time.Minute, DisOrder: true}),
g.Entry("none of index mode", helpers.Args{Input: "index_mode_none",
WantEmpty: true, Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
g.Entry("query by id in index mode", helpers.Args{Input:
"index_mode_by_id", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
g.Entry("multi groups: unchanged", helpers.Args{Input:
"multi_group_unchanged", Duration: 35 * time.Minute, Offset: -20 *
time.Minute}),
diff --git
a/test/integration/distributed/multi_segments/multi_segments_suite_test.go
b/test/integration/distributed/multi_segments/multi_segments_suite_test.go
new file mode 100644
index 00000000..5a8fa9fe
--- /dev/null
+++ b/test/integration/distributed/multi_segments/multi_segments_suite_test.go
@@ -0,0 +1,161 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package integration_multi_segments_test
+
+import (
+ "context"
+ "fmt"
+ "testing"
+ "time"
+
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+ "github.com/onsi/gomega/gleak"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+
+ "github.com/apache/skywalking-banyandb/banyand/metadata"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+ "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/pool"
+ "github.com/apache/skywalking-banyandb/pkg/test"
+ "github.com/apache/skywalking-banyandb/pkg/test/flags"
+ "github.com/apache/skywalking-banyandb/pkg/test/gmatcher"
+ "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+ test_measure "github.com/apache/skywalking-banyandb/pkg/test/measure"
+ "github.com/apache/skywalking-banyandb/pkg/test/setup"
+ test_stream "github.com/apache/skywalking-banyandb/pkg/test/stream"
+ test_trace "github.com/apache/skywalking-banyandb/pkg/test/trace"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
+ test_cases "github.com/apache/skywalking-banyandb/test/cases"
+ casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure"
+ casesstream "github.com/apache/skywalking-banyandb/test/cases/stream"
+ casestopn "github.com/apache/skywalking-banyandb/test/cases/topn"
+ casestrace "github.com/apache/skywalking-banyandb/test/cases/trace"
+)
+
+func TestDistributedMultiSegments(t *testing.T) {
+ RegisterFailHandler(Fail)
+ RunSpecs(t, "Distributed Multi Segments Suite")
+}
+
+var (
+ connection *grpc.ClientConn
+ now time.Time
+ baseTime time.Time
+ deferFunc func()
+ goods []gleak.Goroutine
+)
+
+var _ = SynchronizedBeforeSuite(func() []byte {
+ goods = gleak.Goroutines()
+ Expect(logger.Init(logger.Logging{
+ Env: "dev",
+ Level: flags.LogLevel,
+ })).To(Succeed())
+
+ By("Starting etcd server")
+ ports, err := test.AllocateFreePorts(2)
+ Expect(err).NotTo(HaveOccurred())
+ dir, spaceDef, err := test.NewSpace()
+ Expect(err).NotTo(HaveOccurred())
+ ep := fmt.Sprintf("http://127.0.0.1:%d", ports[0])
+ server, err := embeddedetcd.NewServer(
+ embeddedetcd.ConfigureListener([]string{ep},
[]string{fmt.Sprintf("http://127.0.0.1:%d", ports[1])}),
+ embeddedetcd.RootDir(dir),
+ embeddedetcd.AutoCompactionMode("periodic"),
+ embeddedetcd.AutoCompactionRetention("1h"),
+ embeddedetcd.QuotaBackendBytes(2*1024*1024*1024),
+ )
+ Expect(err).ShouldNot(HaveOccurred())
+ <-server.ReadyNotify()
+
+ By("Loading schema")
+ schemaRegistry, err := schema.NewEtcdSchemaRegistry(
+ schema.Namespace(metadata.DefaultNamespace),
+ schema.ConfigureServerEndpoints([]string{ep}),
+ )
+ Expect(err).NotTo(HaveOccurred())
+ defer schemaRegistry.Close()
+ ctx := context.Background()
+ test_stream.PreloadSchema(ctx, schemaRegistry)
+ test_measure.PreloadSchema(ctx, schemaRegistry)
+ test_trace.PreloadSchema(ctx, schemaRegistry)
+
+ By("Starting data node 0")
+ closeDataNode0 := setup.DataNode(ep)
+ By("Starting data node 1")
+ closeDataNode1 := setup.DataNode(ep)
+ By("Starting liaison node")
+ liaisonAddr, closerLiaisonNode := setup.LiaisonNode(ep)
+
+ By("Initializing test cases")
+ ns := timestamp.NowMilli().UnixNano()
+ now = time.Unix(0, ns-ns%int64(time.Minute))
+ baseTime = time.Date(now.Year(), now.Month(), now.Day(), 0o0, 0o2, 0,
0, now.Location())
+ test_cases.Initialize(liaisonAddr, baseTime)
+
+ deferFunc = func() {
+ closerLiaisonNode()
+ closeDataNode0()
+ closeDataNode1()
+ _ = server.Close()
+ <-server.StopNotify()
+ spaceDef()
+ }
+ return []byte(liaisonAddr)
+}, func(address []byte) {
+ var err error
+ connection, err = grpchelper.Conn(string(address), 10*time.Second,
+ grpc.WithTransportCredentials(insecure.NewCredentials()))
+ casesstream.SharedContext = helpers.SharedContext{
+ Connection: connection,
+ BaseTime: baseTime,
+ }
+ casesmeasure.SharedContext = helpers.SharedContext{
+ Connection: connection,
+ BaseTime: baseTime,
+ }
+ casestopn.SharedContext = helpers.SharedContext{
+ Connection: connection,
+ BaseTime: baseTime,
+ }
+ casestrace.SharedContext = helpers.SharedContext{
+ Connection: connection,
+ BaseTime: baseTime,
+ }
+ Expect(err).NotTo(HaveOccurred())
+})
+
+var _ = SynchronizedAfterSuite(func() {
+ if connection != nil {
+ Expect(connection.Close()).To(Succeed())
+ }
+}, func() {})
+
+var _ = ReportAfterSuite("Distributed Multi Segments Suite", func(report
Report) {
+ if report.SuiteSucceeded {
+ if deferFunc != nil {
+ deferFunc()
+ }
+ Eventually(gleak.Goroutines,
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+ Eventually(pool.AllRefsCount,
flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef())
+ }
+})
diff --git
a/test/integration/standalone/multi_segments/multi_segments_suite_test.go
b/test/integration/standalone/multi_segments/multi_segments_suite_test.go
new file mode 100644
index 00000000..5c8e1c61
--- /dev/null
+++ b/test/integration/standalone/multi_segments/multi_segments_suite_test.go
@@ -0,0 +1,109 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package integration_query_test
+
+import (
+ "testing"
+ "time"
+
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+ "github.com/onsi/gomega/gleak"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+
+ "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/pool"
+ "github.com/apache/skywalking-banyandb/pkg/test/flags"
+ "github.com/apache/skywalking-banyandb/pkg/test/gmatcher"
+ "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+ "github.com/apache/skywalking-banyandb/pkg/test/setup"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
+ test_cases "github.com/apache/skywalking-banyandb/test/cases"
+ casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure"
+ casesstream "github.com/apache/skywalking-banyandb/test/cases/stream"
+ casestopn "github.com/apache/skywalking-banyandb/test/cases/topn"
+ casestrace "github.com/apache/skywalking-banyandb/test/cases/trace"
+ integration_standalone
"github.com/apache/skywalking-banyandb/test/integration/standalone"
+)
+
+func TestIntegrationMultiSegments(t *testing.T) {
+ RegisterFailHandler(Fail)
+ RunSpecs(t, "Integration Multi Segments Suite",
Label(integration_standalone.Labels...))
+}
+
+var (
+ connection *grpc.ClientConn
+ now time.Time
+ baseTime time.Time
+ deferFunc func()
+ goods []gleak.Goroutine
+)
+
+var _ = SynchronizedBeforeSuite(func() []byte {
+ goods = gleak.Goroutines()
+ Expect(logger.Init(logger.Logging{
+ Env: "dev",
+ Level: flags.LogLevel,
+ })).To(Succeed())
+ var addr string
+ addr, _, deferFunc = setup.Standalone()
+ ns := timestamp.NowMilli().UnixNano()
+ now = time.Unix(0, ns-ns%int64(time.Minute))
+ baseTime = time.Date(now.Year(), now.Month(), now.Day(), 0o0, 0o2, 0,
0, now.Location())
+ test_cases.Initialize(addr, baseTime)
+ return []byte(addr)
+}, func(address []byte) {
+ var err error
+ connection, err = grpchelper.Conn(string(address), 10*time.Second,
+ grpc.WithTransportCredentials(insecure.NewCredentials()))
+ casesstream.SharedContext = helpers.SharedContext{
+ Connection: connection,
+ BaseTime: baseTime,
+ }
+ casesmeasure.SharedContext = helpers.SharedContext{
+ Connection: connection,
+ BaseTime: baseTime,
+ }
+ casestopn.SharedContext = helpers.SharedContext{
+ Connection: connection,
+ BaseTime: baseTime,
+ }
+ casestrace.SharedContext = helpers.SharedContext{
+ Connection: connection,
+ BaseTime: baseTime,
+ }
+ Expect(err).NotTo(HaveOccurred())
+})
+
+var _ = SynchronizedAfterSuite(func() {
+ if connection != nil {
+ Expect(connection.Close()).To(Succeed())
+ }
+}, func() {})
+
+var _ = ReportAfterSuite("Integration Query Suite", func(report Report) {
+ if report.SuiteSucceeded {
+ if deferFunc != nil {
+ deferFunc()
+ }
+ Eventually(gleak.Goroutines,
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+ Eventually(pool.AllRefsCount,
flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef())
+ }
+})