This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch test/multi-segments in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 75eecf971b8e3883b37e0b33575c608a9dc99997 Author: Gao Hongtao <[email protected]> AuthorDate: Fri Sep 12 13:34:43 2025 +0800 Refactor newSupplier function to remove unused nodeID parameter. Enhance searchSeriesList function to collect and sort segment results based on query options. Introduce segResultHeap for efficient sorting of segment results. Add tests for segResultHeap sorting and NPE prevention. Improve logging in syncer and write_liaison components for better traceability. --- banyand/measure/metadata.go | 6 +- banyand/measure/query.go | 217 ++++++++++--- banyand/measure/query_test.go | 342 +++++++++++++++++++++ banyand/measure/syncer.go | 24 +- banyand/measure/write_liaison.go | 3 + banyand/measure/write_standalone.go | 3 + test/cases/measure/data/data.go | 21 ++ test/cases/measure/data/input/linked_or.yaml | 1 + test/cases/measure/data/want/entity_in.yaml | 8 +- test/cases/measure/data/want/tag_filter_int.yaml | 4 +- test/cases/measure/measure.go | 20 +- .../multi_segments/multi_segments_suite_test.go | 154 ++++++++++ .../multi_segments/multi_segments_suite_test.go | 109 +++++++ 13 files changed, 849 insertions(+), 63 deletions(-) diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go index f9c64eb2..82017b7f 100644 --- a/banyand/measure/metadata.go +++ b/banyand/measure/metadata.go @@ -96,7 +96,7 @@ func newSchemaRepo(path string, svc *standalone, nodeLabels map[string]string, n sr.Repository = resourceSchema.NewRepository( svc.metadata, svc.l, - newSupplier(path, svc, sr, nodeLabels, nodeID), + newSupplier(path, svc, sr, nodeLabels), resourceSchema.NewMetrics(svc.omr.With(metadataScope)), ) sr.start() @@ -392,12 +392,11 @@ type supplier struct { l *logger.Logger schemaRepo *schemaRepo nodeLabels map[string]string - nodeID string path string option option } -func newSupplier(path string, svc *standalone, sr *schemaRepo, nodeLabels map[string]string, nodeID string) *supplier { +func newSupplier(path string, svc *standalone, sr *schemaRepo, nodeLabels map[string]string) *supplier { if svc.pm == nil { svc.l.Panic().Msg("CRITICAL: svc.pm is nil in newSupplier") } @@ -418,7 +417,6 @@ func newSupplier(path string, svc *standalone, sr *schemaRepo, nodeLabels map[st pm: svc.pm, schemaRepo: sr, nodeLabels: nodeLabels, - nodeID: nodeID, } } diff --git a/banyand/measure/query.go b/banyand/measure/query.go index 066f732a..8e0fceeb 100644 --- a/banyand/measure/query.go +++ b/banyand/measure/query.go @@ -226,9 +226,14 @@ func (m *measure) searchSeriesList(ctx context.Context, series []*pbv1.Series, m newTagProjection = append(newTagProjection, tagProjection) } } + + // Collect all search results first + var segResults []*segResult + var needsSorting bool seriesFilter := roaring.NewPostingList() + for i := range segments { - sd, _, err := segments[i].IndexDB().Search(ctx, series, storage.IndexSearchOpts{ + sd, sortedValues, err := segments[i].IndexDB().Search(ctx, series, storage.IndexSearchOpts{ Query: mqo.Query, Order: mqo.Order, PreloadSize: preloadSize, @@ -241,44 +246,135 @@ func (m *measure) searchSeriesList(ctx context.Context, series []*pbv1.Series, m tt, cc := segments[i].Tables() tables = append(tables, tt...) caches = append(caches, cc...) + + // Create segResult for this segment + sr := &segResult{ + SeriesData: sd, + sortedValues: sortedValues, + i: 0, + } + segResults = append(segResults, sr) + + // Check if we need sorting + if mqo.Order != nil && sortedValues != nil { + needsSorting = true + } } - for j := range sd.SeriesList { - if seriesFilter.Contains(uint64(sd.SeriesList[j].ID)) { + } + + // Sort if needed, otherwise use original order + if needsSorting && len(segResults) > 0 { + // Use segResultHeap to sort + segHeap := &segResultHeap{ + results: segResults, + sortDesc: mqo.Order.Sort == modelv1.Sort_SORT_DESC, + } + heap.Init(segHeap) + + // Extract sorted series IDs + for segHeap.Len() > 0 { + top := heap.Pop(segHeap).(*segResult) + series := top.SeriesList[top.i] + + if seriesFilter.Contains(uint64(series.ID)) { + // Move to next in this segment + top.i++ + if top.i < len(top.SeriesList) { + heap.Push(segHeap, top) + } continue } - seriesFilter.Insert(uint64(sd.SeriesList[j].ID)) - sl = append(sl, sd.SeriesList[j].ID) - if projectedEntityOffsets == nil && sd.Fields == nil { - continue + + seriesFilter.Insert(uint64(series.ID)) + sl = append(sl, series.ID) + + // Build storedIndexValue for this series + var fieldResult map[string][]byte + if top.Fields != nil && top.i < len(top.Fields) { + fieldResult = top.Fields[top.i] } - if storedIndexValue == nil { - storedIndexValue = make(map[common.SeriesID]map[string]*modelv1.TagValue) + storedIndexValue = m.buildStoredIndexValue( + series.ID, + series.EntityValues, + fieldResult, + projectedEntityOffsets, + fieldToValueType, + storedIndexValue, + ) + + // Move to next in this segment + top.i++ + if top.i < len(top.SeriesList) { + heap.Push(segHeap, top) } - tagValues := make(map[string]*modelv1.TagValue) - storedIndexValue[sd.SeriesList[j].ID] = tagValues - for name, offset := range projectedEntityOffsets { - if offset < 0 || offset >= len(sd.SeriesList[j].EntityValues) { - logger.Warningf("offset %d for tag %s is out of range for series ID %v", offset, name, sd.SeriesList[j].ID) - tagValues[name] = pbv1.NullTagValue + } + } else { + // Original logic when no sorting is needed + for _, sr := range segResults { + for j := range sr.SeriesList { + if seriesFilter.Contains(uint64(sr.SeriesList[j].ID)) { continue } - tagValues[name] = sd.SeriesList[j].EntityValues[offset] - } - if sd.Fields == nil { - continue - } - for f, v := range sd.Fields[j] { - if tnt, ok := fieldToValueType[f]; ok { - tagValues[tnt.fieldName] = mustDecodeTagValue(tnt.typ, v) - } else { - logger.Panicf("unknown field %s not found in fieldToValueType", f) + seriesFilter.Insert(uint64(sr.SeriesList[j].ID)) + sl = append(sl, sr.SeriesList[j].ID) + + var fieldResult map[string][]byte + if sr.Fields != nil && j < len(sr.Fields) { + fieldResult = sr.Fields[j] } + storedIndexValue = m.buildStoredIndexValue( + sr.SeriesList[j].ID, + sr.SeriesList[j].EntityValues, + fieldResult, + projectedEntityOffsets, + fieldToValueType, + storedIndexValue, + ) } } } + return sl, tables, caches, storedIndexValue, newTagProjection, nil } +func (m *measure) buildStoredIndexValue( + seriesID common.SeriesID, + entityValues []*modelv1.TagValue, + fieldResult map[string][]byte, + projectedEntityOffsets map[string]int, + fieldToValueType map[string]tagNameWithType, + storedIndexValue map[common.SeriesID]map[string]*modelv1.TagValue, +) map[common.SeriesID]map[string]*modelv1.TagValue { + if projectedEntityOffsets == nil && fieldResult == nil { + return storedIndexValue + } + + if storedIndexValue == nil { + storedIndexValue = make(map[common.SeriesID]map[string]*modelv1.TagValue) + } + tagValues := make(map[string]*modelv1.TagValue) + storedIndexValue[seriesID] = tagValues + + for name, offset := range projectedEntityOffsets { + if offset < 0 || offset >= len(entityValues) { + logger.Warningf("offset %d for tag %s is out of range for series ID %v", offset, name, seriesID) + tagValues[name] = pbv1.NullTagValue + continue + } + tagValues[name] = entityValues[offset] + } + + for f, v := range fieldResult { + if tnt, ok := fieldToValueType[f]; ok { + tagValues[tnt.fieldName] = mustDecodeTagValue(tnt.typ, v) + } else { + logger.Panicf("unknown field %s not found in fieldToValueType", f) + } + } + + return storedIndexValue +} + func (m *measure) buildIndexQueryResult(ctx context.Context, mqo model.MeasureQueryOptions, segments []storage.Segment[*tsTable, option], ) (model.MeasureQueryResult, error) { @@ -351,11 +447,17 @@ func (m *measure) buildIndexQueryResult(ctx context.Context, mqo model.MeasureQu if len(sr.SeriesList) < 1 { continue } - r.segResults = append(r.segResults, sr) + r.segResults.results = append(r.segResults.results, sr) } - if len(r.segResults) < 1 { + if len(r.segResults.results) < 1 { return nilResult, nil } + + // Set sort order based on mqo.Order.Sort + if mqo.Order != nil && mqo.Order.Sort == modelv1.Sort_SORT_DESC { + r.segResults.sortDesc = true + } + heap.Init(&r.segResults) return r, nil } @@ -770,18 +872,18 @@ type indexSortResult struct { // Pull implements model.MeasureQueryResult. func (iqr *indexSortResult) Pull() *model.MeasureResult { - if len(iqr.segResults) < 1 { + if len(iqr.segResults.results) < 1 { return nil } - if len(iqr.segResults) == 1 { - if iqr.segResults[0].i >= len(iqr.segResults[0].SeriesList) { + if len(iqr.segResults.results) == 1 { + if iqr.segResults.results[0].i >= len(iqr.segResults.results[0].SeriesList) { return nil } - sr := iqr.segResults[0] + sr := iqr.segResults.results[0] r := iqr.copyTo(sr) sr.i++ if sr.i >= len(sr.SeriesList) { - iqr.segResults = iqr.segResults[:0] + iqr.segResults.results = iqr.segResults.results[:0] } return r } @@ -861,25 +963,56 @@ func (sr *segResult) remove(i int) { } } -type segResultHeap []*segResult +type segResultHeap struct { + results []*segResult + sortDesc bool +} + +func (h *segResultHeap) Len() int { return len(h.results) } +func (h *segResultHeap) Less(i, j int) bool { + // Handle NPE - check for nil results or invalid indices + if i >= len(h.results) || j >= len(h.results) { + return false + } + if h.results[i] == nil || h.results[j] == nil { + return false + } + if h.results[i].i >= len(h.results[i].SeriesList) || h.results[j].i >= len(h.results[j].SeriesList) { + return false + } + + // If no sortedValues, compare by SeriesID + if h.results[i].sortedValues == nil || h.results[j].sortedValues == nil { + if h.sortDesc { + return h.results[i].SeriesList[h.results[i].i].ID > h.results[j].SeriesList[h.results[j].i].ID + } + return h.results[i].SeriesList[h.results[i].i].ID < h.results[j].SeriesList[h.results[j].i].ID + } + + // Handle potential index out of bounds for sortedValues + if h.results[i].i >= len(h.results[i].sortedValues) || h.results[j].i >= len(h.results[j].sortedValues) { + if h.sortDesc { + return h.results[i].SeriesList[h.results[i].i].ID > h.results[j].SeriesList[h.results[j].i].ID + } + return h.results[i].SeriesList[h.results[i].i].ID < h.results[j].SeriesList[h.results[j].i].ID + } -func (h segResultHeap) Len() int { return len(h) } -func (h segResultHeap) Less(i, j int) bool { - if h[i].sortedValues == nil { - return h[i].SeriesList[h[i].i].ID < h[j].SeriesList[h[j].i].ID + cmp := bytes.Compare(h.results[i].sortedValues[h.results[i].i], h.results[j].sortedValues[h.results[j].i]) + if h.sortDesc { + return cmp > 0 } - return bytes.Compare(h[i].sortedValues[h[i].i], h[j].sortedValues[h[j].i]) < 0 + return cmp < 0 } -func (h segResultHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } +func (h *segResultHeap) Swap(i, j int) { h.results[i], h.results[j] = h.results[j], h.results[i] } func (h *segResultHeap) Push(x interface{}) { - *h = append(*h, x.(*segResult)) + h.results = append(h.results, x.(*segResult)) } func (h *segResultHeap) Pop() interface{} { - old := *h + old := h.results n := len(old) x := old[n-1] - *h = old[0 : n-1] + h.results = old[0 : n-1] return x } diff --git a/banyand/measure/query_test.go b/banyand/measure/query_test.go index 1f2d24a8..85d1af5f 100644 --- a/banyand/measure/query_test.go +++ b/banyand/measure/query_test.go @@ -34,6 +34,7 @@ import ( itest "github.com/apache/skywalking-banyandb/banyand/internal/test" "github.com/apache/skywalking-banyandb/banyand/protector" "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/query/model" @@ -1515,3 +1516,344 @@ func TestQueryResult_QuotaExceeded(t *testing.T) { }) } } + +func TestSegResultHeap_Sorting(t *testing.T) { + tests := []struct { + name string + segResults []*segResult + expectOrder []int + sortDesc bool + }{ + { + name: "Sort ascending by SeriesID (no sortedValues)", + sortDesc: false, + segResults: []*segResult{ + { + SeriesData: storage.SeriesData{ + SeriesList: pbv1.SeriesList{ + &pbv1.Series{ID: common.SeriesID(30)}, + }, + }, + i: 0, + }, + { + SeriesData: storage.SeriesData{ + SeriesList: pbv1.SeriesList{ + &pbv1.Series{ID: common.SeriesID(10)}, + }, + }, + i: 0, + }, + { + SeriesData: storage.SeriesData{ + SeriesList: pbv1.SeriesList{ + &pbv1.Series{ID: common.SeriesID(20)}, + }, + }, + i: 0, + }, + }, + expectOrder: []int{1, 0, 2}, // SeriesID order: 10, 30, 20 + }, + { + name: "Sort descending by SeriesID (no sortedValues)", + sortDesc: true, + segResults: []*segResult{ + { + SeriesData: storage.SeriesData{ + SeriesList: pbv1.SeriesList{ + &pbv1.Series{ID: common.SeriesID(10)}, + }, + }, + i: 0, + }, + { + SeriesData: storage.SeriesData{ + SeriesList: pbv1.SeriesList{ + &pbv1.Series{ID: common.SeriesID(30)}, + }, + }, + i: 0, + }, + { + SeriesData: storage.SeriesData{ + SeriesList: pbv1.SeriesList{ + &pbv1.Series{ID: common.SeriesID(20)}, + }, + }, + i: 0, + }, + }, + expectOrder: []int{1, 0, 2}, // SeriesID order: 30, 10, 20 + }, + { + name: "Sort ascending by sortedValues", + sortDesc: false, + segResults: []*segResult{ + { + SeriesData: storage.SeriesData{ + SeriesList: pbv1.SeriesList{ + &pbv1.Series{ID: common.SeriesID(1)}, + }, + }, + sortedValues: [][]byte{[]byte("charlie")}, + i: 0, + }, + { + SeriesData: storage.SeriesData{ + SeriesList: pbv1.SeriesList{ + &pbv1.Series{ID: common.SeriesID(2)}, + }, + }, + sortedValues: [][]byte{[]byte("alpha")}, + i: 0, + }, + { + SeriesData: storage.SeriesData{ + SeriesList: pbv1.SeriesList{ + &pbv1.Series{ID: common.SeriesID(3)}, + }, + }, + sortedValues: [][]byte{[]byte("beta")}, + i: 0, + }, + }, + expectOrder: []int{1, 0, 2}, // alpha, charlie, beta + }, + { + name: "Sort descending by sortedValues", + sortDesc: true, + segResults: []*segResult{ + { + SeriesData: storage.SeriesData{ + SeriesList: pbv1.SeriesList{ + &pbv1.Series{ID: common.SeriesID(1)}, + }, + }, + sortedValues: [][]byte{[]byte("alpha")}, + i: 0, + }, + { + SeriesData: storage.SeriesData{ + SeriesList: pbv1.SeriesList{ + &pbv1.Series{ID: common.SeriesID(2)}, + }, + }, + sortedValues: [][]byte{[]byte("charlie")}, + i: 0, + }, + { + SeriesData: storage.SeriesData{ + SeriesList: pbv1.SeriesList{ + &pbv1.Series{ID: common.SeriesID(3)}, + }, + }, + sortedValues: [][]byte{[]byte("beta")}, + i: 0, + }, + }, + expectOrder: []int{1, 0, 2}, // charlie, alpha, beta + }, + { + name: "Mixed sortedValues and nil sortedValues ascending", + sortDesc: false, + segResults: []*segResult{ + { + SeriesData: storage.SeriesData{ + SeriesList: pbv1.SeriesList{ + &pbv1.Series{ID: common.SeriesID(30)}, + }, + }, + sortedValues: nil, // Will use SeriesID for sorting + i: 0, + }, + { + SeriesData: storage.SeriesData{ + SeriesList: pbv1.SeriesList{ + &pbv1.Series{ID: common.SeriesID(10)}, + }, + }, + sortedValues: [][]byte{[]byte("zzz")}, // Should come after nil sortedValues when sorted by SeriesID + i: 0, + }, + { + SeriesData: storage.SeriesData{ + SeriesList: pbv1.SeriesList{ + &pbv1.Series{ID: common.SeriesID(20)}, + }, + }, + sortedValues: nil, // Will use SeriesID for sorting + i: 0, + }, + }, + expectOrder: []int{1, 0, 2}, // SeriesID 10, 30, 20 (nil sortedValues use SeriesID) + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create and initialize heap + heap := segResultHeap{ + results: make([]*segResult, 0), + sortDesc: tt.sortDesc, + } + + // Add all results to heap + for _, sr := range tt.segResults { + heap.results = append(heap.results, sr) + } + + // Initialize heap + require.Equal(t, len(tt.segResults), heap.Len()) + + // Sort using Go's heap + heapImpl := &heap + heap2 := make([]*segResult, len(tt.segResults)) + copy(heap2, heap.results) + + // Sort manually to get expected order + sort.Slice(heap2, func(i, j int) bool { + return heapImpl.Less(i, j) + }) + + // Verify the order matches expectation + for i, expectedIdx := range tt.expectOrder { + actual := heap2[i] + expected := tt.segResults[expectedIdx] + require.Equal(t, expected.SeriesList[expected.i].ID, actual.SeriesList[actual.i].ID, + "Position %d: expected SeriesID %d, got %d", i, expected.SeriesList[expected.i].ID, actual.SeriesList[actual.i].ID) + } + }) + } +} + +func TestSegResultHeap_NPE_Prevention(t *testing.T) { + tests := []struct { + name string + segResults []*segResult + i, j int + expectLess bool + }{ + { + name: "Out of bounds indices", + segResults: []*segResult{{SeriesData: storage.SeriesData{SeriesList: pbv1.SeriesList{{ID: 1}}}, i: 0}}, + i: 0, + j: 5, // Out of bounds + expectLess: false, + }, + { + name: "Nil segResult", + segResults: []*segResult{nil, {SeriesData: storage.SeriesData{SeriesList: pbv1.SeriesList{{ID: 1}}}, i: 0}}, + i: 0, + j: 1, + expectLess: false, + }, + { + name: "Index out of bounds for SeriesList", + segResults: []*segResult{ + {SeriesData: storage.SeriesData{SeriesList: pbv1.SeriesList{{ID: 1}}}, i: 5}, // i is out of bounds + {SeriesData: storage.SeriesData{SeriesList: pbv1.SeriesList{{ID: 2}}}, i: 0}, + }, + i: 0, + j: 1, + expectLess: false, + }, + { + name: "Index out of bounds for sortedValues", + segResults: []*segResult{ + { + SeriesData: storage.SeriesData{SeriesList: pbv1.SeriesList{{ID: 1}}}, + sortedValues: [][]byte{[]byte("test")}, + i: 5, // i is out of bounds for sortedValues + }, + { + SeriesData: storage.SeriesData{SeriesList: pbv1.SeriesList{{ID: 2}}}, + sortedValues: [][]byte{[]byte("test2")}, + i: 0, + }, + }, + i: 0, + j: 1, + expectLess: false, // Should fallback to SeriesID comparison + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + heap := segResultHeap{ + results: tt.segResults, + sortDesc: false, + } + + // This should not panic due to NPE prevention + result := heap.Less(tt.i, tt.j) + require.Equal(t, tt.expectLess, result) + }) + } +} + +func TestIndexSortResult_OrderBySortDesc(t *testing.T) { + tests := []struct { + name string + sortOrder modelv1.Sort + expectDesc bool + }{ + { + name: "SORT_ASC should be ascending", + sortOrder: modelv1.Sort_SORT_ASC, + expectDesc: false, + }, + { + name: "SORT_UNSPECIFIED should be ascending", + sortOrder: modelv1.Sort_SORT_UNSPECIFIED, + expectDesc: false, + }, + { + name: "SORT_DESC should be descending", + sortOrder: modelv1.Sort_SORT_DESC, + expectDesc: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create mock segments and measure data + mqo := model.MeasureQueryOptions{ + Order: &index.OrderBy{ + Sort: tt.sortOrder, + }, + } + + // Create a simple segResult + sr := &segResult{ + SeriesData: storage.SeriesData{ + SeriesList: pbv1.SeriesList{ + &pbv1.Series{ID: common.SeriesID(1)}, + &pbv1.Series{ID: common.SeriesID(2)}, + }, + Timestamps: []int64{1000, 2000}, + Versions: []int64{1, 2}, + }, + sortedValues: [][]byte{[]byte("test1"), []byte("test2")}, + i: 0, + } + + // Create index sort result + r := &indexSortResult{ + tfl: []tagFamilyLocation{}, + segResults: segResultHeap{ + results: []*segResult{sr}, + sortDesc: false, // This should be set by buildIndexQueryResult + }, + } + + // Simulate the logic from buildIndexQueryResult + if mqo.Order != nil && mqo.Order.Sort == modelv1.Sort_SORT_DESC { + r.segResults.sortDesc = true + } + + // Verify the sort order was set correctly + require.Equal(t, tt.expectDesc, r.segResults.sortDesc) + }) + } +} diff --git a/banyand/measure/syncer.go b/banyand/measure/syncer.go index f91841b3..791a97e0 100644 --- a/banyand/measure/syncer.go +++ b/banyand/measure/syncer.go @@ -20,6 +20,7 @@ package measure import ( "context" "fmt" + "strings" "time" "github.com/apache/skywalking-banyandb/api/data" @@ -230,7 +231,28 @@ func (tst *tsTable) syncSnapshot(curSnapshot *snapshot, syncCh chan *syncIntrodu // Create streaming reader for the part. files, release := createPartFileReaders(part) releaseFuncs = append(releaseFuncs, release) - + builder := strings.Builder{} + for i := range part.primaryBlockMetadata { + offset := part.primaryBlockMetadata[i].offset + size := part.primaryBlockMetadata[i].size + buf := make([]byte, size) + part.primary.Read(int64(offset), buf) + uncompressedBuf, err := zstd.Decompress(nil, buf) + if err != nil { + return fmt.Errorf("cannot decompress block metadata: %w", err) + } + blockMetadata, err := unmarshalBlockMetadata(nil, uncompressedBuf) + if err != nil { + return fmt.Errorf("cannot unmarshal block metadata: %w", err) + } + for _, block := range blockMetadata { + builder.WriteString(fmt.Sprintf("%v", block.seriesID)) + builder.WriteString(",") + } + } + timeStart := time.Unix(0, part.partMetadata.MinTimestamp) + timeEnd := time.Unix(0, part.partMetadata.MaxTimestamp) + fmt.Printf("snp %v primary block metadata: %v total count: %v time range: %v-%v group: %v shard: %v - %v\n", curSnapshot.epoch, builder.String(), part.partMetadata.TotalCount, timeStart, timeEnd, tst.group, tst.shardID, time.Now().Format(time.StampNano)) // Create streaming part sync data. streamingParts = append(streamingParts, queue.StreamingPartData{ ID: part.partMetadata.ID, diff --git a/banyand/measure/write_liaison.go b/banyand/measure/write_liaison.go index 7b287ff5..6bae9dc4 100644 --- a/banyand/measure/write_liaison.go +++ b/banyand/measure/write_liaison.go @@ -108,6 +108,9 @@ func (w *writeQueueCallback) Rev(ctx context.Context, message bus.Message) (resp for j := range g.tables { es := g.tables[j] if es.tsTable != nil && es.dataPoints != nil { + for i := range es.dataPoints.timestamps { + fmt.Printf("series id: %v timestamp: %v time range: %v\n", es.dataPoints.seriesIDs[i], es.dataPoints.timestamps[i], es.timeRange) + } es.tsTable.mustAddDataPoints(es.dataPoints) releaseDataPoints(es.dataPoints) } diff --git a/banyand/measure/write_standalone.go b/banyand/measure/write_standalone.go index dc7fd8b1..759af0e2 100644 --- a/banyand/measure/write_standalone.go +++ b/banyand/measure/write_standalone.go @@ -80,6 +80,9 @@ func processDataPoint(dpt *dataPointsInTable, req *measurev1.WriteRequest, write if err := series.Marshal(); err != nil { return 0, fmt.Errorf("cannot marshal series: %w", err) } + if req.Metadata.Name == "service_cpm_minute" { + fmt.Printf("entity values: %v time range: %v series id: %v\n", writeEvent.EntityValues, dpt.timeRange, series.ID) + } if stm.schema.IndexMode { fields := handleIndexMode(stm.schema, req, is.indexRuleLocators) diff --git a/test/cases/measure/data/data.go b/test/cases/measure/data/data.go index 77274d08..1e8bb2b2 100644 --- a/test/cases/measure/data/data.go +++ b/test/cases/measure/data/data.go @@ -23,6 +23,7 @@ import ( "embed" "encoding/json" "io" + "slices" "time" "github.com/google/go-cmp/cmp" @@ -75,6 +76,26 @@ func verifyWithContext(ctx context.Context, innerGm gm.Gomega, sharedContext hel innerGm.Expect(err).NotTo(gm.HaveOccurred()) want := &measurev1.QueryResponse{} helpers.UnmarshalYAML(ww, want) + if args.DisOrder { + slices.SortFunc(want.DataPoints, func(a, b *measurev1.DataPoint) int { + if a.Sid != b.Sid { + if a.Sid < b.Sid { + return -1 + } + return 1 + } + return a.Timestamp.AsTime().Compare(b.Timestamp.AsTime()) + }) + slices.SortFunc(resp.DataPoints, func(a, b *measurev1.DataPoint) int { + if a.Sid != b.Sid { + if a.Sid < b.Sid { + return -1 + } + return 1 + } + return a.Timestamp.AsTime().Compare(b.Timestamp.AsTime()) + }) + } for i := range resp.DataPoints { if resp.DataPoints[i].Timestamp != nil { innerGm.Expect(resp.DataPoints[i].Version).Should(gm.BeNumerically(">", 0)) diff --git a/test/cases/measure/data/input/linked_or.yaml b/test/cases/measure/data/input/linked_or.yaml index d2662a84..898a02a2 100644 --- a/test/cases/measure/data/input/linked_or.yaml +++ b/test/cases/measure/data/input/linked_or.yaml @@ -16,6 +16,7 @@ # under the License. name: "service_cpm_minute" +trace: true groups: ["sw_metric"] tagProjection: tagFamilies: diff --git a/test/cases/measure/data/want/entity_in.yaml b/test/cases/measure/data/want/entity_in.yaml index 73ba584e..125eb6aa 100644 --- a/test/cases/measure/data/want/entity_in.yaml +++ b/test/cases/measure/data/want/entity_in.yaml @@ -22,19 +22,19 @@ dataPoints: - key: name value: str: - value: service_name_1 + value: service_name_2 - key: short_name value: str: - value: service_short_name_1 + value: service_short_name_2 - tagFamilies: - name: default tags: - key: name value: str: - value: service_name_2 + value: service_name_1 - key: short_name value: str: - value: service_short_name_2 + value: service_short_name_1 diff --git a/test/cases/measure/data/want/tag_filter_int.yaml b/test/cases/measure/data/want/tag_filter_int.yaml index 5a726b9d..3a6ffc22 100644 --- a/test/cases/measure/data/want/tag_filter_int.yaml +++ b/test/cases/measure/data/want/tag_filter_int.yaml @@ -22,7 +22,7 @@ dataPoints: - key: name value: str: - value: service_name_1 + value: service_name_3 - key: layer value: int: @@ -33,7 +33,7 @@ dataPoints: - key: name value: str: - value: service_name_3 + value: service_name_1 - key: layer value: int: diff --git a/test/cases/measure/measure.go b/test/cases/measure/measure.go index aeacde4f..12a693ca 100644 --- a/test/cases/measure/measure.go +++ b/test/cases/measure/measure.go @@ -44,7 +44,7 @@ var _ = g.DescribeTable("Scanning Measures", verify, g.Entry("all only fields", helpers.Args{Input: "all_only_fields", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), g.Entry("the max limit", helpers.Args{Input: "all_max_limit", Want: "all", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), g.Entry("filter by tag", helpers.Args{Input: "tag_filter", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), - g.Entry("filter by a integer tag", helpers.Args{Input: "tag_filter_int", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), + g.Entry("filter by a integer tag", helpers.Args{Input: "tag_filter_int", Duration: 25 * time.Minute, Offset: -20 * time.Minute, DisOrder: true}), g.Entry("filter by an unknown tag", helpers.Args{Input: "tag_filter_unknown", Duration: 25 * time.Minute, Offset: -20 * time.Minute, WantEmpty: true}), g.Entry("group and max", helpers.Args{Input: "group_max", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), g.Entry("group and min", helpers.Args{Input: "group_min", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), @@ -62,24 +62,24 @@ var _ = g.DescribeTable("Scanning Measures", verify, g.Entry("limit 3,2", helpers.Args{Input: "limit", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), g.Entry("match a node", helpers.Args{Input: "match_node", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), g.Entry("match nodes", helpers.Args{Input: "match_nodes", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), - g.Entry("filter by entity id", helpers.Args{Input: "entity", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), - g.Entry("filter by several entity ids", helpers.Args{Input: "entity_in", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), - g.Entry("filter by entity id and service id", helpers.Args{Input: "entity_service", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), - g.Entry("without field", helpers.Args{Input: "no_field", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), + g.Entry("filter by entity id", helpers.Args{Input: "entity", Duration: 25 * time.Minute, Offset: -20 * time.Minute, DisOrder: true}), + g.Entry("filter by several entity ids", helpers.Args{Input: "entity_in", Duration: 25 * time.Minute, Offset: -20 * time.Minute, DisOrder: true}), + g.Entry("filter by entity id and service id", helpers.Args{Input: "entity_service", Duration: 25 * time.Minute, Offset: -20 * time.Minute, DisOrder: true}), + g.Entry("without field", helpers.Args{Input: "no_field", Duration: 25 * time.Minute, Offset: -20 * time.Minute, DisOrder: true}), g.Entry("invalid logical expression", helpers.Args{Input: "err_invalid_le", Duration: 25 * time.Minute, Offset: -20 * time.Minute, WantErr: true}), - g.Entry("linked or expressions", helpers.Args{Input: "linked_or", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), + g.FEntry("linked or expressions", helpers.Args{Input: "linked_or", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), g.Entry("In and not In expressions", helpers.Args{Input: "in", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), g.Entry("float64 value", helpers.Args{Input: "float", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), g.Entry("float64 aggregation:min", helpers.Args{Input: "float_agg_min", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), g.Entry("all_latency", helpers.Args{Input: "all_latency", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), g.Entry("duplicated in a part", helpers.Args{Input: "duplicated_part", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), g.Entry("match a tag belongs to the entity", helpers.Args{Input: "entity_match", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), - g.Entry("all of index mode", helpers.Args{Input: "index_mode_all", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), + g.Entry("all of index mode", helpers.Args{Input: "index_mode_all", Duration: 25 * time.Minute, Offset: -20 * time.Minute, DisOrder: true}), g.Entry("all of index mode in a larger time range", - helpers.Args{Input: "index_mode_all", Want: "index_mode_all_xl", Duration: 96 * time.Hour, Offset: -72 * time.Hour}), - g.Entry("all in all segments of index mode", helpers.Args{Input: "index_mode_all", Want: "index_mode_all_segs", Duration: 96 * time.Hour, Offset: -72 * time.Hour}), + helpers.Args{Input: "index_mode_all", Want: "index_mode_all_xl", Duration: 96 * time.Hour, Offset: -72 * time.Hour, DisOrder: true}), + g.Entry("all in all segments of index mode", helpers.Args{Input: "index_mode_all", Want: "index_mode_all_segs", Duration: 96 * time.Hour, Offset: -72 * time.Hour, DisOrder: true}), g.Entry("order by desc of index mode", helpers.Args{Input: "index_mode_order_desc", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), - g.Entry("range of index mode", helpers.Args{Input: "index_mode_range", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), + g.Entry("range of index mode", helpers.Args{Input: "index_mode_range", Duration: 25 * time.Minute, Offset: -20 * time.Minute, DisOrder: true}), g.Entry("none of index mode", helpers.Args{Input: "index_mode_none", WantEmpty: true, Duration: 25 * time.Minute, Offset: -20 * time.Minute}), g.Entry("query by id in index mode", helpers.Args{Input: "index_mode_by_id", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), g.Entry("multi groups: unchanged", helpers.Args{Input: "multi_group_unchanged", Duration: 35 * time.Minute, Offset: -20 * time.Minute}), diff --git a/test/integration/distributed/multi_segments/multi_segments_suite_test.go b/test/integration/distributed/multi_segments/multi_segments_suite_test.go new file mode 100644 index 00000000..bb1af63d --- /dev/null +++ b/test/integration/distributed/multi_segments/multi_segments_suite_test.go @@ -0,0 +1,154 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package integration_multi_segments_test + +import ( + "context" + "fmt" + "testing" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/onsi/gomega/gleak" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + "github.com/apache/skywalking-banyandb/banyand/metadata" + "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd" + "github.com/apache/skywalking-banyandb/banyand/metadata/schema" + "github.com/apache/skywalking-banyandb/pkg/grpchelper" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/pool" + "github.com/apache/skywalking-banyandb/pkg/test" + "github.com/apache/skywalking-banyandb/pkg/test/flags" + "github.com/apache/skywalking-banyandb/pkg/test/gmatcher" + "github.com/apache/skywalking-banyandb/pkg/test/helpers" + test_measure "github.com/apache/skywalking-banyandb/pkg/test/measure" + "github.com/apache/skywalking-banyandb/pkg/test/setup" + test_stream "github.com/apache/skywalking-banyandb/pkg/test/stream" + "github.com/apache/skywalking-banyandb/pkg/timestamp" + test_cases "github.com/apache/skywalking-banyandb/test/cases" + casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure" + casesstream "github.com/apache/skywalking-banyandb/test/cases/stream" + casestopn "github.com/apache/skywalking-banyandb/test/cases/topn" +) + +func TestDistributedMultiSegments(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Distributed Multi Segments Suite") +} + +var ( + connection *grpc.ClientConn + now time.Time + baseTime time.Time + deferFunc func() + goods []gleak.Goroutine +) + +var _ = SynchronizedBeforeSuite(func() []byte { + goods = gleak.Goroutines() + Expect(logger.Init(logger.Logging{ + Env: "dev", + Level: flags.LogLevel, + })).To(Succeed()) + + By("Starting etcd server") + ports, err := test.AllocateFreePorts(2) + Expect(err).NotTo(HaveOccurred()) + dir, spaceDef, err := test.NewSpace() + Expect(err).NotTo(HaveOccurred()) + ep := fmt.Sprintf("http://127.0.0.1:%d", ports[0]) + server, err := embeddedetcd.NewServer( + embeddedetcd.ConfigureListener([]string{ep}, []string{fmt.Sprintf("http://127.0.0.1:%d", ports[1])}), + embeddedetcd.RootDir(dir), + embeddedetcd.AutoCompactionMode("periodic"), + embeddedetcd.AutoCompactionRetention("1h"), + embeddedetcd.QuotaBackendBytes(2*1024*1024*1024), + ) + Expect(err).ShouldNot(HaveOccurred()) + <-server.ReadyNotify() + + By("Loading schema") + schemaRegistry, err := schema.NewEtcdSchemaRegistry( + schema.Namespace(metadata.DefaultNamespace), + schema.ConfigureServerEndpoints([]string{ep}), + ) + Expect(err).NotTo(HaveOccurred()) + defer schemaRegistry.Close() + ctx := context.Background() + test_stream.PreloadSchema(ctx, schemaRegistry) + test_measure.PreloadSchema(ctx, schemaRegistry) + + By("Starting data node 0") + closeDataNode0 := setup.DataNode(ep) + By("Starting data node 1") + closeDataNode1 := setup.DataNode(ep) + By("Starting liaison node") + liaisonAddr, closerLiaisonNode := setup.LiaisonNode(ep) + + By("Initializing test cases") + ns := timestamp.NowMilli().UnixNano() + now = time.Unix(0, ns-ns%int64(time.Minute)) + baseTime = time.Date(now.Year(), now.Month(), now.Day(), 00, 02, 0, 0, now.Location()) + test_cases.Initialize(liaisonAddr, baseTime) + + deferFunc = func() { + closerLiaisonNode() + closeDataNode0() + closeDataNode1() + _ = server.Close() + <-server.StopNotify() + spaceDef() + } + return []byte(liaisonAddr) +}, func(address []byte) { + var err error + connection, err = grpchelper.Conn(string(address), 10*time.Second, + grpc.WithTransportCredentials(insecure.NewCredentials())) + casesstream.SharedContext = helpers.SharedContext{ + Connection: connection, + BaseTime: baseTime, + } + casesmeasure.SharedContext = helpers.SharedContext{ + Connection: connection, + BaseTime: baseTime, + } + casestopn.SharedContext = helpers.SharedContext{ + Connection: connection, + BaseTime: baseTime, + } + Expect(err).NotTo(HaveOccurred()) +}) + +var _ = SynchronizedAfterSuite(func() { + if connection != nil { + Expect(connection.Close()).To(Succeed()) + } +}, func() {}) + +var _ = ReportAfterSuite("Distributed Multi Segments Suite", func(report Report) { + if report.SuiteSucceeded { + if deferFunc != nil { + deferFunc() + } + Eventually(gleak.Goroutines, flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods)) + Eventually(pool.AllRefsCount, flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef()) + } +}) diff --git a/test/integration/standalone/multi_segments/multi_segments_suite_test.go b/test/integration/standalone/multi_segments/multi_segments_suite_test.go new file mode 100644 index 00000000..b3ea9011 --- /dev/null +++ b/test/integration/standalone/multi_segments/multi_segments_suite_test.go @@ -0,0 +1,109 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package integration_query_test + +import ( + "testing" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/onsi/gomega/gleak" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + "github.com/apache/skywalking-banyandb/pkg/grpchelper" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/pool" + "github.com/apache/skywalking-banyandb/pkg/test/flags" + "github.com/apache/skywalking-banyandb/pkg/test/gmatcher" + "github.com/apache/skywalking-banyandb/pkg/test/helpers" + "github.com/apache/skywalking-banyandb/pkg/test/setup" + "github.com/apache/skywalking-banyandb/pkg/timestamp" + test_cases "github.com/apache/skywalking-banyandb/test/cases" + casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure" + casesstream "github.com/apache/skywalking-banyandb/test/cases/stream" + casestopn "github.com/apache/skywalking-banyandb/test/cases/topn" + casestrace "github.com/apache/skywalking-banyandb/test/cases/trace" + integration_standalone "github.com/apache/skywalking-banyandb/test/integration/standalone" +) + +func TestIntegrationMultiSegments(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Integration Multi Segments Suite", Label(integration_standalone.Labels...)) +} + +var ( + connection *grpc.ClientConn + now time.Time + baseTime time.Time + deferFunc func() + goods []gleak.Goroutine +) + +var _ = SynchronizedBeforeSuite(func() []byte { + goods = gleak.Goroutines() + Expect(logger.Init(logger.Logging{ + Env: "dev", + Level: flags.LogLevel, + })).To(Succeed()) + var addr string + addr, _, deferFunc = setup.Standalone() + ns := timestamp.NowMilli().UnixNano() + now = time.Unix(0, ns-ns%int64(time.Minute)) + baseTime = time.Date(now.Year(), now.Month(), now.Day(), 00, 02, 0, 0, now.Location()) + test_cases.Initialize(addr, baseTime) + return []byte(addr) +}, func(address []byte) { + var err error + connection, err = grpchelper.Conn(string(address), 10*time.Second, + grpc.WithTransportCredentials(insecure.NewCredentials())) + casesstream.SharedContext = helpers.SharedContext{ + Connection: connection, + BaseTime: baseTime, + } + casesmeasure.SharedContext = helpers.SharedContext{ + Connection: connection, + BaseTime: baseTime, + } + casestopn.SharedContext = helpers.SharedContext{ + Connection: connection, + BaseTime: baseTime, + } + casestrace.SharedContext = helpers.SharedContext{ + Connection: connection, + BaseTime: baseTime, + } + Expect(err).NotTo(HaveOccurred()) +}) + +var _ = SynchronizedAfterSuite(func() { + if connection != nil { + Expect(connection.Close()).To(Succeed()) + } +}, func() {}) + +var _ = ReportAfterSuite("Integration Query Suite", func(report Report) { + if report.SuiteSucceeded { + if deferFunc != nil { + deferFunc() + } + Eventually(gleak.Goroutines, flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods)) + Eventually(pool.AllRefsCount, flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef()) + } +})
