This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch kv in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit c99420652e256111d0e5cdcf9385fcc4130c7a42 Author: Gao Hongtao <[email protected]> AuthorDate: Mon Nov 21 05:20:09 2022 +0000 Fix some flaws in kv * Correct int encoding disorder * Add a print context helper for debugging encoding issues Signed-off-by: Gao Hongtao <[email protected]> --- banyand/kv/badger.go | 23 +++++ banyand/kv/kv.go | 2 + banyand/measure/measure_query.go | 12 ++- banyand/tsdb/block.go | 5 + banyand/tsdb/series.go | 7 +- banyand/tsdb/series_seek.go | 69 +++++++++++++ banyand/tsdb/series_seek_sort.go | 15 ++- pkg/encoding/encoding.go | 2 + pkg/encoding/int.go | 43 +++++--- pkg/encoding/int_test.go | 110 ++++++++++++++------- pkg/encoding/plain.go | 9 ++ pkg/index/iterator.go | 4 + pkg/pb/v1/write.go | 3 +- .../measure/measure_plan_indexscan_local.go | 2 +- 14 files changed, 244 insertions(+), 62 deletions(-) diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go index 4482721..9ef467b 100644 --- a/banyand/kv/badger.go +++ b/banyand/kv/badger.go @@ -48,6 +48,25 @@ type badgerTSS struct { badger.TSet } +func (b *badgerTSS) Context(key []byte, ts uint64, n int) (pre Iterator, next Iterator) { + preOpts := badger.DefaultIteratorOptions + preOpts.PrefetchSize = n + preOpts.PrefetchValues = false + preOpts.Prefix = key + preOpts.Reverse = false + nextOpts := badger.DefaultIteratorOptions + nextOpts.PrefetchSize = n + nextOpts.PrefetchValues = false + nextOpts.Prefix = key + nextOpts.Reverse = true + seekKey := y.KeyWithTs(key, ts) + preIter := b.db.NewIterator(preOpts) + preIter.Seek(seekKey) + nextIter := b.db.NewIterator(nextOpts) + nextIter.Seek(seekKey) + return &iterator{delegated: preIter}, &iterator{delegated: nextIter, reverse: true} +} + func (b *badgerTSS) Stats() (s observability.Statistics) { return badgerStats(b.db) } @@ -190,6 +209,10 @@ func (i *iterator) Key() []byte { return y.ParseKey(i.delegated.Key()) } +func (i *iterator) RawKey() []byte { + return i.delegated.Key() +} + func (i *iterator) Val() []byte { return y.Copy(i.delegated.Value().Value) } diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go index 90944a2..5dd9e53 100644 --- a/banyand/kv/kv.go +++ b/banyand/kv/kv.go @@ -80,6 +80,7 @@ type TimeSeriesWriter interface { type TimeSeriesReader interface { // Get a value by its key and timestamp/version Get(key []byte, ts uint64) ([]byte, error) + Context(key []byte, ts uint64, n int) (pre, next Iterator) } // TimeSeriesStore is time series storage @@ -140,6 +141,7 @@ type Iterator interface { Rewind() Seek(key []byte) Key() []byte + RawKey() []byte Val() []byte Valid() bool Close() error diff --git a/banyand/measure/measure_query.go b/banyand/measure/measure_query.go index 89f1b13..4581f3f 100644 --- a/banyand/measure/measure_query.go +++ b/banyand/measure/measure_query.go @@ -114,10 +114,14 @@ func (s *measure) Shard(id common.ShardID) (tsdb.Shard, error) { } func (s *measure) ParseTagFamily(family string, item tsdb.Item) (*modelv1.TagFamily, error) { - familyRawBytes, err := item.Family(familyIdentity(family, pbv1.TagFlag)) + fid := familyIdentity(family, pbv1.TagFlag) + familyRawBytes, err := item.Family(fid) if err != nil { return nil, errors.Wrapf(err, "measure %s.%s parse family %s", s.name, s.group, family) } + if len(familyRawBytes) < 1 { + item.PrintContext(s.l.Named("tag-family"), fid, 10) + } tagFamily := &modelv1.TagFamilyForWrite{} err = proto.Unmarshal(familyRawBytes, tagFamily) if err != nil { @@ -155,10 +159,14 @@ func (s *measure) ParseField(name string, item tsdb.Item) (*measurev1.DataPoint_ break } } - bytes, err := item.Family(familyIdentity(name, pbv1.EncoderFieldFlag(fieldSpec, s.interval))) + fid := familyIdentity(name, pbv1.EncoderFieldFlag(fieldSpec, s.interval)) + bytes, err := item.Family(fid) if err != nil { return nil, err } + if len(bytes) < 1 { + item.PrintContext(s.l.Named("field"), fid, 10) + } fieldValue, err := pbv1.DecodeFieldValue(bytes, fieldSpec) if err != nil { return nil, err diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go index 5ba3331..bdfde12 100644 --- a/banyand/tsdb/block.go +++ b/banyand/tsdb/block.go @@ -322,6 +322,7 @@ type BlockDelegate interface { writeLSMIndex(fields []index.Field, id common.ItemID) error writeInvertedIndex(fields []index.Field, id common.ItemID) error dataReader() kv.TimeSeriesReader + decoderPool() encoding.SeriesDecoderPool lsmIndexReader() index.Searcher invertedIndexReader() index.Searcher primaryIndexReader() index.FieldIterable @@ -336,6 +337,10 @@ type bDelegate struct { delegate *block } +func (d *bDelegate) decoderPool() encoding.SeriesDecoderPool { + return d.delegate.encodingMethod.DecoderPool +} + func (d *bDelegate) dataReader() kv.TimeSeriesReader { return d.delegate.store } diff --git a/banyand/tsdb/series.go b/banyand/tsdb/series.go index 09d62aa..4d44f30 100644 --- a/banyand/tsdb/series.go +++ b/banyand/tsdb/series.go @@ -104,9 +104,10 @@ func (s *series) Get(ctx context.Context, id GlobalItemID) (Item, io.Closer, err return nil, nil, errors.WithMessagef(ErrBlockAbsent, "id: %v", id) } return &item{ - data: b.dataReader(), - itemID: id.ID, - seriesID: s.id, + data: b.dataReader(), + itemID: id.ID, + seriesID: s.id, + decoderPool: b.decoderPool(), }, b, nil } diff --git a/banyand/tsdb/series_seek.go b/banyand/tsdb/series_seek.go index b3f08e1..18db6e2 100644 --- a/banyand/tsdb/series_seek.go +++ b/banyand/tsdb/series_seek.go @@ -18,11 +18,19 @@ package tsdb import ( + "bytes" + "encoding/hex" + "time" + + "github.com/dgraph-io/badger/v3/y" + "github.com/apache/skywalking-banyandb/api/common" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/kv" + "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/index" + "github.com/apache/skywalking-banyandb/pkg/logger" ) type Iterator interface { @@ -33,6 +41,7 @@ type Iterator interface { type Item interface { Family(family []byte) ([]byte, error) + PrintContext(l *logger.Logger, family []byte, n int) Val() ([]byte, error) ID() common.ItemID SortedField() []byte @@ -59,6 +68,7 @@ type seekerBuilder struct { order modelv1.Sort indexRuleForSorting *databasev1.IndexRule rangeOptsForSorting index.RangeOpts + l *logger.Logger } func (s *seekerBuilder) Build() (Seeker, error) { @@ -75,6 +85,7 @@ func (s *seekerBuilder) Build() (Seeker, error) { func newSeekerBuilder(s *seriesSpan) SeekerBuilder { return &seekerBuilder{ seriesSpan: s, + l: logger.GetLogger("seeker-builder"), } } @@ -101,6 +112,7 @@ type item struct { data kv.TimeSeriesReader seriesID common.SeriesID sortedField []byte + decoderPool encoding.SeriesDecoderPool } func (i *item) Time() uint64 { @@ -119,6 +131,63 @@ func (i *item) Family(family []byte) ([]byte, error) { return i.data.Get(d.marshal(), uint64(i.itemID)) } +func (i *item) PrintContext(l *logger.Logger, family []byte, n int) { + decoder := i.decoderPool.Get(family) + defer i.decoderPool.Put(decoder) + d := dataBucket{ + seriesID: i.seriesID, + family: family, + } + key := d.marshal() + pre, next := i.data.Context(key, uint64(i.itemID), n) + defer pre.Close() + defer next.Close() + j := 0 + currentTS := uint64(i.itemID) + + each := func(iter kv.Iterator) { + if !bytes.Equal(key, iter.Key()) { + return + } + j++ + + ts := y.ParseTs(iter.RawKey()) + + logEvent := l.Info().Int("i", j). + Time("ts", time.Unix(0, int64(y.ParseTs(iter.RawKey())))) + if err := decoder.Decode(family, iter.Val()); err != nil { + logEvent = logEvent.Str("loc", "mem") + if ts == currentTS { + logEvent = logEvent.Bool("at", true) + } + } else { + start, end := decoder.Range() + logEvent = logEvent.Time("start", time.Unix(0, int64(start))). + Time("end", time.Unix(0, int64(end))).Int("num", decoder.Len()).Str("loc", "table") + if start <= currentTS && currentTS <= end { + if dd, err := decoder.Get(currentTS); err == nil && len(dd) > 0 { + logEvent = logEvent.Bool("at", true) + } + } + } + logEvent.Send() + } + + s := hex.EncodeToString(key) + if len(s) > 7 { + s = s[:7] + } + l.Info().Str("prefix", s).Time("ts", time.Unix(0, int64(i.itemID))).Msg("print previous lines") + for ; pre.Valid() && j < n; pre.Next() { + each(pre) + } + j = 0 + l.Info().Str("prefix", s).Time("ts", time.Unix(0, int64(i.itemID))).Msg("print next lines") + for ; next.Valid() && j < n; next.Next() { + each(next) + } +} + func (i *item) Val() ([]byte, error) { d := dataBucket{ seriesID: i.seriesID, diff --git a/banyand/tsdb/series_seek_sort.go b/banyand/tsdb/series_seek_sort.go index 76bc194..910499e 100644 --- a/banyand/tsdb/series_seek_sort.go +++ b/banyand/tsdb/series_seek_sort.go @@ -28,6 +28,7 @@ import ( modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/kv" "github.com/apache/skywalking-banyandb/pkg/convert" + "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/index/posting" "github.com/apache/skywalking-banyandb/pkg/logger" @@ -88,7 +89,8 @@ func (s *seekerBuilder) buildSeriesByIndex() (series []Iterator, err error) { return nil, err } if inner != nil { - series = append(series, newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), s.seriesSpan.seriesID, filters)) + series = append(series, newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), b.decoderPool(), + s.seriesSpan.seriesID, filters)) } } return @@ -134,9 +136,11 @@ func (s *seekerBuilder) buildSeriesByTime() ([]Iterator, error) { return nil, err } if filter == nil { - delegated = append(delegated, newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), s.seriesSpan.seriesID, emptyFilters)) + delegated = append(delegated, newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), b.decoderPool(), + s.seriesSpan.seriesID, emptyFilters)) } else { - delegated = append(delegated, newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), s.seriesSpan.seriesID, []filterFn{filter})) + delegated = append(delegated, newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), b.decoderPool(), + s.seriesSpan.seriesID, []filterFn{filter})) } } } @@ -156,6 +160,7 @@ type searcherIterator struct { curKey []byte cur posting.Iterator data kv.TimeSeriesReader + decoderPool encoding.SeriesDecoderPool seriesID common.SeriesID filters []filterFn l *logger.Logger @@ -193,6 +198,7 @@ func (s *searcherIterator) Val() Item { itemID: s.cur.Current(), data: s.data, seriesID: s.seriesID, + decoderPool: s.decoderPool, } } @@ -201,7 +207,7 @@ func (s *searcherIterator) Close() error { } func newSearcherIterator(l *logger.Logger, fieldIterator index.FieldIterator, data kv.TimeSeriesReader, - seriesID common.SeriesID, filters []filterFn, + decoderPool encoding.SeriesDecoderPool, seriesID common.SeriesID, filters []filterFn, ) Iterator { return &searcherIterator{ fieldIterator: fieldIterator, @@ -209,6 +215,7 @@ func newSearcherIterator(l *logger.Logger, fieldIterator index.FieldIterator, da seriesID: seriesID, filters: filters, l: l, + decoderPool: decoderPool, } } diff --git a/pkg/encoding/encoding.go b/pkg/encoding/encoding.go index 0382ee2..efc1f6e 100644 --- a/pkg/encoding/encoding.go +++ b/pkg/encoding/encoding.go @@ -79,6 +79,8 @@ type SeriesDecoder interface { Get(ts uint64) ([]byte, error) // Iterator returns a SeriesIterator Iterator() SeriesIterator + // Range returns the start and end time of this series + Range() (start, end uint64) } // SeriesIterator iterates time series data diff --git a/pkg/encoding/int.go b/pkg/encoding/int.go index c9e1991..c9a48a4 100644 --- a/pkg/encoding/int.go +++ b/pkg/encoding/int.go @@ -138,8 +138,10 @@ func (ie *intEncoder) Append(ts uint64, value []byte) { if ie.startTime == 0 { ie.startTime = ts ie.prevTime = ts + } else if ie.startTime > ts { + ie.startTime = ts } - gap := int(ts) - int(ie.prevTime) + gap := int(ie.prevTime) - int(ts) if gap < 0 { return } @@ -166,13 +168,15 @@ func (ie *intEncoder) Reset(key []byte) { ie.interval = ie.fn(key) ie.startTime = 0 ie.prevTime = 0 + ie.num = 0 + ie.values = NewXOREncoder(ie.bw) } func (ie *intEncoder) Encode() ([]byte, error) { ie.bw.Flush() buffWriter := buffer.NewBufferWriter(ie.buff) buffWriter.PutUint64(ie.startTime) - buffWriter.PutUint16(uint16(ie.size)) + buffWriter.PutUint16(uint16(ie.num)) bb := buffWriter.Bytes() encodedSize.WithLabelValues(ie.name, "int").Add(float64(len(bb))) return bb, nil @@ -195,6 +199,9 @@ type intDecoder struct { } func (i *intDecoder) Decode(key, data []byte) error { + if len(data) < 10 { + return ErrInvalidValue + } i.interval = i.fn(key) i.startTime = binary.LittleEndian.Uint64(data[len(data)-10 : len(data)-2]) i.num = int(binary.LittleEndian.Uint16(data[len(data)-2:])) @@ -219,29 +226,33 @@ func (i intDecoder) Get(ts uint64) ([]byte, error) { return zeroBytes, nil } +func (i intDecoder) Range() (start, end uint64) { + return i.startTime, i.startTime + uint64(i.num-1)*uint64(i.interval) +} + func (i intDecoder) Iterator() SeriesIterator { br := bit.NewReader(bytes.NewReader(i.area)) return &intIterator{ - startTime: i.startTime, - interval: int(i.interval), - br: br, - values: NewXORDecoder(br), - size: i.size, + endTime: i.startTime + uint64(i.num*int(i.interval)), + interval: int(i.interval), + br: br, + values: NewXORDecoder(br), + size: i.num, } } var ( _ SeriesIterator = (*intIterator)(nil) - zeroBytes = convert.Int64ToBytes(0) - Zero = convert.BytesToUint64(zeroBytes) + zeroBytes = convert.Uint64ToBytes(zero) + zero = convert.BytesToUint64(convert.Int64ToBytes(0)) ) type intIterator struct { - startTime uint64 - interval int - size int - br *bit.Reader - values *XORDecoder + endTime uint64 + interval int + size int + br *bit.Reader + values *XORDecoder currVal uint64 currTime uint64 @@ -266,10 +277,10 @@ func (i *intIterator) Next() bool { i.currVal = i.values.Value() } } else { - i.currVal = Zero + i.currVal = zero } - i.currTime = i.startTime + uint64(i.interval*i.index) i.index++ + i.currTime = i.endTime - uint64(i.interval*i.index) return true } diff --git a/pkg/encoding/int_test.go b/pkg/encoding/int_test.go index 657bbe5..1f08278 100644 --- a/pkg/encoding/int_test.go +++ b/pkg/encoding/int_test.go @@ -28,8 +28,10 @@ import ( func TestNewIntEncoderAndDecoder(t *testing.T) { type tsData struct { - ts []uint64 - data []int64 + ts []uint64 + data []int64 + start uint64 + end uint64 } tests := []struct { name string @@ -39,45 +41,53 @@ func TestNewIntEncoderAndDecoder(t *testing.T) { { name: "golden path", args: tsData{ - ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)}, + ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)}, data: []int64{7, 8, 7, 9}, }, want: tsData{ - ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)}, - data: []int64{7, 8, 7, 9}, + ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)}, + data: []int64{7, 8, 7, 9}, + start: uint64(time.Minute), + end: uint64(4 * time.Minute), }, }, { name: "more than the size", args: tsData{ - ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute), uint64(4 * time.Minute)}, + ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), uint64(1 * time.Minute)}, data: []int64{7, 8, 7, 9, 6}, }, want: tsData{ - ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)}, - data: []int64{7, 8, 7, 9}, + ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)}, + data: []int64{7, 8, 7, 9}, + start: uint64(time.Minute), + end: uint64(4 * time.Minute), }, }, { name: "less than the size", args: tsData{ - ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute)}, + ts: []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)}, data: []int64{7, 8, 7}, }, want: tsData{ - ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute)}, - data: []int64{7, 8, 7}, + ts: []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)}, + data: []int64{7, 8, 7}, + start: uint64(time.Minute), + end: uint64(3 * time.Minute), }, }, { name: "empty slot in the middle", args: tsData{ - ts: []uint64{uint64(time.Minute), uint64(4 * time.Minute)}, + ts: []uint64{uint64(4 * time.Minute), uint64(time.Minute)}, data: []int64{7, 9}, }, want: tsData{ - ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)}, - data: []int64{7, 0, 0, 9}, + ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)}, + data: []int64{7, 0, 0, 9}, + start: uint64(time.Minute), + end: uint64(4 * time.Minute), }, }, } @@ -93,36 +103,49 @@ func TestNewIntEncoderAndDecoder(t *testing.T) { t.Run(tt.name, func(t *testing.T) { at := assert.New(t) encoder := encoderPool.Get(key) + defer encoderPool.Put(encoder) decoder := decoderPool.Get(key) + defer decoderPool.Put(decoder) encoder.Reset(key) + isFull := false for i, v := range tt.args.ts { encoder.Append(v, convert.Int64ToBytes(tt.args.data[i])) if encoder.IsFull() { + isFull = true break } } bb, err := encoder.Encode() at.NoError(err) + + at.Equal(tt.want.start, encoder.StartTime()) at.NoError(decoder.Decode(key, bb)) - at.True(decoder.IsFull()) - iter := decoder.Iterator() - for i, t := range tt.want.ts { - at.True(iter.Next()) + start, end := decoder.Range() + at.Equal(tt.want.start, start) + at.Equal(tt.want.end, end) + if isFull { + at.True(decoder.IsFull()) + } + i := 0 + for iter := decoder.Iterator(); iter.Next(); i++ { at.NoError(iter.Error()) at.Equal(tt.want.ts[i], iter.Time()) at.Equal(tt.want.data[i], convert.BytesToInt64(iter.Val())) - v, err := decoder.Get(t) + v, err := decoder.Get(iter.Time()) at.NoError(err) at.Equal(tt.want.data[i], convert.BytesToInt64(v)) } + at.Equal(len(tt.want.ts), i) }) } } func TestNewIntDecoderGet(t *testing.T) { type tsData struct { - ts []uint64 - data []int64 + ts []uint64 + data []int64 + start uint64 + end uint64 } tests := []struct { name string @@ -132,45 +155,53 @@ func TestNewIntDecoderGet(t *testing.T) { { name: "golden path", args: tsData{ - ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)}, + ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)}, data: []int64{7, 8, 7, 9}, }, want: tsData{ - ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)}, - data: []int64{7, 8, 7, 9}, + ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)}, + data: []int64{7, 8, 7, 9}, + start: uint64(time.Minute), + end: uint64(4 * time.Minute), }, }, { name: "more than the size", args: tsData{ - ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute), uint64(4 * time.Minute)}, + ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), uint64(1 * time.Minute)}, data: []int64{7, 8, 7, 9, 6}, }, want: tsData{ - ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute), uint64(5 * time.Minute)}, - data: []int64{7, 8, 7, 9, 0}, + ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), 0}, + data: []int64{7, 8, 7, 9, 0}, + start: uint64(time.Minute), + end: uint64(4 * time.Minute), }, }, { name: "less than the size", args: tsData{ - ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute)}, + ts: []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)}, data: []int64{7, 8, 7}, }, want: tsData{ - ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute)}, - data: []int64{7, 8, 7}, + ts: []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)}, + data: []int64{7, 8, 7}, + start: uint64(time.Minute), + end: uint64(3 * time.Minute), }, }, { name: "empty slot in the middle", args: tsData{ - ts: []uint64{uint64(time.Minute), uint64(4 * time.Minute)}, + ts: []uint64{uint64(4 * time.Minute), uint64(time.Minute)}, data: []int64{7, 9}, }, want: tsData{ - ts: []uint64{uint64(time.Minute), uint64(2 * time.Minute), uint64(3 * time.Minute), uint64(4 * time.Minute)}, - data: []int64{7, 0, 0, 9}, + ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)}, + data: []int64{7, 0, 0, 9}, + start: uint64(time.Minute), + end: uint64(4 * time.Minute), }, }, } @@ -186,18 +217,29 @@ func TestNewIntDecoderGet(t *testing.T) { t.Run(tt.name, func(t *testing.T) { at := assert.New(t) encoder := encoderPool.Get(key) + defer encoderPool.Put(encoder) decoder := decoderPool.Get(key) + defer decoderPool.Put(decoder) encoder.Reset(key) + isFull := false for i, v := range tt.args.ts { encoder.Append(v, convert.Int64ToBytes(tt.args.data[i])) if encoder.IsFull() { + isFull = true break } } bb, err := encoder.Encode() at.NoError(err) + + at.Equal(tt.want.start, encoder.StartTime()) at.NoError(decoder.Decode(key, bb)) - at.True(decoder.IsFull()) + start, end := decoder.Range() + at.Equal(tt.want.start, start) + at.Equal(tt.want.end, end) + if isFull { + at.True(decoder.IsFull()) + } for i, t := range tt.want.ts { v, err := decoder.Get(t) at.NoError(err) diff --git a/pkg/encoding/plain.go b/pkg/encoding/plain.go index 2284273..388ee8c 100644 --- a/pkg/encoding/plain.go +++ b/pkg/encoding/plain.go @@ -202,6 +202,9 @@ func (t *plainDecoder) Len() int { } func (t *plainDecoder) Decode(_, rawData []byte) (err error) { + if len(rawData) < 2 { + return ErrInvalidValue + } var data []byte size := binary.LittleEndian.Uint16(rawData[len(rawData)-2:]) if data, err = zstdDecoder.DecodeAll(rawData[:len(rawData)-2], make([]byte, 0, size)); err != nil { @@ -242,6 +245,12 @@ func (t *plainDecoder) Get(ts uint64) ([]byte, error) { return getVal(t.val, parseOffset(slot)) } +func (t *plainDecoder) Range() (start, end uint64) { + startSlot := getTSSlot(t.ts, int(t.num)-1) + endSlot := getTSSlot(t.ts, 0) + return parseTS(startSlot), parseTS(endSlot) +} + func (t *plainDecoder) Iterator() SeriesIterator { return newBlockItemIterator(t) } diff --git a/pkg/index/iterator.go b/pkg/index/iterator.go index fe94b44..8259d7d 100644 --- a/pkg/index/iterator.go +++ b/pkg/index/iterator.go @@ -266,6 +266,10 @@ func (di *delegateIterator) Key() []byte { return di.delegated.Key() } +func (di *delegateIterator) RawKey() []byte { + return di.delegated.RawKey() +} + func (di *delegateIterator) Field() Field { return di.curField } diff --git a/pkg/pb/v1/write.go b/pkg/pb/v1/write.go index f56a68b..3d22f92 100644 --- a/pkg/pb/v1/write.go +++ b/pkg/pb/v1/write.go @@ -32,14 +32,13 @@ import ( modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1" "github.com/apache/skywalking-banyandb/pkg/convert" - "github.com/apache/skywalking-banyandb/pkg/encoding" ) type ID string const fieldFlagLength = 9 -var zeroFieldValue = &modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: &modelv1.Int{Value: int64(encoding.Zero)}}} +var zeroFieldValue = &modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: &modelv1.Int{Value: 0}}} var ( strDelimiter = []byte("\n") diff --git a/pkg/query/logical/measure/measure_plan_indexscan_local.go b/pkg/query/logical/measure/measure_plan_indexscan_local.go index d7722d4..3640595 100644 --- a/pkg/query/logical/measure/measure_plan_indexscan_local.go +++ b/pkg/query/logical/measure/measure_plan_indexscan_local.go @@ -153,7 +153,7 @@ func (i *localIndexScan) Execute(ec executor.MeasureExecutionContext) (executor. projectionTagsRefs: i.projectionTagsRefs, projectionFieldsRefs: i.projectionFieldsRefs, } - if len(iters) == 1 || i.groupByEntity { + if i.groupByEntity { return newSeriesMIterator(iters, transformContext), nil } c := logical.CreateComparator(i.Sort)
