This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 071c0ccd Fix syncSnapshot for trace (#788)
071c0ccd is described below

commit 071c0ccd4a4430b42f55eff4a0c03f66517da25a
Author: Huang Youliang <[email protected]>
AuthorDate: Tue Sep 30 06:53:16 2025 +0800

    Fix syncSnapshot for trace (#788)
    
    * Fix syncSnapshot for trace
    
    * Remove the logging-env flag
---
 banyand/internal/sidx/sidx.go                      |  6 ++
 banyand/trace/flusher.go                           |  5 ++
 banyand/trace/syncer.go                            | 70 +++++++++++++++-------
 banyand/trace/write_data.go                        |  2 +
 pkg/test/setup/setup.go                            |  4 ++
 .../distributed/query/query_suite_test.go          |  5 ++
 6 files changed, 70 insertions(+), 22 deletions(-)

diff --git a/banyand/internal/sidx/sidx.go b/banyand/internal/sidx/sidx.go
index b2aa29df..5e2efd1f 100644
--- a/banyand/internal/sidx/sidx.go
+++ b/banyand/internal/sidx/sidx.go
@@ -453,6 +453,12 @@ func (s *sidx) Flush() error {
                }
                partPath := partPath(s.root, pw.ID())
                pw.mp.mustFlush(s.fileSystem, partPath)
+               if l := s.l.Debug(); l.Enabled() {
+                       s.l.Debug().
+                               Uint64("part_id", pw.ID()).
+                               Str("part_path", partPath).
+                               Msg("flushing sidx part")
+               }
                newPW := newPartWrapper(nil, mustOpenPart(partPath, 
s.fileSystem))
                flushIntro.flushed[newPW.ID()] = newPW
        }
diff --git a/banyand/trace/flusher.go b/banyand/trace/flusher.go
index 33eb4d1e..76096b4b 100644
--- a/banyand/trace/flusher.go
+++ b/banyand/trace/flusher.go
@@ -269,6 +269,11 @@ func (tst *tsTable) flush(snapshot *snapshot, flushCh chan 
*flusherIntroduction)
        }
        allSidx := tst.getAllSidx()
        for name, sidxInstance := range allSidx {
+               if l := tst.l.Debug(); l.Enabled() {
+                       tst.l.Debug().
+                               Str("sidx_name", name).
+                               Msg("flushing sidx")
+               }
                if err := sidxInstance.Flush(); err != nil {
                        tst.l.Warn().Err(err).Str("sidx", name).Msg("sidx flush 
failed")
                        return
diff --git a/banyand/trace/syncer.go b/banyand/trace/syncer.go
index 8e98a3e0..b0155363 100644
--- a/banyand/trace/syncer.go
+++ b/banyand/trace/syncer.go
@@ -189,24 +189,18 @@ func (tst *tsTable) syncSnapshot(curSnapshot *snapshot, 
syncCh chan *syncIntrodu
        if err != nil {
                return err
        }
-       if len(partsToSync) == 0 && len(sidxPartsToSync) == 0 {
-               return nil
-       }
-       hasSidxParts := false
-       for _, sidxParts := range sidxPartsToSync {
-               if len(sidxParts) == 0 {
-                       continue
+       if l := tst.l.Debug(); l.Enabled() {
+               for name, sidxParts := range sidxPartsToSync {
+                       tst.l.Debug().
+                               Str("sidx_name", name).
+                               Int("sidx_parts_count", len(sidxParts)).
+                               Msg("sidxPartsToSync in syncSnapshot")
                }
-               hasSidxParts = true
-               break
-       }
-       if len(partsToSync) == 0 && !hasSidxParts {
-               return nil
        }
 
        // Validate sync preconditions
-       if err := tst.validateSyncPreconditions(partsToSync, sidxPartsToSync); 
err != nil {
-               return err
+       if !tst.needToSync(partsToSync, sidxPartsToSync) {
+               return nil
        }
 
        // Execute sync operation
@@ -234,13 +228,46 @@ func (tst *tsTable) collectPartsToSync(curSnapshot 
*snapshot) ([]*part, map[stri
                        return nil, nil, errClosed
                }
                sidxPartsToSync[name] = sidx.PartsToSync()
+               if l := tst.l.Debug(); l.Enabled() {
+                       tst.l.Debug().
+                               Str("sidx_name", name).
+                               Int("sidx_parts_count", 
len(sidxPartsToSync[name])).
+                               Msg("get sidx parts to sync")
+               }
+       }
+
+       if l := tst.l.Debug(); l.Enabled() {
+               tst.l.Debug().
+                       Int("core_parts_count", len(partsToSync)).
+                       Uint64("snapshot_epoch", curSnapshot.epoch).
+                       Msg("collected core parts for sync")
+               if len(partsToSync) > 0 {
+                       var corePartIDs []uint64
+                       for _, part := range partsToSync {
+                               corePartIDs = append(corePartIDs, 
part.partMetadata.ID)
+                       }
+                       tst.l.Debug().
+                               Interface("core_part_ids", corePartIDs).
+                               Msg("core parts to sync details")
+               }
+               for sidxName, sidxParts := range sidxPartsToSync {
+                       var sidxPartIDs []uint64
+                       for _, part := range sidxParts {
+                               sidxPartIDs = append(sidxPartIDs, part.ID())
+                       }
+                       tst.l.Debug().
+                               Str("sidx_name", sidxName).
+                               Int("sidx_parts_count", len(sidxParts)).
+                               Interface("sidx_part_ids", sidxPartIDs).
+                               Msg("collected sidx parts for sync")
+               }
        }
 
        return partsToSync, sidxPartsToSync, nil
 }
 
-// validateSyncPreconditions validates that there are parts to sync and nodes 
available.
-func (tst *tsTable) validateSyncPreconditions(partsToSync []*part, 
sidxPartsToSync map[string][]*sidx.Part) error {
+// needToSync validates that there are parts to sync and nodes available.
+func (tst *tsTable) needToSync(partsToSync []*part, sidxPartsToSync 
map[string][]*sidx.Part) bool {
        hasCoreParts := len(partsToSync) > 0
        hasSidxParts := false
        for _, parts := range sidxPartsToSync {
@@ -250,15 +277,11 @@ func (tst *tsTable) validateSyncPreconditions(partsToSync 
[]*part, sidxPartsToSy
                }
        }
        if !hasCoreParts && !hasSidxParts {
-               return nil
+               return false
        }
 
        nodes := tst.getNodes()
-       if len(nodes) == 0 {
-               return fmt.Errorf("no nodes to sync parts")
-       }
-
-       return nil
+       return len(nodes) > 0
 }
 
 // executeSyncOperation performs the actual synchronization of parts to nodes.
@@ -384,6 +407,9 @@ func (tst *tsTable) syncStreamingPartsToNodes(ctx 
context.Context, nodes []strin
                streamingParts := make([]queue.StreamingPartData, 0)
                // Add sidx streaming parts
                for name, sidxParts := range sidxPartsToSync {
+                       if len(sidxParts) == 0 {
+                               continue
+                       }
                        sidx, ok := tst.getSidx(name)
                        if !ok {
                                return fmt.Errorf("sidx %s not found", name)
diff --git a/banyand/trace/write_data.go b/banyand/trace/write_data.go
index ed3c74dc..f1d8da52 100644
--- a/banyand/trace/write_data.go
+++ b/banyand/trace/write_data.go
@@ -83,6 +83,8 @@ func (s *syncPartContext) Close() error {
                s.sidxPartContext = nil
        }
        s.tsTable = nil
+       s.traceIDFilterBuffer = nil
+       s.tagTypeBuffer = nil
        return nil
 }
 
diff --git a/pkg/test/setup/setup.go b/pkg/test/setup/setup.go
index 68972a40..804a8cf6 100644
--- a/pkg/test/setup/setup.go
+++ b/pkg/test/setup/setup.go
@@ -256,6 +256,8 @@ func startDataNode(etcdEndpoint, dataDir string, flags 
...string) (string, strin
                "--etcd-endpoints", etcdEndpoint,
                "--node-host-provider", "flag",
                "--node-host", nodeHost,
+               "--logging-modules", "trace,sidx",
+               "--logging-levels", "debug,debug",
        )
 
        closeFn := CMD(flags...)
@@ -344,6 +346,8 @@ func LiaisonNodeWithHTTP(etcdEndpoint string, flags 
...string) (string, string,
                "--stream-sync-interval=1s",
                "--measure-sync-interval=1s",
                "--trace-sync-interval=1s",
+               "--logging-modules", "trace,sidx",
+               "--logging-levels", "debug,debug",
        )
        closeFn := CMD(flags...)
        gomega.Eventually(helpers.HTTPHealthCheck(httpAddr, ""), 
testflags.EventuallyTimeout).Should(gomega.Succeed())
diff --git a/test/integration/distributed/query/query_suite_test.go 
b/test/integration/distributed/query/query_suite_test.go
index 1f67007d..297445ba 100644
--- a/test/integration/distributed/query/query_suite_test.go
+++ b/test/integration/distributed/query/query_suite_test.go
@@ -49,6 +49,7 @@ import (
        casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure"
        casesstream "github.com/apache/skywalking-banyandb/test/cases/stream"
        casestopn "github.com/apache/skywalking-banyandb/test/cases/topn"
+       casestrace "github.com/apache/skywalking-banyandb/test/cases/trace"
 )
 
 func TestQuery(t *testing.T) {
@@ -130,6 +131,10 @@ var _ = SynchronizedBeforeSuite(func() []byte {
                Connection: connection,
                BaseTime:   now,
        }
+       casestrace.SharedContext = helpers.SharedContext{
+               Connection: connection,
+               BaseTime:   now,
+       }
        Expect(err).NotTo(HaveOccurred())
 })
 

Reply via email to