This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push: new aa9fb38 Introduce encoding component (#59) aa9fb38 is described below commit aa9fb38fb59df1ca4f61c05440089e864bcec27f Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Sun Nov 14 08:16:49 2021 +0800 Introduce encoding component (#59) * Encoding component helps tsdb module to customize how to encode data in a chunk * StreamChunkEncoder/Decoder handles stream module's encoding * Update badger to support encoding component Signed-off-by: Gao Hongtao <hanahm...@gmail.com> --- banyand/kv/badger.go | 20 ++++ banyand/kv/kv.go | 40 ++++--- banyand/stream/stream.go | 12 +++ banyand/tsdb/block.go | 22 ++-- banyand/tsdb/tsdb.go | 27 ++++- banyand/tsdb/tsdb_test.go | 9 ++ go.mod | 6 +- go.sum | 8 +- pkg/encoding/encoding.go | 66 ++++++++++++ pkg/encoding/stream_chunk.go | 250 +++++++++++++++++++++++++++++++++++++++++++ 10 files changed, 419 insertions(+), 41 deletions(-) diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go index a0b0a1b..24820f0 100644 --- a/banyand/kv/badger.go +++ b/banyand/kv/badger.go @@ -24,8 +24,10 @@ import ( "time" "github.com/dgraph-io/badger/v3" + "github.com/dgraph-io/badger/v3/bydb" "github.com/dgraph-io/badger/v3/y" + "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/logger" ) @@ -256,3 +258,21 @@ func (l *badgerLog) Infof(f string, v ...interface{}) { func (l *badgerLog) Debugf(f string, v ...interface{}) { l.delegated.Debug().Msgf(f, v...) } + +var _ bydb.TSetDecoder = (*decoderDelegate)(nil) + +type decoderDelegate struct { + encoding.SeriesDecoder +} + +func (d *decoderDelegate) Iterator() bydb.TSetIterator { + return &iterDelegate{ + SeriesIterator: d.SeriesDecoder.Iterator(), + } +} + +var _ bydb.TSetDecoder = (*decoderDelegate)(nil) + +type iterDelegate struct { + encoding.SeriesIterator +} diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go index fc829d3..ea92d1c 100644 --- a/banyand/kv/kv.go +++ b/banyand/kv/kv.go @@ -23,8 +23,10 @@ import ( "math" "github.com/dgraph-io/badger/v3" + "github.com/dgraph-io/badger/v3/bydb" "github.com/pkg/errors" + "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/logger" ) @@ -101,6 +103,20 @@ func TSSWithLogger(l *logger.Logger) TimeSeriesOptions { } } +func TSSWithEncoding(encoderFactory encoding.SeriesEncoderFactory, decoderFactory encoding.SeriesDecoderFactory) TimeSeriesOptions { + return func(store TimeSeriesStore) { + if btss, ok := store.(*badgerTSS); ok { + btss.dbOpts = btss.dbOpts.WithExternalCompactor(func() bydb.TSetEncoder { + return encoderFactory() + }, func() bydb.TSetDecoder { + return &decoderDelegate{ + SeriesDecoder: decoderFactory(), + } + }) + } + } +} + type Iterator interface { Next() Rewind() @@ -125,7 +141,7 @@ type IndexStore interface { } // OpenTimeSeriesStore creates a new TimeSeriesStore -func OpenTimeSeriesStore(shardID int, path string, compressLevel int, valueSize int, options ...TimeSeriesOptions) (TimeSeriesStore, error) { +func OpenTimeSeriesStore(shardID int, path string, options ...TimeSeriesOptions) (TimeSeriesStore, error) { btss := new(badgerTSS) btss.shardID = shardID btss.dbOpts = badger.DefaultOptions(path) @@ -139,7 +155,7 @@ func OpenTimeSeriesStore(shardID int, path string, compressLevel int, valueSize if err != nil { return nil, fmt.Errorf("failed to open time series store: %v", err) } - btss.TSet = *badger.NewTSet(btss.db, compressLevel, valueSize) + btss.TSet = *badger.NewTSet(btss.db) return btss, nil } @@ -161,26 +177,6 @@ func StoreWithNamedLogger(name string, l *logger.Logger) StoreOptions { } } -// StoreWithBufferSize sets a external logger into underlying Store -func StoreWithBufferSize(size int64) StoreOptions { - return func(store Store) { - if bdb, ok := store.(*badgerDB); ok { - bdb.dbOpts = bdb.dbOpts.WithMemTableSize(size) - } - } -} - -type FlushCallback func() - -// StoreWithFlushCallback sets a callback function -func StoreWithFlushCallback(callback FlushCallback) StoreOptions { - return func(store Store) { - if bdb, ok := store.(*badgerDB); ok { - bdb.dbOpts.FlushCallBack = callback - } - } -} - // OpenStore creates a new Store func OpenStore(shardID int, path string, options ...StoreOptions) (Store, error) { bdb := new(badgerDB) diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go index 35ae2a5..760687d 100644 --- a/banyand/stream/stream.go +++ b/banyand/stream/stream.go @@ -23,10 +23,14 @@ import ( databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/tsdb" + "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/partition" ) +// a chunk is 1MB +const chunkSize = 1 << 20 + type indexRule struct { rule *databasev1.IndexRule tagIndices []partition.TagLocator @@ -102,6 +106,14 @@ func openStream(root string, spec streamSpec, l *logger.Logger) (*stream, error) Location: root, ShardNum: sm.schema.GetOpts().GetShardNum(), IndexRules: spec.indexRules, + EncodingMethod: tsdb.EncodingMethod{ + EncoderFactory: func() encoding.SeriesEncoder { + return encoding.NewStreamChunkEncoder(chunkSize) + }, + DecoderFactory: func() encoding.SeriesDecoder { + return encoding.NewStreamChunkDecoder(chunkSize) + }, + }, }) if err != nil { return nil, err diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go index 231cef9..2f38308 100644 --- a/banyand/tsdb/block.go +++ b/banyand/tsdb/block.go @@ -23,6 +23,7 @@ import ( "time" "github.com/dgraph-io/ristretto/z" + "github.com/pkg/errors" "github.com/apache/skywalking-banyandb/api/common" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" @@ -50,11 +51,9 @@ type block struct { } type blockOpts struct { - segID uint16 - blockID uint16 - path string - compressLevel int - valueSize int + segID uint16 + blockID uint16 + path string } func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) { @@ -71,8 +70,17 @@ func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) { b.l = pl.Named("block") } } - if b.store, err = kv.OpenTimeSeriesStore(0, b.path+"/store", opts.compressLevel, opts.valueSize, - kv.TSSWithLogger(b.l)); err != nil { + encodingMethodObject := ctx.Value(encodingMethodKey) + if encodingMethodObject == nil { + return nil, errors.Wrap(ErrEncodingMethodAbsent, "failed to create a block") + } + encodingMethod := encodingMethodObject.(EncodingMethod) + if b.store, err = kv.OpenTimeSeriesStore( + 0, + b.path+"/store", + kv.TSSWithEncoding(encodingMethod.EncoderFactory, encodingMethod.DecoderFactory), + kv.TSSWithLogger(b.l), + ); err != nil { return nil, err } if b.primaryIndex, err = lsm.NewStore(lsm.StoreOpts{ diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go index 80a84f5..8df233b 100644 --- a/banyand/tsdb/tsdb.go +++ b/banyand/tsdb/tsdb.go @@ -31,6 +31,7 @@ import ( "github.com/apache/skywalking-banyandb/api/common" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/logger" ) @@ -47,10 +48,16 @@ const ( dirPerm = 0700 ) -var ErrInvalidShardID = errors.New("invalid shard id") -var indexRulesKey = contextIndexRulesKey{} +var ( + ErrInvalidShardID = errors.New("invalid shard id") + ErrEncodingMethodAbsent = errors.New("encoding method is absent") + + indexRulesKey = contextIndexRulesKey{} + encodingMethodKey = contextEncodingMethodKey{} +) type contextIndexRulesKey struct{} +type contextEncodingMethodKey struct{} type Database interface { io.Closer @@ -68,9 +75,15 @@ type Shard interface { var _ Database = (*database)(nil) type DatabaseOpts struct { - Location string - ShardNum uint32 - IndexRules []*databasev1.IndexRule + Location string + ShardNum uint32 + IndexRules []*databasev1.IndexRule + EncodingMethod EncodingMethod +} + +type EncodingMethod struct { + EncoderFactory encoding.SeriesEncoderFactory + DecoderFactory encoding.SeriesDecoderFactory } type database struct { @@ -111,6 +124,9 @@ func OpenDatabase(ctx context.Context, opts DatabaseOpts) (Database, error) { db.logger = pl.Named("tsdb") } } + if opts.EncodingMethod.EncoderFactory == nil || opts.EncodingMethod.DecoderFactory == nil { + return nil, errors.Wrap(ErrEncodingMethodAbsent, "failed to open database") + } if _, err := mkdir(opts.Location); err != nil { return nil, err } @@ -122,6 +138,7 @@ func OpenDatabase(ctx context.Context, opts DatabaseOpts) (Database, error) { } thisContext := context.WithValue(ctx, logger.ContextKey, db.logger) thisContext = context.WithValue(thisContext, indexRulesKey, opts.IndexRules) + thisContext = context.WithValue(thisContext, encodingMethodKey, opts.EncodingMethod) if len(entries) > 0 { return loadDatabase(thisContext, db) } diff --git a/banyand/tsdb/tsdb_test.go b/banyand/tsdb/tsdb_test.go index 3f91e40..89c56d6 100644 --- a/banyand/tsdb/tsdb_test.go +++ b/banyand/tsdb/tsdb_test.go @@ -27,6 +27,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/test" ) @@ -56,6 +57,14 @@ func setUp(t *require.Assertions) (tempDir string, deferFunc func(), db Database DatabaseOpts{ Location: tempDir, ShardNum: 1, + EncodingMethod: EncodingMethod{ + EncoderFactory: func() encoding.SeriesEncoder { + return nil + }, + DecoderFactory: func() encoding.SeriesDecoder { + return nil + }, + }, }) t.NoError(err) t.NotNil(db) diff --git a/go.mod b/go.mod index a1ba284..24e4ce0 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/golang/protobuf v1.5.2 github.com/google/go-cmp v0.5.6 github.com/google/uuid v1.3.0 - github.com/klauspost/compress v1.13.1 // indirect + github.com/klauspost/compress v1.13.1 github.com/oklog/run v1.1.0 github.com/pkg/errors v0.9.1 github.com/rs/zerolog v1.23.0 @@ -43,7 +43,7 @@ require ( github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/snappy v0.0.3 // indirect github.com/google/btree v1.0.1 // indirect - github.com/google/flatbuffers v1.12.0 // indirect + github.com/google/flatbuffers v1.12.1 // indirect github.com/gorilla/websocket v1.4.2 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect @@ -101,4 +101,4 @@ require ( sigs.k8s.io/yaml v1.2.0 // indirect ) -replace github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20210809093509-ff1b2dd81165 +replace github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20211111092400-7f8fa9a51476 diff --git a/go.sum b/go.sum index 4ed137e..957109b 100644 --- a/go.sum +++ b/go.sum @@ -47,8 +47,8 @@ github.com/RoaringBitmap/gocroaring v0.4.0/go.mod h1:NieMwz7ZqwU2DD73/vvYwv7r4eW github.com/RoaringBitmap/real-roaring-datasets v0.0.0-20190726190000-eb7c87156f76/go.mod h1:oM0MHmQ3nDsq609SS36p+oYbRi16+oVvU2Bw4Ipv0SE= github.com/RoaringBitmap/roaring v0.9.1 h1:5PRizBmoN/PfV17nPNQou4dHQ7NcJi8FO/bihdYyCEM= github.com/RoaringBitmap/roaring v0.9.1/go.mod h1:h1B7iIUOmnAeb5ytYMvnHJwxMc6LUrwBnzXWRuqTQUc= -github.com/SkyAPM/badger/v3 v3.0.0-20210809093509-ff1b2dd81165 h1:csoTNiGUMtp4H1AchgaZWJ4WY4uJQ6s+pz3sXS93jAA= -github.com/SkyAPM/badger/v3 v3.0.0-20210809093509-ff1b2dd81165/go.mod h1:dULbq6ehJ5K0cGW/1TQ9iSfUk0gbSiToDWmWmTsJ53E= +github.com/SkyAPM/badger/v3 v3.0.0-20211111092400-7f8fa9a51476 h1:MH/Jy2x3WF3RdD+WD25XepG4fzIz3qMOoIUM4Enn+GA= +github.com/SkyAPM/badger/v3 v3.0.0-20211111092400-7f8fa9a51476/go.mod h1:RHo4/GmYcKKh5Lxu63wLEMHJ70Pac2JqZRYGhlyAo2M= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= @@ -192,8 +192,8 @@ github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Z github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.1 h1:gK4Kx5IaGY9CD5sPJ36FHiBJ6ZXl0kilRiiCj+jdYp4= github.com/google/btree v1.0.1/go.mod h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9uaxA= -github.com/google/flatbuffers v1.12.0 h1:/PtAHvnBY4Kqnx/xCQ3OIV9uYcSFGScBsWI3Oogeh6w= -github.com/google/flatbuffers v1.12.0/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= +github.com/google/flatbuffers v1.12.1 h1:MVlul7pQNoDzWRLTw5imwYsl+usrS1TXG2H4jg6ImGw= +github.com/google/flatbuffers v1.12.1/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= diff --git a/pkg/encoding/encoding.go b/pkg/encoding/encoding.go new file mode 100644 index 0000000..5e04e72 --- /dev/null +++ b/pkg/encoding/encoding.go @@ -0,0 +1,66 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package encoding + +import "github.com/pkg/errors" + +var ErrEncodeEmpty = errors.New("encode an empty value") + +type SeriesEncoderFactory func() SeriesEncoder + +// SeriesEncoder encodes time series data point +type SeriesEncoder interface { + // Append a data point + Append(ts uint64, value []byte) + // IsFull returns whether the encoded data reached its capacity + IsFull() bool + // Reset the underlying buffer + Reset() + // Encode the time series data point to a binary + Encode() ([]byte, error) + // StartTime indicates the first entry's time + StartTime() uint64 +} + +type SeriesDecoderFactory func() SeriesDecoder + +// SeriesDecoder decodes encoded time series data +type SeriesDecoder interface { + // Decode the time series data + Decode(data []byte) error + // Len denotes the size of iterator + Len() int + // IsFull returns whether the encoded data reached its capacity + IsFull() bool + // Get the data point by its time + Get(ts uint64) ([]byte, error) + // Iterator returns a SeriesIterator + Iterator() SeriesIterator +} + +// SeriesIterator iterates time series data +type SeriesIterator interface { + // Next scroll the cursor to the next + Next() bool + // Val returns the value of the current data point + Val() []byte + // Time returns the time of the current data point + Time() uint64 + // Error might return an error indicates a decode failure + Error() error +} diff --git a/pkg/encoding/stream_chunk.go b/pkg/encoding/stream_chunk.go new file mode 100644 index 0000000..7ff5094 --- /dev/null +++ b/pkg/encoding/stream_chunk.go @@ -0,0 +1,250 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package encoding + +import ( + "bytes" + "encoding/binary" + "fmt" + "sort" + + "github.com/klauspost/compress/zstd" + "github.com/pkg/errors" +) + +var ( + decoder, _ = zstd.NewReader(nil) + encoder, _ = zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedBetterCompression)) + _ SeriesEncoder = (*streamChunkEncoder)(nil) + _ SeriesDecoder = (*StreamChunkDecoder)(nil) +) + +//streamChunkEncoder backport to reduced value +type streamChunkEncoder struct { + tsBuff bytes.Buffer + valBuff bytes.Buffer + scratch [binary.MaxVarintLen64]byte + len uint32 + num uint32 + startTime uint64 + valueSize int +} + +func NewStreamChunkEncoder(size int) SeriesEncoder { + return &streamChunkEncoder{ + valueSize: size, + } +} + +func (t *streamChunkEncoder) Append(ts uint64, value []byte) { + if t.startTime == 0 { + t.startTime = ts + } else if t.startTime > ts { + t.startTime = ts + } + vLen := len(value) + offset := uint32(len(t.valBuff.Bytes())) + t.valBuff.Write(t.putUint32(uint32(vLen))) + t.valBuff.Write(value) + t.tsBuff.Write(t.putUint64(ts)) + t.tsBuff.Write(t.putUint32(offset)) + t.num = t.num + 1 +} + +func (t *streamChunkEncoder) IsFull() bool { + return t.valBuff.Len() >= t.valueSize +} + +func (t *streamChunkEncoder) Reset() { + t.tsBuff.Reset() + t.valBuff.Reset() + t.num = 0 + t.startTime = 0 +} + +func (t *streamChunkEncoder) Encode() ([]byte, error) { + if t.tsBuff.Len() < 1 { + return nil, ErrEncodeEmpty + } + val := t.valBuff.Bytes() + t.len = uint32(len(val)) + _, err := t.tsBuff.WriteTo(&t.valBuff) + if err != nil { + return nil, err + } + t.valBuff.Write(t.putUint32(t.num)) + t.valBuff.Write(t.putUint32(t.len)) + data := t.valBuff.Bytes() + l := len(data) + dst := make([]byte, 0, compressBound(l)) + dst = encoder.EncodeAll(data, dst) + result := make([]byte, len(dst)+2) + copy(result, dst) + copy(result[len(dst):], t.putUint16(uint16(l))) + return result, nil +} + +func compressBound(srcSize int) int { + return srcSize + (srcSize >> 8) +} + +func (t *streamChunkEncoder) StartTime() uint64 { + return t.startTime +} + +func (t *streamChunkEncoder) putUint16(v uint16) []byte { + binary.LittleEndian.PutUint16(t.scratch[:], v) + return t.scratch[:2] +} + +func (t *streamChunkEncoder) putUint32(v uint32) []byte { + binary.LittleEndian.PutUint32(t.scratch[:], v) + return t.scratch[:4] +} + +func (t *streamChunkEncoder) putUint64(v uint64) []byte { + binary.LittleEndian.PutUint64(t.scratch[:], v) + return t.scratch[:8] +} + +const ( + // TsLen equals ts(uint64) + data_offset(uint32) + TsLen = 8 + 4 +) + +var ErrInvalidValue = errors.New("invalid encoded value") + +//StreamChunkDecoder decodes encoded time index +type StreamChunkDecoder struct { + ts []byte + val []byte + len uint32 + num uint32 + valueSize int +} + +func NewStreamChunkDecoder(size int) SeriesDecoder { + return &StreamChunkDecoder{ + valueSize: size, + } +} + +func (t *StreamChunkDecoder) Len() int { + return int(t.num) +} + +func (t *StreamChunkDecoder) Decode(rawData []byte) (err error) { + var data []byte + size := binary.LittleEndian.Uint16(rawData[len(rawData)-2:]) + if data, err = decoder.DecodeAll(rawData[:len(rawData)-2], make([]byte, 0, size)); err != nil { + return err + } + l := uint32(len(data)) + if l <= 8 { + return ErrInvalidValue + } + lenOffset := len(data) - 4 + numOffset := lenOffset - 4 + t.num = binary.LittleEndian.Uint32(data[numOffset:lenOffset]) + t.len = binary.LittleEndian.Uint32(data[lenOffset:]) + if l <= t.len+8 { + return ErrInvalidValue + } + t.val = data[:t.len] + t.ts = data[t.len:numOffset] + return nil +} + +func (t *StreamChunkDecoder) IsFull() bool { + return int(t.len) >= t.valueSize +} + +func (t *StreamChunkDecoder) Get(ts uint64) ([]byte, error) { + i := sort.Search(int(t.num), func(i int) bool { + slot := getTSSlot(t.ts, i) + return parseTS(slot) <= ts + }) + if i >= int(t.num) { + return nil, fmt.Errorf("%d doesn't exist", ts) + } + slot := getTSSlot(t.ts, i) + if parseTS(slot) != ts { + return nil, fmt.Errorf("%d doesn't exist", ts) + } + return getVal(t.val, parseOffset(slot)) +} + +func (t *StreamChunkDecoder) Iterator() SeriesIterator { + return newBlockItemIterator(t) +} + +func getVal(buf []byte, offset uint32) ([]byte, error) { + if uint32(len(buf)) <= offset+4 { + return nil, ErrInvalidValue + } + dataLen := binary.LittleEndian.Uint32(buf[offset : offset+4]) + return buf[offset+4 : offset+4+dataLen], nil +} + +func getTSSlot(data []byte, index int) []byte { + return data[index*TsLen : (index+1)*TsLen] +} + +func parseTS(tsSlot []byte) uint64 { + return binary.LittleEndian.Uint64(tsSlot[:8]) +} + +func parseOffset(tsSlot []byte) uint32 { + return binary.LittleEndian.Uint32(tsSlot[8:]) +} + +var _ SeriesIterator = (*chunkIterator)(nil) + +type chunkIterator struct { + index []byte + data []byte + idx int + num int +} + +func newBlockItemIterator(decoder *StreamChunkDecoder) SeriesIterator { + return &chunkIterator{ + idx: -1, + index: decoder.ts, + data: decoder.val, + num: int(decoder.num), + } +} + +func (b *chunkIterator) Next() bool { + b.idx++ + return b.idx >= 0 && b.idx < b.num +} + +func (b *chunkIterator) Val() []byte { + v, _ := getVal(b.data, parseOffset(getTSSlot(b.index, b.idx))) + return v +} + +func (b *chunkIterator) Time() uint64 { + return parseTS(getTSSlot(b.index, b.idx)) +} + +func (b *chunkIterator) Error() error { + return nil +}