This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 51399612 Fix the sorting timestamps issue of the measure model (#781)
51399612 is described below

commit 513996126898e78b8207ccd8f714e8d6306c0c47
Author: Gao Hongtao <[email protected]>
AuthorDate: Tue Sep 23 10:42:15 2025 +0800

    Fix the sorting timestamps issue of the measure model (#781)
---
 CHANGES.md                                         |   1 +
 banyand/internal/sidx/interfaces.go                |   4 +-
 banyand/internal/sidx/metadata.go                  |   3 +-
 banyand/internal/sidx/multi_sidx_query_test.go     |   4 +-
 banyand/internal/sidx/sidx.go                      |   3 +-
 banyand/internal/sidx/sidx_test.go                 |  26 +-
 banyand/internal/sidx/snapshot_test.go             |  10 +-
 banyand/internal/sidx/sync.go                      |   6 +-
 banyand/measure/flusher.go                         |  85 ++++--
 banyand/measure/metadata.go                        |   9 +-
 banyand/measure/part.go                            |   1 +
 banyand/measure/query.go                           | 217 ++++++++++---
 banyand/measure/query_test.go                      | 340 +++++++++++++++++++++
 banyand/measure/syncer.go                          |   1 -
 banyand/measure/tstable.go                         |   8 +
 banyand/measure/write_liaison.go                   |   2 +-
 banyand/stream/flusher.go                          |  85 ++++--
 banyand/stream/part.go                             |   2 +
 banyand/stream/tstable.go                          |   8 +
 banyand/stream/write_liaison.go                    |   2 +-
 banyand/trace/flusher.go                           | 138 ++++++---
 banyand/trace/part.go                              |   2 +
 banyand/trace/syncer.go                            |  14 +-
 banyand/trace/tstable.go                           |   8 +
 banyand/trace/write_liaison.go                     |   4 +-
 banyand/trace/write_standalone.go                  |   2 +-
 test/cases/measure/data/data.go                    |  21 ++
 test/cases/measure/data/want/entity_in.yaml        |   8 +-
 test/cases/measure/data/want/tag_filter_int.yaml   |   4 +-
 test/cases/measure/measure.go                      |  19 +-
 .../multi_segments/multi_segments_suite_test.go    | 161 ++++++++++
 .../multi_segments/multi_segments_suite_test.go    | 109 +++++++
 32 files changed, 1124 insertions(+), 183 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index ab84e0d0..79e28c3a 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -59,6 +59,7 @@ Release Notes.
 - Fix returning empty result when using IN operatior on the array type tags.
 - Fix memory leaks and OOM issues in streaming processing by implementing 
deduplication logic in priority queues and improving sliding window memory 
management.
 - Fix etcd prefix matching any key that starts with this prefix.
+- Fix the sorting timestamps issue of the measure model when there are more 
than one segment.
 
 ### Document
 
diff --git a/banyand/internal/sidx/interfaces.go 
b/banyand/internal/sidx/interfaces.go
index bad91fb1..94cf6d29 100644
--- a/banyand/internal/sidx/interfaces.go
+++ b/banyand/internal/sidx/interfaces.go
@@ -40,7 +40,7 @@ type SIDX interface {
        MustAddMemPart(ctx context.Context, mp *memPart)
        // Write performs batch write operations. All writes must be submitted 
as batches.
        // Elements within each batch should be pre-sorted by the caller for 
optimal performance.
-       Write(ctx context.Context, reqs []WriteRequest) error
+       Write(ctx context.Context, reqs []WriteRequest, segmentID int64) error
        // Query executes a query with key range and tag filtering.
        // Returns a QueryResponse directly with all results loaded.
        // Both setup/validation errors and execution errors are returned via 
the error return value.
@@ -56,7 +56,7 @@ type SIDX interface {
        // PartsToSync returns the parts to sync.
        PartsToSync() []*part
        // StreamingParts returns the streaming parts.
-       StreamingParts(partsToSync []*part, group string, shardID uint32, name 
string, minTimestamps []int64) ([]queue.StreamingPartData, []func())
+       StreamingParts(partsToSync []*part, group string, shardID uint32, name 
string) ([]queue.StreamingPartData, []func())
        // SyncCh returns the sync channel for external synchronization.
        SyncCh() chan<- *SyncIntroduction
 }
diff --git a/banyand/internal/sidx/metadata.go 
b/banyand/internal/sidx/metadata.go
index deb8eb06..b60197ae 100644
--- a/banyand/internal/sidx/metadata.go
+++ b/banyand/internal/sidx/metadata.go
@@ -47,7 +47,8 @@ type partMetadata struct {
        MaxKey int64 `json:"maxKey"` // Maximum user key in part
 
        // Identity
-       ID uint64 `json:"id"` // Unique part identifier
+       ID        uint64 `json:"id"`        // Unique part identifier
+       SegmentID int64  `json:"segmentID"` // Segment identifier
 }
 
 func validatePartMetadata(fileSystem fs.FileSystem, partPath string) error {
diff --git a/banyand/internal/sidx/multi_sidx_query_test.go 
b/banyand/internal/sidx/multi_sidx_query_test.go
index 87d57c28..b15553b7 100644
--- a/banyand/internal/sidx/multi_sidx_query_test.go
+++ b/banyand/internal/sidx/multi_sidx_query_test.go
@@ -40,7 +40,7 @@ type mockSIDX struct {
 
 func (m *mockSIDX) MustAddMemPart(_ context.Context, _ *memPart) {}
 
-func (m *mockSIDX) Write(_ context.Context, _ []WriteRequest) error {
+func (m *mockSIDX) Write(_ context.Context, _ []WriteRequest, _ int64) error {
        return nil // Not implemented for tests
 }
 
@@ -71,7 +71,7 @@ func (m *mockSIDX) PartsToSync() []*part {
        return nil
 }
 
-func (m *mockSIDX) StreamingParts(_ []*part, _ string, _ uint32, _ string, _ 
[]int64) ([]queue.StreamingPartData, []func()) {
+func (m *mockSIDX) StreamingParts(_ []*part, _ string, _ uint32, _ string) 
([]queue.StreamingPartData, []func()) {
        return nil, nil
 }
 
diff --git a/banyand/internal/sidx/sidx.go b/banyand/internal/sidx/sidx.go
index 84ba4280..b2aa29df 100644
--- a/banyand/internal/sidx/sidx.go
+++ b/banyand/internal/sidx/sidx.go
@@ -130,7 +130,7 @@ func (s *sidx) MustAddMemPart(ctx context.Context, mp 
*memPart) {
 }
 
 // Write implements SIDX interface.
-func (s *sidx) Write(ctx context.Context, reqs []WriteRequest) error {
+func (s *sidx) Write(ctx context.Context, reqs []WriteRequest, segmentID 
int64) error {
        // Validate requests
        for _, req := range reqs {
                if err := req.Validate(); err != nil {
@@ -160,6 +160,7 @@ func (s *sidx) Write(ctx context.Context, reqs 
[]WriteRequest) error {
        intro := generateIntroduction()
        intro.memPart = mp
        intro.memPart.partMetadata.ID = partID
+       intro.memPart.partMetadata.SegmentID = segmentID
        intro.applied = make(chan struct{})
 
        // Send to introducer loop
diff --git a/banyand/internal/sidx/sidx_test.go 
b/banyand/internal/sidx/sidx_test.go
index 47d82f6e..95aef2b0 100644
--- a/banyand/internal/sidx/sidx_test.go
+++ b/banyand/internal/sidx/sidx_test.go
@@ -100,7 +100,7 @@ func TestSIDX_Write_SingleRequest(t *testing.T) {
                createTestWriteRequest(1, 100, "data1", createTestTag("tag1", 
"value1")),
        }
 
-       err := sidx.Write(ctx, reqs)
+       err := sidx.Write(ctx, reqs, 1)
        assert.NoError(t, err)
 
        // Verify stats
@@ -124,7 +124,7 @@ func TestSIDX_Write_BatchRequest(t *testing.T) {
                createTestWriteRequest(2, 200, "data3", createTestTag("tag2", 
"value3")),
        }
 
-       err := sidx.Write(ctx, reqs)
+       err := sidx.Write(ctx, reqs, 1)
        assert.NoError(t, err)
 
        // Verify stats
@@ -170,7 +170,7 @@ func TestSIDX_Write_Validation(t *testing.T) {
 
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
-                       err := sidx.Write(ctx, []WriteRequest{tt.req})
+                       err := sidx.Write(ctx, []WriteRequest{tt.req}, 1)
                        if tt.expectErr {
                                assert.Error(t, err)
                        } else {
@@ -199,7 +199,7 @@ func TestSIDX_Write_WithTags(t *testing.T) {
                createTestWriteRequest(1, 100, "trace-data", tags...),
        }
 
-       err := sidx.Write(ctx, reqs)
+       err := sidx.Write(ctx, reqs, 1)
        assert.NoError(t, err)
 }
 
@@ -218,7 +218,7 @@ func TestSIDX_Query_BasicQuery(t *testing.T) {
                createTestWriteRequest(1, 100, "data1"),
                createTestWriteRequest(1, 101, "data2"),
        }
-       err := sidx.Write(ctx, reqs)
+       err := sidx.Write(ctx, reqs, 1)
        require.NoError(t, err)
 
        // Wait for introducer loop to process
@@ -268,7 +268,7 @@ func TestSIDX_Query_KeyRangeFilter(t *testing.T) {
                createTestWriteRequest(1, 150, "data150"),
                createTestWriteRequest(1, 200, "data200"),
        }
-       err := sidx.Write(ctx, reqs)
+       err := sidx.Write(ctx, reqs, 1)
        require.NoError(t, err)
 
        // Wait for introducer loop to process
@@ -322,7 +322,7 @@ func TestSIDX_Query_Ordering(t *testing.T) {
                createTestWriteRequest(3, 75, "series3-data75"),
                createTestWriteRequest(3, 175, "series3-data175"),
        }
-       err := sidx.Write(ctx, reqs)
+       err := sidx.Write(ctx, reqs, 1)
        require.NoError(t, err)
 
        // Wait for introducer loop to process
@@ -479,7 +479,7 @@ func TestSIDX_WriteQueryIntegration(t *testing.T) {
                createTestWriteRequest(2, 180, "series2-data2", 
createTestTag("env", "dev")),
        }
 
-       err := sidx.Write(ctx, reqs)
+       err := sidx.Write(ctx, reqs, 1)
        require.NoError(t, err)
 
        // Test 1: Query single series
@@ -540,7 +540,7 @@ func TestSIDX_DataConsistency(t *testing.T) {
                reqs = append(reqs, createTestWriteRequest(1, key, data))
        }
 
-       err := sidx.Write(ctx, reqs)
+       err := sidx.Write(ctx, reqs, 1)
        require.NoError(t, err)
 
        // Query back and verify data integrity
@@ -587,7 +587,7 @@ func TestSIDX_LargeDataset(t *testing.T) {
                ))
        }
 
-       err := sidx.Write(ctx, reqs)
+       err := sidx.Write(ctx, reqs, 1)
        require.NoError(t, err)
 
        // Query back and verify we can handle large result sets
@@ -634,7 +634,7 @@ func TestSIDX_ConcurrentWrites(t *testing.T) {
                                reqs = append(reqs, 
createTestWriteRequest(seriesID, key, data))
                        }
 
-                       if err := sidx.Write(ctx, reqs); err != nil {
+                       if err := sidx.Write(ctx, reqs, 1); err != nil {
                                errors <- err
                        }
                }(g)
@@ -666,7 +666,7 @@ func TestSIDX_ConcurrentReadsWrites(t *testing.T) {
        initialReqs := []WriteRequest{
                createTestWriteRequest(1, 100, "initial-data"),
        }
-       err := sidx.Write(ctx, initialReqs)
+       err := sidx.Write(ctx, initialReqs, 1)
        require.NoError(t, err)
 
        var wg sync.WaitGroup
@@ -708,7 +708,7 @@ func TestSIDX_ConcurrentReadsWrites(t *testing.T) {
                                        int64(writeCount),
                                        fmt.Sprintf("writer-%d-data-%d", 
writerID, writeCount),
                                )
-                               sidx.Write(ctx, []WriteRequest{req}) // Ignore 
errors during concurrent stress
+                               sidx.Write(ctx, []WriteRequest{req}, 1) // 
Ignore errors during concurrent stress
                                writeCount++
                        }
                }(i)
diff --git a/banyand/internal/sidx/snapshot_test.go 
b/banyand/internal/sidx/snapshot_test.go
index 3322a459..fd4d5abe 100644
--- a/banyand/internal/sidx/snapshot_test.go
+++ b/banyand/internal/sidx/snapshot_test.go
@@ -413,7 +413,7 @@ func TestSnapshotReplacement_Basic(t *testing.T) {
                        Tags:     []Tag{{Name: "test", Value: 
[]byte("snapshot-replacement")}},
                }
 
-               if err := sidx.Write(ctx, []WriteRequest{req}); err != nil {
+               if err := sidx.Write(ctx, []WriteRequest{req}, 1); err != nil {
                        t.Errorf("write %d failed: %v", i, err)
                }
 
@@ -504,7 +504,7 @@ func 
TestSnapshotReplacement_ConcurrentReadsConsistentData(t *testing.T) {
                        },
                }
 
-               if err := sidx.Write(ctx, reqs); err != nil {
+               if err := sidx.Write(ctx, reqs, 1); err != nil {
                        t.Errorf("write %d failed: %v", i, err)
                }
 
@@ -593,7 +593,7 @@ func TestSnapshotReplacement_NoDataRacesDuringReplacement(t 
*testing.T) {
                                                        },
                                                },
                                        }
-                                       sidx.Write(ctx, reqs)
+                                       sidx.Write(ctx, reqs, 1)
                                case 1:
                                        // Stats operation - accesses current 
snapshot
                                        sidx.Stats(ctx)
@@ -653,7 +653,7 @@ func TestSnapshotReplacement_MemoryLeaksPrevention(t 
*testing.T) {
                                },
                        }
 
-                       if writeErr := sidx.Write(ctx, reqs); writeErr != nil {
+                       if writeErr := sidx.Write(ctx, reqs, 1); writeErr != 
nil {
                                t.Errorf("batch %d write %d failed: %v", i, j, 
writeErr)
                        }
                }
@@ -714,7 +714,7 @@ func TestSnapshotReplacement_MemoryLeaksPrevention(t 
*testing.T) {
                                        },
                                }
 
-                               if writeErr := sidx.Write(ctx, reqs); writeErr 
!= nil {
+                               if writeErr := sidx.Write(ctx, reqs, 1); 
writeErr != nil {
                                        t.Errorf("concurrent writer %d write %d 
failed: %v", writerID, j, writeErr)
                                }
 
diff --git a/banyand/internal/sidx/sync.go b/banyand/internal/sidx/sync.go
index 7d4f1c65..0a671ebb 100644
--- a/banyand/internal/sidx/sync.go
+++ b/banyand/internal/sidx/sync.go
@@ -62,10 +62,10 @@ func (s *sidx) PartsToSync() []*part {
 }
 
 // StreamingParts returns the streaming parts.
-func (s *sidx) StreamingParts(partsToSync []*part, group string, shardID 
uint32, name string, minTimestamps []int64) ([]queue.StreamingPartData, 
[]func()) {
+func (s *sidx) StreamingParts(partsToSync []*part, group string, shardID 
uint32, name string) ([]queue.StreamingPartData, []func()) {
        var streamingParts []queue.StreamingPartData
        var releaseFuncs []func()
-       for i, part := range partsToSync {
+       for _, part := range partsToSync {
                // Create streaming reader for the part
                files, release := createPartFileReaders(part)
                releaseFuncs = append(releaseFuncs, release)
@@ -80,7 +80,7 @@ func (s *sidx) StreamingParts(partsToSync []*part, group 
string, shardID uint32,
                        UncompressedSizeBytes: 
part.partMetadata.UncompressedSizeBytes,
                        TotalCount:            part.partMetadata.TotalCount,
                        BlocksCount:           part.partMetadata.BlocksCount,
-                       MinTimestamp:          minTimestamps[i],
+                       MinTimestamp:          part.partMetadata.SegmentID,
                        MinKey:                part.partMetadata.MinKey,
                        MaxKey:                part.partMetadata.MaxKey,
                        PartType:              name,
diff --git a/banyand/measure/flusher.go b/banyand/measure/flusher.go
index 7c2b0bba..c0390f50 100644
--- a/banyand/measure/flusher.go
+++ b/banyand/measure/flusher.go
@@ -99,33 +99,80 @@ func (tst *tsTable) pauseFlusherToPileupMemParts(epoch 
uint64, flushWatcher watc
 }
 
 func (tst *tsTable) mergeMemParts(snp *snapshot, mergeCh chan 
*mergerIntroduction) (bool, error) {
+       var merged bool
+       var currentSegmentID int64
        var memParts []*partWrapper
        mergedIDs := make(map[uint64]struct{})
+
+       // Helper function to merge current segment's parts
+       mergeCurrentSegment := func() (bool, error) {
+               if len(memParts) < 2 {
+                       return false, nil
+               }
+
+               // Create a copy of mergedIDs for this merge operation
+               currentMergedIDs := make(map[uint64]struct{}, len(mergedIDs))
+               for id := range mergedIDs {
+                       currentMergedIDs[id] = struct{}{}
+               }
+
+               // merge memory must not be closed by the tsTable.close
+               closeCh := make(chan struct{})
+               newPart, err := 
tst.mergePartsThenSendIntroduction(snapshotCreatorMergedFlusher, memParts,
+                       currentMergedIDs, mergeCh, closeCh, "mem")
+               close(closeCh)
+               if err != nil {
+                       if errors.Is(err, errClosed) {
+                               return true, nil
+                       }
+                       return false, err
+               }
+               return newPart != nil, nil
+       }
+
+       // Process parts grouped by segmentID
        for i := range snp.parts {
-               if snp.parts[i].mp != nil {
-                       memParts = append(memParts, snp.parts[i])
-                       mergedIDs[snp.parts[i].ID()] = struct{}{}
+               if snp.parts[i].mp == nil {
                        continue
                }
-       }
-       if len(memParts) < 2 {
-               return false, nil
-       }
-       // merge memory must not be closed by the tsTable.close
-       closeCh := make(chan struct{})
-       newPart, err := 
tst.mergePartsThenSendIntroduction(snapshotCreatorMergedFlusher, memParts,
-               mergedIDs, mergeCh, closeCh, "mem")
-       close(closeCh)
-       if err != nil {
-               if errors.Is(err, errClosed) {
-                       return true, nil
+
+               segID := snp.parts[i].mp.segmentID
+
+               // If this is a new segment, merge the previous segment first
+               if currentSegmentID != 0 && currentSegmentID != segID {
+                       m, err := mergeCurrentSegment()
+                       if err != nil {
+                               return false, err
+                       }
+                       if m {
+                               merged = true
+                       }
+
+                       // Reset for next segment
+                       memParts = memParts[:0]
+                       for id := range mergedIDs {
+                               delete(mergedIDs, id)
+                       }
                }
-               return false, err
+
+               // Add part to current segment
+               currentSegmentID = segID
+               memParts = append(memParts, snp.parts[i])
+               mergedIDs[snp.parts[i].ID()] = struct{}{}
        }
-       if newPart == nil {
-               return false, nil
+
+       // Merge the last segment if it has parts
+       if len(memParts) >= 2 {
+               m, err := mergeCurrentSegment()
+               if err != nil {
+                       return false, err
+               }
+               if m {
+                       merged = true
+               }
        }
-       return true, nil
+
+       return merged, nil
 }
 
 func (tst *tsTable) flush(snapshot *snapshot, flushCh chan 
*flusherIntroduction) {
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index f9c64eb2..53475c5a 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -96,7 +96,7 @@ func newSchemaRepo(path string, svc *standalone, nodeLabels 
map[string]string, n
        sr.Repository = resourceSchema.NewRepository(
                svc.metadata,
                svc.l,
-               newSupplier(path, svc, sr, nodeLabels, nodeID),
+               newSupplier(path, svc, sr, nodeLabels),
                resourceSchema.NewMetrics(svc.omr.With(metadataScope)),
        )
        sr.start()
@@ -372,8 +372,9 @@ func (sr *schemaRepo) 
stopAllProcessorsWithGroupPrefix(groupName string) {
                        manager := v.(*topNProcessorManager)
                        if err := manager.Close(); err != nil {
                                sr.l.Error().Err(err).Str("key", 
key).Msg("failed to close topN processor manager")
+                       } else {
+                               sr.topNProcessorMap.Delete(key)
                        }
-                       sr.topNProcessorMap.Delete(key)
                }
        }
 
@@ -392,12 +393,11 @@ type supplier struct {
        l          *logger.Logger
        schemaRepo *schemaRepo
        nodeLabels map[string]string
-       nodeID     string
        path       string
        option     option
 }
 
-func newSupplier(path string, svc *standalone, sr *schemaRepo, nodeLabels 
map[string]string, nodeID string) *supplier {
+func newSupplier(path string, svc *standalone, sr *schemaRepo, nodeLabels 
map[string]string) *supplier {
        if svc.pm == nil {
                svc.l.Panic().Msg("CRITICAL: svc.pm is nil in newSupplier")
        }
@@ -418,7 +418,6 @@ func newSupplier(path string, svc *standalone, sr 
*schemaRepo, nodeLabels map[st
                pm:         svc.pm,
                schemaRepo: sr,
                nodeLabels: nodeLabels,
-               nodeID:     nodeID,
        }
 }
 
diff --git a/banyand/measure/part.go b/banyand/measure/part.go
index d7d639ea..e47bfbc6 100644
--- a/banyand/measure/part.go
+++ b/banyand/measure/part.go
@@ -109,6 +109,7 @@ type memPart struct {
        timestamps        bytes.Buffer
        fieldValues       bytes.Buffer
        partMetadata      partMetadata
+       segmentID         int64
 }
 
 func (mp *memPart) mustCreateMemTagFamilyWriters(name string) (fs.Writer, 
fs.Writer) {
diff --git a/banyand/measure/query.go b/banyand/measure/query.go
index 066f732a..8e0fceeb 100644
--- a/banyand/measure/query.go
+++ b/banyand/measure/query.go
@@ -226,9 +226,14 @@ func (m *measure) searchSeriesList(ctx context.Context, 
series []*pbv1.Series, m
                        newTagProjection = append(newTagProjection, 
tagProjection)
                }
        }
+
+       // Collect all search results first
+       var segResults []*segResult
+       var needsSorting bool
        seriesFilter := roaring.NewPostingList()
+
        for i := range segments {
-               sd, _, err := segments[i].IndexDB().Search(ctx, series, 
storage.IndexSearchOpts{
+               sd, sortedValues, err := segments[i].IndexDB().Search(ctx, 
series, storage.IndexSearchOpts{
                        Query:       mqo.Query,
                        Order:       mqo.Order,
                        PreloadSize: preloadSize,
@@ -241,44 +246,135 @@ func (m *measure) searchSeriesList(ctx context.Context, 
series []*pbv1.Series, m
                        tt, cc := segments[i].Tables()
                        tables = append(tables, tt...)
                        caches = append(caches, cc...)
+
+                       // Create segResult for this segment
+                       sr := &segResult{
+                               SeriesData:   sd,
+                               sortedValues: sortedValues,
+                               i:            0,
+                       }
+                       segResults = append(segResults, sr)
+
+                       // Check if we need sorting
+                       if mqo.Order != nil && sortedValues != nil {
+                               needsSorting = true
+                       }
                }
-               for j := range sd.SeriesList {
-                       if seriesFilter.Contains(uint64(sd.SeriesList[j].ID)) {
+       }
+
+       // Sort if needed, otherwise use original order
+       if needsSorting && len(segResults) > 0 {
+               // Use segResultHeap to sort
+               segHeap := &segResultHeap{
+                       results:  segResults,
+                       sortDesc: mqo.Order.Sort == modelv1.Sort_SORT_DESC,
+               }
+               heap.Init(segHeap)
+
+               // Extract sorted series IDs
+               for segHeap.Len() > 0 {
+                       top := heap.Pop(segHeap).(*segResult)
+                       series := top.SeriesList[top.i]
+
+                       if seriesFilter.Contains(uint64(series.ID)) {
+                               // Move to next in this segment
+                               top.i++
+                               if top.i < len(top.SeriesList) {
+                                       heap.Push(segHeap, top)
+                               }
                                continue
                        }
-                       seriesFilter.Insert(uint64(sd.SeriesList[j].ID))
-                       sl = append(sl, sd.SeriesList[j].ID)
-                       if projectedEntityOffsets == nil && sd.Fields == nil {
-                               continue
+
+                       seriesFilter.Insert(uint64(series.ID))
+                       sl = append(sl, series.ID)
+
+                       // Build storedIndexValue for this series
+                       var fieldResult map[string][]byte
+                       if top.Fields != nil && top.i < len(top.Fields) {
+                               fieldResult = top.Fields[top.i]
                        }
-                       if storedIndexValue == nil {
-                               storedIndexValue = 
make(map[common.SeriesID]map[string]*modelv1.TagValue)
+                       storedIndexValue = m.buildStoredIndexValue(
+                               series.ID,
+                               series.EntityValues,
+                               fieldResult,
+                               projectedEntityOffsets,
+                               fieldToValueType,
+                               storedIndexValue,
+                       )
+
+                       // Move to next in this segment
+                       top.i++
+                       if top.i < len(top.SeriesList) {
+                               heap.Push(segHeap, top)
                        }
-                       tagValues := make(map[string]*modelv1.TagValue)
-                       storedIndexValue[sd.SeriesList[j].ID] = tagValues
-                       for name, offset := range projectedEntityOffsets {
-                               if offset < 0 || offset >= 
len(sd.SeriesList[j].EntityValues) {
-                                       logger.Warningf("offset %d for tag %s 
is out of range for series ID %v", offset, name, sd.SeriesList[j].ID)
-                                       tagValues[name] = pbv1.NullTagValue
+               }
+       } else {
+               // Original logic when no sorting is needed
+               for _, sr := range segResults {
+                       for j := range sr.SeriesList {
+                               if 
seriesFilter.Contains(uint64(sr.SeriesList[j].ID)) {
                                        continue
                                }
-                               tagValues[name] = 
sd.SeriesList[j].EntityValues[offset]
-                       }
-                       if sd.Fields == nil {
-                               continue
-                       }
-                       for f, v := range sd.Fields[j] {
-                               if tnt, ok := fieldToValueType[f]; ok {
-                                       tagValues[tnt.fieldName] = 
mustDecodeTagValue(tnt.typ, v)
-                               } else {
-                                       logger.Panicf("unknown field %s not 
found in fieldToValueType", f)
+                               seriesFilter.Insert(uint64(sr.SeriesList[j].ID))
+                               sl = append(sl, sr.SeriesList[j].ID)
+
+                               var fieldResult map[string][]byte
+                               if sr.Fields != nil && j < len(sr.Fields) {
+                                       fieldResult = sr.Fields[j]
                                }
+                               storedIndexValue = m.buildStoredIndexValue(
+                                       sr.SeriesList[j].ID,
+                                       sr.SeriesList[j].EntityValues,
+                                       fieldResult,
+                                       projectedEntityOffsets,
+                                       fieldToValueType,
+                                       storedIndexValue,
+                               )
                        }
                }
        }
+
        return sl, tables, caches, storedIndexValue, newTagProjection, nil
 }
 
+func (m *measure) buildStoredIndexValue(
+       seriesID common.SeriesID,
+       entityValues []*modelv1.TagValue,
+       fieldResult map[string][]byte,
+       projectedEntityOffsets map[string]int,
+       fieldToValueType map[string]tagNameWithType,
+       storedIndexValue map[common.SeriesID]map[string]*modelv1.TagValue,
+) map[common.SeriesID]map[string]*modelv1.TagValue {
+       if projectedEntityOffsets == nil && fieldResult == nil {
+               return storedIndexValue
+       }
+
+       if storedIndexValue == nil {
+               storedIndexValue = 
make(map[common.SeriesID]map[string]*modelv1.TagValue)
+       }
+       tagValues := make(map[string]*modelv1.TagValue)
+       storedIndexValue[seriesID] = tagValues
+
+       for name, offset := range projectedEntityOffsets {
+               if offset < 0 || offset >= len(entityValues) {
+                       logger.Warningf("offset %d for tag %s is out of range 
for series ID %v", offset, name, seriesID)
+                       tagValues[name] = pbv1.NullTagValue
+                       continue
+               }
+               tagValues[name] = entityValues[offset]
+       }
+
+       for f, v := range fieldResult {
+               if tnt, ok := fieldToValueType[f]; ok {
+                       tagValues[tnt.fieldName] = mustDecodeTagValue(tnt.typ, 
v)
+               } else {
+                       logger.Panicf("unknown field %s not found in 
fieldToValueType", f)
+               }
+       }
+
+       return storedIndexValue
+}
+
 func (m *measure) buildIndexQueryResult(ctx context.Context, mqo 
model.MeasureQueryOptions,
        segments []storage.Segment[*tsTable, option],
 ) (model.MeasureQueryResult, error) {
@@ -351,11 +447,17 @@ func (m *measure) buildIndexQueryResult(ctx 
context.Context, mqo model.MeasureQu
                if len(sr.SeriesList) < 1 {
                        continue
                }
-               r.segResults = append(r.segResults, sr)
+               r.segResults.results = append(r.segResults.results, sr)
        }
-       if len(r.segResults) < 1 {
+       if len(r.segResults.results) < 1 {
                return nilResult, nil
        }
+
+       // Set sort order based on mqo.Order.Sort
+       if mqo.Order != nil && mqo.Order.Sort == modelv1.Sort_SORT_DESC {
+               r.segResults.sortDesc = true
+       }
+
        heap.Init(&r.segResults)
        return r, nil
 }
@@ -770,18 +872,18 @@ type indexSortResult struct {
 
 // Pull implements model.MeasureQueryResult.
 func (iqr *indexSortResult) Pull() *model.MeasureResult {
-       if len(iqr.segResults) < 1 {
+       if len(iqr.segResults.results) < 1 {
                return nil
        }
-       if len(iqr.segResults) == 1 {
-               if iqr.segResults[0].i >= len(iqr.segResults[0].SeriesList) {
+       if len(iqr.segResults.results) == 1 {
+               if iqr.segResults.results[0].i >= 
len(iqr.segResults.results[0].SeriesList) {
                        return nil
                }
-               sr := iqr.segResults[0]
+               sr := iqr.segResults.results[0]
                r := iqr.copyTo(sr)
                sr.i++
                if sr.i >= len(sr.SeriesList) {
-                       iqr.segResults = iqr.segResults[:0]
+                       iqr.segResults.results = iqr.segResults.results[:0]
                }
                return r
        }
@@ -861,25 +963,56 @@ func (sr *segResult) remove(i int) {
        }
 }
 
-type segResultHeap []*segResult
+type segResultHeap struct {
+       results  []*segResult
+       sortDesc bool
+}
+
+func (h *segResultHeap) Len() int { return len(h.results) }
+func (h *segResultHeap) Less(i, j int) bool {
+       // Handle NPE - check for nil results or invalid indices
+       if i >= len(h.results) || j >= len(h.results) {
+               return false
+       }
+       if h.results[i] == nil || h.results[j] == nil {
+               return false
+       }
+       if h.results[i].i >= len(h.results[i].SeriesList) || h.results[j].i >= 
len(h.results[j].SeriesList) {
+               return false
+       }
+
+       // If no sortedValues, compare by SeriesID
+       if h.results[i].sortedValues == nil || h.results[j].sortedValues == nil 
{
+               if h.sortDesc {
+                       return h.results[i].SeriesList[h.results[i].i].ID > 
h.results[j].SeriesList[h.results[j].i].ID
+               }
+               return h.results[i].SeriesList[h.results[i].i].ID < 
h.results[j].SeriesList[h.results[j].i].ID
+       }
+
+       // Handle potential index out of bounds for sortedValues
+       if h.results[i].i >= len(h.results[i].sortedValues) || h.results[j].i 
>= len(h.results[j].sortedValues) {
+               if h.sortDesc {
+                       return h.results[i].SeriesList[h.results[i].i].ID > 
h.results[j].SeriesList[h.results[j].i].ID
+               }
+               return h.results[i].SeriesList[h.results[i].i].ID < 
h.results[j].SeriesList[h.results[j].i].ID
+       }
 
-func (h segResultHeap) Len() int { return len(h) }
-func (h segResultHeap) Less(i, j int) bool {
-       if h[i].sortedValues == nil {
-               return h[i].SeriesList[h[i].i].ID < h[j].SeriesList[h[j].i].ID
+       cmp := bytes.Compare(h.results[i].sortedValues[h.results[i].i], 
h.results[j].sortedValues[h.results[j].i])
+       if h.sortDesc {
+               return cmp > 0
        }
-       return bytes.Compare(h[i].sortedValues[h[i].i], 
h[j].sortedValues[h[j].i]) < 0
+       return cmp < 0
 }
-func (h segResultHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
+func (h *segResultHeap) Swap(i, j int) { h.results[i], h.results[j] = 
h.results[j], h.results[i] }
 
 func (h *segResultHeap) Push(x interface{}) {
-       *h = append(*h, x.(*segResult))
+       h.results = append(h.results, x.(*segResult))
 }
 
 func (h *segResultHeap) Pop() interface{} {
-       old := *h
+       old := h.results
        n := len(old)
        x := old[n-1]
-       *h = old[0 : n-1]
+       h.results = old[0 : n-1]
        return x
 }
diff --git a/banyand/measure/query_test.go b/banyand/measure/query_test.go
index 1f2d24a8..17ca9c37 100644
--- a/banyand/measure/query_test.go
+++ b/banyand/measure/query_test.go
@@ -34,6 +34,7 @@ import (
        itest "github.com/apache/skywalking-banyandb/banyand/internal/test"
        "github.com/apache/skywalking-banyandb/banyand/protector"
        "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/index"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        "github.com/apache/skywalking-banyandb/pkg/query/model"
@@ -1515,3 +1516,342 @@ func TestQueryResult_QuotaExceeded(t *testing.T) {
                })
        }
 }
+
+func TestSegResultHeap_Sorting(t *testing.T) {
+       tests := []struct {
+               name        string
+               segResults  []*segResult
+               expectOrder []int
+               sortDesc    bool
+       }{
+               {
+                       name:     "Sort ascending by SeriesID (no 
sortedValues)",
+                       sortDesc: false,
+                       segResults: []*segResult{
+                               {
+                                       SeriesData: storage.SeriesData{
+                                               SeriesList: pbv1.SeriesList{
+                                                       &pbv1.Series{ID: 
common.SeriesID(30)},
+                                               },
+                                       },
+                                       i: 0,
+                               },
+                               {
+                                       SeriesData: storage.SeriesData{
+                                               SeriesList: pbv1.SeriesList{
+                                                       &pbv1.Series{ID: 
common.SeriesID(10)},
+                                               },
+                                       },
+                                       i: 0,
+                               },
+                               {
+                                       SeriesData: storage.SeriesData{
+                                               SeriesList: pbv1.SeriesList{
+                                                       &pbv1.Series{ID: 
common.SeriesID(20)},
+                                               },
+                                       },
+                                       i: 0,
+                               },
+                       },
+                       expectOrder: []int{1, 0, 2}, // SeriesID order: 10, 30, 
20
+               },
+               {
+                       name:     "Sort descending by SeriesID (no 
sortedValues)",
+                       sortDesc: true,
+                       segResults: []*segResult{
+                               {
+                                       SeriesData: storage.SeriesData{
+                                               SeriesList: pbv1.SeriesList{
+                                                       &pbv1.Series{ID: 
common.SeriesID(10)},
+                                               },
+                                       },
+                                       i: 0,
+                               },
+                               {
+                                       SeriesData: storage.SeriesData{
+                                               SeriesList: pbv1.SeriesList{
+                                                       &pbv1.Series{ID: 
common.SeriesID(30)},
+                                               },
+                                       },
+                                       i: 0,
+                               },
+                               {
+                                       SeriesData: storage.SeriesData{
+                                               SeriesList: pbv1.SeriesList{
+                                                       &pbv1.Series{ID: 
common.SeriesID(20)},
+                                               },
+                                       },
+                                       i: 0,
+                               },
+                       },
+                       expectOrder: []int{1, 0, 2}, // SeriesID order: 30, 10, 
20
+               },
+               {
+                       name:     "Sort ascending by sortedValues",
+                       sortDesc: false,
+                       segResults: []*segResult{
+                               {
+                                       SeriesData: storage.SeriesData{
+                                               SeriesList: pbv1.SeriesList{
+                                                       &pbv1.Series{ID: 
common.SeriesID(1)},
+                                               },
+                                       },
+                                       sortedValues: 
[][]byte{[]byte("charlie")},
+                                       i:            0,
+                               },
+                               {
+                                       SeriesData: storage.SeriesData{
+                                               SeriesList: pbv1.SeriesList{
+                                                       &pbv1.Series{ID: 
common.SeriesID(2)},
+                                               },
+                                       },
+                                       sortedValues: [][]byte{[]byte("alpha")},
+                                       i:            0,
+                               },
+                               {
+                                       SeriesData: storage.SeriesData{
+                                               SeriesList: pbv1.SeriesList{
+                                                       &pbv1.Series{ID: 
common.SeriesID(3)},
+                                               },
+                                       },
+                                       sortedValues: [][]byte{[]byte("beta")},
+                                       i:            0,
+                               },
+                       },
+                       expectOrder: []int{1, 0, 2}, // alpha, charlie, beta
+               },
+               {
+                       name:     "Sort descending by sortedValues",
+                       sortDesc: true,
+                       segResults: []*segResult{
+                               {
+                                       SeriesData: storage.SeriesData{
+                                               SeriesList: pbv1.SeriesList{
+                                                       &pbv1.Series{ID: 
common.SeriesID(1)},
+                                               },
+                                       },
+                                       sortedValues: [][]byte{[]byte("alpha")},
+                                       i:            0,
+                               },
+                               {
+                                       SeriesData: storage.SeriesData{
+                                               SeriesList: pbv1.SeriesList{
+                                                       &pbv1.Series{ID: 
common.SeriesID(2)},
+                                               },
+                                       },
+                                       sortedValues: 
[][]byte{[]byte("charlie")},
+                                       i:            0,
+                               },
+                               {
+                                       SeriesData: storage.SeriesData{
+                                               SeriesList: pbv1.SeriesList{
+                                                       &pbv1.Series{ID: 
common.SeriesID(3)},
+                                               },
+                                       },
+                                       sortedValues: [][]byte{[]byte("beta")},
+                                       i:            0,
+                               },
+                       },
+                       expectOrder: []int{1, 0, 2}, // charlie, alpha, beta
+               },
+               {
+                       name:     "Mixed sortedValues and nil sortedValues 
ascending",
+                       sortDesc: false,
+                       segResults: []*segResult{
+                               {
+                                       SeriesData: storage.SeriesData{
+                                               SeriesList: pbv1.SeriesList{
+                                                       &pbv1.Series{ID: 
common.SeriesID(30)},
+                                               },
+                                       },
+                                       sortedValues: nil, // Will use SeriesID 
for sorting
+                                       i:            0,
+                               },
+                               {
+                                       SeriesData: storage.SeriesData{
+                                               SeriesList: pbv1.SeriesList{
+                                                       &pbv1.Series{ID: 
common.SeriesID(10)},
+                                               },
+                                       },
+                                       sortedValues: [][]byte{[]byte("zzz")}, 
// Should come after nil sortedValues when sorted by SeriesID
+                                       i:            0,
+                               },
+                               {
+                                       SeriesData: storage.SeriesData{
+                                               SeriesList: pbv1.SeriesList{
+                                                       &pbv1.Series{ID: 
common.SeriesID(20)},
+                                               },
+                                       },
+                                       sortedValues: nil, // Will use SeriesID 
for sorting
+                                       i:            0,
+                               },
+                       },
+                       expectOrder: []int{1, 0, 2}, // SeriesID 10, 30, 20 
(nil sortedValues use SeriesID)
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       // Create and initialize heap
+                       heap := segResultHeap{
+                               results:  make([]*segResult, 0),
+                               sortDesc: tt.sortDesc,
+                       }
+
+                       // Add all results to heap
+                       heap.results = append(heap.results, tt.segResults...)
+
+                       // Initialize heap
+                       require.Equal(t, len(tt.segResults), heap.Len())
+
+                       // Sort using Go's heap
+                       heapImpl := &heap
+                       heap2 := make([]*segResult, len(tt.segResults))
+                       copy(heap2, heap.results)
+
+                       // Sort manually to get expected order
+                       sort.Slice(heap2, func(i, j int) bool {
+                               return heapImpl.Less(i, j)
+                       })
+
+                       // Verify the order matches expectation
+                       for i, expectedIdx := range tt.expectOrder {
+                               actual := heap2[i]
+                               expected := tt.segResults[expectedIdx]
+                               require.Equal(t, 
expected.SeriesList[expected.i].ID, actual.SeriesList[actual.i].ID,
+                                       "Position %d: expected SeriesID %d, got 
%d", i, expected.SeriesList[expected.i].ID, actual.SeriesList[actual.i].ID)
+                       }
+               })
+       }
+}
+
+func TestSegResultHeap_NPE_Prevention(t *testing.T) {
+       tests := []struct {
+               name       string
+               segResults []*segResult
+               i, j       int
+               expectLess bool
+       }{
+               {
+                       name:       "Out of bounds indices",
+                       segResults: []*segResult{{SeriesData: 
storage.SeriesData{SeriesList: pbv1.SeriesList{{ID: 1}}}, i: 0}},
+                       i:          0,
+                       j:          5, // Out of bounds
+                       expectLess: false,
+               },
+               {
+                       name:       "Nil segResult",
+                       segResults: []*segResult{nil, {SeriesData: 
storage.SeriesData{SeriesList: pbv1.SeriesList{{ID: 1}}}, i: 0}},
+                       i:          0,
+                       j:          1,
+                       expectLess: false,
+               },
+               {
+                       name: "Index out of bounds for SeriesList",
+                       segResults: []*segResult{
+                               {SeriesData: storage.SeriesData{SeriesList: 
pbv1.SeriesList{{ID: 1}}}, i: 5}, // i is out of bounds
+                               {SeriesData: storage.SeriesData{SeriesList: 
pbv1.SeriesList{{ID: 2}}}, i: 0},
+                       },
+                       i:          0,
+                       j:          1,
+                       expectLess: false,
+               },
+               {
+                       name: "Index out of bounds for sortedValues",
+                       segResults: []*segResult{
+                               {
+                                       SeriesData:   
storage.SeriesData{SeriesList: pbv1.SeriesList{{ID: 1}}},
+                                       sortedValues: [][]byte{[]byte("test")},
+                                       i:            5, // i is out of bounds 
for sortedValues
+                               },
+                               {
+                                       SeriesData:   
storage.SeriesData{SeriesList: pbv1.SeriesList{{ID: 2}}},
+                                       sortedValues: [][]byte{[]byte("test2")},
+                                       i:            0,
+                               },
+                       },
+                       i:          0,
+                       j:          1,
+                       expectLess: false, // Should fallback to SeriesID 
comparison
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       heap := segResultHeap{
+                               results:  tt.segResults,
+                               sortDesc: false,
+                       }
+
+                       // This should not panic due to NPE prevention
+                       result := heap.Less(tt.i, tt.j)
+                       require.Equal(t, tt.expectLess, result)
+               })
+       }
+}
+
+func TestIndexSortResult_OrderBySortDesc(t *testing.T) {
+       tests := []struct {
+               name       string
+               sortOrder  modelv1.Sort
+               expectDesc bool
+       }{
+               {
+                       name:       "SORT_ASC should be ascending",
+                       sortOrder:  modelv1.Sort_SORT_ASC,
+                       expectDesc: false,
+               },
+               {
+                       name:       "SORT_UNSPECIFIED should be ascending",
+                       sortOrder:  modelv1.Sort_SORT_UNSPECIFIED,
+                       expectDesc: false,
+               },
+               {
+                       name:       "SORT_DESC should be descending",
+                       sortOrder:  modelv1.Sort_SORT_DESC,
+                       expectDesc: true,
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       // Create mock segments and measure data
+                       mqo := model.MeasureQueryOptions{
+                               Order: &index.OrderBy{
+                                       Sort: tt.sortOrder,
+                               },
+                       }
+
+                       // Create a simple segResult
+                       sr := &segResult{
+                               SeriesData: storage.SeriesData{
+                                       SeriesList: pbv1.SeriesList{
+                                               &pbv1.Series{ID: 
common.SeriesID(1)},
+                                               &pbv1.Series{ID: 
common.SeriesID(2)},
+                                       },
+                                       Timestamps: []int64{1000, 2000},
+                                       Versions:   []int64{1, 2},
+                               },
+                               sortedValues: [][]byte{[]byte("test1"), 
[]byte("test2")},
+                               i:            0,
+                       }
+
+                       // Create index sort result
+                       r := &indexSortResult{
+                               tfl: []tagFamilyLocation{},
+                               segResults: segResultHeap{
+                                       results:  []*segResult{sr},
+                                       sortDesc: false, // This should be set 
by buildIndexQueryResult
+                               },
+                       }
+
+                       // Simulate the logic from buildIndexQueryResult
+                       if mqo.Order != nil && mqo.Order.Sort == 
modelv1.Sort_SORT_DESC {
+                               r.segResults.sortDesc = true
+                       }
+
+                       // Verify the sort order was set correctly
+                       require.Equal(t, tt.expectDesc, r.segResults.sortDesc)
+               })
+       }
+}
diff --git a/banyand/measure/syncer.go b/banyand/measure/syncer.go
index f91841b3..b3b7f3a4 100644
--- a/banyand/measure/syncer.go
+++ b/banyand/measure/syncer.go
@@ -230,7 +230,6 @@ func (tst *tsTable) syncSnapshot(curSnapshot *snapshot, 
syncCh chan *syncIntrodu
                        // Create streaming reader for the part.
                        files, release := createPartFileReaders(part)
                        releaseFuncs = append(releaseFuncs, release)
-
                        // Create streaming part sync data.
                        streamingParts = append(streamingParts, 
queue.StreamingPartData{
                                ID:                    part.partMetadata.ID,
diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go
index 2f71eddd..5ddab3ea 100644
--- a/banyand/measure/tstable.go
+++ b/banyand/measure/tstable.go
@@ -106,6 +106,9 @@ func initTSTable(fileSystem fs.FileSystem, rootPath string, 
p common.Position,
                fileSystem.MustRMAll(filepath.Join(rootPath, needToDelete[i]))
        }
        if len(loadedParts) == 0 || len(loadedSnapshots) == 0 {
+               for _, id := range loadedSnapshots {
+                       fileSystem.MustRMAll(filepath.Join(rootPath, 
snapshotName(id)))
+               }
                return &tst, uint64(time.Now().UnixNano())
        }
        sort.Slice(loadedSnapshots, func(i, j int) bool {
@@ -279,12 +282,17 @@ func (tst *tsTable) Close() error {
 }
 
 func (tst *tsTable) mustAddDataPoints(dps *dataPoints) {
+       tst.mustAddDataPointsWithSegmentID(dps, 0)
+}
+
+func (tst *tsTable) mustAddDataPointsWithSegmentID(dps *dataPoints, segmentID 
int64) {
        if len(dps.seriesIDs) == 0 {
                return
        }
 
        mp := generateMemPart()
        mp.mustInitFromDataPoints(dps)
+       mp.segmentID = segmentID
        tst.mustAddMemPart(mp)
 }
 
diff --git a/banyand/measure/write_liaison.go b/banyand/measure/write_liaison.go
index 7b287ff5..e64cad88 100644
--- a/banyand/measure/write_liaison.go
+++ b/banyand/measure/write_liaison.go
@@ -108,7 +108,7 @@ func (w *writeQueueCallback) Rev(ctx context.Context, 
message bus.Message) (resp
                for j := range g.tables {
                        es := g.tables[j]
                        if es.tsTable != nil && es.dataPoints != nil {
-                               es.tsTable.mustAddDataPoints(es.dataPoints)
+                               
es.tsTable.mustAddDataPointsWithSegmentID(es.dataPoints, 
es.timeRange.Start.UnixNano())
                                releaseDataPoints(es.dataPoints)
                        }
                        nodes := g.queue.GetNodes(es.shardID)
diff --git a/banyand/stream/flusher.go b/banyand/stream/flusher.go
index d8d4cc2f..04788af0 100644
--- a/banyand/stream/flusher.go
+++ b/banyand/stream/flusher.go
@@ -129,33 +129,80 @@ func (tst *tsTable) pauseFlusherToPileupMemParts(epoch 
uint64, flushWatcher watc
 }
 
 func (tst *tsTable) mergeMemParts(snp *snapshot, mergeCh chan 
*mergerIntroduction) (bool, error) {
+       var merged bool
+       var currentSegmentID int64
        var memParts []*partWrapper
        mergedIDs := make(map[uint64]struct{})
+
+       // Helper function to merge current segment's parts
+       mergeCurrentSegment := func() (bool, error) {
+               if len(memParts) < 2 {
+                       return false, nil
+               }
+
+               // Create a copy of mergedIDs for this merge operation
+               currentMergedIDs := make(map[uint64]struct{}, len(mergedIDs))
+               for id := range mergedIDs {
+                       currentMergedIDs[id] = struct{}{}
+               }
+
+               // merge memory must not be closed by the tsTable.close
+               closeCh := make(chan struct{})
+               newPart, err := 
tst.mergePartsThenSendIntroduction(snapshotCreatorMergedFlusher, memParts,
+                       currentMergedIDs, mergeCh, closeCh, "mem")
+               close(closeCh)
+               if err != nil {
+                       if errors.Is(err, errClosed) {
+                               return true, nil
+                       }
+                       return false, err
+               }
+               return newPart != nil, nil
+       }
+
+       // Process parts grouped by segmentID
        for i := range snp.parts {
-               if snp.parts[i].mp != nil {
-                       memParts = append(memParts, snp.parts[i])
-                       mergedIDs[snp.parts[i].ID()] = struct{}{}
+               if snp.parts[i].mp == nil {
                        continue
                }
-       }
-       if len(memParts) < 2 {
-               return false, nil
-       }
-       // merge memory must not be closed by the tsTable.close
-       closeCh := make(chan struct{})
-       newPart, err := 
tst.mergePartsThenSendIntroduction(snapshotCreatorMergedFlusher, memParts,
-               mergedIDs, mergeCh, closeCh, "mem")
-       close(closeCh)
-       if err != nil {
-               if errors.Is(err, errClosed) {
-                       return true, nil
+
+               segID := snp.parts[i].mp.segmentID
+
+               // If this is a new segment, merge the previous segment first
+               if currentSegmentID != 0 && currentSegmentID != segID {
+                       m, err := mergeCurrentSegment()
+                       if err != nil {
+                               return false, err
+                       }
+                       if m {
+                               merged = true
+                       }
+
+                       // Reset for next segment
+                       memParts = memParts[:0]
+                       for id := range mergedIDs {
+                               delete(mergedIDs, id)
+                       }
                }
-               return false, err
+
+               // Add part to current segment
+               currentSegmentID = segID
+               memParts = append(memParts, snp.parts[i])
+               mergedIDs[snp.parts[i].ID()] = struct{}{}
        }
-       if newPart == nil {
-               return false, nil
+
+       // Merge the last segment if it has parts
+       if len(memParts) >= 2 {
+               m, err := mergeCurrentSegment()
+               if err != nil {
+                       return false, err
+               }
+               if m {
+                       merged = true
+               }
        }
-       return true, nil
+
+       return merged, nil
 }
 
 func (tst *tsTable) flush(snapshot *snapshot, flushCh chan 
*flusherIntroduction) {
diff --git a/banyand/stream/part.go b/banyand/stream/part.go
index 07e5a22f..39cb2145 100644
--- a/banyand/stream/part.go
+++ b/banyand/stream/part.go
@@ -112,6 +112,7 @@ type memPart struct {
        primary           bytes.Buffer
        timestamps        bytes.Buffer
        partMetadata      partMetadata
+       segmentID         int64
 }
 
 func (mp *memPart) mustCreateMemTagFamilyWriters(name string) (fs.Writer, 
fs.Writer, fs.Writer) {
@@ -140,6 +141,7 @@ func (mp *memPart) reset() {
        mp.meta.Reset()
        mp.primary.Reset()
        mp.timestamps.Reset()
+       mp.segmentID = 0
        if mp.tagFamilies != nil {
                for k, tf := range mp.tagFamilies {
                        tf.Reset()
diff --git a/banyand/stream/tstable.go b/banyand/stream/tstable.go
index bd521ce4..9384c4b4 100644
--- a/banyand/stream/tstable.go
+++ b/banyand/stream/tstable.go
@@ -261,6 +261,9 @@ func initTSTable(fileSystem fs.FileSystem, rootPath string, 
p common.Position,
                fileSystem.MustRMAll(filepath.Join(rootPath, needToDelete[i]))
        }
        if len(loadedParts) == 0 || len(loadedSnapshots) == 0 {
+               for _, id := range loadedSnapshots {
+                       fileSystem.MustRMAll(filepath.Join(rootPath, 
snapshotName(id)))
+               }
                return &tst, uint64(time.Now().UnixNano()), nil
        }
        sort.Slice(loadedSnapshots, func(i, j int) bool {
@@ -336,12 +339,17 @@ func (tst *tsTable) mustAddMemPart(mp *memPart) {
 }
 
 func (tst *tsTable) mustAddElements(es *elements) {
+       tst.mustAddElementsWithSegmentID(es, 0)
+}
+
+func (tst *tsTable) mustAddElementsWithSegmentID(es *elements, segmentID 
int64) {
        if len(es.seriesIDs) == 0 {
                return
        }
 
        mp := generateMemPart()
        mp.mustInitFromElements(es)
+       mp.segmentID = segmentID
        tst.mustAddMemPart(mp)
 }
 
diff --git a/banyand/stream/write_liaison.go b/banyand/stream/write_liaison.go
index c78d1a37..be566f09 100644
--- a/banyand/stream/write_liaison.go
+++ b/banyand/stream/write_liaison.go
@@ -181,7 +181,7 @@ func (w *writeQueueCallback) Rev(ctx context.Context, 
message bus.Message) (resp
                g := groups[i]
                for j := range g.tables {
                        es := g.tables[j]
-                       es.tsTable.mustAddElements(es.elements)
+                       es.tsTable.mustAddElementsWithSegmentID(es.elements, 
es.timeRange.Start.UnixNano())
                        releaseElements(es.elements)
                        // Get nodes for this shard
                        nodes := g.queue.GetNodes(es.shardID)
diff --git a/banyand/trace/flusher.go b/banyand/trace/flusher.go
index 56f81c32..727cad6c 100644
--- a/banyand/trace/flusher.go
+++ b/banyand/trace/flusher.go
@@ -48,33 +48,20 @@ func (tst *tsTable) flusherLoop(flushCh chan 
*flusherIntroduction, mergeCh chan
                                        
tst.incTotalFlushLatency(time.Since(start).Seconds())
                                }()
                                curSnapshot := tst.currentSnapshot()
-                               if curSnapshot != nil {
-                                       flusherWatchers = 
tst.pauseFlusherToPileupMemParts(epoch, flusherWatcher, flusherWatchers)
-                                       curSnapshot.decRef()
-                                       curSnapshot = nil
-                               }
-                               tst.RLock()
-                               if tst.snapshot != nil && tst.snapshot.epoch > 
epoch {
-                                       curSnapshot = tst.snapshot
-                                       curSnapshot.incRef()
-                               }
-                               tst.RUnlock()
+                               var ok bool
+                               var merged bool
+                               curSnapshot, flusherWatchers, ok, merged = 
tst.pauseFlusherToPileupMemPartsWithMerge(curSnapshot, flusherWatcher, 
flusherWatchers, epoch, mergeCh)
                                if curSnapshot != nil {
                                        defer curSnapshot.decRef()
-                                       merged, err := 
tst.mergeMemParts(curSnapshot, mergeCh)
-                                       if err != nil {
-                                               
tst.l.Logger.Warn().Err(err).Msgf("cannot merge snapshot: %d", 
curSnapshot.epoch)
-                                               tst.incTotalFlushLoopErr(1)
+                                       if !ok {
                                                return false
                                        }
                                        if !merged {
                                                tst.flush(curSnapshot, flushCh)
                                        }
-                                       epoch = curSnapshot.epoch
-                                       // Notify merger to start a new round 
of merge.
-                                       // This round might have be triggered 
in pauseFlusherToPileupMemParts.
                                        flusherWatchers.Notify(math.MaxUint64)
                                        flusherWatchers = nil
+                                       epoch = curSnapshot.epoch
                                        if tst.currentEpoch() != epoch {
                                                tst.incTotalFlushLoopProgress(1)
                                                return false
@@ -132,6 +119,36 @@ func (tst *tsTable) flusherLoopNoMerger(flushCh chan 
*flusherIntroduction, intro
        }
 }
 
+func (tst *tsTable) pauseFlusherToPileupMemPartsWithMerge(
+       curSnapshot *snapshot, flusherWatcher watcher.Channel, flusherWatchers 
watcher.Epochs,
+       epoch uint64, mergeCh chan *mergerIntroduction,
+) (*snapshot, watcher.Epochs, bool, bool) {
+       if tst.option.flushTimeout < 1 {
+               return curSnapshot, flusherWatchers, true, false
+       }
+       if curSnapshot != nil {
+               flusherWatchers = tst.pauseFlusherToPileupMemParts(epoch, 
flusherWatcher, flusherWatchers)
+               curSnapshot.decRef()
+               curSnapshot = nil
+       }
+       tst.RLock()
+       if tst.snapshot != nil && tst.snapshot.epoch > epoch {
+               curSnapshot = tst.snapshot
+               curSnapshot.incRef()
+       }
+       tst.RUnlock()
+       if curSnapshot == nil {
+               return nil, flusherWatchers, false, false
+       }
+       merged, err := tst.mergeMemParts(curSnapshot, mergeCh)
+       if err != nil {
+               tst.l.Logger.Warn().Err(err).Msgf("cannot merge snapshot: %d", 
curSnapshot.epoch)
+               tst.incTotalFlushLoopErr(1)
+               return curSnapshot, flusherWatchers, false, false
+       }
+       return curSnapshot, flusherWatchers, true, merged
+}
+
 // pauseFlusherToPileupMemParts takes a pause to wait for in-memory parts to 
pile up.
 // If there is no in-memory part, we can skip the pause.
 // When a merging is finished, we can skip the pause.
@@ -155,33 +172,80 @@ func (tst *tsTable) pauseFlusherToPileupMemParts(epoch 
uint64, flushWatcher watc
 }
 
 func (tst *tsTable) mergeMemParts(snp *snapshot, mergeCh chan 
*mergerIntroduction) (bool, error) {
+       var merged bool
+       var currentSegmentID int64
        var memParts []*partWrapper
        mergedIDs := make(map[partHandle]struct{})
+
+       // Helper function to merge current segment's parts
+       mergeCurrentSegment := func() (bool, error) {
+               if len(memParts) < 2 {
+                       return false, nil
+               }
+
+               // Create a copy of mergedIDs for this merge operation
+               currentMergedIDs := make(map[partHandle]struct{}, 
len(mergedIDs))
+               for id := range mergedIDs {
+                       currentMergedIDs[id] = struct{}{}
+               }
+
+               // merge memory must not be closed by the tsTable.close
+               closeCh := make(chan struct{})
+               newPart, err := 
tst.mergePartsThenSendIntroduction(snapshotCreatorMergedFlusher, memParts,
+                       currentMergedIDs, mergeCh, closeCh, "mem")
+               close(closeCh)
+               if err != nil {
+                       if errors.Is(err, errClosed) {
+                               return true, nil
+                       }
+                       return false, err
+               }
+               return newPart != nil, nil
+       }
+
+       // Process parts grouped by segmentID
        for i := range snp.parts {
-               if snp.parts[i].mp != nil {
-                       memParts = append(memParts, snp.parts[i])
-                       mergedIDs[partHandle{partID: snp.parts[i].ID(), 
partType: PartTypeCore}] = struct{}{}
+               if snp.parts[i].mp == nil {
                        continue
                }
-       }
-       if len(memParts) < 2 {
-               return false, nil
-       }
-       // merge memory must not be closed by the tsTable.close
-       closeCh := make(chan struct{})
-       newPart, err := 
tst.mergePartsThenSendIntroduction(snapshotCreatorMergedFlusher, memParts,
-               mergedIDs, mergeCh, closeCh, "mem")
-       close(closeCh)
-       if err != nil {
-               if errors.Is(err, errClosed) {
-                       return true, nil
+
+               segID := snp.parts[i].mp.segmentID
+
+               // If this is a new segment, merge the previous segment first
+               if currentSegmentID != 0 && currentSegmentID != segID {
+                       m, err := mergeCurrentSegment()
+                       if err != nil {
+                               return false, err
+                       }
+                       if m {
+                               merged = true
+                       }
+
+                       // Reset for next segment
+                       memParts = memParts[:0]
+                       for id := range mergedIDs {
+                               delete(mergedIDs, id)
+                       }
                }
-               return false, err
+
+               // Add part to current segment
+               currentSegmentID = segID
+               memParts = append(memParts, snp.parts[i])
+               mergedIDs[partHandle{partID: snp.parts[i].ID(), partType: 
PartTypeCore}] = struct{}{}
        }
-       if newPart == nil {
-               return false, nil
+
+       // Merge the last segment if it has parts
+       if len(memParts) >= 2 {
+               m, err := mergeCurrentSegment()
+               if err != nil {
+                       return false, err
+               }
+               if m {
+                       merged = true
+               }
        }
-       return true, nil
+
+       return merged, nil
 }
 
 func (tst *tsTable) flush(snapshot *snapshot, flushCh chan 
*flusherIntroduction) {
diff --git a/banyand/trace/part.go b/banyand/trace/part.go
index d8ba721d..b76caab9 100644
--- a/banyand/trace/part.go
+++ b/banyand/trace/part.go
@@ -113,6 +113,7 @@ type memPart struct {
        meta          bytes.Buffer
        primary       bytes.Buffer
        partMetadata  partMetadata
+       segmentID     int64
 }
 
 func (mp *memPart) mustCreateMemTagWriters(name string) (fs.Writer, fs.Writer) 
{
@@ -143,6 +144,7 @@ func (mp *memPart) reset() {
        mp.meta.Reset()
        mp.primary.Reset()
        mp.spans.Reset()
+       mp.segmentID = 0
        if mp.tags != nil {
                for k, t := range mp.tags {
                        t.Reset()
diff --git a/banyand/trace/syncer.go b/banyand/trace/syncer.go
index 8e9b024c..a95c9e8c 100644
--- a/banyand/trace/syncer.go
+++ b/banyand/trace/syncer.go
@@ -296,21 +296,9 @@ func (tst *tsTable) syncStreamingPartsToNodes(ctx 
context.Context, nodes []strin
                }
                // Prepare all streaming parts data
                streamingParts := make([]queue.StreamingPartData, 0)
-               snapshot := tst.currentSnapshot()
                // Add sidx streaming parts
                for name, sidxParts := range sidxPartsToSync {
-                       // TODO: minTimestmaps should be read only once
-                       minTimestamps := make([]int64, 0)
-                       for _, part := range sidxParts {
-                               partID := part.ID()
-                               for _, pw := range snapshot.parts {
-                                       if pw.p.partMetadata.ID == partID {
-                                               minTimestamps = 
append(minTimestamps, pw.p.partMetadata.MinTimestamp)
-                                               break
-                                       }
-                               }
-                       }
-                       sidxStreamingParts, sidxReleaseFuncs := 
tst.sidxMap[name].StreamingParts(sidxParts, tst.group, uint32(tst.shardID), 
name, minTimestamps)
+                       sidxStreamingParts, sidxReleaseFuncs := 
tst.sidxMap[name].StreamingParts(sidxParts, tst.group, uint32(tst.shardID), 
name)
                        streamingParts = append(streamingParts, 
sidxStreamingParts...)
                        *releaseFuncs = append(*releaseFuncs, 
sidxReleaseFuncs...)
                }
diff --git a/banyand/trace/tstable.go b/banyand/trace/tstable.go
index 8e3be10f..645dde18 100644
--- a/banyand/trace/tstable.go
+++ b/banyand/trace/tstable.go
@@ -248,6 +248,9 @@ func initTSTable(fileSystem fs.FileSystem, rootPath string, 
p common.Position,
                fileSystem.MustRMAll(filepath.Join(rootPath, needToDelete[i]))
        }
        if len(loadedParts) == 0 || len(loadedSnapshots) == 0 {
+               for _, id := range loadedSnapshots {
+                       fileSystem.MustRMAll(filepath.Join(rootPath, 
snapshotName(id)))
+               }
                return &tst, uint64(time.Now().UnixNano())
        }
        sort.Slice(loadedSnapshots, func(i, j int) bool {
@@ -412,12 +415,17 @@ func (tst *tsTable) mustAddMemPart(mp *memPart) {
 }
 
 func (tst *tsTable) mustAddTraces(ts *traces) {
+       tst.mustAddTracesWithSegmentID(ts, 0)
+}
+
+func (tst *tsTable) mustAddTracesWithSegmentID(ts *traces, segmentID int64) {
        if len(ts.traceIDs) == 0 {
                return
        }
 
        mp := generateMemPart()
        mp.mustInitFromTraces(ts)
+       mp.segmentID = segmentID
        tst.mustAddMemPart(mp)
 }
 
diff --git a/banyand/trace/write_liaison.go b/banyand/trace/write_liaison.go
index 58a37f60..d4741f50 100644
--- a/banyand/trace/write_liaison.go
+++ b/banyand/trace/write_liaison.go
@@ -191,7 +191,7 @@ func (w *writeQueueCallback) Rev(ctx context.Context, 
message bus.Message) (resp
                g := groups[i]
                for j := range g.tables {
                        es := g.tables[j]
-                       es.tsTable.mustAddTraces(es.traces)
+                       es.tsTable.mustAddTracesWithSegmentID(es.traces, 
es.timeRange.Start.UnixNano())
                        releaseTraces(es.traces)
 
                        for sidxName, sidxReqs := range es.sidxReqsMap {
@@ -201,7 +201,7 @@ func (w *writeQueueCallback) Rev(ctx context.Context, 
message bus.Message) (resp
                                                
w.l.Error().Err(err).Str("sidx", sidxName).Msg("cannot get or create sidx 
instance")
                                                continue
                                        }
-                                       if err := sidxInstance.Write(ctx, 
sidxReqs); err != nil {
+                                       if err := sidxInstance.Write(ctx, 
sidxReqs, es.timeRange.Start.UnixNano()); err != nil {
                                                
w.l.Error().Err(err).Str("sidx", sidxName).Msg("cannot write to secondary 
index")
                                        }
                                }
diff --git a/banyand/trace/write_standalone.go 
b/banyand/trace/write_standalone.go
index 653ffbd4..6ef873fd 100644
--- a/banyand/trace/write_standalone.go
+++ b/banyand/trace/write_standalone.go
@@ -362,7 +362,7 @@ func (w *writeCallback) Rev(ctx context.Context, message 
bus.Message) (resp bus.
                                                
w.l.Error().Err(err).Str("sidx", sidxName).Msg("cannot get or create sidx 
instance")
                                                continue
                                        }
-                                       if err := sidxInstance.Write(ctx, 
sidxReqs); err != nil {
+                                       if err := sidxInstance.Write(ctx, 
sidxReqs, es.timeRange.Start.UnixNano()); err != nil {
                                                
w.l.Error().Err(err).Str("sidx", sidxName).Msg("cannot write to secondary 
index")
                                        }
                                }
diff --git a/test/cases/measure/data/data.go b/test/cases/measure/data/data.go
index 77274d08..1e8bb2b2 100644
--- a/test/cases/measure/data/data.go
+++ b/test/cases/measure/data/data.go
@@ -23,6 +23,7 @@ import (
        "embed"
        "encoding/json"
        "io"
+       "slices"
        "time"
 
        "github.com/google/go-cmp/cmp"
@@ -75,6 +76,26 @@ func verifyWithContext(ctx context.Context, innerGm 
gm.Gomega, sharedContext hel
        innerGm.Expect(err).NotTo(gm.HaveOccurred())
        want := &measurev1.QueryResponse{}
        helpers.UnmarshalYAML(ww, want)
+       if args.DisOrder {
+               slices.SortFunc(want.DataPoints, func(a, b 
*measurev1.DataPoint) int {
+                       if a.Sid != b.Sid {
+                               if a.Sid < b.Sid {
+                                       return -1
+                               }
+                               return 1
+                       }
+                       return 
a.Timestamp.AsTime().Compare(b.Timestamp.AsTime())
+               })
+               slices.SortFunc(resp.DataPoints, func(a, b 
*measurev1.DataPoint) int {
+                       if a.Sid != b.Sid {
+                               if a.Sid < b.Sid {
+                                       return -1
+                               }
+                               return 1
+                       }
+                       return 
a.Timestamp.AsTime().Compare(b.Timestamp.AsTime())
+               })
+       }
        for i := range resp.DataPoints {
                if resp.DataPoints[i].Timestamp != nil {
                        
innerGm.Expect(resp.DataPoints[i].Version).Should(gm.BeNumerically(">", 0))
diff --git a/test/cases/measure/data/want/entity_in.yaml 
b/test/cases/measure/data/want/entity_in.yaml
index 73ba584e..125eb6aa 100644
--- a/test/cases/measure/data/want/entity_in.yaml
+++ b/test/cases/measure/data/want/entity_in.yaml
@@ -22,19 +22,19 @@ dataPoints:
     - key: name
       value:
         str:
-          value: service_name_1
+          value: service_name_2
     - key: short_name
       value:
         str:
-          value: service_short_name_1
+          value: service_short_name_2
 - tagFamilies:
   - name: default
     tags:
     - key: name
       value:
         str:
-          value: service_name_2
+          value: service_name_1
     - key: short_name
       value:
         str:
-          value: service_short_name_2
+          value: service_short_name_1
diff --git a/test/cases/measure/data/want/tag_filter_int.yaml 
b/test/cases/measure/data/want/tag_filter_int.yaml
index 5a726b9d..3a6ffc22 100644
--- a/test/cases/measure/data/want/tag_filter_int.yaml
+++ b/test/cases/measure/data/want/tag_filter_int.yaml
@@ -22,7 +22,7 @@ dataPoints:
           - key: name
             value:
               str:
-                value: service_name_1
+                value: service_name_3
           - key: layer
             value:
               int:
@@ -33,7 +33,7 @@ dataPoints:
           - key: name
             value:
               str:
-                value: service_name_3
+                value: service_name_1
           - key: layer
             value:
               int:
diff --git a/test/cases/measure/measure.go b/test/cases/measure/measure.go
index aeacde4f..274dd3cb 100644
--- a/test/cases/measure/measure.go
+++ b/test/cases/measure/measure.go
@@ -44,7 +44,7 @@ var _ = g.DescribeTable("Scanning Measures", verify,
        g.Entry("all only fields", helpers.Args{Input: "all_only_fields", 
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
        g.Entry("the max limit", helpers.Args{Input: "all_max_limit", Want: 
"all", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
        g.Entry("filter by tag", helpers.Args{Input: "tag_filter", Duration: 25 
* time.Minute, Offset: -20 * time.Minute}),
-       g.Entry("filter by a integer tag", helpers.Args{Input: 
"tag_filter_int", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
+       g.Entry("filter by a integer tag", helpers.Args{Input: 
"tag_filter_int", Duration: 25 * time.Minute, Offset: -20 * time.Minute, 
DisOrder: true}),
        g.Entry("filter by an unknown tag", helpers.Args{Input: 
"tag_filter_unknown", Duration: 25 * time.Minute, Offset: -20 * time.Minute, 
WantEmpty: true}),
        g.Entry("group and max", helpers.Args{Input: "group_max", Duration: 25 
* time.Minute, Offset: -20 * time.Minute}),
        g.Entry("group and min", helpers.Args{Input: "group_min", Duration: 25 
* time.Minute, Offset: -20 * time.Minute}),
@@ -62,10 +62,10 @@ var _ = g.DescribeTable("Scanning Measures", verify,
        g.Entry("limit 3,2", helpers.Args{Input: "limit", Duration: 25 * 
time.Minute, Offset: -20 * time.Minute}),
        g.Entry("match a node", helpers.Args{Input: "match_node", Duration: 25 
* time.Minute, Offset: -20 * time.Minute}),
        g.Entry("match nodes", helpers.Args{Input: "match_nodes", Duration: 25 
* time.Minute, Offset: -20 * time.Minute}),
-       g.Entry("filter by entity id", helpers.Args{Input: "entity", Duration: 
25 * time.Minute, Offset: -20 * time.Minute}),
-       g.Entry("filter by several entity ids", helpers.Args{Input: 
"entity_in", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
-       g.Entry("filter by entity id and service id", helpers.Args{Input: 
"entity_service", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
-       g.Entry("without field", helpers.Args{Input: "no_field", Duration: 25 * 
time.Minute, Offset: -20 * time.Minute}),
+       g.Entry("filter by entity id", helpers.Args{Input: "entity", Duration: 
25 * time.Minute, Offset: -20 * time.Minute, DisOrder: true}),
+       g.Entry("filter by several entity ids", helpers.Args{Input: 
"entity_in", Duration: 25 * time.Minute, Offset: -20 * time.Minute, DisOrder: 
true}),
+       g.Entry("filter by entity id and service id", helpers.Args{Input: 
"entity_service", Duration: 25 * time.Minute, Offset: -20 * time.Minute, 
DisOrder: true}),
+       g.Entry("without field", helpers.Args{Input: "no_field", Duration: 25 * 
time.Minute, Offset: -20 * time.Minute, DisOrder: true}),
        g.Entry("invalid logical expression", helpers.Args{Input: 
"err_invalid_le", Duration: 25 * time.Minute, Offset: -20 * time.Minute, 
WantErr: true}),
        g.Entry("linked or expressions", helpers.Args{Input: "linked_or", 
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
        g.Entry("In and not In expressions", helpers.Args{Input: "in", 
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
@@ -74,12 +74,13 @@ var _ = g.DescribeTable("Scanning Measures", verify,
        g.Entry("all_latency", helpers.Args{Input: "all_latency", Duration: 25 
* time.Minute, Offset: -20 * time.Minute}),
        g.Entry("duplicated in a part", helpers.Args{Input: "duplicated_part", 
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
        g.Entry("match a tag belongs to the entity", helpers.Args{Input: 
"entity_match", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
-       g.Entry("all of index mode", helpers.Args{Input: "index_mode_all", 
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
+       g.Entry("all of index mode", helpers.Args{Input: "index_mode_all", 
Duration: 25 * time.Minute, Offset: -20 * time.Minute, DisOrder: true}),
        g.Entry("all of index mode in a larger time range",
-               helpers.Args{Input: "index_mode_all", Want: 
"index_mode_all_xl", Duration: 96 * time.Hour, Offset: -72 * time.Hour}),
-       g.Entry("all in all segments of index mode", helpers.Args{Input: 
"index_mode_all", Want: "index_mode_all_segs", Duration: 96 * time.Hour, 
Offset: -72 * time.Hour}),
+               helpers.Args{Input: "index_mode_all", Want: 
"index_mode_all_xl", Duration: 96 * time.Hour, Offset: -72 * time.Hour, 
DisOrder: true}),
+       g.Entry("all in all segments of index mode",
+               helpers.Args{Input: "index_mode_all", Want: 
"index_mode_all_segs", Duration: 96 * time.Hour, Offset: -72 * time.Hour, 
DisOrder: true}),
        g.Entry("order by desc of index mode", helpers.Args{Input: 
"index_mode_order_desc", Duration: 25 * time.Minute, Offset: -20 * 
time.Minute}),
-       g.Entry("range of index mode", helpers.Args{Input: "index_mode_range", 
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
+       g.Entry("range of index mode", helpers.Args{Input: "index_mode_range", 
Duration: 25 * time.Minute, Offset: -20 * time.Minute, DisOrder: true}),
        g.Entry("none of index mode", helpers.Args{Input: "index_mode_none", 
WantEmpty: true, Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
        g.Entry("query by id in index mode", helpers.Args{Input: 
"index_mode_by_id", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
        g.Entry("multi groups: unchanged", helpers.Args{Input: 
"multi_group_unchanged", Duration: 35 * time.Minute, Offset: -20 * 
time.Minute}),
diff --git 
a/test/integration/distributed/multi_segments/multi_segments_suite_test.go 
b/test/integration/distributed/multi_segments/multi_segments_suite_test.go
new file mode 100644
index 00000000..5a8fa9fe
--- /dev/null
+++ b/test/integration/distributed/multi_segments/multi_segments_suite_test.go
@@ -0,0 +1,161 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package integration_multi_segments_test
+
+import (
+       "context"
+       "fmt"
+       "testing"
+       "time"
+
+       . "github.com/onsi/ginkgo/v2"
+       . "github.com/onsi/gomega"
+       "github.com/onsi/gomega/gleak"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+
+       "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/test/gmatcher"
+       "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+       test_measure "github.com/apache/skywalking-banyandb/pkg/test/measure"
+       "github.com/apache/skywalking-banyandb/pkg/test/setup"
+       test_stream "github.com/apache/skywalking-banyandb/pkg/test/stream"
+       test_trace "github.com/apache/skywalking-banyandb/pkg/test/trace"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+       test_cases "github.com/apache/skywalking-banyandb/test/cases"
+       casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure"
+       casesstream "github.com/apache/skywalking-banyandb/test/cases/stream"
+       casestopn "github.com/apache/skywalking-banyandb/test/cases/topn"
+       casestrace "github.com/apache/skywalking-banyandb/test/cases/trace"
+)
+
+func TestDistributedMultiSegments(t *testing.T) {
+       RegisterFailHandler(Fail)
+       RunSpecs(t, "Distributed Multi Segments Suite")
+}
+
+var (
+       connection *grpc.ClientConn
+       now        time.Time
+       baseTime   time.Time
+       deferFunc  func()
+       goods      []gleak.Goroutine
+)
+
+var _ = SynchronizedBeforeSuite(func() []byte {
+       goods = gleak.Goroutines()
+       Expect(logger.Init(logger.Logging{
+               Env:   "dev",
+               Level: flags.LogLevel,
+       })).To(Succeed())
+
+       By("Starting etcd server")
+       ports, err := test.AllocateFreePorts(2)
+       Expect(err).NotTo(HaveOccurred())
+       dir, spaceDef, err := test.NewSpace()
+       Expect(err).NotTo(HaveOccurred())
+       ep := fmt.Sprintf("http://127.0.0.1:%d";, ports[0])
+       server, err := embeddedetcd.NewServer(
+               embeddedetcd.ConfigureListener([]string{ep}, 
[]string{fmt.Sprintf("http://127.0.0.1:%d";, ports[1])}),
+               embeddedetcd.RootDir(dir),
+               embeddedetcd.AutoCompactionMode("periodic"),
+               embeddedetcd.AutoCompactionRetention("1h"),
+               embeddedetcd.QuotaBackendBytes(2*1024*1024*1024),
+       )
+       Expect(err).ShouldNot(HaveOccurred())
+       <-server.ReadyNotify()
+
+       By("Loading schema")
+       schemaRegistry, err := schema.NewEtcdSchemaRegistry(
+               schema.Namespace(metadata.DefaultNamespace),
+               schema.ConfigureServerEndpoints([]string{ep}),
+       )
+       Expect(err).NotTo(HaveOccurred())
+       defer schemaRegistry.Close()
+       ctx := context.Background()
+       test_stream.PreloadSchema(ctx, schemaRegistry)
+       test_measure.PreloadSchema(ctx, schemaRegistry)
+       test_trace.PreloadSchema(ctx, schemaRegistry)
+
+       By("Starting data node 0")
+       closeDataNode0 := setup.DataNode(ep)
+       By("Starting data node 1")
+       closeDataNode1 := setup.DataNode(ep)
+       By("Starting liaison node")
+       liaisonAddr, closerLiaisonNode := setup.LiaisonNode(ep)
+
+       By("Initializing test cases")
+       ns := timestamp.NowMilli().UnixNano()
+       now = time.Unix(0, ns-ns%int64(time.Minute))
+       baseTime = time.Date(now.Year(), now.Month(), now.Day(), 0o0, 0o2, 0, 
0, now.Location())
+       test_cases.Initialize(liaisonAddr, baseTime)
+
+       deferFunc = func() {
+               closerLiaisonNode()
+               closeDataNode0()
+               closeDataNode1()
+               _ = server.Close()
+               <-server.StopNotify()
+               spaceDef()
+       }
+       return []byte(liaisonAddr)
+}, func(address []byte) {
+       var err error
+       connection, err = grpchelper.Conn(string(address), 10*time.Second,
+               grpc.WithTransportCredentials(insecure.NewCredentials()))
+       casesstream.SharedContext = helpers.SharedContext{
+               Connection: connection,
+               BaseTime:   baseTime,
+       }
+       casesmeasure.SharedContext = helpers.SharedContext{
+               Connection: connection,
+               BaseTime:   baseTime,
+       }
+       casestopn.SharedContext = helpers.SharedContext{
+               Connection: connection,
+               BaseTime:   baseTime,
+       }
+       casestrace.SharedContext = helpers.SharedContext{
+               Connection: connection,
+               BaseTime:   baseTime,
+       }
+       Expect(err).NotTo(HaveOccurred())
+})
+
+var _ = SynchronizedAfterSuite(func() {
+       if connection != nil {
+               Expect(connection.Close()).To(Succeed())
+       }
+}, func() {})
+
+var _ = ReportAfterSuite("Distributed Multi Segments Suite", func(report 
Report) {
+       if report.SuiteSucceeded {
+               if deferFunc != nil {
+                       deferFunc()
+               }
+               Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+               Eventually(pool.AllRefsCount, 
flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef())
+       }
+})
diff --git 
a/test/integration/standalone/multi_segments/multi_segments_suite_test.go 
b/test/integration/standalone/multi_segments/multi_segments_suite_test.go
new file mode 100644
index 00000000..5c8e1c61
--- /dev/null
+++ b/test/integration/standalone/multi_segments/multi_segments_suite_test.go
@@ -0,0 +1,109 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package integration_query_test
+
+import (
+       "testing"
+       "time"
+
+       . "github.com/onsi/ginkgo/v2"
+       . "github.com/onsi/gomega"
+       "github.com/onsi/gomega/gleak"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/test/gmatcher"
+       "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+       "github.com/apache/skywalking-banyandb/pkg/test/setup"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+       test_cases "github.com/apache/skywalking-banyandb/test/cases"
+       casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure"
+       casesstream "github.com/apache/skywalking-banyandb/test/cases/stream"
+       casestopn "github.com/apache/skywalking-banyandb/test/cases/topn"
+       casestrace "github.com/apache/skywalking-banyandb/test/cases/trace"
+       integration_standalone 
"github.com/apache/skywalking-banyandb/test/integration/standalone"
+)
+
+func TestIntegrationMultiSegments(t *testing.T) {
+       RegisterFailHandler(Fail)
+       RunSpecs(t, "Integration Multi Segments Suite", 
Label(integration_standalone.Labels...))
+}
+
+var (
+       connection *grpc.ClientConn
+       now        time.Time
+       baseTime   time.Time
+       deferFunc  func()
+       goods      []gleak.Goroutine
+)
+
+var _ = SynchronizedBeforeSuite(func() []byte {
+       goods = gleak.Goroutines()
+       Expect(logger.Init(logger.Logging{
+               Env:   "dev",
+               Level: flags.LogLevel,
+       })).To(Succeed())
+       var addr string
+       addr, _, deferFunc = setup.Standalone()
+       ns := timestamp.NowMilli().UnixNano()
+       now = time.Unix(0, ns-ns%int64(time.Minute))
+       baseTime = time.Date(now.Year(), now.Month(), now.Day(), 0o0, 0o2, 0, 
0, now.Location())
+       test_cases.Initialize(addr, baseTime)
+       return []byte(addr)
+}, func(address []byte) {
+       var err error
+       connection, err = grpchelper.Conn(string(address), 10*time.Second,
+               grpc.WithTransportCredentials(insecure.NewCredentials()))
+       casesstream.SharedContext = helpers.SharedContext{
+               Connection: connection,
+               BaseTime:   baseTime,
+       }
+       casesmeasure.SharedContext = helpers.SharedContext{
+               Connection: connection,
+               BaseTime:   baseTime,
+       }
+       casestopn.SharedContext = helpers.SharedContext{
+               Connection: connection,
+               BaseTime:   baseTime,
+       }
+       casestrace.SharedContext = helpers.SharedContext{
+               Connection: connection,
+               BaseTime:   baseTime,
+       }
+       Expect(err).NotTo(HaveOccurred())
+})
+
+var _ = SynchronizedAfterSuite(func() {
+       if connection != nil {
+               Expect(connection.Close()).To(Succeed())
+       }
+}, func() {})
+
+var _ = ReportAfterSuite("Integration Query Suite", func(report Report) {
+       if report.SuiteSucceeded {
+               if deferFunc != nil {
+                       deferFunc()
+               }
+               Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+               Eventually(pool.AllRefsCount, 
flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef())
+       }
+})

Reply via email to