hanahmily commented on code in PR #801:
URL: 
https://github.com/apache/skywalking-banyandb/pull/801#discussion_r2416514017


##########
banyand/trace/syncer.go:
##########
@@ -299,129 +236,34 @@ func (tst *tsTable) executeSyncOperation(partsToSync 
[]*part, sidxPartsToSync ma
        }()
 
        nodes := tst.getNodes()
-       return tst.syncStreamingPartsToNodes(ctx, nodes, partsToSync, 
sidxPartsToSync, &releaseFuncs)
-}
-
-// handleSyncIntroductions creates and processes sync introductions for both 
core and sidx parts.
-func (tst *tsTable) handleSyncIntroductions(partsToSync []*part, 
sidxPartsToSync map[string][]*sidx.Part, syncCh chan *syncIntroduction) error {
-       // Create core sync introduction
-       si := generateSyncIntroduction()
-       defer releaseSyncIntroduction(si)
-       si.applied = make(chan struct{})
-       for _, part := range partsToSync {
-               ph := partHandle{
-                       partID:   part.partMetadata.ID,
-                       partType: PartTypeCore,
-               }
-               si.synced[ph] = struct{}{}
-       }
-
-       // Create sidx sync introductions
-       sidxSyncIntroductions := 
tst.createSidxSyncIntroductions(sidxPartsToSync)
-       defer tst.releaseSidxSyncIntroductions(sidxSyncIntroductions)
-
-       // Send sync introductions
-       if err := tst.sendSyncIntroductions(si, sidxSyncIntroductions, syncCh); 
err != nil {
-               return err
-       }
-
-       // Wait for sync introductions to be applied
-       return tst.waitForSyncIntroductions(si, sidxSyncIntroductions)
-}
-
-// createSidxSyncIntroductions creates sync introductions for sidx parts.
-func (tst *tsTable) createSidxSyncIntroductions(sidxPartsToSync 
map[string][]*sidx.Part) map[string]*sidx.SyncIntroduction {
-       sidxSyncIntroductions := make(map[string]*sidx.SyncIntroduction)
-       for name, sidxParts := range sidxPartsToSync {
-               if len(sidxParts) > 0 {
-                       ssi := sidx.GenerateSyncIntroduction()
-                       ssi.Applied = make(chan struct{})
-                       for _, part := range sidxParts {
-                               ssi.Synced[part.ID()] = struct{}{}
-                       }
-                       sidxSyncIntroductions[name] = ssi
-               }
-       }
-       return sidxSyncIntroductions
-}
-
-// releaseSidxSyncIntroductions releases sidx sync introductions.
-func (tst *tsTable) releaseSidxSyncIntroductions(sidxSyncIntroductions 
map[string]*sidx.SyncIntroduction) {
-       for _, ssi := range sidxSyncIntroductions {
-               sidx.ReleaseSyncIntroduction(ssi)
-       }
-}
-
-// sendSyncIntroductions sends sync introductions to their respective channels.
-func (tst *tsTable) sendSyncIntroductions(si *syncIntroduction, 
sidxSyncIntroductions map[string]*sidx.SyncIntroduction, syncCh chan 
*syncIntroduction) error {
-       select {
-       case syncCh <- si:
-       case <-tst.loopCloser.CloseNotify():
-               return errClosed
-       }
-
-       for name, ssi := range sidxSyncIntroductions {
-               sidx, ok := tst.getSidx(name)
-               if !ok {
-                       return fmt.Errorf("sidx %s not found", name)
-               }
-               select {
-               case sidx.SyncCh() <- ssi:
-               case <-tst.loopCloser.CloseNotify():
-                       return errClosed
-               }
-       }
-       return nil
-}
-
-// waitForSyncIntroductions waits for all sync introductions to be applied.
-func (tst *tsTable) waitForSyncIntroductions(si *syncIntroduction, 
sidxSyncIntroductions map[string]*sidx.SyncIntroduction) error {
-       select {
-       case <-si.applied:
-       case <-tst.loopCloser.CloseNotify():
-               return errClosed
-       }
-
-       for _, ssi := range sidxSyncIntroductions {
-               select {
-               case <-ssi.Applied:
-               case <-tst.loopCloser.CloseNotify():
-                       return errClosed
-               }
-       }
-       return nil
-}
-
-// syncStreamingPartsToNodes synchronizes streamingparts to multiple nodes.
-func (tst *tsTable) syncStreamingPartsToNodes(ctx context.Context, nodes 
[]string,
-       partsToSync []*part, sidxPartsToSync map[string][]*sidx.Part, 
releaseFuncs *[]func(),
-) error {
        if tst.loopCloser != nil && tst.loopCloser.Closed() {
                return errClosed
        }
+       sidxMap := tst.getAllSidx()
        for _, node := range nodes {
                if tst.loopCloser != nil && tst.loopCloser.Closed() {
                        return errClosed
                }
                // Prepare all streaming parts data
                streamingParts := make([]queue.StreamingPartData, 0)
                // Add sidx streaming parts
-               for name, sidxParts := range sidxPartsToSync {
-                       if len(sidxParts) == 0 {
-                               continue
+               for name, sidx := range sidxMap {
+                       sidxStreamingParts, sidxReleaseFuncs := 
sidx.StreamingParts(partIDsToSync, tst.group, uint32(tst.shardID), name)
+                       if len(sidxStreamingParts) != len(partIDsToSync) {
+                               logger.Panicf("sidx streaming parts count 
mismatch: %d != %d", len(sidxStreamingParts), len(partIDsToSync))
+                               return nil
                        }
-                       sidx, ok := tst.getSidx(name)
-                       if !ok {
-                               return fmt.Errorf("sidx %s not found", name)
-                       }
-                       sidxStreamingParts, sidxReleaseFuncs := 
sidx.StreamingParts(sidxParts, tst.group, uint32(tst.shardID), name)
                        streamingParts = append(streamingParts, 
sidxStreamingParts...)
-                       *releaseFuncs = append(*releaseFuncs, 
sidxReleaseFuncs...)
+                       releaseFuncs = append(releaseFuncs, sidxReleaseFuncs...)
+               }
+               if len(streamingParts) != len(partIDsToSync)*len(sidxMap) {
+                       logger.Panicf("streaming parts count mismatch: %d != 
%d", len(streamingParts), len(partIDsToSync)*len(sidxMap))
+                       return nil

Review Comment:
   The compiler cannot recognize `logger.Panicf` as a valid terminal line.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to