This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new a528a5d9 Fix duplicated data points in a writing batch (#536)
a528a5d9 is described below

commit a528a5d99745c8fa978c135b8bda2685c50cda95
Author: Gao Hongtao <[email protected]>
AuthorDate: Fri Sep 13 14:15:10 2024 +0800

    Fix duplicated data points in a writing batch (#536)
---
 banyand/measure/block.go             |   2 +-
 banyand/measure/block_reader_test.go |  32 ++++----
 banyand/measure/merger_policy.go     |   2 +-
 banyand/measure/merger_test.go       |  38 +++++-----
 banyand/measure/part.go              |   4 +-
 banyand/measure/tstable_test.go      | 143 +++++++++++++++++++++++------------
 banyand/stream/block.go              |   2 +-
 banyand/stream/block_reader_test.go  |  38 +++++-----
 banyand/stream/merger_policy.go      |   2 +-
 banyand/stream/merger_test.go        |  32 ++++----
 banyand/stream/tstable_test.go       | 107 +++++++++++++++-----------
 11 files changed, 230 insertions(+), 172 deletions(-)

diff --git a/banyand/measure/block.go b/banyand/measure/block.go
index b95329a1..d6808bd3 100644
--- a/banyand/measure/block.go
+++ b/banyand/measure/block.go
@@ -258,7 +258,7 @@ func (b *block) unmarshalTagFamilyFromSeqReaders(decoder 
*encoding.BytesBlockDec
 func (b *block) uncompressedSizeBytes() uint64 {
        dataPointsCount := uint64(b.Len())
 
-       n := dataPointsCount * 8
+       n := dataPointsCount * (8 + 8) // 8 bytes for timestamp and 8 bytes for 
version
 
        tff := b.tagFamilies
        for i := range tff {
diff --git a/banyand/measure/block_reader_test.go 
b/banyand/measure/block_reader_test.go
index cd576b78..0cd0cf9d 100644
--- a/banyand/measure/block_reader_test.go
+++ b/banyand/measure/block_reader_test.go
@@ -45,40 +45,40 @@ func Test_blockReader_nextBlock(t *testing.T) {
                        name:    "Test with single part",
                        dpsList: []*dataPoints{dpsTS1},
                        want: []blockMetadata{
-                               {seriesID: 1, count: 1, uncompressedSizeBytes: 
1676},
-                               {seriesID: 2, count: 1, uncompressedSizeBytes: 
55},
-                               {seriesID: 3, count: 1, uncompressedSizeBytes: 
24},
+                               {seriesID: 1, count: 1, uncompressedSizeBytes: 
1684},
+                               {seriesID: 2, count: 1, uncompressedSizeBytes: 
63},
+                               {seriesID: 3, count: 1, uncompressedSizeBytes: 
32},
                        },
                },
                {
                        name:    "Test with multiple parts with different ts",
                        dpsList: []*dataPoints{dpsTS1, dpsTS2},
                        want: []blockMetadata{
-                               {seriesID: 1, count: 1, uncompressedSizeBytes: 
1676},
-                               {seriesID: 1, count: 1, uncompressedSizeBytes: 
1676},
-                               {seriesID: 2, count: 1, uncompressedSizeBytes: 
55},
-                               {seriesID: 2, count: 1, uncompressedSizeBytes: 
55},
-                               {seriesID: 3, count: 1, uncompressedSizeBytes: 
24},
-                               {seriesID: 3, count: 1, uncompressedSizeBytes: 
24},
+                               {seriesID: 1, count: 1, uncompressedSizeBytes: 
1684},
+                               {seriesID: 1, count: 1, uncompressedSizeBytes: 
1684},
+                               {seriesID: 2, count: 1, uncompressedSizeBytes: 
63},
+                               {seriesID: 2, count: 1, uncompressedSizeBytes: 
63},
+                               {seriesID: 3, count: 1, uncompressedSizeBytes: 
32},
+                               {seriesID: 3, count: 1, uncompressedSizeBytes: 
32},
                        },
                },
                {
                        name:    "Test with a single part with same ts",
                        dpsList: []*dataPoints{duplicatedDps},
                        want: []blockMetadata{
-                               {seriesID: 1, count: 1, uncompressedSizeBytes: 
24},
+                               {seriesID: 1, count: 1, uncompressedSizeBytes: 
32},
                        },
                },
                {
                        name:    "Test with multiple parts with same ts",
                        dpsList: []*dataPoints{dpsTS1, dpsTS1},
                        want: []blockMetadata{
-                               {seriesID: 1, count: 1, uncompressedSizeBytes: 
1676},
-                               {seriesID: 1, count: 1, uncompressedSizeBytes: 
1676},
-                               {seriesID: 2, count: 1, uncompressedSizeBytes: 
55},
-                               {seriesID: 2, count: 1, uncompressedSizeBytes: 
55},
-                               {seriesID: 3, count: 1, uncompressedSizeBytes: 
24},
-                               {seriesID: 3, count: 1, uncompressedSizeBytes: 
24},
+                               {seriesID: 1, count: 1, uncompressedSizeBytes: 
1684},
+                               {seriesID: 1, count: 1, uncompressedSizeBytes: 
1684},
+                               {seriesID: 2, count: 1, uncompressedSizeBytes: 
63},
+                               {seriesID: 2, count: 1, uncompressedSizeBytes: 
63},
+                               {seriesID: 3, count: 1, uncompressedSizeBytes: 
32},
+                               {seriesID: 3, count: 1, uncompressedSizeBytes: 
32},
                        },
                },
        }
diff --git a/banyand/measure/merger_policy.go b/banyand/measure/merger_policy.go
index 750122ef..d9d07678 100644
--- a/banyand/measure/merger_policy.go
+++ b/banyand/measure/merger_policy.go
@@ -38,7 +38,7 @@ func newDefaultMergePolicy() *mergePolicy {
 }
 
 func newDefaultMergePolicyForTesting() *mergePolicy {
-       return newMergePolicy(4, 1.7, run.Bytes(math.MaxInt64))
+       return newMergePolicy(3, 1, run.Bytes(math.MaxInt64))
 }
 
 // NewMergePolicy creates a MergePolicy with given parameters.
diff --git a/banyand/measure/merger_test.go b/banyand/measure/merger_test.go
index b3983407..b8a8ff50 100644
--- a/banyand/measure/merger_test.go
+++ b/banyand/measure/merger_test.go
@@ -260,43 +260,43 @@ func Test_mergeParts(t *testing.T) {
                        name:    "Test with single part",
                        dpsList: []*dataPoints{dpsTS1},
                        want: []blockMetadata{
-                               {seriesID: 1, count: 1, uncompressedSizeBytes: 
1676},
-                               {seriesID: 2, count: 1, uncompressedSizeBytes: 
55},
-                               {seriesID: 3, count: 1, uncompressedSizeBytes: 
24},
+                               {seriesID: 1, count: 1, uncompressedSizeBytes: 
1684},
+                               {seriesID: 2, count: 1, uncompressedSizeBytes: 
63},
+                               {seriesID: 3, count: 1, uncompressedSizeBytes: 
32},
                        },
                },
                {
                        name:    "Test with multiple parts with different ts",
                        dpsList: []*dataPoints{dpsTS1, dpsTS2, dpsTS2},
                        want: []blockMetadata{
-                               {seriesID: 1, count: 2, uncompressedSizeBytes: 
3352},
-                               {seriesID: 2, count: 2, uncompressedSizeBytes: 
110},
-                               {seriesID: 3, count: 2, uncompressedSizeBytes: 
48},
+                               {seriesID: 1, count: 2, uncompressedSizeBytes: 
3368},
+                               {seriesID: 2, count: 2, uncompressedSizeBytes: 
126},
+                               {seriesID: 3, count: 2, uncompressedSizeBytes: 
64},
                        },
                },
                {
                        name:    "Test with multiple parts with same ts",
                        dpsList: []*dataPoints{dpsTS11, dpsTS1},
                        want: []blockMetadata{
-                               {seriesID: 1, count: 1, uncompressedSizeBytes: 
1676},
-                               {seriesID: 2, count: 1, uncompressedSizeBytes: 
55},
-                               {seriesID: 3, count: 1, uncompressedSizeBytes: 
24},
+                               {seriesID: 1, count: 1, uncompressedSizeBytes: 
1684},
+                               {seriesID: 2, count: 1, uncompressedSizeBytes: 
63},
+                               {seriesID: 3, count: 1, uncompressedSizeBytes: 
32},
                        },
                },
                {
                        name:    "Test with multiple parts with a large 
quantity of different ts",
                        dpsList: []*dataPoints{generateHugeDps(1, 5000, 1), 
generateHugeDps(5001, 10000, 2)},
                        want: []blockMetadata{
-                               {seriesID: 1, count: 1265, 
uncompressedSizeBytes: 2120140},
-                               {seriesID: 1, count: 1265, 
uncompressedSizeBytes: 2120140},
-                               {seriesID: 1, count: 1265, 
uncompressedSizeBytes: 2120140},
-                               {seriesID: 1, count: 2470, 
uncompressedSizeBytes: 4139720},
-                               {seriesID: 1, count: 2470, 
uncompressedSizeBytes: 4139720},
-                               {seriesID: 1, count: 2470, 
uncompressedSizeBytes: 4139720},
-                               {seriesID: 1, count: 2410, 
uncompressedSizeBytes: 4039160},
-                               {seriesID: 1, count: 1205, 
uncompressedSizeBytes: 2019580},
-                               {seriesID: 2, count: 2, uncompressedSizeBytes: 
110},
-                               {seriesID: 3, count: 2, uncompressedSizeBytes: 
48},
+                               {seriesID: 1, count: 1265, 
uncompressedSizeBytes: 2130260},
+                               {seriesID: 1, count: 1265, 
uncompressedSizeBytes: 2130260},
+                               {seriesID: 1, count: 1265, 
uncompressedSizeBytes: 2130260},
+                               {seriesID: 1, count: 2470, 
uncompressedSizeBytes: 4159480},
+                               {seriesID: 1, count: 2470, 
uncompressedSizeBytes: 4159480},
+                               {seriesID: 1, count: 2470, 
uncompressedSizeBytes: 4159480},
+                               {seriesID: 1, count: 2410, 
uncompressedSizeBytes: 4058440},
+                               {seriesID: 1, count: 1205, 
uncompressedSizeBytes: 2029220},
+                               {seriesID: 2, count: 2, uncompressedSizeBytes: 
126},
+                               {seriesID: 3, count: 2, uncompressedSizeBytes: 
64},
                        },
                },
        }
diff --git a/banyand/measure/part.go b/banyand/measure/part.go
index 36a72eb8..1e89b1e9 100644
--- a/banyand/measure/part.go
+++ b/banyand/measure/part.go
@@ -163,16 +163,14 @@ func (mp *memPart) mustInitFromDataPoints(dps 
*dataPoints) {
                                continue
                        }
                        tsPrev = dps.timestamps[i]
-               } else {
-                       tsPrev = 0
                }
 
                if uncompressedBlockSizeBytes >= maxUncompressedBlockSize ||
                        (i-indexPrev) > maxBlockLength || sid != sidPrev {
                        bsw.MustWriteDataPoints(sidPrev, 
dps.timestamps[indexPrev:i], dps.versions[indexPrev:i], 
dps.tagFamilies[indexPrev:i], dps.fields[indexPrev:i])
                        sidPrev = sid
-                       tsPrev = 0
                        indexPrev = i
+                       tsPrev = dps.timestamps[indexPrev]
                        uncompressedBlockSizeBytes = 0
                }
                uncompressedBlockSizeBytes += uncompressedDataPointSizeBytes(i, 
dps)
diff --git a/banyand/measure/tstable_test.go b/banyand/measure/tstable_test.go
index eff1cc28..2f24d5a6 100644
--- a/banyand/measure/tstable_test.go
+++ b/banyand/measure/tstable_test.go
@@ -35,6 +35,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/query/model"
        "github.com/apache/skywalking-banyandb/pkg/run"
        "github.com/apache/skywalking-banyandb/pkg/test"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
        "github.com/apache/skywalking-banyandb/pkg/watcher"
 )
@@ -194,9 +195,9 @@ func Test_tstIter(t *testing.T) {
                                minTimestamp: 1,
                                maxTimestamp: 1,
                                want: []blockMetadata{
-                                       {seriesID: 1, count: 1, 
uncompressedSizeBytes: 1676},
-                                       {seriesID: 2, count: 1, 
uncompressedSizeBytes: 55},
-                                       {seriesID: 3, count: 1, 
uncompressedSizeBytes: 24},
+                                       {seriesID: 1, count: 1, 
uncompressedSizeBytes: 1684},
+                                       {seriesID: 2, count: 1, 
uncompressedSizeBytes: 63},
+                                       {seriesID: 3, count: 1, 
uncompressedSizeBytes: 32},
                                },
                        },
                        {
@@ -206,22 +207,24 @@ func Test_tstIter(t *testing.T) {
                                minTimestamp: 1,
                                maxTimestamp: 2,
                                want: []blockMetadata{
-                                       {seriesID: 1, count: 1, 
uncompressedSizeBytes: 1676},
-                                       {seriesID: 1, count: 1, 
uncompressedSizeBytes: 1676},
-                                       {seriesID: 2, count: 1, 
uncompressedSizeBytes: 55},
-                                       {seriesID: 2, count: 1, 
uncompressedSizeBytes: 55},
-                                       {seriesID: 3, count: 1, 
uncompressedSizeBytes: 24},
-                                       {seriesID: 3, count: 1, 
uncompressedSizeBytes: 24},
+                                       {seriesID: 1, count: 1, 
uncompressedSizeBytes: 1684},
+                                       {seriesID: 1, count: 1, 
uncompressedSizeBytes: 1684},
+                                       {seriesID: 2, count: 1, 
uncompressedSizeBytes: 63},
+                                       {seriesID: 2, count: 1, 
uncompressedSizeBytes: 63},
+                                       {seriesID: 3, count: 1, 
uncompressedSizeBytes: 32},
+                                       {seriesID: 3, count: 1, 
uncompressedSizeBytes: 32},
                                },
                        },
                        {
                                name:         "Test with a single part with 
same ts",
-                               dpsList:      []*dataPoints{duplicatedDps},
-                               sids:         []common.SeriesID{1},
+                               dpsList:      []*dataPoints{duplicatedDps1},
+                               sids:         []common.SeriesID{1, 2, 3},
                                minTimestamp: 1,
                                maxTimestamp: 1,
                                want: []blockMetadata{
-                                       {seriesID: 1, count: 1, 
uncompressedSizeBytes: 24},
+                                       {seriesID: 1, count: 1, 
uncompressedSizeBytes: 16},
+                                       {seriesID: 2, count: 1, 
uncompressedSizeBytes: 16},
+                                       {seriesID: 3, count: 1, 
uncompressedSizeBytes: 16},
                                },
                        },
                        {
@@ -231,12 +234,12 @@ func Test_tstIter(t *testing.T) {
                                minTimestamp: 1,
                                maxTimestamp: 2,
                                want: []blockMetadata{
-                                       {seriesID: 1, count: 1, 
uncompressedSizeBytes: 1676},
-                                       {seriesID: 1, count: 1, 
uncompressedSizeBytes: 1676},
-                                       {seriesID: 2, count: 1, 
uncompressedSizeBytes: 55},
-                                       {seriesID: 2, count: 1, 
uncompressedSizeBytes: 55},
-                                       {seriesID: 3, count: 1, 
uncompressedSizeBytes: 24},
-                                       {seriesID: 3, count: 1, 
uncompressedSizeBytes: 24},
+                                       {seriesID: 1, count: 1, 
uncompressedSizeBytes: 1684},
+                                       {seriesID: 1, count: 1, 
uncompressedSizeBytes: 1684},
+                                       {seriesID: 2, count: 1, 
uncompressedSizeBytes: 63},
+                                       {seriesID: 2, count: 1, 
uncompressedSizeBytes: 63},
+                                       {seriesID: 3, count: 1, 
uncompressedSizeBytes: 32},
+                                       {seriesID: 3, count: 1, 
uncompressedSizeBytes: 32},
                                },
                        },
                }
@@ -280,9 +283,9 @@ func Test_tstIter(t *testing.T) {
                                minTimestamp: 1,
                                maxTimestamp: 1,
                                want: []blockMetadata{
-                                       {seriesID: 1, count: 1, 
uncompressedSizeBytes: 1676},
-                                       {seriesID: 2, count: 1, 
uncompressedSizeBytes: 55},
-                                       {seriesID: 3, count: 1, 
uncompressedSizeBytes: 24},
+                                       {seriesID: 1, count: 1, 
uncompressedSizeBytes: 1684},
+                                       {seriesID: 2, count: 1, 
uncompressedSizeBytes: 63},
+                                       {seriesID: 3, count: 1, 
uncompressedSizeBytes: 32},
                                },
                        },
                        {
@@ -292,9 +295,9 @@ func Test_tstIter(t *testing.T) {
                                minTimestamp: 1,
                                maxTimestamp: 2,
                                want: []blockMetadata{
-                                       {seriesID: 1, count: 2, 
uncompressedSizeBytes: 3352},
-                                       {seriesID: 2, count: 2, 
uncompressedSizeBytes: 110},
-                                       {seriesID: 3, count: 2, 
uncompressedSizeBytes: 48},
+                                       {seriesID: 1, count: 2, 
uncompressedSizeBytes: 3368},
+                                       {seriesID: 2, count: 2, 
uncompressedSizeBytes: 126},
+                                       {seriesID: 3, count: 2, 
uncompressedSizeBytes: 64},
                                },
                        },
                        {
@@ -304,9 +307,9 @@ func Test_tstIter(t *testing.T) {
                                minTimestamp: 1,
                                maxTimestamp: 2,
                                want: []blockMetadata{
-                                       {seriesID: 1, count: 1, 
uncompressedSizeBytes: 1676},
-                                       {seriesID: 2, count: 1, 
uncompressedSizeBytes: 55},
-                                       {seriesID: 3, count: 1, 
uncompressedSizeBytes: 24},
+                                       {seriesID: 1, count: 1, 
uncompressedSizeBytes: 1684},
+                                       {seriesID: 2, count: 1, 
uncompressedSizeBytes: 63},
+                                       {seriesID: 3, count: 1, 
uncompressedSizeBytes: 32},
                                },
                        },
                }
@@ -322,38 +325,52 @@ func Test_tstIter(t *testing.T) {
                                        require.NoError(t, err)
                                        for i, dps := range tt.dpsList {
                                                tst.mustAddDataPoints(dps)
+                                               timeout := 
time.After(flags.EventuallyTimeout) // Set the timeout duration
+                                       OUTER:
                                                for {
-                                                       snp := 
tst.currentSnapshot()
-                                                       if snp == nil {
-                                                               t.Logf("waiting 
for snapshot %d to be introduced", i)
-                                                               time.Sleep(100 
* time.Millisecond)
-                                                               continue
-                                                       }
-                                                       if snp.creator != 
snapshotCreatorMemPart {
+                                                       select {
+                                                       case <-timeout:
+                                                               
t.Fatalf("timeout waiting for snapshot %d to be introduced", i)
+                                                       default:
+                                                               snp := 
tst.currentSnapshot()
+                                                               if snp == nil {
+                                                                       
t.Logf("waiting for snapshot %d to be introduced", i)
+                                                                       
time.Sleep(100 * time.Millisecond)
+                                                                       continue
+                                                               }
+                                                               if snp.creator 
!= snapshotCreatorMemPart {
+                                                                       
snp.decRef()
+                                                                       break 
OUTER
+                                                               }
+                                                               t.Logf("waiting 
for snapshot %d to be flushed or merged: current creator:%d, parts: %+v",
+                                                                       i, 
snp.creator, snp.parts)
                                                                snp.decRef()
-                                                               break
+                                                               time.Sleep(100 
* time.Millisecond)
                                                        }
-                                                       t.Logf("waiting for 
snapshot %d to be flushed or merged: current creator:%d, parts: %+v",
-                                                               i, snp.creator, 
snp.parts)
-                                                       snp.decRef()
-                                                       time.Sleep(100 * 
time.Millisecond)
                                                }
                                        }
                                        // wait until some parts are merged
                                        if len(tt.dpsList) > 0 {
+                                               timeout := 
time.After(flags.EventuallyTimeout) // Set the timeout duration
+                                       OUTER1:
                                                for {
-                                                       snp := 
tst.currentSnapshot()
-                                                       if snp == nil {
-                                                               time.Sleep(100 
* time.Millisecond)
-                                                               continue
-                                                       }
-                                                       if len(snp.parts) == 1 {
+                                                       select {
+                                                       case <-timeout:
+                                                               
t.Fatalf("timeout waiting for snapshot to be merged")
+                                                       default:
+                                                               snp := 
tst.currentSnapshot()
+                                                               if snp == nil {
+                                                                       
time.Sleep(100 * time.Millisecond)
+                                                                       continue
+                                                               }
+                                                               if 
len(snp.parts) == 1 {
+                                                                       
snp.decRef()
+                                                                       break 
OUTER1
+                                                               }
+                                                               t.Logf("waiting 
for snapshot to be merged: current creator:%d, parts: %+v", snp.creator, 
snp.parts)
                                                                snp.decRef()
-                                                               break
+                                                               time.Sleep(100 
* time.Millisecond)
                                                        }
-                                                       t.Logf("waiting for 
snapshot to be merged: current creator:%d, parts: %+v", snp.creator, snp.parts)
-                                                       snp.decRef()
-                                                       time.Sleep(100 * 
time.Millisecond)
                                                }
                                        }
                                        verify(t, tt, tst)
@@ -630,6 +647,34 @@ var duplicatedDps = &dataPoints{
        },
 }
 
+var duplicatedDps1 = &dataPoints{
+       seriesIDs:  []common.SeriesID{2, 2, 2, 1, 1, 1, 3, 3, 3},
+       timestamps: []int64{1, 1, 1, 1, 1, 1, 1, 1, 1},
+       versions:   []int64{1, 2, 3, 3, 2, 1, 2, 1, 3},
+       tagFamilies: [][]nameValues{
+               {},
+               {},
+               {},
+               {},
+               {},
+               {},
+               {},
+               {},
+               {},
+       },
+       fields: []nameValues{
+               {},
+               {},
+               {},
+               {},
+               {},
+               {},
+               {},
+               {},
+               {},
+       },
+}
+
 func generateHugeDps(startTimestamp, endTimestamp, timestamp int64) 
*dataPoints {
        hugeDps := &dataPoints{
                seriesIDs:   []common.SeriesID{},
diff --git a/banyand/stream/block.go b/banyand/stream/block.go
index e6e7b964..3b65e88e 100644
--- a/banyand/stream/block.go
+++ b/banyand/stream/block.go
@@ -233,7 +233,7 @@ func (b *block) unmarshalTagFamilyFromSeqReaders(decoder 
*encoding.BytesBlockDec
 func (b *block) uncompressedSizeBytes() uint64 {
        elementsCount := uint64(b.Len())
 
-       n := elementsCount * 8
+       n := elementsCount * (8 + 8) // 8 bytes for timestamp and 8 bytes for 
elementID
 
        tff := b.tagFamilies
        for i := range tff {
diff --git a/banyand/stream/block_reader_test.go 
b/banyand/stream/block_reader_test.go
index 154fdd21..ae9a3fc1 100644
--- a/banyand/stream/block_reader_test.go
+++ b/banyand/stream/block_reader_test.go
@@ -45,39 +45,39 @@ func Test_blockReader_nextBlock(t *testing.T) {
                        name:   "Test with single part",
                        esList: []*elements{esTS1},
                        want: []blockMetadata{
-                               {seriesID: 1, count: 1, uncompressedSizeBytes: 
881},
-                               {seriesID: 2, count: 1, uncompressedSizeBytes: 
55},
-                               {seriesID: 3, count: 1, uncompressedSizeBytes: 
8},
+                               {seriesID: 1, count: 1, uncompressedSizeBytes: 
889},
+                               {seriesID: 2, count: 1, uncompressedSizeBytes: 
63},
+                               {seriesID: 3, count: 1, uncompressedSizeBytes: 
16},
                        },
                },
                {
                        name:   "Test with multiple parts with different ts",
                        esList: []*elements{esTS1, esTS2},
                        want: []blockMetadata{
-                               {seriesID: 1, count: 1, uncompressedSizeBytes: 
881},
-                               {seriesID: 1, count: 1, uncompressedSizeBytes: 
881},
-                               {seriesID: 2, count: 1, uncompressedSizeBytes: 
55},
-                               {seriesID: 2, count: 1, uncompressedSizeBytes: 
55},
-                               {seriesID: 3, count: 1, uncompressedSizeBytes: 
8},
-                               {seriesID: 3, count: 1, uncompressedSizeBytes: 
8},
+                               {seriesID: 1, count: 1, uncompressedSizeBytes: 
889},
+                               {seriesID: 1, count: 1, uncompressedSizeBytes: 
889},
+                               {seriesID: 2, count: 1, uncompressedSizeBytes: 
63},
+                               {seriesID: 2, count: 1, uncompressedSizeBytes: 
63},
+                               {seriesID: 3, count: 1, uncompressedSizeBytes: 
16},
+                               {seriesID: 3, count: 1, uncompressedSizeBytes: 
16},
                        },
                },
                {
                        name:   "Test with multiple parts with same ts",
                        esList: []*elements{esTS1, esTS1},
                        want: []blockMetadata{
-                               {seriesID: 1, count: 1, uncompressedSizeBytes: 
881},
-                               {seriesID: 1, count: 1, uncompressedSizeBytes: 
881},
-                               {seriesID: 2, count: 1, uncompressedSizeBytes: 
55},
-                               {seriesID: 2, count: 1, uncompressedSizeBytes: 
55},
-                               {seriesID: 3, count: 1, uncompressedSizeBytes: 
8},
-                               {seriesID: 3, count: 1, uncompressedSizeBytes: 
8},
+                               {seriesID: 1, count: 1, uncompressedSizeBytes: 
889},
+                               {seriesID: 1, count: 1, uncompressedSizeBytes: 
889},
+                               {seriesID: 2, count: 1, uncompressedSizeBytes: 
63},
+                               {seriesID: 2, count: 1, uncompressedSizeBytes: 
63},
+                               {seriesID: 3, count: 1, uncompressedSizeBytes: 
16},
+                               {seriesID: 3, count: 1, uncompressedSizeBytes: 
16},
                        },
                },
        }
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
-                       verify := func(pp []*part) {
+                       verify := func(t *testing.T, pp []*part) {
                                var pii []*partMergeIter
                                for _, p := range pp {
                                        pmi := &partMergeIter{}
@@ -116,7 +116,7 @@ func Test_blockReader_nextBlock(t *testing.T) {
                                }
                        }
 
-                       t.Run("memory parts", func(_ *testing.T) {
+                       t.Run("memory parts", func(t *testing.T) {
                                var mpp []*memPart
                                defer func() {
                                        for _, mp := range mpp {
@@ -130,7 +130,7 @@ func Test_blockReader_nextBlock(t *testing.T) {
                                        mp.mustInitFromElements(es)
                                        pp = append(pp, openMemPart(mp))
                                }
-                               verify(pp)
+                               verify(t, pp)
                        })
 
                        t.Run("file parts", func(t *testing.T) {
@@ -158,7 +158,7 @@ func Test_blockReader_nextBlock(t *testing.T) {
                                        fpp = append(fpp, filePW)
                                        pp = append(pp, filePW.p)
                                }
-                               verify(pp)
+                               verify(t, pp)
                        })
                })
        }
diff --git a/banyand/stream/merger_policy.go b/banyand/stream/merger_policy.go
index e694f804..ad881e58 100644
--- a/banyand/stream/merger_policy.go
+++ b/banyand/stream/merger_policy.go
@@ -38,7 +38,7 @@ func newDefaultMergePolicy() *mergePolicy {
 }
 
 func newDefaultMergePolicyForTesting() *mergePolicy {
-       return newMergePolicy(4, 1.7, run.Bytes(math.MaxInt64))
+       return newMergePolicy(3, 1, run.Bytes(math.MaxInt64))
 }
 
 func newDisabledMergePolicyForTesting() *mergePolicy {
diff --git a/banyand/stream/merger_test.go b/banyand/stream/merger_test.go
index 310d164b..3128dece 100644
--- a/banyand/stream/merger_test.go
+++ b/banyand/stream/merger_test.go
@@ -248,40 +248,40 @@ func Test_mergeParts(t *testing.T) {
                        name:   "Test with single part",
                        esList: []*elements{esTS1},
                        want: []blockMetadata{
-                               {seriesID: 1, count: 1, uncompressedSizeBytes: 
881},
-                               {seriesID: 2, count: 1, uncompressedSizeBytes: 
55},
-                               {seriesID: 3, count: 1, uncompressedSizeBytes: 
8},
+                               {seriesID: 1, count: 1, uncompressedSizeBytes: 
889},
+                               {seriesID: 2, count: 1, uncompressedSizeBytes: 
63},
+                               {seriesID: 3, count: 1, uncompressedSizeBytes: 
16},
                        },
                },
                {
                        name:   "Test with multiple parts with different ts",
                        esList: []*elements{esTS1, esTS2, esTS2},
                        want: []blockMetadata{
-                               {seriesID: 1, count: 3, uncompressedSizeBytes: 
2643},
-                               {seriesID: 2, count: 3, uncompressedSizeBytes: 
165},
-                               {seriesID: 3, count: 3, uncompressedSizeBytes: 
24},
+                               {seriesID: 1, count: 3, uncompressedSizeBytes: 
2667},
+                               {seriesID: 2, count: 3, uncompressedSizeBytes: 
189},
+                               {seriesID: 3, count: 3, uncompressedSizeBytes: 
48},
                        },
                },
                {
                        name:   "Test with multiple parts with same ts",
                        esList: []*elements{esTS1, esTS1, esTS1},
                        want: []blockMetadata{
-                               {seriesID: 1, count: 3, uncompressedSizeBytes: 
2643},
-                               {seriesID: 2, count: 3, uncompressedSizeBytes: 
165},
-                               {seriesID: 3, count: 3, uncompressedSizeBytes: 
24},
+                               {seriesID: 1, count: 3, uncompressedSizeBytes: 
2667},
+                               {seriesID: 2, count: 3, uncompressedSizeBytes: 
189},
+                               {seriesID: 3, count: 3, uncompressedSizeBytes: 
48},
                        },
                },
                {
                        name:   "Test with multiple parts with a large quantity 
of different ts",
                        esList: []*elements{generateHugeEs(1, 5000, 1), 
generateHugeEs(5001, 10000, 2)},
                        want: []blockMetadata{
-                               {seriesID: 1, count: 2448, 
uncompressedSizeBytes: 2156688},
-                               {seriesID: 1, count: 2448, 
uncompressedSizeBytes: 2156688},
-                               {seriesID: 1, count: 2552, 
uncompressedSizeBytes: 2248312},
-                               {seriesID: 1, count: 2448, 
uncompressedSizeBytes: 2156688},
-                               {seriesID: 1, count: 104, 
uncompressedSizeBytes: 91624},
-                               {seriesID: 2, count: 2, uncompressedSizeBytes: 
110},
-                               {seriesID: 3, count: 2, uncompressedSizeBytes: 
16},
+                               {seriesID: 1, count: 2448, 
uncompressedSizeBytes: 2176272},
+                               {seriesID: 1, count: 2448, 
uncompressedSizeBytes: 2176272},
+                               {seriesID: 1, count: 2552, 
uncompressedSizeBytes: 2268728},
+                               {seriesID: 1, count: 2448, 
uncompressedSizeBytes: 2176272},
+                               {seriesID: 1, count: 104, 
uncompressedSizeBytes: 92456},
+                               {seriesID: 2, count: 2, uncompressedSizeBytes: 
126},
+                               {seriesID: 3, count: 2, uncompressedSizeBytes: 
32},
                        },
                },
        }
diff --git a/banyand/stream/tstable_test.go b/banyand/stream/tstable_test.go
index c8759b9e..9224e552 100644
--- a/banyand/stream/tstable_test.go
+++ b/banyand/stream/tstable_test.go
@@ -36,6 +36,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/query/model"
        "github.com/apache/skywalking-banyandb/pkg/run"
        "github.com/apache/skywalking-banyandb/pkg/test"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
        "github.com/apache/skywalking-banyandb/pkg/watcher"
 )
@@ -190,9 +191,9 @@ func Test_tstIter(t *testing.T) {
                                minTimestamp: 1,
                                maxTimestamp: 1,
                                want: []blockMetadata{
-                                       {seriesID: 1, count: 1, 
uncompressedSizeBytes: 881},
-                                       {seriesID: 2, count: 1, 
uncompressedSizeBytes: 55},
-                                       {seriesID: 3, count: 1, 
uncompressedSizeBytes: 8},
+                                       {seriesID: 1, count: 1, 
uncompressedSizeBytes: 889},
+                                       {seriesID: 2, count: 1, 
uncompressedSizeBytes: 63},
+                                       {seriesID: 3, count: 1, 
uncompressedSizeBytes: 16},
                                },
                        },
                        {
@@ -202,12 +203,12 @@ func Test_tstIter(t *testing.T) {
                                minTimestamp: 1,
                                maxTimestamp: 2,
                                want: []blockMetadata{
-                                       {seriesID: 1, count: 1, 
uncompressedSizeBytes: 881},
-                                       {seriesID: 1, count: 1, 
uncompressedSizeBytes: 881},
-                                       {seriesID: 2, count: 1, 
uncompressedSizeBytes: 55},
-                                       {seriesID: 2, count: 1, 
uncompressedSizeBytes: 55},
-                                       {seriesID: 3, count: 1, 
uncompressedSizeBytes: 8},
-                                       {seriesID: 3, count: 1, 
uncompressedSizeBytes: 8},
+                                       {seriesID: 1, count: 1, 
uncompressedSizeBytes: 889},
+                                       {seriesID: 1, count: 1, 
uncompressedSizeBytes: 889},
+                                       {seriesID: 2, count: 1, 
uncompressedSizeBytes: 63},
+                                       {seriesID: 2, count: 1, 
uncompressedSizeBytes: 63},
+                                       {seriesID: 3, count: 1, 
uncompressedSizeBytes: 16},
+                                       {seriesID: 3, count: 1, 
uncompressedSizeBytes: 16},
                                },
                        },
                        {
@@ -217,12 +218,12 @@ func Test_tstIter(t *testing.T) {
                                minTimestamp: 1,
                                maxTimestamp: 2,
                                want: []blockMetadata{
-                                       {seriesID: 1, count: 1, 
uncompressedSizeBytes: 881},
-                                       {seriesID: 1, count: 1, 
uncompressedSizeBytes: 881},
-                                       {seriesID: 2, count: 1, 
uncompressedSizeBytes: 55},
-                                       {seriesID: 2, count: 1, 
uncompressedSizeBytes: 55},
-                                       {seriesID: 3, count: 1, 
uncompressedSizeBytes: 8},
-                                       {seriesID: 3, count: 1, 
uncompressedSizeBytes: 8},
+                                       {seriesID: 1, count: 1, 
uncompressedSizeBytes: 889},
+                                       {seriesID: 1, count: 1, 
uncompressedSizeBytes: 889},
+                                       {seriesID: 2, count: 1, 
uncompressedSizeBytes: 63},
+                                       {seriesID: 2, count: 1, 
uncompressedSizeBytes: 63},
+                                       {seriesID: 3, count: 1, 
uncompressedSizeBytes: 16},
+                                       {seriesID: 3, count: 1, 
uncompressedSizeBytes: 16},
                                },
                        },
                }
@@ -268,9 +269,9 @@ func Test_tstIter(t *testing.T) {
                                minTimestamp: 1,
                                maxTimestamp: 1,
                                want: []blockMetadata{
-                                       {seriesID: 1, count: 1, 
uncompressedSizeBytes: 881},
-                                       {seriesID: 2, count: 1, 
uncompressedSizeBytes: 55},
-                                       {seriesID: 3, count: 1, 
uncompressedSizeBytes: 8},
+                                       {seriesID: 1, count: 1, 
uncompressedSizeBytes: 889},
+                                       {seriesID: 2, count: 1, 
uncompressedSizeBytes: 63},
+                                       {seriesID: 3, count: 1, 
uncompressedSizeBytes: 16},
                                },
                        },
                        {
@@ -280,9 +281,9 @@ func Test_tstIter(t *testing.T) {
                                minTimestamp: 1,
                                maxTimestamp: 2,
                                want: []blockMetadata{
-                                       {seriesID: 1, count: 3, 
uncompressedSizeBytes: 2643},
-                                       {seriesID: 2, count: 3, 
uncompressedSizeBytes: 165},
-                                       {seriesID: 3, count: 3, 
uncompressedSizeBytes: 24},
+                                       {seriesID: 1, count: 3, 
uncompressedSizeBytes: 2667},
+                                       {seriesID: 2, count: 3, 
uncompressedSizeBytes: 189},
+                                       {seriesID: 3, count: 3, 
uncompressedSizeBytes: 48},
                                },
                        },
                        {
@@ -292,9 +293,9 @@ func Test_tstIter(t *testing.T) {
                                minTimestamp: 1,
                                maxTimestamp: 2,
                                want: []blockMetadata{
-                                       {seriesID: 1, count: 2, 
uncompressedSizeBytes: 1762},
-                                       {seriesID: 2, count: 2, 
uncompressedSizeBytes: 110},
-                                       {seriesID: 3, count: 2, 
uncompressedSizeBytes: 16},
+                                       {seriesID: 1, count: 2, 
uncompressedSizeBytes: 1778},
+                                       {seriesID: 2, count: 2, 
uncompressedSizeBytes: 126},
+                                       {seriesID: 3, count: 2, 
uncompressedSizeBytes: 32},
                                },
                        },
                }
@@ -310,38 +311,52 @@ func Test_tstIter(t *testing.T) {
                                        require.NoError(t, err)
                                        for i, es := range tt.esList {
                                                tst.mustAddElements(es)
+                                               timeout := 
time.After(flags.EventuallyTimeout) // Set the timeout duration
+                                       OUTER:
                                                for {
-                                                       snp := 
tst.currentSnapshot()
-                                                       if snp == nil {
-                                                               t.Logf("waiting 
for snapshot %d to be introduced", i)
-                                                               time.Sleep(100 
* time.Millisecond)
-                                                               continue
-                                                       }
-                                                       if snp.creator != 
snapshotCreatorMemPart {
+                                                       select {
+                                                       case <-timeout:
+                                                               
t.Fatalf("timeout waiting for snapshot %d to be introduced", i)
+                                                       default:
+                                                               snp := 
tst.currentSnapshot()
+                                                               if snp == nil {
+                                                                       
t.Logf("waiting for snapshot %d to be introduced", i)
+                                                                       
time.Sleep(100 * time.Millisecond)
+                                                                       continue
+                                                               }
+                                                               if snp.creator 
!= snapshotCreatorMemPart {
+                                                                       
snp.decRef()
+                                                                       break 
OUTER
+                                                               }
+                                                               t.Logf("waiting 
for snapshot %d to be flushed or merged: current creator:%d, parts: %+v",
+                                                                       i, 
snp.creator, snp.parts)
                                                                snp.decRef()
-                                                               break
+                                                               time.Sleep(100 
* time.Millisecond)
                                                        }
-                                                       t.Logf("waiting for 
snapshot %d to be flushed or merged: current creator:%d, parts: %+v",
-                                                               i, snp.creator, 
snp.parts)
-                                                       snp.decRef()
-                                                       time.Sleep(100 * 
time.Millisecond)
                                                }
                                        }
                                        // wait until some parts are merged
                                        if len(tt.esList) > 0 {
+                                               timeout := 
time.After(flags.EventuallyTimeout) // Set the timeout duration
+                                       OUTER1:
                                                for {
-                                                       snp := 
tst.currentSnapshot()
-                                                       if snp == nil {
-                                                               time.Sleep(100 
* time.Millisecond)
-                                                               continue
-                                                       }
-                                                       if len(snp.parts) == 1 
|| len(snp.parts) < len(tt.esList) {
+                                                       select {
+                                                       case <-timeout:
+                                                               
t.Fatalf("timeout waiting for snapshot to be merged")
+                                                       default:
+                                                               snp := 
tst.currentSnapshot()
+                                                               if snp == nil {
+                                                                       
time.Sleep(100 * time.Millisecond)
+                                                                       continue
+                                                               }
+                                                               if 
len(snp.parts) == 1 || len(snp.parts) < len(tt.esList) {
+                                                                       
snp.decRef()
+                                                                       break 
OUTER1
+                                                               }
+                                                               t.Logf("waiting 
for snapshot to be merged: current creator:%d, parts: %+v", snp.creator, 
snp.parts)
                                                                snp.decRef()
-                                                               break
+                                                               time.Sleep(100 
* time.Millisecond)
                                                        }
-                                                       t.Logf("waiting for 
snapshot to be merged: current creator:%d, parts: %+v", snp.creator, snp.parts)
-                                                       snp.decRef()
-                                                       time.Sleep(100 * 
time.Millisecond)
                                                }
                                        }
                                        verify(t, tt, tst)

Reply via email to