This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new a528a5d9 Fix duplicated data points in a writing batch (#536)
a528a5d9 is described below
commit a528a5d99745c8fa978c135b8bda2685c50cda95
Author: Gao Hongtao <[email protected]>
AuthorDate: Fri Sep 13 14:15:10 2024 +0800
Fix duplicated data points in a writing batch (#536)
---
banyand/measure/block.go | 2 +-
banyand/measure/block_reader_test.go | 32 ++++----
banyand/measure/merger_policy.go | 2 +-
banyand/measure/merger_test.go | 38 +++++-----
banyand/measure/part.go | 4 +-
banyand/measure/tstable_test.go | 143 +++++++++++++++++++++++------------
banyand/stream/block.go | 2 +-
banyand/stream/block_reader_test.go | 38 +++++-----
banyand/stream/merger_policy.go | 2 +-
banyand/stream/merger_test.go | 32 ++++----
banyand/stream/tstable_test.go | 107 +++++++++++++++-----------
11 files changed, 230 insertions(+), 172 deletions(-)
diff --git a/banyand/measure/block.go b/banyand/measure/block.go
index b95329a1..d6808bd3 100644
--- a/banyand/measure/block.go
+++ b/banyand/measure/block.go
@@ -258,7 +258,7 @@ func (b *block) unmarshalTagFamilyFromSeqReaders(decoder
*encoding.BytesBlockDec
func (b *block) uncompressedSizeBytes() uint64 {
dataPointsCount := uint64(b.Len())
- n := dataPointsCount * 8
+ n := dataPointsCount * (8 + 8) // 8 bytes for timestamp and 8 bytes for
version
tff := b.tagFamilies
for i := range tff {
diff --git a/banyand/measure/block_reader_test.go
b/banyand/measure/block_reader_test.go
index cd576b78..0cd0cf9d 100644
--- a/banyand/measure/block_reader_test.go
+++ b/banyand/measure/block_reader_test.go
@@ -45,40 +45,40 @@ func Test_blockReader_nextBlock(t *testing.T) {
name: "Test with single part",
dpsList: []*dataPoints{dpsTS1},
want: []blockMetadata{
- {seriesID: 1, count: 1, uncompressedSizeBytes:
1676},
- {seriesID: 2, count: 1, uncompressedSizeBytes:
55},
- {seriesID: 3, count: 1, uncompressedSizeBytes:
24},
+ {seriesID: 1, count: 1, uncompressedSizeBytes:
1684},
+ {seriesID: 2, count: 1, uncompressedSizeBytes:
63},
+ {seriesID: 3, count: 1, uncompressedSizeBytes:
32},
},
},
{
name: "Test with multiple parts with different ts",
dpsList: []*dataPoints{dpsTS1, dpsTS2},
want: []blockMetadata{
- {seriesID: 1, count: 1, uncompressedSizeBytes:
1676},
- {seriesID: 1, count: 1, uncompressedSizeBytes:
1676},
- {seriesID: 2, count: 1, uncompressedSizeBytes:
55},
- {seriesID: 2, count: 1, uncompressedSizeBytes:
55},
- {seriesID: 3, count: 1, uncompressedSizeBytes:
24},
- {seriesID: 3, count: 1, uncompressedSizeBytes:
24},
+ {seriesID: 1, count: 1, uncompressedSizeBytes:
1684},
+ {seriesID: 1, count: 1, uncompressedSizeBytes:
1684},
+ {seriesID: 2, count: 1, uncompressedSizeBytes:
63},
+ {seriesID: 2, count: 1, uncompressedSizeBytes:
63},
+ {seriesID: 3, count: 1, uncompressedSizeBytes:
32},
+ {seriesID: 3, count: 1, uncompressedSizeBytes:
32},
},
},
{
name: "Test with a single part with same ts",
dpsList: []*dataPoints{duplicatedDps},
want: []blockMetadata{
- {seriesID: 1, count: 1, uncompressedSizeBytes:
24},
+ {seriesID: 1, count: 1, uncompressedSizeBytes:
32},
},
},
{
name: "Test with multiple parts with same ts",
dpsList: []*dataPoints{dpsTS1, dpsTS1},
want: []blockMetadata{
- {seriesID: 1, count: 1, uncompressedSizeBytes:
1676},
- {seriesID: 1, count: 1, uncompressedSizeBytes:
1676},
- {seriesID: 2, count: 1, uncompressedSizeBytes:
55},
- {seriesID: 2, count: 1, uncompressedSizeBytes:
55},
- {seriesID: 3, count: 1, uncompressedSizeBytes:
24},
- {seriesID: 3, count: 1, uncompressedSizeBytes:
24},
+ {seriesID: 1, count: 1, uncompressedSizeBytes:
1684},
+ {seriesID: 1, count: 1, uncompressedSizeBytes:
1684},
+ {seriesID: 2, count: 1, uncompressedSizeBytes:
63},
+ {seriesID: 2, count: 1, uncompressedSizeBytes:
63},
+ {seriesID: 3, count: 1, uncompressedSizeBytes:
32},
+ {seriesID: 3, count: 1, uncompressedSizeBytes:
32},
},
},
}
diff --git a/banyand/measure/merger_policy.go b/banyand/measure/merger_policy.go
index 750122ef..d9d07678 100644
--- a/banyand/measure/merger_policy.go
+++ b/banyand/measure/merger_policy.go
@@ -38,7 +38,7 @@ func newDefaultMergePolicy() *mergePolicy {
}
func newDefaultMergePolicyForTesting() *mergePolicy {
- return newMergePolicy(4, 1.7, run.Bytes(math.MaxInt64))
+ return newMergePolicy(3, 1, run.Bytes(math.MaxInt64))
}
// NewMergePolicy creates a MergePolicy with given parameters.
diff --git a/banyand/measure/merger_test.go b/banyand/measure/merger_test.go
index b3983407..b8a8ff50 100644
--- a/banyand/measure/merger_test.go
+++ b/banyand/measure/merger_test.go
@@ -260,43 +260,43 @@ func Test_mergeParts(t *testing.T) {
name: "Test with single part",
dpsList: []*dataPoints{dpsTS1},
want: []blockMetadata{
- {seriesID: 1, count: 1, uncompressedSizeBytes:
1676},
- {seriesID: 2, count: 1, uncompressedSizeBytes:
55},
- {seriesID: 3, count: 1, uncompressedSizeBytes:
24},
+ {seriesID: 1, count: 1, uncompressedSizeBytes:
1684},
+ {seriesID: 2, count: 1, uncompressedSizeBytes:
63},
+ {seriesID: 3, count: 1, uncompressedSizeBytes:
32},
},
},
{
name: "Test with multiple parts with different ts",
dpsList: []*dataPoints{dpsTS1, dpsTS2, dpsTS2},
want: []blockMetadata{
- {seriesID: 1, count: 2, uncompressedSizeBytes:
3352},
- {seriesID: 2, count: 2, uncompressedSizeBytes:
110},
- {seriesID: 3, count: 2, uncompressedSizeBytes:
48},
+ {seriesID: 1, count: 2, uncompressedSizeBytes:
3368},
+ {seriesID: 2, count: 2, uncompressedSizeBytes:
126},
+ {seriesID: 3, count: 2, uncompressedSizeBytes:
64},
},
},
{
name: "Test with multiple parts with same ts",
dpsList: []*dataPoints{dpsTS11, dpsTS1},
want: []blockMetadata{
- {seriesID: 1, count: 1, uncompressedSizeBytes:
1676},
- {seriesID: 2, count: 1, uncompressedSizeBytes:
55},
- {seriesID: 3, count: 1, uncompressedSizeBytes:
24},
+ {seriesID: 1, count: 1, uncompressedSizeBytes:
1684},
+ {seriesID: 2, count: 1, uncompressedSizeBytes:
63},
+ {seriesID: 3, count: 1, uncompressedSizeBytes:
32},
},
},
{
name: "Test with multiple parts with a large
quantity of different ts",
dpsList: []*dataPoints{generateHugeDps(1, 5000, 1),
generateHugeDps(5001, 10000, 2)},
want: []blockMetadata{
- {seriesID: 1, count: 1265,
uncompressedSizeBytes: 2120140},
- {seriesID: 1, count: 1265,
uncompressedSizeBytes: 2120140},
- {seriesID: 1, count: 1265,
uncompressedSizeBytes: 2120140},
- {seriesID: 1, count: 2470,
uncompressedSizeBytes: 4139720},
- {seriesID: 1, count: 2470,
uncompressedSizeBytes: 4139720},
- {seriesID: 1, count: 2470,
uncompressedSizeBytes: 4139720},
- {seriesID: 1, count: 2410,
uncompressedSizeBytes: 4039160},
- {seriesID: 1, count: 1205,
uncompressedSizeBytes: 2019580},
- {seriesID: 2, count: 2, uncompressedSizeBytes:
110},
- {seriesID: 3, count: 2, uncompressedSizeBytes:
48},
+ {seriesID: 1, count: 1265,
uncompressedSizeBytes: 2130260},
+ {seriesID: 1, count: 1265,
uncompressedSizeBytes: 2130260},
+ {seriesID: 1, count: 1265,
uncompressedSizeBytes: 2130260},
+ {seriesID: 1, count: 2470,
uncompressedSizeBytes: 4159480},
+ {seriesID: 1, count: 2470,
uncompressedSizeBytes: 4159480},
+ {seriesID: 1, count: 2470,
uncompressedSizeBytes: 4159480},
+ {seriesID: 1, count: 2410,
uncompressedSizeBytes: 4058440},
+ {seriesID: 1, count: 1205,
uncompressedSizeBytes: 2029220},
+ {seriesID: 2, count: 2, uncompressedSizeBytes:
126},
+ {seriesID: 3, count: 2, uncompressedSizeBytes:
64},
},
},
}
diff --git a/banyand/measure/part.go b/banyand/measure/part.go
index 36a72eb8..1e89b1e9 100644
--- a/banyand/measure/part.go
+++ b/banyand/measure/part.go
@@ -163,16 +163,14 @@ func (mp *memPart) mustInitFromDataPoints(dps
*dataPoints) {
continue
}
tsPrev = dps.timestamps[i]
- } else {
- tsPrev = 0
}
if uncompressedBlockSizeBytes >= maxUncompressedBlockSize ||
(i-indexPrev) > maxBlockLength || sid != sidPrev {
bsw.MustWriteDataPoints(sidPrev,
dps.timestamps[indexPrev:i], dps.versions[indexPrev:i],
dps.tagFamilies[indexPrev:i], dps.fields[indexPrev:i])
sidPrev = sid
- tsPrev = 0
indexPrev = i
+ tsPrev = dps.timestamps[indexPrev]
uncompressedBlockSizeBytes = 0
}
uncompressedBlockSizeBytes += uncompressedDataPointSizeBytes(i,
dps)
diff --git a/banyand/measure/tstable_test.go b/banyand/measure/tstable_test.go
index eff1cc28..2f24d5a6 100644
--- a/banyand/measure/tstable_test.go
+++ b/banyand/measure/tstable_test.go
@@ -35,6 +35,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/query/model"
"github.com/apache/skywalking-banyandb/pkg/run"
"github.com/apache/skywalking-banyandb/pkg/test"
+ "github.com/apache/skywalking-banyandb/pkg/test/flags"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
"github.com/apache/skywalking-banyandb/pkg/watcher"
)
@@ -194,9 +195,9 @@ func Test_tstIter(t *testing.T) {
minTimestamp: 1,
maxTimestamp: 1,
want: []blockMetadata{
- {seriesID: 1, count: 1,
uncompressedSizeBytes: 1676},
- {seriesID: 2, count: 1,
uncompressedSizeBytes: 55},
- {seriesID: 3, count: 1,
uncompressedSizeBytes: 24},
+ {seriesID: 1, count: 1,
uncompressedSizeBytes: 1684},
+ {seriesID: 2, count: 1,
uncompressedSizeBytes: 63},
+ {seriesID: 3, count: 1,
uncompressedSizeBytes: 32},
},
},
{
@@ -206,22 +207,24 @@ func Test_tstIter(t *testing.T) {
minTimestamp: 1,
maxTimestamp: 2,
want: []blockMetadata{
- {seriesID: 1, count: 1,
uncompressedSizeBytes: 1676},
- {seriesID: 1, count: 1,
uncompressedSizeBytes: 1676},
- {seriesID: 2, count: 1,
uncompressedSizeBytes: 55},
- {seriesID: 2, count: 1,
uncompressedSizeBytes: 55},
- {seriesID: 3, count: 1,
uncompressedSizeBytes: 24},
- {seriesID: 3, count: 1,
uncompressedSizeBytes: 24},
+ {seriesID: 1, count: 1,
uncompressedSizeBytes: 1684},
+ {seriesID: 1, count: 1,
uncompressedSizeBytes: 1684},
+ {seriesID: 2, count: 1,
uncompressedSizeBytes: 63},
+ {seriesID: 2, count: 1,
uncompressedSizeBytes: 63},
+ {seriesID: 3, count: 1,
uncompressedSizeBytes: 32},
+ {seriesID: 3, count: 1,
uncompressedSizeBytes: 32},
},
},
{
name: "Test with a single part with
same ts",
- dpsList: []*dataPoints{duplicatedDps},
- sids: []common.SeriesID{1},
+ dpsList: []*dataPoints{duplicatedDps1},
+ sids: []common.SeriesID{1, 2, 3},
minTimestamp: 1,
maxTimestamp: 1,
want: []blockMetadata{
- {seriesID: 1, count: 1,
uncompressedSizeBytes: 24},
+ {seriesID: 1, count: 1,
uncompressedSizeBytes: 16},
+ {seriesID: 2, count: 1,
uncompressedSizeBytes: 16},
+ {seriesID: 3, count: 1,
uncompressedSizeBytes: 16},
},
},
{
@@ -231,12 +234,12 @@ func Test_tstIter(t *testing.T) {
minTimestamp: 1,
maxTimestamp: 2,
want: []blockMetadata{
- {seriesID: 1, count: 1,
uncompressedSizeBytes: 1676},
- {seriesID: 1, count: 1,
uncompressedSizeBytes: 1676},
- {seriesID: 2, count: 1,
uncompressedSizeBytes: 55},
- {seriesID: 2, count: 1,
uncompressedSizeBytes: 55},
- {seriesID: 3, count: 1,
uncompressedSizeBytes: 24},
- {seriesID: 3, count: 1,
uncompressedSizeBytes: 24},
+ {seriesID: 1, count: 1,
uncompressedSizeBytes: 1684},
+ {seriesID: 1, count: 1,
uncompressedSizeBytes: 1684},
+ {seriesID: 2, count: 1,
uncompressedSizeBytes: 63},
+ {seriesID: 2, count: 1,
uncompressedSizeBytes: 63},
+ {seriesID: 3, count: 1,
uncompressedSizeBytes: 32},
+ {seriesID: 3, count: 1,
uncompressedSizeBytes: 32},
},
},
}
@@ -280,9 +283,9 @@ func Test_tstIter(t *testing.T) {
minTimestamp: 1,
maxTimestamp: 1,
want: []blockMetadata{
- {seriesID: 1, count: 1,
uncompressedSizeBytes: 1676},
- {seriesID: 2, count: 1,
uncompressedSizeBytes: 55},
- {seriesID: 3, count: 1,
uncompressedSizeBytes: 24},
+ {seriesID: 1, count: 1,
uncompressedSizeBytes: 1684},
+ {seriesID: 2, count: 1,
uncompressedSizeBytes: 63},
+ {seriesID: 3, count: 1,
uncompressedSizeBytes: 32},
},
},
{
@@ -292,9 +295,9 @@ func Test_tstIter(t *testing.T) {
minTimestamp: 1,
maxTimestamp: 2,
want: []blockMetadata{
- {seriesID: 1, count: 2,
uncompressedSizeBytes: 3352},
- {seriesID: 2, count: 2,
uncompressedSizeBytes: 110},
- {seriesID: 3, count: 2,
uncompressedSizeBytes: 48},
+ {seriesID: 1, count: 2,
uncompressedSizeBytes: 3368},
+ {seriesID: 2, count: 2,
uncompressedSizeBytes: 126},
+ {seriesID: 3, count: 2,
uncompressedSizeBytes: 64},
},
},
{
@@ -304,9 +307,9 @@ func Test_tstIter(t *testing.T) {
minTimestamp: 1,
maxTimestamp: 2,
want: []blockMetadata{
- {seriesID: 1, count: 1,
uncompressedSizeBytes: 1676},
- {seriesID: 2, count: 1,
uncompressedSizeBytes: 55},
- {seriesID: 3, count: 1,
uncompressedSizeBytes: 24},
+ {seriesID: 1, count: 1,
uncompressedSizeBytes: 1684},
+ {seriesID: 2, count: 1,
uncompressedSizeBytes: 63},
+ {seriesID: 3, count: 1,
uncompressedSizeBytes: 32},
},
},
}
@@ -322,38 +325,52 @@ func Test_tstIter(t *testing.T) {
require.NoError(t, err)
for i, dps := range tt.dpsList {
tst.mustAddDataPoints(dps)
+ timeout :=
time.After(flags.EventuallyTimeout) // Set the timeout duration
+ OUTER:
for {
- snp :=
tst.currentSnapshot()
- if snp == nil {
- t.Logf("waiting
for snapshot %d to be introduced", i)
- time.Sleep(100
* time.Millisecond)
- continue
- }
- if snp.creator !=
snapshotCreatorMemPart {
+ select {
+ case <-timeout:
+
t.Fatalf("timeout waiting for snapshot %d to be introduced", i)
+ default:
+ snp :=
tst.currentSnapshot()
+ if snp == nil {
+
t.Logf("waiting for snapshot %d to be introduced", i)
+
time.Sleep(100 * time.Millisecond)
+ continue
+ }
+ if snp.creator
!= snapshotCreatorMemPart {
+
snp.decRef()
+ break
OUTER
+ }
+ t.Logf("waiting
for snapshot %d to be flushed or merged: current creator:%d, parts: %+v",
+ i,
snp.creator, snp.parts)
snp.decRef()
- break
+ time.Sleep(100
* time.Millisecond)
}
- t.Logf("waiting for
snapshot %d to be flushed or merged: current creator:%d, parts: %+v",
- i, snp.creator,
snp.parts)
- snp.decRef()
- time.Sleep(100 *
time.Millisecond)
}
}
// wait until some parts are merged
if len(tt.dpsList) > 0 {
+ timeout :=
time.After(flags.EventuallyTimeout) // Set the timeout duration
+ OUTER1:
for {
- snp :=
tst.currentSnapshot()
- if snp == nil {
- time.Sleep(100
* time.Millisecond)
- continue
- }
- if len(snp.parts) == 1 {
+ select {
+ case <-timeout:
+
t.Fatalf("timeout waiting for snapshot to be merged")
+ default:
+ snp :=
tst.currentSnapshot()
+ if snp == nil {
+
time.Sleep(100 * time.Millisecond)
+ continue
+ }
+ if
len(snp.parts) == 1 {
+
snp.decRef()
+ break
OUTER1
+ }
+ t.Logf("waiting
for snapshot to be merged: current creator:%d, parts: %+v", snp.creator,
snp.parts)
snp.decRef()
- break
+ time.Sleep(100
* time.Millisecond)
}
- t.Logf("waiting for
snapshot to be merged: current creator:%d, parts: %+v", snp.creator, snp.parts)
- snp.decRef()
- time.Sleep(100 *
time.Millisecond)
}
}
verify(t, tt, tst)
@@ -630,6 +647,34 @@ var duplicatedDps = &dataPoints{
},
}
+var duplicatedDps1 = &dataPoints{
+ seriesIDs: []common.SeriesID{2, 2, 2, 1, 1, 1, 3, 3, 3},
+ timestamps: []int64{1, 1, 1, 1, 1, 1, 1, 1, 1},
+ versions: []int64{1, 2, 3, 3, 2, 1, 2, 1, 3},
+ tagFamilies: [][]nameValues{
+ {},
+ {},
+ {},
+ {},
+ {},
+ {},
+ {},
+ {},
+ {},
+ },
+ fields: []nameValues{
+ {},
+ {},
+ {},
+ {},
+ {},
+ {},
+ {},
+ {},
+ {},
+ },
+}
+
func generateHugeDps(startTimestamp, endTimestamp, timestamp int64)
*dataPoints {
hugeDps := &dataPoints{
seriesIDs: []common.SeriesID{},
diff --git a/banyand/stream/block.go b/banyand/stream/block.go
index e6e7b964..3b65e88e 100644
--- a/banyand/stream/block.go
+++ b/banyand/stream/block.go
@@ -233,7 +233,7 @@ func (b *block) unmarshalTagFamilyFromSeqReaders(decoder
*encoding.BytesBlockDec
func (b *block) uncompressedSizeBytes() uint64 {
elementsCount := uint64(b.Len())
- n := elementsCount * 8
+ n := elementsCount * (8 + 8) // 8 bytes for timestamp and 8 bytes for
elementID
tff := b.tagFamilies
for i := range tff {
diff --git a/banyand/stream/block_reader_test.go
b/banyand/stream/block_reader_test.go
index 154fdd21..ae9a3fc1 100644
--- a/banyand/stream/block_reader_test.go
+++ b/banyand/stream/block_reader_test.go
@@ -45,39 +45,39 @@ func Test_blockReader_nextBlock(t *testing.T) {
name: "Test with single part",
esList: []*elements{esTS1},
want: []blockMetadata{
- {seriesID: 1, count: 1, uncompressedSizeBytes:
881},
- {seriesID: 2, count: 1, uncompressedSizeBytes:
55},
- {seriesID: 3, count: 1, uncompressedSizeBytes:
8},
+ {seriesID: 1, count: 1, uncompressedSizeBytes:
889},
+ {seriesID: 2, count: 1, uncompressedSizeBytes:
63},
+ {seriesID: 3, count: 1, uncompressedSizeBytes:
16},
},
},
{
name: "Test with multiple parts with different ts",
esList: []*elements{esTS1, esTS2},
want: []blockMetadata{
- {seriesID: 1, count: 1, uncompressedSizeBytes:
881},
- {seriesID: 1, count: 1, uncompressedSizeBytes:
881},
- {seriesID: 2, count: 1, uncompressedSizeBytes:
55},
- {seriesID: 2, count: 1, uncompressedSizeBytes:
55},
- {seriesID: 3, count: 1, uncompressedSizeBytes:
8},
- {seriesID: 3, count: 1, uncompressedSizeBytes:
8},
+ {seriesID: 1, count: 1, uncompressedSizeBytes:
889},
+ {seriesID: 1, count: 1, uncompressedSizeBytes:
889},
+ {seriesID: 2, count: 1, uncompressedSizeBytes:
63},
+ {seriesID: 2, count: 1, uncompressedSizeBytes:
63},
+ {seriesID: 3, count: 1, uncompressedSizeBytes:
16},
+ {seriesID: 3, count: 1, uncompressedSizeBytes:
16},
},
},
{
name: "Test with multiple parts with same ts",
esList: []*elements{esTS1, esTS1},
want: []blockMetadata{
- {seriesID: 1, count: 1, uncompressedSizeBytes:
881},
- {seriesID: 1, count: 1, uncompressedSizeBytes:
881},
- {seriesID: 2, count: 1, uncompressedSizeBytes:
55},
- {seriesID: 2, count: 1, uncompressedSizeBytes:
55},
- {seriesID: 3, count: 1, uncompressedSizeBytes:
8},
- {seriesID: 3, count: 1, uncompressedSizeBytes:
8},
+ {seriesID: 1, count: 1, uncompressedSizeBytes:
889},
+ {seriesID: 1, count: 1, uncompressedSizeBytes:
889},
+ {seriesID: 2, count: 1, uncompressedSizeBytes:
63},
+ {seriesID: 2, count: 1, uncompressedSizeBytes:
63},
+ {seriesID: 3, count: 1, uncompressedSizeBytes:
16},
+ {seriesID: 3, count: 1, uncompressedSizeBytes:
16},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- verify := func(pp []*part) {
+ verify := func(t *testing.T, pp []*part) {
var pii []*partMergeIter
for _, p := range pp {
pmi := &partMergeIter{}
@@ -116,7 +116,7 @@ func Test_blockReader_nextBlock(t *testing.T) {
}
}
- t.Run("memory parts", func(_ *testing.T) {
+ t.Run("memory parts", func(t *testing.T) {
var mpp []*memPart
defer func() {
for _, mp := range mpp {
@@ -130,7 +130,7 @@ func Test_blockReader_nextBlock(t *testing.T) {
mp.mustInitFromElements(es)
pp = append(pp, openMemPart(mp))
}
- verify(pp)
+ verify(t, pp)
})
t.Run("file parts", func(t *testing.T) {
@@ -158,7 +158,7 @@ func Test_blockReader_nextBlock(t *testing.T) {
fpp = append(fpp, filePW)
pp = append(pp, filePW.p)
}
- verify(pp)
+ verify(t, pp)
})
})
}
diff --git a/banyand/stream/merger_policy.go b/banyand/stream/merger_policy.go
index e694f804..ad881e58 100644
--- a/banyand/stream/merger_policy.go
+++ b/banyand/stream/merger_policy.go
@@ -38,7 +38,7 @@ func newDefaultMergePolicy() *mergePolicy {
}
func newDefaultMergePolicyForTesting() *mergePolicy {
- return newMergePolicy(4, 1.7, run.Bytes(math.MaxInt64))
+ return newMergePolicy(3, 1, run.Bytes(math.MaxInt64))
}
func newDisabledMergePolicyForTesting() *mergePolicy {
diff --git a/banyand/stream/merger_test.go b/banyand/stream/merger_test.go
index 310d164b..3128dece 100644
--- a/banyand/stream/merger_test.go
+++ b/banyand/stream/merger_test.go
@@ -248,40 +248,40 @@ func Test_mergeParts(t *testing.T) {
name: "Test with single part",
esList: []*elements{esTS1},
want: []blockMetadata{
- {seriesID: 1, count: 1, uncompressedSizeBytes:
881},
- {seriesID: 2, count: 1, uncompressedSizeBytes:
55},
- {seriesID: 3, count: 1, uncompressedSizeBytes:
8},
+ {seriesID: 1, count: 1, uncompressedSizeBytes:
889},
+ {seriesID: 2, count: 1, uncompressedSizeBytes:
63},
+ {seriesID: 3, count: 1, uncompressedSizeBytes:
16},
},
},
{
name: "Test with multiple parts with different ts",
esList: []*elements{esTS1, esTS2, esTS2},
want: []blockMetadata{
- {seriesID: 1, count: 3, uncompressedSizeBytes:
2643},
- {seriesID: 2, count: 3, uncompressedSizeBytes:
165},
- {seriesID: 3, count: 3, uncompressedSizeBytes:
24},
+ {seriesID: 1, count: 3, uncompressedSizeBytes:
2667},
+ {seriesID: 2, count: 3, uncompressedSizeBytes:
189},
+ {seriesID: 3, count: 3, uncompressedSizeBytes:
48},
},
},
{
name: "Test with multiple parts with same ts",
esList: []*elements{esTS1, esTS1, esTS1},
want: []blockMetadata{
- {seriesID: 1, count: 3, uncompressedSizeBytes:
2643},
- {seriesID: 2, count: 3, uncompressedSizeBytes:
165},
- {seriesID: 3, count: 3, uncompressedSizeBytes:
24},
+ {seriesID: 1, count: 3, uncompressedSizeBytes:
2667},
+ {seriesID: 2, count: 3, uncompressedSizeBytes:
189},
+ {seriesID: 3, count: 3, uncompressedSizeBytes:
48},
},
},
{
name: "Test with multiple parts with a large quantity
of different ts",
esList: []*elements{generateHugeEs(1, 5000, 1),
generateHugeEs(5001, 10000, 2)},
want: []blockMetadata{
- {seriesID: 1, count: 2448,
uncompressedSizeBytes: 2156688},
- {seriesID: 1, count: 2448,
uncompressedSizeBytes: 2156688},
- {seriesID: 1, count: 2552,
uncompressedSizeBytes: 2248312},
- {seriesID: 1, count: 2448,
uncompressedSizeBytes: 2156688},
- {seriesID: 1, count: 104,
uncompressedSizeBytes: 91624},
- {seriesID: 2, count: 2, uncompressedSizeBytes:
110},
- {seriesID: 3, count: 2, uncompressedSizeBytes:
16},
+ {seriesID: 1, count: 2448,
uncompressedSizeBytes: 2176272},
+ {seriesID: 1, count: 2448,
uncompressedSizeBytes: 2176272},
+ {seriesID: 1, count: 2552,
uncompressedSizeBytes: 2268728},
+ {seriesID: 1, count: 2448,
uncompressedSizeBytes: 2176272},
+ {seriesID: 1, count: 104,
uncompressedSizeBytes: 92456},
+ {seriesID: 2, count: 2, uncompressedSizeBytes:
126},
+ {seriesID: 3, count: 2, uncompressedSizeBytes:
32},
},
},
}
diff --git a/banyand/stream/tstable_test.go b/banyand/stream/tstable_test.go
index c8759b9e..9224e552 100644
--- a/banyand/stream/tstable_test.go
+++ b/banyand/stream/tstable_test.go
@@ -36,6 +36,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/query/model"
"github.com/apache/skywalking-banyandb/pkg/run"
"github.com/apache/skywalking-banyandb/pkg/test"
+ "github.com/apache/skywalking-banyandb/pkg/test/flags"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
"github.com/apache/skywalking-banyandb/pkg/watcher"
)
@@ -190,9 +191,9 @@ func Test_tstIter(t *testing.T) {
minTimestamp: 1,
maxTimestamp: 1,
want: []blockMetadata{
- {seriesID: 1, count: 1,
uncompressedSizeBytes: 881},
- {seriesID: 2, count: 1,
uncompressedSizeBytes: 55},
- {seriesID: 3, count: 1,
uncompressedSizeBytes: 8},
+ {seriesID: 1, count: 1,
uncompressedSizeBytes: 889},
+ {seriesID: 2, count: 1,
uncompressedSizeBytes: 63},
+ {seriesID: 3, count: 1,
uncompressedSizeBytes: 16},
},
},
{
@@ -202,12 +203,12 @@ func Test_tstIter(t *testing.T) {
minTimestamp: 1,
maxTimestamp: 2,
want: []blockMetadata{
- {seriesID: 1, count: 1,
uncompressedSizeBytes: 881},
- {seriesID: 1, count: 1,
uncompressedSizeBytes: 881},
- {seriesID: 2, count: 1,
uncompressedSizeBytes: 55},
- {seriesID: 2, count: 1,
uncompressedSizeBytes: 55},
- {seriesID: 3, count: 1,
uncompressedSizeBytes: 8},
- {seriesID: 3, count: 1,
uncompressedSizeBytes: 8},
+ {seriesID: 1, count: 1,
uncompressedSizeBytes: 889},
+ {seriesID: 1, count: 1,
uncompressedSizeBytes: 889},
+ {seriesID: 2, count: 1,
uncompressedSizeBytes: 63},
+ {seriesID: 2, count: 1,
uncompressedSizeBytes: 63},
+ {seriesID: 3, count: 1,
uncompressedSizeBytes: 16},
+ {seriesID: 3, count: 1,
uncompressedSizeBytes: 16},
},
},
{
@@ -217,12 +218,12 @@ func Test_tstIter(t *testing.T) {
minTimestamp: 1,
maxTimestamp: 2,
want: []blockMetadata{
- {seriesID: 1, count: 1,
uncompressedSizeBytes: 881},
- {seriesID: 1, count: 1,
uncompressedSizeBytes: 881},
- {seriesID: 2, count: 1,
uncompressedSizeBytes: 55},
- {seriesID: 2, count: 1,
uncompressedSizeBytes: 55},
- {seriesID: 3, count: 1,
uncompressedSizeBytes: 8},
- {seriesID: 3, count: 1,
uncompressedSizeBytes: 8},
+ {seriesID: 1, count: 1,
uncompressedSizeBytes: 889},
+ {seriesID: 1, count: 1,
uncompressedSizeBytes: 889},
+ {seriesID: 2, count: 1,
uncompressedSizeBytes: 63},
+ {seriesID: 2, count: 1,
uncompressedSizeBytes: 63},
+ {seriesID: 3, count: 1,
uncompressedSizeBytes: 16},
+ {seriesID: 3, count: 1,
uncompressedSizeBytes: 16},
},
},
}
@@ -268,9 +269,9 @@ func Test_tstIter(t *testing.T) {
minTimestamp: 1,
maxTimestamp: 1,
want: []blockMetadata{
- {seriesID: 1, count: 1,
uncompressedSizeBytes: 881},
- {seriesID: 2, count: 1,
uncompressedSizeBytes: 55},
- {seriesID: 3, count: 1,
uncompressedSizeBytes: 8},
+ {seriesID: 1, count: 1,
uncompressedSizeBytes: 889},
+ {seriesID: 2, count: 1,
uncompressedSizeBytes: 63},
+ {seriesID: 3, count: 1,
uncompressedSizeBytes: 16},
},
},
{
@@ -280,9 +281,9 @@ func Test_tstIter(t *testing.T) {
minTimestamp: 1,
maxTimestamp: 2,
want: []blockMetadata{
- {seriesID: 1, count: 3,
uncompressedSizeBytes: 2643},
- {seriesID: 2, count: 3,
uncompressedSizeBytes: 165},
- {seriesID: 3, count: 3,
uncompressedSizeBytes: 24},
+ {seriesID: 1, count: 3,
uncompressedSizeBytes: 2667},
+ {seriesID: 2, count: 3,
uncompressedSizeBytes: 189},
+ {seriesID: 3, count: 3,
uncompressedSizeBytes: 48},
},
},
{
@@ -292,9 +293,9 @@ func Test_tstIter(t *testing.T) {
minTimestamp: 1,
maxTimestamp: 2,
want: []blockMetadata{
- {seriesID: 1, count: 2,
uncompressedSizeBytes: 1762},
- {seriesID: 2, count: 2,
uncompressedSizeBytes: 110},
- {seriesID: 3, count: 2,
uncompressedSizeBytes: 16},
+ {seriesID: 1, count: 2,
uncompressedSizeBytes: 1778},
+ {seriesID: 2, count: 2,
uncompressedSizeBytes: 126},
+ {seriesID: 3, count: 2,
uncompressedSizeBytes: 32},
},
},
}
@@ -310,38 +311,52 @@ func Test_tstIter(t *testing.T) {
require.NoError(t, err)
for i, es := range tt.esList {
tst.mustAddElements(es)
+ timeout :=
time.After(flags.EventuallyTimeout) // Set the timeout duration
+ OUTER:
for {
- snp :=
tst.currentSnapshot()
- if snp == nil {
- t.Logf("waiting
for snapshot %d to be introduced", i)
- time.Sleep(100
* time.Millisecond)
- continue
- }
- if snp.creator !=
snapshotCreatorMemPart {
+ select {
+ case <-timeout:
+
t.Fatalf("timeout waiting for snapshot %d to be introduced", i)
+ default:
+ snp :=
tst.currentSnapshot()
+ if snp == nil {
+
t.Logf("waiting for snapshot %d to be introduced", i)
+
time.Sleep(100 * time.Millisecond)
+ continue
+ }
+ if snp.creator
!= snapshotCreatorMemPart {
+
snp.decRef()
+ break
OUTER
+ }
+ t.Logf("waiting
for snapshot %d to be flushed or merged: current creator:%d, parts: %+v",
+ i,
snp.creator, snp.parts)
snp.decRef()
- break
+ time.Sleep(100
* time.Millisecond)
}
- t.Logf("waiting for
snapshot %d to be flushed or merged: current creator:%d, parts: %+v",
- i, snp.creator,
snp.parts)
- snp.decRef()
- time.Sleep(100 *
time.Millisecond)
}
}
// wait until some parts are merged
if len(tt.esList) > 0 {
+ timeout :=
time.After(flags.EventuallyTimeout) // Set the timeout duration
+ OUTER1:
for {
- snp :=
tst.currentSnapshot()
- if snp == nil {
- time.Sleep(100
* time.Millisecond)
- continue
- }
- if len(snp.parts) == 1
|| len(snp.parts) < len(tt.esList) {
+ select {
+ case <-timeout:
+
t.Fatalf("timeout waiting for snapshot to be merged")
+ default:
+ snp :=
tst.currentSnapshot()
+ if snp == nil {
+
time.Sleep(100 * time.Millisecond)
+ continue
+ }
+ if
len(snp.parts) == 1 || len(snp.parts) < len(tt.esList) {
+
snp.decRef()
+ break
OUTER1
+ }
+ t.Logf("waiting
for snapshot to be merged: current creator:%d, parts: %+v", snp.creator,
snp.parts)
snp.decRef()
- break
+ time.Sleep(100
* time.Millisecond)
}
- t.Logf("waiting for
snapshot to be merged: current creator:%d, parts: %+v", snp.creator, snp.parts)
- snp.decRef()
- time.Sleep(100 *
time.Millisecond)
}
}
verify(t, tt, tst)