This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 16561def Refactor readData method in block.go  (#782)
16561def is described below

commit 16561defde1d3d35ac73f60f298e9062f6d48329
Author: Gao Hongtao <[email protected]>
AuthorDate: Wed Sep 24 00:09:24 2025 +0800

    Refactor readData method in block.go  (#782)
---
 banyand/internal/sidx/block.go                     |  17 ++--
 banyand/internal/sidx/part.go                      |  14 +--
 banyand/internal/sidx/query_result.go              |   8 +-
 banyand/internal/sidx/snapshot.go                  |   1 +
 banyand/trace/flusher.go                           |   4 +-
 banyand/trace/merger.go                            |   4 +-
 banyand/trace/syncer.go                            | 105 ++++++++++++++++++---
 banyand/trace/tstable.go                           |  20 +---
 .../multi_segments/multi_segments_suite_test.go    |   5 -
 9 files changed, 110 insertions(+), 68 deletions(-)

diff --git a/banyand/internal/sidx/block.go b/banyand/internal/sidx/block.go
index 85bd370e..524fdb78 100644
--- a/banyand/internal/sidx/block.go
+++ b/banyand/internal/sidx/block.go
@@ -25,7 +25,6 @@ import (
        "github.com/apache/skywalking-banyandb/api/common"
        internalencoding 
"github.com/apache/skywalking-banyandb/banyand/internal/encoding"
        "github.com/apache/skywalking-banyandb/pkg/bytes"
-       "github.com/apache/skywalking-banyandb/pkg/compress/zstd"
        "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
@@ -341,11 +340,9 @@ func mustWriteDataTo(db *dataBlock, data [][]byte, 
dataWriter *writer) {
        // Encode all data payloads as a block
        bb.Buf = encoding.EncodeBytesBlock(bb.Buf[:0], data)
 
-       // Compress and write
-       compressedData := zstd.Compress(nil, bb.Buf, 1)
        db.offset = dataWriter.bytesWritten
-       db.size = uint64(len(compressedData))
-       dataWriter.MustWrite(compressedData)
+       db.size = uint64(len(bb.Buf))
+       dataWriter.MustWrite(bb.Buf)
 }
 
 type blockPointer struct {
@@ -536,13 +533,11 @@ func (b *block) readData(decoder 
*encoding.BytesBlockDecoder, sr *seqReaders, bm
                bb.Buf = bb.Buf[:0]
                bigValuePool.Put(bb)
        }()
-       bb.Buf = bytes.ResizeOver(bb.Buf[:0], int(bm.dataBlock.size))
+       bb.Buf = bytes.ResizeOver(bb.Buf, int(bm.dataBlock.size))
        sr.data.mustReadFull(bb.Buf)
-       dataBuf, err := zstd.Decompress(bb.Buf[:0], bb.Buf)
-       if err != nil {
-               return fmt.Errorf("cannot decompress data: %w", err)
-       }
-       b.data, err = decoder.Decode(b.data[:0], dataBuf, bm.count)
+
+       var err error
+       b.data, err = decoder.Decode(b.data[:0], bb.Buf, bm.count)
        if err != nil {
                return fmt.Errorf("cannot decode data payloads: %w", err)
        }
diff --git a/banyand/internal/sidx/part.go b/banyand/internal/sidx/part.go
index 6c7b12d9..94c6b7f7 100644
--- a/banyand/internal/sidx/part.go
+++ b/banyand/internal/sidx/part.go
@@ -297,7 +297,6 @@ func (p *part) readAll() ([]*elements, error) {
        result := make([]*elements, 0, len(p.primaryBlockMetadata))
        compressedPrimaryBuf := make([]byte, 0, 1024)
        primaryBuf := make([]byte, 0, 1024)
-       compressedDataBuf := make([]byte, 0, 1024)
        dataBuf := make([]byte, 0, 1024)
        compressedKeysBuf := make([]byte, 0, 1024)
 
@@ -348,17 +347,8 @@ func (p *part) readAll() ([]*elements, error) {
                        }
 
                        // Read data payloads
-                       compressedDataBuf = bytes.ResizeOver(compressedDataBuf, 
int(bm.dataBlock.size))
-                       fs.MustReadData(p.data, int64(bm.dataBlock.offset), 
compressedDataBuf)
-
-                       dataBuf, err = zstd.Decompress(dataBuf[:0], 
compressedDataBuf)
-                       if err != nil {
-                               releaseElements(elems)
-                               for _, e := range result {
-                                       releaseElements(e)
-                               }
-                               return nil, fmt.Errorf("cannot decompress data 
block: %w", err)
-                       }
+                       dataBuf = bytes.ResizeOver(dataBuf, 
int(bm.dataBlock.size))
+                       fs.MustReadData(p.data, int64(bm.dataBlock.offset), 
dataBuf)
 
                        // Decode data payloads - create a new decoder for each 
block to avoid state corruption
                        blockBytesDecoder := &encoding.BytesBlockDecoder{}
diff --git a/banyand/internal/sidx/query_result.go 
b/banyand/internal/sidx/query_result.go
index d21c3b3c..09692a2f 100644
--- a/banyand/internal/sidx/query_result.go
+++ b/banyand/internal/sidx/query_result.go
@@ -24,7 +24,6 @@ import (
        internalencoding 
"github.com/apache/skywalking-banyandb/banyand/internal/encoding"
        "github.com/apache/skywalking-banyandb/banyand/protector"
        "github.com/apache/skywalking-banyandb/pkg/bytes"
-       "github.com/apache/skywalking-banyandb/pkg/compress/zstd"
        "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/logger"
@@ -87,14 +86,9 @@ func (qr *queryResult) loadBlockData(tmpBlock *block, p 
*part, bm *blockMetadata
        bb2.Buf = bytes.ResizeOver(bb2.Buf[:0], int(bm.dataBlock.size))
        fs.MustReadData(p.data, int64(bm.dataBlock.offset), bb2.Buf)
 
-       dataBuf, err := zstd.Decompress(bb.Buf[:0], bb2.Buf)
-       if err != nil {
-               return false
-       }
-
        // Decode data payloads
        decoder := &encoding.BytesBlockDecoder{}
-       tmpBlock.data, err = decoder.Decode(tmpBlock.data[:0], dataBuf, 
bm.count)
+       tmpBlock.data, err = decoder.Decode(tmpBlock.data[:0], bb2.Buf, 
bm.count)
        if err != nil {
                return false
        }
diff --git a/banyand/internal/sidx/snapshot.go 
b/banyand/internal/sidx/snapshot.go
index ce7cf6a6..809c0763 100644
--- a/banyand/internal/sidx/snapshot.go
+++ b/banyand/internal/sidx/snapshot.go
@@ -338,6 +338,7 @@ func (s *snapshot) remove(epoch uint64, toRemove 
map[uint64]struct{}) *snapshot
                if _, shouldRemove := toRemove[pw.ID()]; !shouldRemove {
                        if pw.acquire() {
                                result.parts = append(result.parts, pw)
+                               continue
                        }
                }
                pw.markForRemoval()
diff --git a/banyand/trace/flusher.go b/banyand/trace/flusher.go
index 727cad6c..33eb4d1e 100644
--- a/banyand/trace/flusher.go
+++ b/banyand/trace/flusher.go
@@ -268,9 +268,9 @@ func (tst *tsTable) flush(snapshot *snapshot, flushCh chan 
*flusherIntroduction)
                return
        }
        allSidx := tst.getAllSidx()
-       for _, sidxInstance := range allSidx {
+       for name, sidxInstance := range allSidx {
                if err := sidxInstance.Flush(); err != nil {
-                       tst.l.Warn().Err(err).Msg("sidx flush failed")
+                       tst.l.Warn().Err(err).Str("sidx", name).Msg("sidx flush 
failed")
                        return
                }
        }
diff --git a/banyand/trace/merger.go b/banyand/trace/merger.go
index 2702e639..d96fad6f 100644
--- a/banyand/trace/merger.go
+++ b/banyand/trace/merger.go
@@ -112,9 +112,9 @@ func (tst *tsTable) mergePartsThenSendIntroduction(creator 
snapshotCreator, part
        if err != nil {
                return nil, err
        }
-       for sidxName, sidxInstance := range tst.sidxMap {
+       for _, sidxInstance := range tst.getAllSidx() {
                if err := sidxInstance.Merge(closeCh); err != nil {
-                       tst.l.Warn().Err(err).Str("sidx", sidxName).Msg("sidx 
merge failed")
+                       tst.l.Warn().Err(err).Msg("sidx merge failed")
                        return nil, err
                }
        }
diff --git a/banyand/trace/syncer.go b/banyand/trace/syncer.go
index a95c9e8c..a1ef98e0 100644
--- a/banyand/trace/syncer.go
+++ b/banyand/trace/syncer.go
@@ -183,6 +183,29 @@ func (tst *tsTable) syncSnapshot(curSnapshot *snapshot, 
syncCh chan *syncIntrodu
        if tst.loopCloser != nil && tst.loopCloser.Closed() {
                return errClosed
        }
+
+       // Collect parts to sync
+       partsToSync, sidxPartsToSync, err := tst.collectPartsToSync(curSnapshot)
+       if err != nil {
+               return err
+       }
+
+       // Validate sync preconditions
+       if err := tst.validateSyncPreconditions(partsToSync, sidxPartsToSync); 
err != nil {
+               return err
+       }
+
+       // Execute sync operation
+       if err := tst.executeSyncOperation(partsToSync, sidxPartsToSync); err 
!= nil {
+               return err
+       }
+
+       // Handle sync introductions
+       return tst.handleSyncIntroductions(partsToSync, sidxPartsToSync, syncCh)
+}
+
+// collectPartsToSync collects both core and sidx parts that need to be 
synchronized.
+func (tst *tsTable) collectPartsToSync(curSnapshot *snapshot) ([]*part, 
map[string][]*sidx.Part, error) {
        // Get all parts from the current snapshot
        var partsToSync []*part
        for _, pw := range curSnapshot.parts {
@@ -190,14 +213,20 @@ func (tst *tsTable) syncSnapshot(curSnapshot *snapshot, 
syncCh chan *syncIntrodu
                        partsToSync = append(partsToSync, pw.p)
                }
        }
+
        sidxPartsToSync := sidx.NewPartsToSync()
-       for name, sidx := range tst.sidxMap {
+       for name, sidx := range tst.getAllSidx() {
                if tst.loopCloser != nil && tst.loopCloser.Closed() {
-                       return errClosed
+                       return nil, nil, errClosed
                }
                sidxPartsToSync[name] = sidx.PartsToSync()
        }
 
+       return partsToSync, sidxPartsToSync, nil
+}
+
+// validateSyncPreconditions validates that there are parts to sync and nodes 
available.
+func (tst *tsTable) validateSyncPreconditions(partsToSync []*part, 
sidxPartsToSync map[string][]*sidx.Part) error {
        hasCoreParts := len(partsToSync) > 0
        hasSidxParts := false
        for _, parts := range sidxPartsToSync {
@@ -214,11 +243,16 @@ func (tst *tsTable) syncSnapshot(curSnapshot *snapshot, 
syncCh chan *syncIntrodu
        if len(nodes) == 0 {
                return fmt.Errorf("no nodes to sync parts")
        }
+
+       return nil
+}
+
+// executeSyncOperation performs the actual synchronization of parts to nodes.
+func (tst *tsTable) executeSyncOperation(partsToSync []*part, sidxPartsToSync 
map[string][]*sidx.Part) error {
        sort.Slice(partsToSync, func(i, j int) bool {
                return partsToSync[i].partMetadata.ID < 
partsToSync[j].partMetadata.ID
        })
 
-       // Use chunked sync with streaming for better memory efficiency
        ctx := context.Background()
        releaseFuncs := make([]func(), 0, len(partsToSync))
        defer func() {
@@ -226,11 +260,14 @@ func (tst *tsTable) syncSnapshot(curSnapshot *snapshot, 
syncCh chan *syncIntrodu
                        release()
                }
        }()
-       if err := tst.syncStreamingPartsToNodes(ctx, nodes, partsToSync, 
sidxPartsToSync, &releaseFuncs); err != nil {
-               return err
-       }
 
-       // Construct syncIntroduction to remove synced parts from snapshot
+       nodes := tst.getNodes()
+       return tst.syncStreamingPartsToNodes(ctx, nodes, partsToSync, 
sidxPartsToSync, &releaseFuncs)
+}
+
+// handleSyncIntroductions creates and processes sync introductions for both 
core and sidx parts.
+func (tst *tsTable) handleSyncIntroductions(partsToSync []*part, 
sidxPartsToSync map[string][]*sidx.Part, syncCh chan *syncIntroduction) error {
+       // Create core sync introduction
        si := generateSyncIntroduction()
        defer releaseSyncIntroduction(si)
        si.applied = make(chan struct{})
@@ -241,7 +278,25 @@ func (tst *tsTable) syncSnapshot(curSnapshot *snapshot, 
syncCh chan *syncIntrodu
                }
                si.synced[ph] = struct{}{}
        }
-       // Construct sidx sync introductions for each sidx instance
+
+       // Create sidx sync introductions
+       sidxSyncIntroductions, err := 
tst.createSidxSyncIntroductions(sidxPartsToSync)
+       if err != nil {
+               return err
+       }
+       defer tst.releaseSidxSyncIntroductions(sidxSyncIntroductions)
+
+       // Send sync introductions
+       if err := tst.sendSyncIntroductions(si, sidxSyncIntroductions, syncCh); 
err != nil {
+               return err
+       }
+
+       // Wait for sync introductions to be applied
+       return tst.waitForSyncIntroductions(si, sidxSyncIntroductions)
+}
+
+// createSidxSyncIntroductions creates sync introductions for sidx parts.
+func (tst *tsTable) createSidxSyncIntroductions(sidxPartsToSync 
map[string][]*sidx.Part) (map[string]*sidx.SyncIntroduction, error) {
        sidxSyncIntroductions := make(map[string]*sidx.SyncIntroduction)
        for name, sidxParts := range sidxPartsToSync {
                if len(sidxParts) > 0 {
@@ -253,32 +308,52 @@ func (tst *tsTable) syncSnapshot(curSnapshot *snapshot, 
syncCh chan *syncIntrodu
                        sidxSyncIntroductions[name] = ssi
                }
        }
-       // Send sync introductions
+       return sidxSyncIntroductions, nil
+}
+
+// releaseSidxSyncIntroductions releases sidx sync introductions.
+func (tst *tsTable) releaseSidxSyncIntroductions(sidxSyncIntroductions 
map[string]*sidx.SyncIntroduction) {
+       for _, ssi := range sidxSyncIntroductions {
+               sidx.ReleaseSyncIntroduction(ssi)
+       }
+}
+
+// sendSyncIntroductions sends sync introductions to their respective channels.
+func (tst *tsTable) sendSyncIntroductions(si *syncIntroduction, 
sidxSyncIntroductions map[string]*sidx.SyncIntroduction, syncCh chan 
*syncIntroduction) error {
        select {
        case syncCh <- si:
        case <-tst.loopCloser.CloseNotify():
                return errClosed
        }
+
        for name, ssi := range sidxSyncIntroductions {
+               sidx, ok := tst.getSidx(name)
+               if !ok {
+                       return fmt.Errorf("sidx %s not found", name)
+               }
                select {
-               case tst.sidxMap[name].SyncCh() <- ssi:
+               case sidx.SyncCh() <- ssi:
                case <-tst.loopCloser.CloseNotify():
                        return errClosed
                }
        }
-       // Wait for sync introductions to be applied
+       return nil
+}
+
+// waitForSyncIntroductions waits for all sync introductions to be applied.
+func (tst *tsTable) waitForSyncIntroductions(si *syncIntroduction, 
sidxSyncIntroductions map[string]*sidx.SyncIntroduction) error {
        select {
        case <-si.applied:
        case <-tst.loopCloser.CloseNotify():
                return errClosed
        }
+
        for _, ssi := range sidxSyncIntroductions {
                select {
                case <-ssi.Applied:
                case <-tst.loopCloser.CloseNotify():
                        return errClosed
                }
-               sidx.ReleaseSyncIntroduction(ssi)
        }
        return nil
 }
@@ -298,7 +373,11 @@ func (tst *tsTable) syncStreamingPartsToNodes(ctx 
context.Context, nodes []strin
                streamingParts := make([]queue.StreamingPartData, 0)
                // Add sidx streaming parts
                for name, sidxParts := range sidxPartsToSync {
-                       sidxStreamingParts, sidxReleaseFuncs := 
tst.sidxMap[name].StreamingParts(sidxParts, tst.group, uint32(tst.shardID), 
name)
+                       sidx, ok := tst.getSidx(name)
+                       if !ok {
+                               return fmt.Errorf("sidx %s not found", name)
+                       }
+                       sidxStreamingParts, sidxReleaseFuncs := 
sidx.StreamingParts(sidxParts, tst.group, uint32(tst.shardID), name)
                        streamingParts = append(streamingParts, 
sidxStreamingParts...)
                        *releaseFuncs = append(*releaseFuncs, 
sidxReleaseFuncs...)
                }
diff --git a/banyand/trace/tstable.go b/banyand/trace/tstable.go
index 645dde18..86453916 100644
--- a/banyand/trace/tstable.go
+++ b/banyand/trace/tstable.go
@@ -306,25 +306,13 @@ func (tst *tsTable) getOrCreateSidx(name string) 
(sidx.SIDX, error) {
 }
 
 // getAllSidx returns all sidx instances for potential multi-sidx queries.
-func (tst *tsTable) getAllSidx() []sidx.SIDX {
+func (tst *tsTable) getAllSidx() map[string]sidx.SIDX {
        tst.RLock()
        defer tst.RUnlock()
 
-       result := make([]sidx.SIDX, 0, len(tst.sidxMap))
-       for _, sidxInstance := range tst.sidxMap {
-               result = append(result, sidxInstance)
-       }
-       return result
-}
-
-// getAllSidxNames returns all sidx names.
-func (tst *tsTable) getAllSidxNames() []string {
-       tst.RLock()
-       defer tst.RUnlock()
-
-       result := make([]string, 0, len(tst.sidxMap))
-       for sidxName := range tst.sidxMap {
-               result = append(result, sidxName)
+       result := make(map[string]sidx.SIDX, len(tst.sidxMap))
+       for name, sidxInstance := range tst.sidxMap {
+               result[name] = sidxInstance
        }
        return result
 }
diff --git 
a/test/integration/distributed/multi_segments/multi_segments_suite_test.go 
b/test/integration/distributed/multi_segments/multi_segments_suite_test.go
index 5a8fa9fe..e331f6e6 100644
--- a/test/integration/distributed/multi_segments/multi_segments_suite_test.go
+++ b/test/integration/distributed/multi_segments/multi_segments_suite_test.go
@@ -48,7 +48,6 @@ import (
        casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure"
        casesstream "github.com/apache/skywalking-banyandb/test/cases/stream"
        casestopn "github.com/apache/skywalking-banyandb/test/cases/topn"
-       casestrace "github.com/apache/skywalking-banyandb/test/cases/trace"
 )
 
 func TestDistributedMultiSegments(t *testing.T) {
@@ -137,10 +136,6 @@ var _ = SynchronizedBeforeSuite(func() []byte {
                Connection: connection,
                BaseTime:   baseTime,
        }
-       casestrace.SharedContext = helpers.SharedContext{
-               Connection: connection,
-               BaseTime:   baseTime,
-       }
        Expect(err).NotTo(HaveOccurred())
 })
 

Reply via email to