Copilot commented on code in PR #806:
URL: 
https://github.com/apache/skywalking-banyandb/pull/806#discussion_r2430811739


##########
test/stress/trace-streaming/memory_monitor.go:
##########
@@ -0,0 +1,310 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package tracestreaming
+
+import (
+       "context"
+       "encoding/csv"
+       "fmt"
+       "io"
+       "net/http"
+       "os"
+       "path/filepath"
+       "runtime"
+       "sync"
+       "time"
+
+       "github.com/apache/skywalking-banyandb/pkg/cgroups"
+)
+
+// MemorySnapshot represents a point-in-time memory measurement.
+type MemorySnapshot struct {
+       Timestamp    time.Time
+       HeapAlloc    uint64
+       HeapSys      uint64
+       HeapInuse    uint64
+       HeapIdle     uint64
+       RSS          uint64
+       NumGC        uint32
+       GCPauseNs    uint64
+       NumGoroutine int
+}
+
+// MemoryMonitor tracks memory usage over time.
+type MemoryMonitor struct {
+       pprofURL     string
+       profileDir   string
+       snapshots    []MemorySnapshot
+       pollInterval time.Duration
+       mu           sync.RWMutex
+}
+
+// NewMemoryMonitor creates a new memory monitor.
+func NewMemoryMonitor(pollInterval time.Duration, pprofURL string, profileDir 
string) *MemoryMonitor {
+       return &MemoryMonitor{
+               snapshots:    make([]MemorySnapshot, 0),
+               pollInterval: pollInterval,
+               pprofURL:     pprofURL,
+               profileDir:   profileDir,
+       }
+}
+
+// Start begins monitoring memory usage.
+func (m *MemoryMonitor) Start(ctx context.Context) {
+       ticker := time.NewTicker(m.pollInterval)
+       defer ticker.Stop()
+
+       // Create profile directory if it doesn't exist
+       if m.profileDir != "" {
+               _ = os.MkdirAll(m.profileDir, 0o755)
+       }
+
+       // Capture initial snapshot
+       m.captureSnapshot()
+
+       // Start periodic heap profiling (every 30 seconds)
+       profileTicker := time.NewTicker(30 * time.Second)
+       defer profileTicker.Stop()
+
+       for {
+               select {
+               case <-ctx.Done():
+                       return
+               case <-ticker.C:
+                       m.captureSnapshot()
+               case <-profileTicker.C:
+                       if m.pprofURL != "" && m.profileDir != "" {
+                               _ = m.captureHeapProfile()
+                       }
+               }
+       }
+}
+
+// captureSnapshot captures a memory snapshot.
+func (m *MemoryMonitor) captureSnapshot() {
+       var memStats runtime.MemStats
+       runtime.ReadMemStats(&memStats)
+
+       // Get memory limit from cgroups if available
+       // We use HeapSys as a proxy for RSS in containerized environments
+       rss := memStats.HeapSys
+       if limit, err := cgroups.MemoryLimit(); err == nil && limit > 0 {
+               // In containers, use sys memory as RSS approximation
+               rss = memStats.Sys
+       }
+
+       snapshot := MemorySnapshot{
+               Timestamp:    time.Now(),
+               HeapAlloc:    memStats.HeapAlloc,
+               HeapSys:      memStats.HeapSys,
+               HeapInuse:    memStats.HeapInuse,
+               HeapIdle:     memStats.HeapIdle,
+               RSS:          rss,
+               NumGC:        memStats.NumGC,
+               GCPauseNs:    memStats.PauseNs[(memStats.NumGC+255)%256],
+               NumGoroutine: runtime.NumGoroutine(),
+       }
+
+       m.mu.Lock()
+       m.snapshots = append(m.snapshots, snapshot)
+       m.mu.Unlock()
+}
+
+// captureHeapProfile captures a heap profile via pprof HTTP endpoint.
+func (m *MemoryMonitor) captureHeapProfile() error {
+       url := fmt.Sprintf("%s/debug/pprof/heap", m.pprofURL)
+       resp, err := http.Get(url) // #nosec G107
+       if err != nil {
+               return fmt.Errorf("failed to get heap profile: %w", err)
+       }
+       defer resp.Body.Close()
+
+       if resp.StatusCode != http.StatusOK {
+               return fmt.Errorf("heap profile returned status %d", 
resp.StatusCode)
+       }
+
+       // Save profile to file
+       filename := filepath.Join(m.profileDir, fmt.Sprintf("heap_%d.pprof", 
time.Now().Unix()))
+       file, err := os.Create(filename)
+       if err != nil {
+               return fmt.Errorf("failed to create profile file: %w", err)
+       }
+       defer file.Close()
+
+       if _, err := io.Copy(file, resp.Body); err != nil {
+               return fmt.Errorf("failed to write profile: %w", err)
+       }
+
+       return nil
+}
+
+// CaptureCPUProfile captures a CPU profile for the specified duration.
+func (m *MemoryMonitor) CaptureCPUProfile(duration time.Duration) error {
+       if m.pprofURL == "" || m.profileDir == "" {
+               return fmt.Errorf("pprof URL or profile directory not 
configured")
+       }
+
+       url := fmt.Sprintf("%s/debug/pprof/profile?seconds=%d", m.pprofURL, 
int(duration.Seconds()))
+       resp, err := http.Get(url) // #nosec G107

Review Comment:
   Similar to the heap profile, this HTTP GET request constructs a URL from 
user input. Consider URL validation or using url.Parse to safely construct the 
URL.



##########
banyand/trace/streaming_pipeline.go:
##########
@@ -0,0 +1,987 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package trace
+
+import (
+       "container/heap"
+       "context"
+       stdErrors "errors"
+       "fmt"
+       "sort"
+       "sync"
+
+       pkgerrors "github.com/pkg/errors"
+
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/banyand/internal/sidx"
+       "github.com/apache/skywalking-banyandb/pkg/cgroups"
+       "github.com/apache/skywalking-banyandb/pkg/query"
+)
+
+type traceBatch struct {
+       err      error
+       keys     map[string]int64
+       traceIDs []string
+       seq      int
+}
+
+type scanBatch struct {
+       err     error
+       cursors []*blockCursor
+       traceBatch
+}
+
+func newTraceBatch(seq int, capacity int) traceBatch {
+       tb := traceBatch{
+               seq:      seq,
+               traceIDs: make([]string, 0, capacity),
+       }
+       if capacity > 0 {
+               tb.keys = make(map[string]int64, capacity)
+       }
+       return tb
+}
+
+func staticTraceBatchSource(ctx context.Context, traceIDs []string, 
maxTraceSize int, keys map[string]int64) <-chan traceBatch {
+       out := make(chan traceBatch)
+
+       go func() {
+               defer close(out)
+
+               if len(traceIDs) == 0 {
+                       return
+               }
+
+               limit := len(traceIDs)
+               if maxTraceSize > 0 && maxTraceSize < limit {
+                       limit = maxTraceSize
+               }
+
+               orderedIDs := append([]string(nil), traceIDs[:limit]...)
+
+               // Determine batch size. When maxTraceSize is zero, emit 
everything in one batch.
+               batchSize := maxTraceSize
+               if batchSize <= 0 || batchSize > len(orderedIDs) {
+                       batchSize = len(orderedIDs)
+               }
+
+               seq := 0
+               for start := 0; start < len(orderedIDs); start += batchSize {
+                       end := start + batchSize
+                       if end > len(orderedIDs) {
+                               end = len(orderedIDs)
+                       }
+
+                       select {
+                       case <-ctx.Done():
+                               return
+                       case out <- traceBatch{
+                               seq:      seq,
+                               traceIDs: append([]string(nil), 
orderedIDs[start:end]...),
+                               keys:     keys,
+                       }:
+                               seq++
+                       }
+               }
+       }()
+
+       return out
+}
+
+const defaultTraceBatchSize = 64
+
+type sidxStreamShard struct {
+       results  <-chan *sidx.QueryResponse
+       response *sidx.QueryResponse
+       id       int
+       idx      int
+       done     bool
+}
+
+func (sh *sidxStreamShard) prepare(ctx context.Context) error {
+       for {
+               if sh.response != nil && sh.idx >= 0 && sh.idx < 
sh.response.Len() {
+                       return nil
+               }
+
+               select {
+               case <-ctx.Done():
+                       return ctx.Err()
+               case resp, ok := <-sh.results:
+                       if !ok {
+                               sh.done = true
+                               sh.response = nil
+                               return nil
+                       }
+                       if resp == nil {
+                               continue
+                       }
+                       if resp.Error != nil {
+                               return resp.Error
+                       }
+                       if resp.Len() == 0 {
+                               continue
+                       }
+                       sh.response = resp
+                       sh.idx = 0
+                       return nil
+               }
+       }
+}
+
+func (sh *sidxStreamShard) currentKey() int64 {
+       return sh.response.Keys[sh.idx]
+}
+
+func (sh *sidxStreamShard) currentData() []byte {
+       return sh.response.Data[sh.idx]
+}
+
+func (sh *sidxStreamShard) advance(ctx context.Context) error {
+       if sh.response == nil {
+               return sh.prepare(ctx)
+       }
+
+       sh.idx++
+
+       return sh.prepare(ctx)
+}
+
+type sidxStreamHeap struct {
+       shards []*sidxStreamShard
+       asc    bool
+}
+
+func (h sidxStreamHeap) Len() int {
+       return len(h.shards)
+}
+
+func (h sidxStreamHeap) Less(i, j int) bool {
+       left := h.shards[i].currentKey()
+       right := h.shards[j].currentKey()
+       if h.asc {
+               return left < right
+       }
+       return left > right
+}
+
+func (h sidxStreamHeap) Swap(i, j int) {
+       h.shards[i], h.shards[j] = h.shards[j], h.shards[i]
+}
+
+func (h *sidxStreamHeap) Push(x interface{}) {
+       h.shards = append(h.shards, x.(*sidxStreamShard))
+}
+
+func (h *sidxStreamHeap) Pop() interface{} {
+       n := len(h.shards)
+       x := h.shards[n-1]
+       h.shards = h.shards[:n-1]
+       return x
+}
+
+type sidxStreamError struct {
+       err   error
+       index int
+}
+
+func decodeTraceID(data []byte) (string, error) {
+       if len(data) == 0 {
+               return "", fmt.Errorf("empty trace id payload")
+       }
+       if idFormat(data[0]) != idFormatV1 {
+               return "", fmt.Errorf("invalid trace id format: %x", data[0])
+       }
+       return string(data[1:]), nil
+}
+
+func forwardSIDXError(ctx context.Context, idx int, errCh <-chan error, out 
chan<- sidxStreamError) {
+       if errCh == nil {
+               return
+       }
+       for err := range errCh {
+               if err == nil {
+                       continue
+               }
+               select {
+               case out <- sidxStreamError{index: idx, err: err}:
+               case <-ctx.Done():
+                       select {
+                       case out <- sidxStreamError{index: idx, err: err}:
+                       default:
+                       }
+               }
+               return
+       }
+}
+
+func (t *trace) streamSIDXTraceBatches(
+       ctx context.Context,
+       sidxInstances []sidx.SIDX,
+       req sidx.QueryRequest,
+       maxTraceSize int,
+) <-chan traceBatch {
+       out := make(chan traceBatch)
+
+       if len(sidxInstances) == 0 {
+               close(out)
+               return out
+       }
+
+       tracer := query.GetTracer(ctx)
+       tracingCtx := ctx
+       var span *query.Span
+       if tracer != nil {
+               var spanCtx context.Context
+               span, spanCtx = tracer.StartSpan(ctx, "sidx-stream")
+               tracingCtx = spanCtx
+               tagSIDXStreamSpan(span, req, maxTraceSize, len(sidxInstances))
+       }
+
+       streamCtx, cancel := context.WithCancel(tracingCtx)
+       runner := newSIDXStreamRunner(ctx, streamCtx, cancel, req, 
maxTraceSize, span)
+
+       go func() {
+               defer close(out)
+               defer cancel()
+
+               if err := runner.prepare(sidxInstances); err != nil {
+                       runner.cancel()
+                       runner.emitError(out, err)
+                       runner.finish()
+                       return
+               }
+
+               runner.run(out)
+               runner.finish()
+       }()
+
+       return out
+}
+
+type sidxStreamRunner struct {
+       streamCtx      context.Context
+       ctx            context.Context
+       spanErr        error
+       cancelFunc     context.CancelFunc
+       span           *query.Span
+       errEvents      chan sidxStreamError
+       heap           *sidxStreamHeap
+       seenTraceIDs   map[string]struct{}
+       req            sidx.QueryRequest
+       batch          traceBatch
+       errWg          sync.WaitGroup
+       maxTraceSize   int
+       nextSeq        int
+       total          int
+       batchesEmitted int
+       duplicates     int
+       batchSize      int
+       limitReached   bool
+}
+
+func tagSIDXStreamSpan(span *query.Span, req sidx.QueryRequest, maxTraceSize 
int, instanceCount int) {
+       if span == nil {
+               return
+       }
+       if req.Filter != nil {
+               span.Tag("filter_present", "true")
+       } else {
+               span.Tag("filter_present", "false")
+       }
+       if req.Order != nil {
+               if req.Order.Index != nil && req.Order.Index.GetMetadata() != 
nil {
+                       span.Tag("order_index", 
req.Order.Index.GetMetadata().GetName())
+               }
+               span.Tag("order_sort", req.Order.Sort.String())
+               span.Tagf("order_type", "%d", req.Order.Type)
+       } else {
+               span.Tag("order_sort", "none")
+       }
+       span.Tagf("series_id_candidates", "%d", len(req.SeriesIDs))
+       span.Tagf("max_element_size", "%d", req.MaxElementSize)
+       span.Tagf("max_trace_size", "%d", maxTraceSize)
+       span.Tagf("sidx_instance_count", "%d", instanceCount)
+}
+
+func newSIDXStreamRunner(
+       ctx context.Context,
+       streamCtx context.Context,
+       cancel context.CancelFunc,
+       req sidx.QueryRequest,
+       maxTraceSize int,
+       span *query.Span,
+) *sidxStreamRunner {
+       asc := true
+       if req.Order != nil && req.Order.Sort == modelv1.Sort_SORT_DESC {
+               asc = false
+       }
+
+       batchSize := req.MaxElementSize
+       if batchSize <= 0 {
+               if maxTraceSize > 0 {
+                       batchSize = maxTraceSize
+               } else {
+                       batchSize = defaultTraceBatchSize
+               }
+       }
+
+       return &sidxStreamRunner{
+               ctx:          ctx,
+               streamCtx:    streamCtx,
+               cancelFunc:   cancel,
+               req:          req,
+               maxTraceSize: maxTraceSize,
+               batchSize:    batchSize,
+               span:         span,
+               heap:         &sidxStreamHeap{asc: asc},
+               seenTraceIDs: make(map[string]struct{}),
+               batch:        newTraceBatch(0, batchSize),
+               nextSeq:      1,
+       }
+}
+
+func (r *sidxStreamRunner) prepare(instances []sidx.SIDX) error {
+       type shardSource struct {
+               shard *sidxStreamShard
+               errCh <-chan error
+               idx   int
+       }
+
+       sources := make([]shardSource, 0, len(instances))
+       for idx, instance := range instances {
+               resultsCh, errCh := instance.StreamingQuery(r.streamCtx, r.req)
+               shard := &sidxStreamShard{
+                       id:      idx,
+                       results: resultsCh,
+               }
+               if err := shard.prepare(r.streamCtx); err != nil {
+                       return fmt.Errorf("sidx[%d] prepare failed: %w", idx, 
err)
+               }
+               if shard.done {
+                       continue
+               }
+               sources = append(sources, shardSource{
+                       shard: shard,
+                       errCh: errCh,
+                       idx:   idx,
+               })
+       }
+
+       for _, src := range sources {
+               heap.Push(r.heap, src.shard)
+       }
+
+       if len(sources) == 0 {
+               return nil
+       }
+
+       r.errEvents = make(chan sidxStreamError, len(sources))
+
+       for _, src := range sources {
+               if src.errCh == nil {
+                       continue
+               }
+               r.errWg.Add(1)
+               go func(index int, ch <-chan error) {
+                       defer r.errWg.Done()
+                       forwardSIDXError(r.streamCtx, index, ch, r.errEvents)
+               }(src.idx, src.errCh)
+       }
+
+       go func() {
+               r.errWg.Wait()
+               close(r.errEvents)
+       }()
+
+       return nil
+}
+
+func (r *sidxStreamRunner) run(out chan<- traceBatch) {
+       if r.heap.Len() == 0 {
+               r.cancel()
+               r.drainErrorEvents(out)
+               return
+       }
+
+       for r.heap.Len() > 0 {
+               if r.maxTraceSize > 0 && r.total >= r.maxTraceSize {
+                       r.limitReached = true
+                       break
+               }
+
+               if err := r.streamCtx.Err(); err != nil {
+                       if r.spanErr == nil {
+                               r.spanErr = err
+                       }
+                       return
+               }
+
+               if !r.pollErrEvents(out) {
+                       return
+               }
+
+               shard := heap.Pop(r.heap).(*sidxStreamShard)
+               if err := r.ensureShardReady(shard); err != nil {
+                       r.emitError(out, err)
+                       return
+               }
+               if shard.done {
+                       continue
+               }
+
+               added, err := r.consumeShard(shard)
+               if err != nil {
+                       r.emitError(out, err)
+                       return
+               }
+
+               if added && len(r.batch.traceIDs) >= r.batchSize {
+                       if !r.emitBatch(out) {
+                               return
+                       }
+               }
+
+               if added && r.maxTraceSize > 0 && r.total >= r.maxTraceSize {
+                       r.limitReached = true
+                       break
+               }
+
+               if err := r.advanceShard(shard); err != nil {
+                       r.emitError(out, err)
+                       return
+               }
+       }
+
+       if len(r.batch.traceIDs) > 0 {
+               if !r.emitBatch(out) {
+                       return
+               }
+       }
+
+       r.cancel()
+       r.drainErrorEvents(out)
+}
+
+func (r *sidxStreamRunner) pollErrEvents(out chan<- traceBatch) bool {
+       if r.errEvents == nil {
+               return true
+       }
+
+       select {
+       case ev, ok := <-r.errEvents:
+               if !ok {
+                       r.errEvents = nil
+                       return true
+               }
+               if ev.err == nil {
+                       return true
+               }
+               eventErr := fmt.Errorf("sidx[%d] streaming error: %w", 
ev.index, ev.err)
+               r.emitError(out, eventErr)
+               return false
+       default:
+               return true
+       }
+}
+
+func (r *sidxStreamRunner) ensureShardReady(shard *sidxStreamShard) error {
+       if shard.response == nil {
+               if err := shard.prepare(r.streamCtx); err != nil {
+                       return fmt.Errorf("sidx[%d] prepare failed: %w", 
shard.id, err)
+               }
+               if shard.done {
+                       return nil
+               }
+       }
+       return nil
+}
+
+func (r *sidxStreamRunner) consumeShard(shard *sidxStreamShard) (bool, error) {
+       traceID, err := decodeTraceID(shard.currentData())
+       if err != nil {
+               return false, fmt.Errorf("sidx[%d] invalid trace id payload: 
%w", shard.id, err)
+       }
+
+       if _, exists := r.seenTraceIDs[traceID]; exists {
+               r.duplicates++
+               return false, nil
+       }

Review Comment:
   [nitpick] The duplicate detection logic could be extracted into a helper 
method like 'isDuplicate(traceID string) bool' to improve readability and 
testability.



##########
banyand/internal/sidx/query.go:
##########
@@ -0,0 +1,748 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sidx
+
+import (
+       "container/heap"
+       "context"
+       "math"
+       "sort"
+       "sync"
+
+       "go.uber.org/multierr"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/banyand/protector"
+       "github.com/apache/skywalking-banyandb/pkg/cgroups"
+       "github.com/apache/skywalking-banyandb/pkg/query"
+)
+
+// Query implements SIDX interface.
+func (s *sidx) Query(ctx context.Context, req QueryRequest) (*QueryResponse, 
error) {
+       if err := req.Validate(); err != nil {
+               return nil, err
+       }
+
+       // The blocking query path returns the full result set. Treat 
MaxElementSize as
+       // a streaming-only hint so it does not truncate the blocking response.
+       req.MaxElementSize = 0

Review Comment:
   Modifying the input parameter req.MaxElementSize could cause confusion for 
callers who expect their request to remain unchanged. Consider using a local 
copy or a different approach to handle this difference.



##########
test/stress/trace-streaming/memory_monitor.go:
##########
@@ -0,0 +1,310 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package tracestreaming
+
+import (
+       "context"
+       "encoding/csv"
+       "fmt"
+       "io"
+       "net/http"
+       "os"
+       "path/filepath"
+       "runtime"
+       "sync"
+       "time"
+
+       "github.com/apache/skywalking-banyandb/pkg/cgroups"
+)
+
+// MemorySnapshot represents a point-in-time memory measurement.
+type MemorySnapshot struct {
+       Timestamp    time.Time
+       HeapAlloc    uint64
+       HeapSys      uint64
+       HeapInuse    uint64
+       HeapIdle     uint64
+       RSS          uint64
+       NumGC        uint32
+       GCPauseNs    uint64
+       NumGoroutine int
+}
+
+// MemoryMonitor tracks memory usage over time.
+type MemoryMonitor struct {
+       pprofURL     string
+       profileDir   string
+       snapshots    []MemorySnapshot
+       pollInterval time.Duration
+       mu           sync.RWMutex
+}
+
+// NewMemoryMonitor creates a new memory monitor.
+func NewMemoryMonitor(pollInterval time.Duration, pprofURL string, profileDir 
string) *MemoryMonitor {
+       return &MemoryMonitor{
+               snapshots:    make([]MemorySnapshot, 0),
+               pollInterval: pollInterval,
+               pprofURL:     pprofURL,
+               profileDir:   profileDir,
+       }
+}
+
+// Start begins monitoring memory usage.
+func (m *MemoryMonitor) Start(ctx context.Context) {
+       ticker := time.NewTicker(m.pollInterval)
+       defer ticker.Stop()
+
+       // Create profile directory if it doesn't exist
+       if m.profileDir != "" {
+               _ = os.MkdirAll(m.profileDir, 0o755)
+       }
+
+       // Capture initial snapshot
+       m.captureSnapshot()
+
+       // Start periodic heap profiling (every 30 seconds)
+       profileTicker := time.NewTicker(30 * time.Second)
+       defer profileTicker.Stop()
+
+       for {
+               select {
+               case <-ctx.Done():
+                       return
+               case <-ticker.C:
+                       m.captureSnapshot()
+               case <-profileTicker.C:
+                       if m.pprofURL != "" && m.profileDir != "" {
+                               _ = m.captureHeapProfile()
+                       }
+               }
+       }
+}
+
+// captureSnapshot captures a memory snapshot.
+func (m *MemoryMonitor) captureSnapshot() {
+       var memStats runtime.MemStats
+       runtime.ReadMemStats(&memStats)
+
+       // Get memory limit from cgroups if available
+       // We use HeapSys as a proxy for RSS in containerized environments
+       rss := memStats.HeapSys
+       if limit, err := cgroups.MemoryLimit(); err == nil && limit > 0 {
+               // In containers, use sys memory as RSS approximation
+               rss = memStats.Sys
+       }
+
+       snapshot := MemorySnapshot{
+               Timestamp:    time.Now(),
+               HeapAlloc:    memStats.HeapAlloc,
+               HeapSys:      memStats.HeapSys,
+               HeapInuse:    memStats.HeapInuse,
+               HeapIdle:     memStats.HeapIdle,
+               RSS:          rss,
+               NumGC:        memStats.NumGC,
+               GCPauseNs:    memStats.PauseNs[(memStats.NumGC+255)%256],
+               NumGoroutine: runtime.NumGoroutine(),
+       }
+
+       m.mu.Lock()
+       m.snapshots = append(m.snapshots, snapshot)
+       m.mu.Unlock()
+}
+
+// captureHeapProfile captures a heap profile via pprof HTTP endpoint.
+func (m *MemoryMonitor) captureHeapProfile() error {
+       url := fmt.Sprintf("%s/debug/pprof/heap", m.pprofURL)
+       resp, err := http.Get(url) // #nosec G107

Review Comment:
   HTTP GET request uses a URL constructed from user input. While the #nosec 
comment acknowledges this, consider validating the pprofURL format or using a 
URL builder to prevent potential injection.



##########
test/stress/trace-streaming/plot_memory_comparison.py:
##########
@@ -0,0 +1,303 @@
+#!/usr/bin/env python3
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Plot memory monitoring data comparing write and query phases.
+
+Usage:
+    python3 plot_memory_comparison.py memory_write_small.csv 
memory_query_small.csv
+    python3 plot_memory_comparison.py memory_write_small.csv 
memory_query_small.csv --output-dir ./charts
+"""
+
+import sys
+import csv
+import os
+from datetime import datetime
+import argparse
+
+try:
+    import matplotlib.pyplot as plt
+    import matplotlib.dates as mdates
+except ImportError:
+    print("Error: matplotlib not installed")
+    print("Install with: pip3 install matplotlib")
+    sys.exit(1)
+
+
+def parse_csv(filename):
+    """Parse memory CSV file."""
+    timestamps = []
+    heap_alloc = []
+    heap_sys = []
+    rss = []
+    num_gc = []
+    num_goroutine = []
+    
+    with open(filename, 'r') as f:
+        reader = csv.DictReader(f)
+        for row in reader:
+            # Parse timestamp
+            ts = datetime.fromisoformat(row['Timestamp'].replace('Z', 
'+00:00'))
+            timestamps.append(ts)
+            
+            # Parse memory values (already in MB)
+            heap_alloc.append(float(row['HeapAlloc(MB)']))
+            heap_sys.append(float(row['HeapSys(MB)']))
+            rss.append(float(row['RSS(MB)']))
+            num_gc.append(int(row['NumGC']))
+            num_goroutine.append(int(row['NumGoroutine']))
+    
+    return {
+        'timestamps': timestamps,
+        'heap_alloc': heap_alloc,
+        'heap_sys': heap_sys,
+        'rss': rss,
+        'num_gc': num_gc,
+        'num_goroutine': num_goroutine
+    }
+
+
+def plot_single_phase(data, title, output_file=None):
+    """Create memory visualization for a single phase."""
+    fig, axes = plt.subplots(3, 1, figsize=(14, 10), sharex=True)
+    
+    # Memory usage over time
+    ax1 = axes[0]
+    ax1.plot(data['timestamps'], data['heap_alloc'], label='Heap Alloc', 
linewidth=2, color='#2E86AB')
+    ax1.plot(data['timestamps'], data['heap_sys'], label='Heap Sys', 
linewidth=2, alpha=0.7, color='#A23B72')
+    ax1.plot(data['timestamps'], data['rss'], label='RSS', linewidth=2, 
alpha=0.7, color='#F18F01')
+    ax1.set_ylabel('Memory (MB)', fontsize=12)
+    ax1.set_title(f'{title} - Memory Usage Over Time', fontsize=14, 
fontweight='bold')
+    ax1.legend(loc='upper left', fontsize=10)
+    ax1.grid(True, alpha=0.3)
+    
+    # Add peak/average annotations
+    peak_heap = max(data['heap_alloc'])
+    avg_heap = sum(data['heap_alloc']) / len(data['heap_alloc'])
+    ax1.axhline(y=peak_heap, color='red', linestyle='--', alpha=0.5, 
label=f'Peak: {peak_heap:.1f} MB')
+    ax1.axhline(y=avg_heap, color='green', linestyle='--', alpha=0.5, 
label=f'Avg: {avg_heap:.1f} MB')
+    ax1.legend(loc='upper left', fontsize=10)
+    
+    # Garbage collection over time
+    ax2 = axes[1]
+    ax2.plot(data['timestamps'], data['num_gc'], label='GC Cycles', 
color='orange', linewidth=2)
+    ax2.set_ylabel('GC Cycles', fontsize=12)
+    ax2.set_title('Garbage Collection Activity', fontsize=12, 
fontweight='bold')
+    ax2.legend(loc='upper left', fontsize=10)
+    ax2.grid(True, alpha=0.3)
+    
+    # Goroutine count over time
+    ax3 = axes[2]
+    ax3.plot(data['timestamps'], data['num_goroutine'], label='Goroutines', 
color='purple', linewidth=2)
+    ax3.set_ylabel('Goroutine Count', fontsize=12)
+    ax3.set_xlabel('Time', fontsize=12)
+    ax3.set_title('Goroutine Count', fontsize=12, fontweight='bold')
+    ax3.legend(loc='upper left', fontsize=10)
+    ax3.grid(True, alpha=0.3)
+    
+    # Format x-axis
+    ax3.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M:%S'))
+    plt.setp(ax3.xaxis.get_majorticklabels(), rotation=45, ha='right')
+    
+    plt.tight_layout()
+    
+    if output_file:
+        plt.savefig(output_file, dpi=150, bbox_inches='tight')
+        print(f"Chart saved to: {output_file}")
+    
+    plt.close()
+    
+    # Return stats
+    return {
+        'peak_heap': peak_heap,
+        'avg_heap': avg_heap,
+        'min_heap': min(data['heap_alloc']),
+        'peak_rss': max(data['rss']),
+        'total_gc': max(data['num_gc']),
+        'max_goroutines': max(data['num_goroutine'])
+    }
+
+
+def plot_comparison(write_data, query_data, output_file=None):
+    """Create comparison plot for write vs query phases."""
+    fig, axes = plt.subplots(2, 1, figsize=(16, 10), sharex=False)
+    
+    # Combine timestamps for proper alignment
+    # Normalize to relative time (seconds from start)
+    def to_relative_seconds(timestamps):
+        start = timestamps[0]
+        return [(t - start).total_seconds() for t in timestamps]
+    
+    write_times = to_relative_seconds(write_data['timestamps'])
+    query_times = to_relative_seconds(query_data['timestamps'])
+    
+    # Memory comparison
+    ax1 = axes[0]
+    ax1.plot(write_times, write_data['heap_alloc'], label='Write - Heap 
Alloc', 
+             linewidth=2, color='#2E86AB')
+    ax1.plot(query_times, query_data['heap_alloc'], label='Query - Heap 
Alloc', 
+             linewidth=2, color='#F18F01')
+    ax1.set_ylabel('Heap Allocated (MB)', fontsize=12)
+    ax1.set_title('Memory Usage Comparison: Write vs Query Phase', 
fontsize=14, fontweight='bold')
+    ax1.legend(loc='upper left', fontsize=11)
+    ax1.grid(True, alpha=0.3)
+    
+    # Add annotations
+    write_peak = max(write_data['heap_alloc'])
+    query_peak = max(query_data['heap_alloc'])
+    ax1.axhline(y=write_peak, color='#2E86AB', linestyle='--', alpha=0.4)
+    ax1.axhline(y=query_peak, color='#F18F01', linestyle='--', alpha=0.4)
+    ax1.text(max(write_times) * 0.95, write_peak, f'{write_peak:.1f} MB', 
+             va='bottom', ha='right', color='#2E86AB', fontweight='bold')
+    ax1.text(max(query_times) * 0.95, query_peak, f'{query_peak:.1f} MB', 
+             va='bottom', ha='right', color='#F18F01', fontweight='bold')
+    
+    # Goroutine comparison
+    ax2 = axes[1]
+    ax2.plot(write_times, write_data['num_goroutine'], label='Write - 
Goroutines', 
+             linewidth=2, color='#2E86AB')
+    ax2.plot(query_times, query_data['num_goroutine'], label='Query - 
Goroutines', 
+             linewidth=2, color='#F18F01')
+    ax2.set_ylabel('Goroutine Count', fontsize=12)
+    ax2.set_xlabel('Time (seconds)', fontsize=12)
+    ax2.set_title('Goroutine Count Comparison', fontsize=12, fontweight='bold')
+    ax2.legend(loc='upper left', fontsize=11)
+    ax2.grid(True, alpha=0.3)
+    
+    plt.tight_layout()
+    
+    if output_file:
+        plt.savefig(output_file, dpi=150, bbox_inches='tight')
+        print(f"Comparison chart saved to: {output_file}")
+    
+    plt.close()
+
+
+def print_analysis(phase_name, data, stats):
+    """Print analysis for a phase."""
+    print(f"\n=== {phase_name} Phase Analysis ===")
+    print(f"Peak Heap Alloc: {stats['peak_heap']:.2f} MB")
+    print(f"Avg Heap Alloc:  {stats['avg_heap']:.2f} MB")
+    print(f"Min Heap Alloc:  {stats['min_heap']:.2f} MB")
+    print(f"Peak RSS:        {stats['peak_rss']:.2f} MB")
+    print(f"Total GC Cycles: {stats['total_gc']}")
+    print(f"Max Goroutines:  {stats['max_goroutines']}")
+    
+    # Memory leak detection
+    if len(data['heap_alloc']) > 4:
+        first_quarter_avg = 
sum(data['heap_alloc'][:len(data['heap_alloc'])//4]) / 
(len(data['heap_alloc'])//4)
+        last_quarter_avg = 
sum(data['heap_alloc'][-len(data['heap_alloc'])//4:]) / 
(len(data['heap_alloc'])//4)

Review Comment:
   [nitpick] The calculation of quarter averages has duplicated logic. Consider 
extracting this into a helper function like 'calculate_quarter_average(data, 
is_first_quarter)' to reduce code duplication.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to