This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch stream-ele in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 672dd60279e14fc45cf540fce99c31612d937de0 Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Thu Aug 1 12:57:20 2024 +0800 Merge elements.bin and timestamp.bin Signed-off-by: Gao Hongtao <hanahm...@gmail.com> --- CHANGES.md | 1 + banyand/measure/block.go | 28 ++++------ banyand/stream/block.go | 100 ++++++++-------------------------- banyand/stream/block_metadata.go | 20 +++++-- banyand/stream/block_metadata_test.go | 31 +++++++---- banyand/stream/block_reader.go | 3 - banyand/stream/block_test.go | 83 ++++++---------------------- banyand/stream/block_writer.go | 7 +-- banyand/stream/part.go | 8 --- banyand/stream/snapshot_test.go | 1 - banyand/stream/stream.go | 1 - pkg/encoding/int_list.go | 8 +-- 12 files changed, 92 insertions(+), 199 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 62f8929d..50884405 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -10,6 +10,7 @@ Release Notes. - Move the series index into segment. - Swap the segment and the shard. - Move indexed values in a measure from data files to index files. +- Merge elementIDs.bin and timestamps.bin into a single file. ### Features diff --git a/banyand/measure/block.go b/banyand/measure/block.go index 93376790..001940cc 100644 --- a/banyand/measure/block.go +++ b/banyand/measure/block.go @@ -380,25 +380,21 @@ func mustSeqReadTimestampsFrom(timestamps, versions []int64, tm *timestampsMetad func mustDecodeTimestampsWithVersions(timestamps, versions []int64, tm *timestampsMetadata, count int, path string, src []byte) ([]int64, []int64) { var err error - if t := encoding.GetCommonType(tm.encodeType); t != encoding.EncodeTypeUnknown { - if tm.size < tm.versionOffset { - logger.Panicf("size %d must be greater than versionOffset %d", tm.size, tm.versionOffset) - } - timestamps, err = encoding.BytesToInt64List(timestamps, src[:tm.versionOffset], t, tm.min, count) - if err != nil { - logger.Panicf("%s: cannot unmarshal timestamps with versions: %v", path, err) - } - versions, err = encoding.BytesToInt64List(versions, src[tm.versionOffset:], tm.versionEncodeType, tm.versionFirst, count) - if err != nil { - logger.Panicf("%s: cannot unmarshal versions: %v", path, err) - } - return timestamps, versions + t := encoding.GetCommonType(tm.encodeType) + if t == encoding.EncodeTypeUnknown { + logger.Panicf("unexpected encodeType %d", tm.encodeType) + } + if tm.size < tm.versionOffset { + logger.Panicf("size %d must be greater than versionOffset %d", tm.size, tm.versionOffset) + } + timestamps, err = encoding.BytesToInt64List(timestamps, src[:tm.versionOffset], t, tm.min, count) + if err != nil { + logger.Panicf("%s: cannot unmarshal timestamps with versions: %v", path, err) } - timestamps, err = encoding.BytesToInt64List(timestamps, src, tm.encodeType, tm.min, count) + versions, err = encoding.BytesToInt64List(versions, src[tm.versionOffset:], tm.versionEncodeType, tm.versionFirst, count) if err != nil { - logger.Panicf("%s: cannot unmarshal timestamps: %v", path, err) + logger.Panicf("%s: cannot unmarshal versions: %v", path, err) } - versions = encoding.ExtendInt64ListCapacity(versions, count) return timestamps, versions } diff --git a/banyand/stream/block.go b/banyand/stream/block.go index 2f2fc3b5..bd0db065 100644 --- a/banyand/stream/block.go +++ b/banyand/stream/block.go @@ -26,7 +26,6 @@ import ( "github.com/apache/skywalking-banyandb/api/common" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/pkg/bytes" - "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/index/posting" @@ -128,8 +127,7 @@ func (b *block) mustWriteTo(sid common.SeriesID, bm *blockMetadata, ww *writers) bm.uncompressedSizeBytes = b.uncompressedSizeBytes() bm.count = uint64(b.Len()) - mustWriteTimestampsTo(&bm.timestamps, b.timestamps, &ww.timestampsWriter) - mustWriteElementIDsTo(&bm.elementIDs, b.elementIDs, &ww.elementIDsWriter) + mustWriteTimestampsTo(&bm.timestamps, b.timestamps, b.elementIDs, &ww.timestampsWriter) for ti := range b.tagFamilies { b.marshalTagFamily(b.tagFamilies[ti], bm, ww) @@ -256,8 +254,7 @@ func (b *block) uncompressedSizeBytes() uint64 { func (b *block) mustReadFrom(decoder *encoding.BytesBlockDecoder, p *part, bm blockMetadata) { b.reset() - b.timestamps = mustReadTimestampsFrom(b.timestamps, &bm.timestamps, int(bm.count), p.timestamps) - b.elementIDs = mustReadElementIDsFrom(b.elementIDs, &bm.elementIDs, int(bm.count), p.elementIDs) + b.timestamps, b.elementIDs = mustReadTimestampsFrom(b.timestamps, b.elementIDs, &bm.timestamps, int(bm.count), p.timestamps) _ = b.resizeTagFamilies(len(bm.tagProjection)) for i := range bm.tagProjection { @@ -275,8 +272,7 @@ func (b *block) mustReadFrom(decoder *encoding.BytesBlockDecoder, p *part, bm bl func (b *block) mustSeqReadFrom(decoder *encoding.BytesBlockDecoder, seqReaders *seqReaders, bm blockMetadata) { b.reset() - b.timestamps = mustSeqReadTimestampsFrom(b.timestamps, &bm.timestamps, int(bm.count), &seqReaders.timestamps) - b.elementIDs = mustSeqReadElementIDsFrom(b.elementIDs, &bm.elementIDs, int(bm.count), &seqReaders.elementIDs) + b.timestamps, b.elementIDs = mustSeqReadTimestampsFrom(b.timestamps, b.elementIDs, &bm.timestamps, int(bm.count), &seqReaders.timestamps) _ = b.resizeTagFamilies(len(bm.tagFamilies)) keys := make([]string, 0, len(bm.tagFamilies)) @@ -298,71 +294,48 @@ func (b *block) sortTagFamilies() { }) } -func mustWriteTimestampsTo(tm *timestampsMetadata, timestamps []int64, timestampsWriter *writer) { +func mustWriteTimestampsTo(tm *timestampsMetadata, timestamps []int64, elementIDs []uint64, timestampsWriter *writer) { tm.reset() bb := bigValuePool.Generate() defer bigValuePool.Release(bb) bb.Buf, tm.encodeType, tm.min = encoding.Int64ListToBytes(bb.Buf[:0], timestamps) - if len(bb.Buf) > maxTimestampsBlockSize { - logger.Panicf("too big block with timestamps: %d bytes; the maximum supported size is %d bytes", len(bb.Buf), maxTimestampsBlockSize) - } tm.max = timestamps[len(timestamps)-1] tm.offset = timestampsWriter.bytesWritten - tm.size = uint64(len(bb.Buf)) + tm.elementIDsOffset = uint64(len(bb.Buf)) + timestampsWriter.MustWrite(bb.Buf) + bb.Buf = encoding.VarUint64sToBytes(bb.Buf[:0], elementIDs) + tm.size = tm.elementIDsOffset + uint64(len(bb.Buf)) timestampsWriter.MustWrite(bb.Buf) } -func mustReadTimestampsFrom(dst []int64, tm *timestampsMetadata, count int, reader fs.Reader) []int64 { +func mustReadTimestampsFrom(timestamps []int64, elementIDs []uint64, tm *timestampsMetadata, count int, reader fs.Reader) ([]int64, []uint64) { bb := bigValuePool.Generate() defer bigValuePool.Release(bb) bb.Buf = bytes.ResizeExact(bb.Buf, int(tm.size)) fs.MustReadData(reader, int64(tm.offset), bb.Buf) - var err error - dst, err = encoding.BytesToInt64List(dst, bb.Buf, tm.encodeType, tm.min, count) - if err != nil { - logger.Panicf("%s: cannot unmarshal timestamps: %v", reader.Path(), err) - } - return dst + return mustDecodeTimestampsWithVersions(timestamps, elementIDs, tm, count, reader.Path(), bb.Buf) } -func mustWriteElementIDsTo(em *elementIDsMetadata, elementIDs []uint64, elementIDsWriter *writer) { - em.reset() - - bb := bigValuePool.Generate() - defer bigValuePool.Release(bb) - elementIDsByteSlice := make([][]byte, len(elementIDs)) - for i, elementID := range elementIDs { - elementIDsByteSlice[i] = convert.Uint64ToBytes(elementID) - } - bb.Buf = encoding.EncodeBytesBlock(bb.Buf, elementIDsByteSlice) - if len(bb.Buf) > maxElementIDsBlockSize { - logger.Panicf("too big block with elementIDs: %d bytes; the maximum supported size is %d bytes", len(bb.Buf), maxElementIDsBlockSize) +func mustDecodeTimestampsWithVersions(timestamps []int64, elementIDs []uint64, tm *timestampsMetadata, count int, path string, src []byte) ([]int64, []uint64) { + if tm.size < tm.elementIDsOffset { + logger.Panicf("size %d must be greater than elementIDsOffset %d", tm.size, tm.elementIDsOffset) } - em.encodeType = encoding.EncodeTypeUnknown - em.offset = elementIDsWriter.bytesWritten - em.size = uint64(len(bb.Buf)) - elementIDsWriter.MustWrite(bb.Buf) -} - -func mustReadElementIDsFrom(dst []uint64, em *elementIDsMetadata, count int, reader fs.Reader) []uint64 { - bb := bigValuePool.Generate() - defer bigValuePool.Release(bb) - bb.Buf = bytes.ResizeExact(bb.Buf, int(em.size)) - fs.MustReadData(reader, int64(em.offset), bb.Buf) - decoder := encoding.BytesBlockDecoder{} - var elementIDsByteSlice [][]byte - elementIDsByteSlice, err := decoder.Decode(elementIDsByteSlice, bb.Buf, uint64(count)) + var err error + timestamps, err = encoding.BytesToInt64List(timestamps, src[:tm.elementIDsOffset], tm.encodeType, tm.min, count) if err != nil { - logger.Panicf("%s: cannot unmarshal elementIDs: %v", reader.Path(), err) + logger.Panicf("%s: cannot unmarshal timestamps: %v", path, err) } - for _, elementID := range elementIDsByteSlice { - dst = append(dst, convert.BytesToUint64(elementID)) + elementIDs = encoding.ExtendListCapacity(elementIDs, count) + elementIDs = elementIDs[:count] + _, err = encoding.BytesToVarUint64s(elementIDs, src[tm.elementIDsOffset:]) + if err != nil { + logger.Panicf("%s: cannot unmarshal element ids: %v", path, err) } - return dst + return timestamps, elementIDs } -func mustSeqReadTimestampsFrom(dst []int64, tm *timestampsMetadata, count int, reader *seqReader) []int64 { +func mustSeqReadTimestampsFrom(timestamps []int64, elementIDs []uint64, tm *timestampsMetadata, count int, reader *seqReader) ([]int64, []uint64) { if tm.offset != reader.bytesRead { logger.Panicf("offset %d must be equal to bytesRead %d", tm.offset, reader.bytesRead) } @@ -370,32 +343,7 @@ func mustSeqReadTimestampsFrom(dst []int64, tm *timestampsMetadata, count int, r defer bigValuePool.Release(bb) bb.Buf = bytes.ResizeExact(bb.Buf, int(tm.size)) reader.mustReadFull(bb.Buf) - var err error - dst, err = encoding.BytesToInt64List(dst, bb.Buf, tm.encodeType, tm.min, count) - if err != nil { - logger.Panicf("%s: cannot unmarshal timestamps: %v", reader.Path(), err) - } - return dst -} - -func mustSeqReadElementIDsFrom(dst []uint64, em *elementIDsMetadata, count int, reader *seqReader) []uint64 { - if em.offset != reader.bytesRead { - logger.Panicf("offset %d must be equal to bytesRead %d", em.offset, reader.bytesRead) - } - bb := bigValuePool.Generate() - defer bigValuePool.Release(bb) - bb.Buf = bytes.ResizeExact(bb.Buf, int(em.size)) - reader.mustReadFull(bb.Buf) - decoder := encoding.BytesBlockDecoder{} - var elementIDsByteSlice [][]byte - elementIDsByteSlice, err := decoder.Decode(elementIDsByteSlice, bb.Buf, uint64(count)) - if err != nil { - logger.Panicf("%s: cannot unmarshal elementIDs: %v", reader.Path(), err) - } - for _, elementID := range elementIDsByteSlice { - dst = append(dst, convert.BytesToUint64(elementID)) - } - return dst + return mustDecodeTimestampsWithVersions(timestamps, elementIDs, tm, count, reader.Path(), bb.Buf) } func generateBlock() *block { diff --git a/banyand/stream/block_metadata.go b/banyand/stream/block_metadata.go index 92aadda2..9f0091ee 100644 --- a/banyand/stream/block_metadata.go +++ b/banyand/stream/block_metadata.go @@ -240,9 +240,10 @@ func releaseBlockMetadataArray(bma *blockMetadataArray) { type timestampsMetadata struct { dataBlock - min int64 - max int64 - encodeType encoding.EncodeType + min int64 + max int64 + elementIDsOffset uint64 + encodeType encoding.EncodeType } func (tm *timestampsMetadata) reset() { @@ -250,6 +251,7 @@ func (tm *timestampsMetadata) reset() { tm.min = 0 tm.max = 0 tm.encodeType = 0 + tm.elementIDsOffset = 0 } func (tm *timestampsMetadata) copyFrom(src *timestampsMetadata) { @@ -257,6 +259,7 @@ func (tm *timestampsMetadata) copyFrom(src *timestampsMetadata) { tm.min = src.min tm.max = src.max tm.encodeType = src.encodeType + tm.elementIDsOffset = src.elementIDsOffset } func (tm *timestampsMetadata) marshal(dst []byte) []byte { @@ -264,20 +267,27 @@ func (tm *timestampsMetadata) marshal(dst []byte) []byte { dst = encoding.Uint64ToBytes(dst, uint64(tm.min)) dst = encoding.Uint64ToBytes(dst, uint64(tm.max)) dst = append(dst, byte(tm.encodeType)) + dst = encoding.VarUint64ToBytes(dst, tm.elementIDsOffset) return dst } func (tm *timestampsMetadata) unmarshal(src []byte) ([]byte, error) { src, err := tm.dataBlock.unmarshal(src) if err != nil { - return nil, fmt.Errorf("cannot unmarshal dataBlock: %w", err) + return nil, fmt.Errorf("cannot unmarshal ts blockData: %w", err) } tm.min = int64(encoding.BytesToUint64(src)) src = src[8:] tm.max = int64(encoding.BytesToUint64(src)) src = src[8:] tm.encodeType = encoding.EncodeType(src[0]) - return src[1:], nil + src = src[1:] + src, n, err := encoding.BytesToVarUint64(src) + if err != nil { + return nil, fmt.Errorf("cannot unmarshal ts offset: %w", err) + } + tm.elementIDsOffset = n + return src, nil } type elementIDsMetadata struct { diff --git a/banyand/stream/block_metadata_test.go b/banyand/stream/block_metadata_test.go index 5c721b7d..00751859 100644 --- a/banyand/stream/block_metadata_test.go +++ b/banyand/stream/block_metadata_test.go @@ -79,9 +79,10 @@ func Test_timestampsMetadata_reset(t *testing.T) { offset: 1, size: 1, }, - min: 1, - max: 1, - encodeType: encoding.EncodeTypeConst, + min: 1, + max: 1, + encodeType: encoding.EncodeTypeConst, + elementIDsOffset: 1, } tm.reset() @@ -90,6 +91,7 @@ func Test_timestampsMetadata_reset(t *testing.T) { assert.Equal(t, uint64(0), tm.dataBlock.size) assert.Equal(t, int64(0), tm.min) assert.Equal(t, int64(0), tm.max) + assert.Equal(t, uint64(0), tm.elementIDsOffset) assert.Equal(t, encoding.EncodeTypeUnknown, tm.encodeType) } @@ -99,9 +101,10 @@ func Test_timestampsMetadata_copyFrom(t *testing.T) { offset: 1, size: 1, }, - min: 1, - max: 1, - encodeType: encoding.EncodeTypeConst, + min: 1, + max: 1, + encodeType: encoding.EncodeTypeConst, + elementIDsOffset: 1, } dest := ×tampsMetadata{ @@ -109,9 +112,10 @@ func Test_timestampsMetadata_copyFrom(t *testing.T) { offset: 2, size: 2, }, - min: 2, - max: 2, - encodeType: encoding.EncodeTypeDelta, + min: 2, + max: 2, + encodeType: encoding.EncodeTypeDelta, + elementIDsOffset: 2, } dest.copyFrom(src) @@ -121,6 +125,7 @@ func Test_timestampsMetadata_copyFrom(t *testing.T) { assert.Equal(t, src.min, dest.min) assert.Equal(t, src.max, dest.max) assert.Equal(t, src.encodeType, dest.encodeType) + assert.Equal(t, src.elementIDsOffset, dest.elementIDsOffset) } func Test_timestampsMetadata_marshal_unmarshal(t *testing.T) { @@ -129,9 +134,10 @@ func Test_timestampsMetadata_marshal_unmarshal(t *testing.T) { offset: 1, size: 1, }, - min: 1, - max: 1, - encodeType: encoding.EncodeTypeConst, + min: 1, + max: 1, + encodeType: encoding.EncodeTypeConst, + elementIDsOffset: 1, } marshaled := original.marshal(nil) @@ -146,6 +152,7 @@ func Test_timestampsMetadata_marshal_unmarshal(t *testing.T) { assert.Equal(t, original.min, unmarshaled.min) assert.Equal(t, original.max, unmarshaled.max) assert.Equal(t, original.encodeType, unmarshaled.encodeType) + assert.Equal(t, original.elementIDsOffset, unmarshaled.elementIDsOffset) } func Test_blockMetadata_marshal_unmarshal(t *testing.T) { diff --git a/banyand/stream/block_reader.go b/banyand/stream/block_reader.go index c952aa77..60701515 100644 --- a/banyand/stream/block_reader.go +++ b/banyand/stream/block_reader.go @@ -87,13 +87,11 @@ type seqReaders struct { tagFamilies map[string]*seqReader primary seqReader timestamps seqReader - elementIDs seqReader } func (sr *seqReaders) reset() { sr.primary.reset() sr.timestamps.reset() - sr.elementIDs.reset() if sr.tagFamilyMetadata != nil { for k, r := range sr.tagFamilyMetadata { releaseSeqReader(r) @@ -112,7 +110,6 @@ func (sr *seqReaders) init(p *part) { sr.reset() sr.primary.init(p.primary) sr.timestamps.init(p.timestamps) - sr.elementIDs.init(p.elementIDs) if sr.tagFamilies == nil { sr.tagFamilies = make(map[string]*seqReader) sr.tagFamilyMetadata = make(map[string]*seqReader) diff --git a/banyand/stream/block_test.go b/banyand/stream/block_test.go index 9c3bf7ed..ed57f48d 100644 --- a/banyand/stream/block_test.go +++ b/banyand/stream/block_test.go @@ -18,8 +18,6 @@ package stream import ( - "crypto/rand" - "encoding/binary" "reflect" "testing" @@ -215,20 +213,16 @@ func marshalIntArr(arr [][]byte) []byte { func Test_mustWriteAndReadTimestamps(t *testing.T) { tests := []struct { - name string - args []int64 - wantPanic bool - wantTM timestampsMetadata + name string + timestamps []int64 + elementIDs []uint64 + wantPanic bool + wantTM timestampsMetadata }{ { - name: "Test mustWriteAndReadTimestamps", - args: []int64{1, 2, 3, 4, 5}, - wantPanic: false, - }, - { - name: "Test mustWriteAndReadTimestamps with panic", - args: getBitInt64Arr(), - wantPanic: true, + name: "Test mustWriteAndReadTimestamps", + timestamps: []int64{1, 2, 3, 4, 5}, + elementIDs: []uint64{0, 1, 2, 3, 4}, }, } for _, tt := range tests { @@ -243,60 +237,18 @@ func Test_mustWriteAndReadTimestamps(t *testing.T) { b := &bytes.Buffer{} w := new(writer) w.init(b) - mustWriteTimestampsTo(tm, tt.args, w) - timestamps := mustReadTimestampsFrom(nil, tm, len(tt.args), b) - if !reflect.DeepEqual(timestamps, tt.args) { - t.Errorf("mustReadTimestampsFrom() = %v, want %v", timestamps, tt.args) + mustWriteTimestampsTo(tm, tt.timestamps, tt.elementIDs, w) + timestamps, elementIDs := mustReadTimestampsFrom(nil, nil, tm, len(tt.timestamps), b) + if !reflect.DeepEqual(timestamps, tt.timestamps) { + t.Errorf("mustReadTimestampsFrom() timestamps = %v, want %v", timestamps, tt.timestamps) } - }) - } -} - -func Test_mustWriteAndReadElementIDs(t *testing.T) { - tests := []struct { - name string - args []uint64 - wantPanic bool - wantTM elementIDsMetadata - }{ - { - name: "Test mustWriteAndReadElementIDs", - args: []uint64{0, 1, 2, 3, 4}, - wantPanic: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - defer func() { - r := recover() - if (r != nil) != tt.wantPanic { - t.Errorf("mustWriteElementIDs() recover = %v, wantPanic = %v", r, tt.wantPanic) - } - }() - em := &elementIDsMetadata{} - b := &bytes.Buffer{} - w := new(writer) - w.init(b) - mustWriteElementIDsTo(em, tt.args, w) - elementIDs := mustReadElementIDsFrom(nil, em, len(tt.args), b) - if !reflect.DeepEqual(elementIDs, tt.args) { - t.Errorf("mustReadElementIDsFrom() = %v, want %v", elementIDs, tt.args) + if !reflect.DeepEqual(elementIDs, tt.elementIDs) { + t.Errorf("mustReadTimestampsFrom() elementIDs = %v, want %v", elementIDs, tt.elementIDs) } }) } } -func getBitInt64Arr() []int64 { - size := maxTimestampsBlockSize + 1 - randSlice := make([]int64, size) - for i := range randSlice { - b := make([]byte, 8) - _, _ = rand.Read(b) - randSlice[i] = int64(binary.BigEndian.Uint64(b)) - } - return randSlice -} - func Test_marshalAndUnmarshalTagFamily(t *testing.T) { metaBuffer, dataBuffer := &bytes.Buffer{}, &bytes.Buffer{} ww := &writers{ @@ -361,10 +313,9 @@ func Test_marshalAndUnmarshalTagFamily(t *testing.T) { } func Test_marshalAndUnmarshalBlock(t *testing.T) { - timestampBuffer, elementIDsBuffer := &bytes.Buffer{}, &bytes.Buffer{} - timestampWriter, elementIDsWriter := &writer{}, &writer{} + timestampBuffer := &bytes.Buffer{} + timestampWriter := &writer{} timestampWriter.init(timestampBuffer) - elementIDsWriter.init(elementIDsBuffer) ww := &writers{ mustCreateTagFamilyWriters: func(_ string) (fs.Writer, fs.Writer) { return &bytes.Buffer{}, &bytes.Buffer{} @@ -372,12 +323,10 @@ func Test_marshalAndUnmarshalBlock(t *testing.T) { tagFamilyMetadataWriters: make(map[string]*writer), tagFamilyWriters: make(map[string]*writer), timestampsWriter: *timestampWriter, - elementIDsWriter: *elementIDsWriter, } p := &part{ primary: &bytes.Buffer{}, timestamps: timestampBuffer, - elementIDs: elementIDsBuffer, } b := &conventionalBlock tagProjection := toTagProjection(*b) diff --git a/banyand/stream/block_writer.go b/banyand/stream/block_writer.go index 8f51d743..5125bd85 100644 --- a/banyand/stream/block_writer.go +++ b/banyand/stream/block_writer.go @@ -65,7 +65,6 @@ type writers struct { tagFamilyMetadataWriters map[string]*writer tagFamilyWriters map[string]*writer timestampsWriter writer - elementIDsWriter writer } func (sw *writers) reset() { @@ -73,7 +72,6 @@ func (sw *writers) reset() { sw.metaWriter.reset() sw.primaryWriter.reset() sw.timestampsWriter.reset() - sw.elementIDsWriter.reset() for i, w := range sw.tagFamilyMetadataWriters { w.reset() @@ -87,7 +85,7 @@ func (sw *writers) reset() { func (sw *writers) totalBytesWritten() uint64 { n := sw.metaWriter.bytesWritten + sw.primaryWriter.bytesWritten + - sw.timestampsWriter.bytesWritten + sw.elementIDsWriter.bytesWritten + sw.timestampsWriter.bytesWritten for _, w := range sw.tagFamilyMetadataWriters { n += w.bytesWritten } @@ -101,7 +99,6 @@ func (sw *writers) MustClose() { sw.metaWriter.MustClose() sw.primaryWriter.MustClose() sw.timestampsWriter.MustClose() - sw.elementIDsWriter.MustClose() for _, w := range sw.tagFamilyMetadataWriters { w.MustClose() @@ -169,7 +166,6 @@ func (bw *blockWriter) MustInitForMemPart(mp *memPart) { bw.writers.metaWriter.init(&mp.meta) bw.writers.primaryWriter.init(&mp.primary) bw.writers.timestampsWriter.init(&mp.timestamps) - bw.writers.elementIDsWriter.init(&mp.elementIDs) } func (bw *blockWriter) mustInitForFilePart(fileSystem fs.FileSystem, path string) { @@ -182,7 +178,6 @@ func (bw *blockWriter) mustInitForFilePart(fileSystem fs.FileSystem, path string bw.writers.metaWriter.init(fs.MustCreateFile(fileSystem, filepath.Join(path, metaFilename), filePermission)) bw.writers.primaryWriter.init(fs.MustCreateFile(fileSystem, filepath.Join(path, primaryFilename), filePermission)) bw.writers.timestampsWriter.init(fs.MustCreateFile(fileSystem, filepath.Join(path, timestampsFilename), filePermission)) - bw.writers.elementIDsWriter.init(fs.MustCreateFile(fileSystem, filepath.Join(path, elementIDsFilename), filePermission)) } func (bw *blockWriter) MustWriteElements(sid common.SeriesID, timestamps []int64, elementIDs []uint64, tagFamilies [][]tagValues) { diff --git a/banyand/stream/part.go b/banyand/stream/part.go index a834e2e7..b6ad05c7 100644 --- a/banyand/stream/part.go +++ b/banyand/stream/part.go @@ -37,7 +37,6 @@ const ( primaryFilename = "primary.bin" metaFilename = "meta.bin" timestampsFilename = "timestamps.bin" - elementIDsFilename = "elementIDs.bin" elementIndexFilename = "idx" tagFamiliesMetadataFilenameExt = ".tfm" tagFamiliesFilenameExt = ".tf" @@ -46,7 +45,6 @@ const ( type part struct { primary fs.Reader timestamps fs.Reader - elementIDs fs.Reader fileSystem fs.FileSystem tagFamilyMetadata map[string]fs.Reader tagFamilies map[string]fs.Reader @@ -58,7 +56,6 @@ type part struct { func (p *part) close() { fs.MustClose(p.primary) fs.MustClose(p.timestamps) - fs.MustClose(p.elementIDs) for _, tf := range p.tagFamilies { fs.MustClose(tf) } @@ -80,7 +77,6 @@ func openMemPart(mp *memPart) *part { // Open data files p.primary = &mp.primary p.timestamps = &mp.timestamps - p.elementIDs = &mp.elementIDs if mp.tagFamilies != nil { p.tagFamilies = make(map[string]fs.Reader) p.tagFamilyMetadata = make(map[string]fs.Reader) @@ -98,7 +94,6 @@ type memPart struct { meta bytes.Buffer primary bytes.Buffer timestamps bytes.Buffer - elementIDs bytes.Buffer partMetadata partMetadata } @@ -124,7 +119,6 @@ func (mp *memPart) reset() { mp.meta.Reset() mp.primary.Reset() mp.timestamps.Reset() - mp.elementIDs.Reset() if mp.tagFamilies != nil { for _, tf := range mp.tagFamilies { tf.Reset() @@ -176,7 +170,6 @@ func (mp *memPart) mustFlush(fileSystem fs.FileSystem, path string) { fs.MustFlush(fileSystem, mp.meta.Buf, filepath.Join(path, metaFilename), filePermission) fs.MustFlush(fileSystem, mp.primary.Buf, filepath.Join(path, primaryFilename), filePermission) fs.MustFlush(fileSystem, mp.timestamps.Buf, filepath.Join(path, timestampsFilename), filePermission) - fs.MustFlush(fileSystem, mp.elementIDs.Buf, filepath.Join(path, elementIDsFilename), filePermission) for name, tf := range mp.tagFamilies { fs.MustFlush(fileSystem, tf.Buf, filepath.Join(path, name+tagFamiliesFilenameExt), filePermission) } @@ -268,7 +261,6 @@ func mustOpenFilePart(id uint64, root string, fileSystem fs.FileSystem) *part { p.primary = mustOpenReader(path.Join(partPath, primaryFilename), fileSystem) p.timestamps = mustOpenReader(path.Join(partPath, timestampsFilename), fileSystem) - p.elementIDs = mustOpenReader(path.Join(partPath, elementIDsFilename), fileSystem) ee := fileSystem.ReadDir(partPath) for _, e := range ee { if e.IsDir() { diff --git a/banyand/stream/snapshot_test.go b/banyand/stream/snapshot_test.go index 0d8b80ce..fc2f3654 100644 --- a/banyand/stream/snapshot_test.go +++ b/banyand/stream/snapshot_test.go @@ -390,7 +390,6 @@ func TestSnapshotRemove(t *testing.T) { {p: &part{ partMetadata: partMetadata{ID: 1}, timestamps: &bytes.Buffer{}, - elementIDs: &bytes.Buffer{}, primary: &bytes.Buffer{}, }, ref: 1}, {p: &part{partMetadata: partMetadata{ID: 2}}, ref: 2}, diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go index d3e901e4..60596f57 100644 --- a/banyand/stream/stream.go +++ b/banyand/stream/stream.go @@ -34,7 +34,6 @@ import ( const ( maxValuesBlockSize = 8 * 1024 * 1024 - maxTimestampsBlockSize = 8 * 1024 * 1024 maxElementIDsBlockSize = 8 * 1024 * 1024 maxTagFamiliesMetadataSize = 8 * 1024 * 1024 maxUncompressedBlockSize = 2 * 1024 * 1024 diff --git a/pkg/encoding/int_list.go b/pkg/encoding/int_list.go index 82b842c5..b0e68bc8 100644 --- a/pkg/encoding/int_list.go +++ b/pkg/encoding/int_list.go @@ -55,7 +55,7 @@ func Int64ListToBytes(dst []byte, a []int64) (result []byte, mt EncodeType, firs // BytesToInt64List decodes bytes into a list of int64. func BytesToInt64List(dst []int64, src []byte, mt EncodeType, firstValue int64, itemsCount int) ([]int64, error) { - dst = ExtendInt64ListCapacity(dst, itemsCount) + dst = ExtendListCapacity(dst, itemsCount) var err error switch mt { @@ -100,11 +100,11 @@ func BytesToInt64List(dst []int64, src []byte, mt EncodeType, firstValue int64, } } -// ExtendInt64ListCapacity extends the capacity of the int64 list. -func ExtendInt64ListCapacity(dst []int64, additionalItems int) []int64 { +// ExtendListCapacity extends the capacity of the given list. +func ExtendListCapacity[T any](dst []T, additionalItems int) []T { dstLen := len(dst) if n := dstLen + additionalItems - cap(dst); n > 0 { - dst = append(dst[:cap(dst)], make([]int64, n)...) + dst = append(dst[:cap(dst)], make([]T, n)...) } return dst[:dstLen] }