Copilot commented on code in PR #767:
URL: 
https://github.com/apache/skywalking-banyandb/pull/767#discussion_r2366278321


##########
banyand/trace/write_data.go:
##########
@@ -18,31 +18,70 @@
 package trace
 
 import (
+       "context"
        "fmt"
        "strings"
        "time"
 
        "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/banyand/internal/sidx"
        "github.com/apache/skywalking-banyandb/banyand/queue"
+       "github.com/apache/skywalking-banyandb/pkg/filter"
        "github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
 type syncPartContext struct {
-       tsTable *tsTable
-       writers *writers
-       memPart *memPart
+       tsTable             *tsTable
+       l                   *logger.Logger
+       writers             *writers
+       memPart             *memPart
+       sidxPartContext     *sidx.SyncPartContext
+       traceIDFilterBuffer []byte
+       tagTypeBuffer       []byte
 }
 
 func (s *syncPartContext) FinishSync() error {
-       s.tsTable.mustAddMemPart(s.memPart)
+       if len(s.traceIDFilterBuffer) > 0 && s.memPart != nil {
+               bf := filter.NewBloomFilter(0)
+               s.memPart.traceIDFilter.filter = 
decodeBloomFilter(s.traceIDFilterBuffer, bf)

Review Comment:
   Creating a BloomFilter with size 0 may not allocate proper memory for the 
filter. Consider using a more appropriate initial size or allowing the 
decodeBloomFilter function to handle initialization.
   ```suggestion
                s.memPart.traceIDFilter.filter = 
decodeBloomFilter(s.traceIDFilterBuffer)
   ```



##########
pkg/query/logical/trace/trace_plan_merge.go:
##########
@@ -0,0 +1,41 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package trace
+
+import (
+       "fmt"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       tracev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
+       "github.com/apache/skywalking-banyandb/pkg/query/executor"
+       "github.com/apache/skywalking-banyandb/pkg/query/logical"
+)
+
+var _ logical.UnresolvedPlan = (*unresolvedTraceMerger)(nil)
+
+type unresolvedTraceMerger struct {
+       criteria      *tracev1.QueryRequest
+       metadata      []*commonv1.Metadata
+       ecc           []executor.TraceExecutionContext
+       tagProjection [][]*logical.Tag
+}
+
+// Analyze implements logical.UnresolvedPlan.
+func (u *unresolvedTraceMerger) Analyze(_ logical.Schema) (logical.Plan, 
error) {
+       return nil, fmt.Errorf("trace merger not implemented yet")

Review Comment:
   The trace merger functionality is marked as unimplemented. This will cause 
failures in distributed trace queries that require merging results from 
multiple nodes.
   ```suggestion
   
   // traceMergerPlan implements logical.Plan for merging distributed trace 
results.
   type traceMergerPlan struct {
        criteria      *tracev1.QueryRequest
        metadata      []*commonv1.Metadata
        ecc           []executor.TraceExecutionContext
        tagProjection [][]*logical.Tag
   }
   
   // Execute merges trace results from multiple nodes.
   func (p *traceMergerPlan) Execute(ctx executor.ExecutionContext) 
([]*tracev1.Trace, error) {
        var mergedTraces = make(map[string]*tracev1.Trace)
        for _, ec := range p.ecc {
                traces, err := ec.ExecuteTraceQuery(ctx, p.criteria, 
p.metadata, p.tagProjection)
                if err != nil {
                        return nil, err
                }
                for _, trace := range traces {
                        id := trace.TraceId
                        if existing, ok := mergedTraces[id]; ok {
                                // Merge spans if trace already exists
                                existing.Spans = append(existing.Spans, 
trace.Spans...)
                        } else {
                                mergedTraces[id] = trace
                        }
                }
        }
        // Optionally deduplicate spans here if needed
        result := make([]*tracev1.Trace, 0, len(mergedTraces))
        for _, trace := range mergedTraces {
                result = append(result, trace)
        }
        return result, nil
   }
   
   // Children returns nil as this is a leaf plan.
   func (p *traceMergerPlan) Children() []logical.Plan {
        return nil
   }
   
   // String returns a string representation of the plan.
   func (p *traceMergerPlan) String() string {
        return "TraceMergerPlan"
   }
   
   // Analyze implements logical.Plan (noop for resolved plan).
   func (p *traceMergerPlan) Analyze(_ logical.Schema) (logical.Plan, error) {
        return p, nil
   }
   func (u *unresolvedTraceMerger) Analyze(_ logical.Schema) (logical.Plan, 
error) {
        return &traceMergerPlan{
                criteria:      u.criteria,
                metadata:      u.metadata,
                ecc:           u.ecc,
                tagProjection: u.tagProjection,
        }, nil
   ```



##########
banyand/internal/storage/segment.go:
##########
@@ -677,7 +677,9 @@ func (sc *segmentController[T, O]) close() {
        sc.Lock()
        defer sc.Unlock()
        for _, s := range sc.lst {
-               s.DecRef()
+               for atomic.LoadInt32(&s.refCount) > 0 {
+                       s.DecRef()

Review Comment:
   [nitpick] This busy-wait loop could cause performance issues or infinite 
loops if DecRef() doesn't properly decrement the reference count. Consider 
adding a timeout or using a more robust cleanup mechanism.
   ```suggestion
                timeout := time.After(5 * time.Second)
                for atomic.LoadInt32(&s.refCount) > 0 {
                        select {
                        case <-timeout:
                                sc.l.Warn().Stringer("segment", s).Msg("Timeout 
waiting for segment refCount to reach zero during close")
                                break
                        default:
                                s.DecRef()
                                time.Sleep(10 * time.Millisecond)
                        }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to