Copilot commented on code in PR #806: URL: https://github.com/apache/skywalking-banyandb/pull/806#discussion_r2430811739
########## test/stress/trace-streaming/memory_monitor.go: ########## @@ -0,0 +1,310 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package tracestreaming + +import ( + "context" + "encoding/csv" + "fmt" + "io" + "net/http" + "os" + "path/filepath" + "runtime" + "sync" + "time" + + "github.com/apache/skywalking-banyandb/pkg/cgroups" +) + +// MemorySnapshot represents a point-in-time memory measurement. +type MemorySnapshot struct { + Timestamp time.Time + HeapAlloc uint64 + HeapSys uint64 + HeapInuse uint64 + HeapIdle uint64 + RSS uint64 + NumGC uint32 + GCPauseNs uint64 + NumGoroutine int +} + +// MemoryMonitor tracks memory usage over time. +type MemoryMonitor struct { + pprofURL string + profileDir string + snapshots []MemorySnapshot + pollInterval time.Duration + mu sync.RWMutex +} + +// NewMemoryMonitor creates a new memory monitor. +func NewMemoryMonitor(pollInterval time.Duration, pprofURL string, profileDir string) *MemoryMonitor { + return &MemoryMonitor{ + snapshots: make([]MemorySnapshot, 0), + pollInterval: pollInterval, + pprofURL: pprofURL, + profileDir: profileDir, + } +} + +// Start begins monitoring memory usage. +func (m *MemoryMonitor) Start(ctx context.Context) { + ticker := time.NewTicker(m.pollInterval) + defer ticker.Stop() + + // Create profile directory if it doesn't exist + if m.profileDir != "" { + _ = os.MkdirAll(m.profileDir, 0o755) + } + + // Capture initial snapshot + m.captureSnapshot() + + // Start periodic heap profiling (every 30 seconds) + profileTicker := time.NewTicker(30 * time.Second) + defer profileTicker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + m.captureSnapshot() + case <-profileTicker.C: + if m.pprofURL != "" && m.profileDir != "" { + _ = m.captureHeapProfile() + } + } + } +} + +// captureSnapshot captures a memory snapshot. +func (m *MemoryMonitor) captureSnapshot() { + var memStats runtime.MemStats + runtime.ReadMemStats(&memStats) + + // Get memory limit from cgroups if available + // We use HeapSys as a proxy for RSS in containerized environments + rss := memStats.HeapSys + if limit, err := cgroups.MemoryLimit(); err == nil && limit > 0 { + // In containers, use sys memory as RSS approximation + rss = memStats.Sys + } + + snapshot := MemorySnapshot{ + Timestamp: time.Now(), + HeapAlloc: memStats.HeapAlloc, + HeapSys: memStats.HeapSys, + HeapInuse: memStats.HeapInuse, + HeapIdle: memStats.HeapIdle, + RSS: rss, + NumGC: memStats.NumGC, + GCPauseNs: memStats.PauseNs[(memStats.NumGC+255)%256], + NumGoroutine: runtime.NumGoroutine(), + } + + m.mu.Lock() + m.snapshots = append(m.snapshots, snapshot) + m.mu.Unlock() +} + +// captureHeapProfile captures a heap profile via pprof HTTP endpoint. +func (m *MemoryMonitor) captureHeapProfile() error { + url := fmt.Sprintf("%s/debug/pprof/heap", m.pprofURL) + resp, err := http.Get(url) // #nosec G107 + if err != nil { + return fmt.Errorf("failed to get heap profile: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("heap profile returned status %d", resp.StatusCode) + } + + // Save profile to file + filename := filepath.Join(m.profileDir, fmt.Sprintf("heap_%d.pprof", time.Now().Unix())) + file, err := os.Create(filename) + if err != nil { + return fmt.Errorf("failed to create profile file: %w", err) + } + defer file.Close() + + if _, err := io.Copy(file, resp.Body); err != nil { + return fmt.Errorf("failed to write profile: %w", err) + } + + return nil +} + +// CaptureCPUProfile captures a CPU profile for the specified duration. +func (m *MemoryMonitor) CaptureCPUProfile(duration time.Duration) error { + if m.pprofURL == "" || m.profileDir == "" { + return fmt.Errorf("pprof URL or profile directory not configured") + } + + url := fmt.Sprintf("%s/debug/pprof/profile?seconds=%d", m.pprofURL, int(duration.Seconds())) + resp, err := http.Get(url) // #nosec G107 Review Comment: Similar to the heap profile, this HTTP GET request constructs a URL from user input. Consider URL validation or using url.Parse to safely construct the URL. ########## banyand/trace/streaming_pipeline.go: ########## @@ -0,0 +1,987 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package trace + +import ( + "container/heap" + "context" + stdErrors "errors" + "fmt" + "sort" + "sync" + + pkgerrors "github.com/pkg/errors" + + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/banyand/internal/sidx" + "github.com/apache/skywalking-banyandb/pkg/cgroups" + "github.com/apache/skywalking-banyandb/pkg/query" +) + +type traceBatch struct { + err error + keys map[string]int64 + traceIDs []string + seq int +} + +type scanBatch struct { + err error + cursors []*blockCursor + traceBatch +} + +func newTraceBatch(seq int, capacity int) traceBatch { + tb := traceBatch{ + seq: seq, + traceIDs: make([]string, 0, capacity), + } + if capacity > 0 { + tb.keys = make(map[string]int64, capacity) + } + return tb +} + +func staticTraceBatchSource(ctx context.Context, traceIDs []string, maxTraceSize int, keys map[string]int64) <-chan traceBatch { + out := make(chan traceBatch) + + go func() { + defer close(out) + + if len(traceIDs) == 0 { + return + } + + limit := len(traceIDs) + if maxTraceSize > 0 && maxTraceSize < limit { + limit = maxTraceSize + } + + orderedIDs := append([]string(nil), traceIDs[:limit]...) + + // Determine batch size. When maxTraceSize is zero, emit everything in one batch. + batchSize := maxTraceSize + if batchSize <= 0 || batchSize > len(orderedIDs) { + batchSize = len(orderedIDs) + } + + seq := 0 + for start := 0; start < len(orderedIDs); start += batchSize { + end := start + batchSize + if end > len(orderedIDs) { + end = len(orderedIDs) + } + + select { + case <-ctx.Done(): + return + case out <- traceBatch{ + seq: seq, + traceIDs: append([]string(nil), orderedIDs[start:end]...), + keys: keys, + }: + seq++ + } + } + }() + + return out +} + +const defaultTraceBatchSize = 64 + +type sidxStreamShard struct { + results <-chan *sidx.QueryResponse + response *sidx.QueryResponse + id int + idx int + done bool +} + +func (sh *sidxStreamShard) prepare(ctx context.Context) error { + for { + if sh.response != nil && sh.idx >= 0 && sh.idx < sh.response.Len() { + return nil + } + + select { + case <-ctx.Done(): + return ctx.Err() + case resp, ok := <-sh.results: + if !ok { + sh.done = true + sh.response = nil + return nil + } + if resp == nil { + continue + } + if resp.Error != nil { + return resp.Error + } + if resp.Len() == 0 { + continue + } + sh.response = resp + sh.idx = 0 + return nil + } + } +} + +func (sh *sidxStreamShard) currentKey() int64 { + return sh.response.Keys[sh.idx] +} + +func (sh *sidxStreamShard) currentData() []byte { + return sh.response.Data[sh.idx] +} + +func (sh *sidxStreamShard) advance(ctx context.Context) error { + if sh.response == nil { + return sh.prepare(ctx) + } + + sh.idx++ + + return sh.prepare(ctx) +} + +type sidxStreamHeap struct { + shards []*sidxStreamShard + asc bool +} + +func (h sidxStreamHeap) Len() int { + return len(h.shards) +} + +func (h sidxStreamHeap) Less(i, j int) bool { + left := h.shards[i].currentKey() + right := h.shards[j].currentKey() + if h.asc { + return left < right + } + return left > right +} + +func (h sidxStreamHeap) Swap(i, j int) { + h.shards[i], h.shards[j] = h.shards[j], h.shards[i] +} + +func (h *sidxStreamHeap) Push(x interface{}) { + h.shards = append(h.shards, x.(*sidxStreamShard)) +} + +func (h *sidxStreamHeap) Pop() interface{} { + n := len(h.shards) + x := h.shards[n-1] + h.shards = h.shards[:n-1] + return x +} + +type sidxStreamError struct { + err error + index int +} + +func decodeTraceID(data []byte) (string, error) { + if len(data) == 0 { + return "", fmt.Errorf("empty trace id payload") + } + if idFormat(data[0]) != idFormatV1 { + return "", fmt.Errorf("invalid trace id format: %x", data[0]) + } + return string(data[1:]), nil +} + +func forwardSIDXError(ctx context.Context, idx int, errCh <-chan error, out chan<- sidxStreamError) { + if errCh == nil { + return + } + for err := range errCh { + if err == nil { + continue + } + select { + case out <- sidxStreamError{index: idx, err: err}: + case <-ctx.Done(): + select { + case out <- sidxStreamError{index: idx, err: err}: + default: + } + } + return + } +} + +func (t *trace) streamSIDXTraceBatches( + ctx context.Context, + sidxInstances []sidx.SIDX, + req sidx.QueryRequest, + maxTraceSize int, +) <-chan traceBatch { + out := make(chan traceBatch) + + if len(sidxInstances) == 0 { + close(out) + return out + } + + tracer := query.GetTracer(ctx) + tracingCtx := ctx + var span *query.Span + if tracer != nil { + var spanCtx context.Context + span, spanCtx = tracer.StartSpan(ctx, "sidx-stream") + tracingCtx = spanCtx + tagSIDXStreamSpan(span, req, maxTraceSize, len(sidxInstances)) + } + + streamCtx, cancel := context.WithCancel(tracingCtx) + runner := newSIDXStreamRunner(ctx, streamCtx, cancel, req, maxTraceSize, span) + + go func() { + defer close(out) + defer cancel() + + if err := runner.prepare(sidxInstances); err != nil { + runner.cancel() + runner.emitError(out, err) + runner.finish() + return + } + + runner.run(out) + runner.finish() + }() + + return out +} + +type sidxStreamRunner struct { + streamCtx context.Context + ctx context.Context + spanErr error + cancelFunc context.CancelFunc + span *query.Span + errEvents chan sidxStreamError + heap *sidxStreamHeap + seenTraceIDs map[string]struct{} + req sidx.QueryRequest + batch traceBatch + errWg sync.WaitGroup + maxTraceSize int + nextSeq int + total int + batchesEmitted int + duplicates int + batchSize int + limitReached bool +} + +func tagSIDXStreamSpan(span *query.Span, req sidx.QueryRequest, maxTraceSize int, instanceCount int) { + if span == nil { + return + } + if req.Filter != nil { + span.Tag("filter_present", "true") + } else { + span.Tag("filter_present", "false") + } + if req.Order != nil { + if req.Order.Index != nil && req.Order.Index.GetMetadata() != nil { + span.Tag("order_index", req.Order.Index.GetMetadata().GetName()) + } + span.Tag("order_sort", req.Order.Sort.String()) + span.Tagf("order_type", "%d", req.Order.Type) + } else { + span.Tag("order_sort", "none") + } + span.Tagf("series_id_candidates", "%d", len(req.SeriesIDs)) + span.Tagf("max_element_size", "%d", req.MaxElementSize) + span.Tagf("max_trace_size", "%d", maxTraceSize) + span.Tagf("sidx_instance_count", "%d", instanceCount) +} + +func newSIDXStreamRunner( + ctx context.Context, + streamCtx context.Context, + cancel context.CancelFunc, + req sidx.QueryRequest, + maxTraceSize int, + span *query.Span, +) *sidxStreamRunner { + asc := true + if req.Order != nil && req.Order.Sort == modelv1.Sort_SORT_DESC { + asc = false + } + + batchSize := req.MaxElementSize + if batchSize <= 0 { + if maxTraceSize > 0 { + batchSize = maxTraceSize + } else { + batchSize = defaultTraceBatchSize + } + } + + return &sidxStreamRunner{ + ctx: ctx, + streamCtx: streamCtx, + cancelFunc: cancel, + req: req, + maxTraceSize: maxTraceSize, + batchSize: batchSize, + span: span, + heap: &sidxStreamHeap{asc: asc}, + seenTraceIDs: make(map[string]struct{}), + batch: newTraceBatch(0, batchSize), + nextSeq: 1, + } +} + +func (r *sidxStreamRunner) prepare(instances []sidx.SIDX) error { + type shardSource struct { + shard *sidxStreamShard + errCh <-chan error + idx int + } + + sources := make([]shardSource, 0, len(instances)) + for idx, instance := range instances { + resultsCh, errCh := instance.StreamingQuery(r.streamCtx, r.req) + shard := &sidxStreamShard{ + id: idx, + results: resultsCh, + } + if err := shard.prepare(r.streamCtx); err != nil { + return fmt.Errorf("sidx[%d] prepare failed: %w", idx, err) + } + if shard.done { + continue + } + sources = append(sources, shardSource{ + shard: shard, + errCh: errCh, + idx: idx, + }) + } + + for _, src := range sources { + heap.Push(r.heap, src.shard) + } + + if len(sources) == 0 { + return nil + } + + r.errEvents = make(chan sidxStreamError, len(sources)) + + for _, src := range sources { + if src.errCh == nil { + continue + } + r.errWg.Add(1) + go func(index int, ch <-chan error) { + defer r.errWg.Done() + forwardSIDXError(r.streamCtx, index, ch, r.errEvents) + }(src.idx, src.errCh) + } + + go func() { + r.errWg.Wait() + close(r.errEvents) + }() + + return nil +} + +func (r *sidxStreamRunner) run(out chan<- traceBatch) { + if r.heap.Len() == 0 { + r.cancel() + r.drainErrorEvents(out) + return + } + + for r.heap.Len() > 0 { + if r.maxTraceSize > 0 && r.total >= r.maxTraceSize { + r.limitReached = true + break + } + + if err := r.streamCtx.Err(); err != nil { + if r.spanErr == nil { + r.spanErr = err + } + return + } + + if !r.pollErrEvents(out) { + return + } + + shard := heap.Pop(r.heap).(*sidxStreamShard) + if err := r.ensureShardReady(shard); err != nil { + r.emitError(out, err) + return + } + if shard.done { + continue + } + + added, err := r.consumeShard(shard) + if err != nil { + r.emitError(out, err) + return + } + + if added && len(r.batch.traceIDs) >= r.batchSize { + if !r.emitBatch(out) { + return + } + } + + if added && r.maxTraceSize > 0 && r.total >= r.maxTraceSize { + r.limitReached = true + break + } + + if err := r.advanceShard(shard); err != nil { + r.emitError(out, err) + return + } + } + + if len(r.batch.traceIDs) > 0 { + if !r.emitBatch(out) { + return + } + } + + r.cancel() + r.drainErrorEvents(out) +} + +func (r *sidxStreamRunner) pollErrEvents(out chan<- traceBatch) bool { + if r.errEvents == nil { + return true + } + + select { + case ev, ok := <-r.errEvents: + if !ok { + r.errEvents = nil + return true + } + if ev.err == nil { + return true + } + eventErr := fmt.Errorf("sidx[%d] streaming error: %w", ev.index, ev.err) + r.emitError(out, eventErr) + return false + default: + return true + } +} + +func (r *sidxStreamRunner) ensureShardReady(shard *sidxStreamShard) error { + if shard.response == nil { + if err := shard.prepare(r.streamCtx); err != nil { + return fmt.Errorf("sidx[%d] prepare failed: %w", shard.id, err) + } + if shard.done { + return nil + } + } + return nil +} + +func (r *sidxStreamRunner) consumeShard(shard *sidxStreamShard) (bool, error) { + traceID, err := decodeTraceID(shard.currentData()) + if err != nil { + return false, fmt.Errorf("sidx[%d] invalid trace id payload: %w", shard.id, err) + } + + if _, exists := r.seenTraceIDs[traceID]; exists { + r.duplicates++ + return false, nil + } Review Comment: [nitpick] The duplicate detection logic could be extracted into a helper method like 'isDuplicate(traceID string) bool' to improve readability and testability. ########## banyand/internal/sidx/query.go: ########## @@ -0,0 +1,748 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package sidx + +import ( + "container/heap" + "context" + "math" + "sort" + "sync" + + "go.uber.org/multierr" + + "github.com/apache/skywalking-banyandb/api/common" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/banyand/protector" + "github.com/apache/skywalking-banyandb/pkg/cgroups" + "github.com/apache/skywalking-banyandb/pkg/query" +) + +// Query implements SIDX interface. +func (s *sidx) Query(ctx context.Context, req QueryRequest) (*QueryResponse, error) { + if err := req.Validate(); err != nil { + return nil, err + } + + // The blocking query path returns the full result set. Treat MaxElementSize as + // a streaming-only hint so it does not truncate the blocking response. + req.MaxElementSize = 0 Review Comment: Modifying the input parameter req.MaxElementSize could cause confusion for callers who expect their request to remain unchanged. Consider using a local copy or a different approach to handle this difference. ########## test/stress/trace-streaming/memory_monitor.go: ########## @@ -0,0 +1,310 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package tracestreaming + +import ( + "context" + "encoding/csv" + "fmt" + "io" + "net/http" + "os" + "path/filepath" + "runtime" + "sync" + "time" + + "github.com/apache/skywalking-banyandb/pkg/cgroups" +) + +// MemorySnapshot represents a point-in-time memory measurement. +type MemorySnapshot struct { + Timestamp time.Time + HeapAlloc uint64 + HeapSys uint64 + HeapInuse uint64 + HeapIdle uint64 + RSS uint64 + NumGC uint32 + GCPauseNs uint64 + NumGoroutine int +} + +// MemoryMonitor tracks memory usage over time. +type MemoryMonitor struct { + pprofURL string + profileDir string + snapshots []MemorySnapshot + pollInterval time.Duration + mu sync.RWMutex +} + +// NewMemoryMonitor creates a new memory monitor. +func NewMemoryMonitor(pollInterval time.Duration, pprofURL string, profileDir string) *MemoryMonitor { + return &MemoryMonitor{ + snapshots: make([]MemorySnapshot, 0), + pollInterval: pollInterval, + pprofURL: pprofURL, + profileDir: profileDir, + } +} + +// Start begins monitoring memory usage. +func (m *MemoryMonitor) Start(ctx context.Context) { + ticker := time.NewTicker(m.pollInterval) + defer ticker.Stop() + + // Create profile directory if it doesn't exist + if m.profileDir != "" { + _ = os.MkdirAll(m.profileDir, 0o755) + } + + // Capture initial snapshot + m.captureSnapshot() + + // Start periodic heap profiling (every 30 seconds) + profileTicker := time.NewTicker(30 * time.Second) + defer profileTicker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + m.captureSnapshot() + case <-profileTicker.C: + if m.pprofURL != "" && m.profileDir != "" { + _ = m.captureHeapProfile() + } + } + } +} + +// captureSnapshot captures a memory snapshot. +func (m *MemoryMonitor) captureSnapshot() { + var memStats runtime.MemStats + runtime.ReadMemStats(&memStats) + + // Get memory limit from cgroups if available + // We use HeapSys as a proxy for RSS in containerized environments + rss := memStats.HeapSys + if limit, err := cgroups.MemoryLimit(); err == nil && limit > 0 { + // In containers, use sys memory as RSS approximation + rss = memStats.Sys + } + + snapshot := MemorySnapshot{ + Timestamp: time.Now(), + HeapAlloc: memStats.HeapAlloc, + HeapSys: memStats.HeapSys, + HeapInuse: memStats.HeapInuse, + HeapIdle: memStats.HeapIdle, + RSS: rss, + NumGC: memStats.NumGC, + GCPauseNs: memStats.PauseNs[(memStats.NumGC+255)%256], + NumGoroutine: runtime.NumGoroutine(), + } + + m.mu.Lock() + m.snapshots = append(m.snapshots, snapshot) + m.mu.Unlock() +} + +// captureHeapProfile captures a heap profile via pprof HTTP endpoint. +func (m *MemoryMonitor) captureHeapProfile() error { + url := fmt.Sprintf("%s/debug/pprof/heap", m.pprofURL) + resp, err := http.Get(url) // #nosec G107 Review Comment: HTTP GET request uses a URL constructed from user input. While the #nosec comment acknowledges this, consider validating the pprofURL format or using a URL builder to prevent potential injection. ########## test/stress/trace-streaming/plot_memory_comparison.py: ########## @@ -0,0 +1,303 @@ +#!/usr/bin/env python3 +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Plot memory monitoring data comparing write and query phases. + +Usage: + python3 plot_memory_comparison.py memory_write_small.csv memory_query_small.csv + python3 plot_memory_comparison.py memory_write_small.csv memory_query_small.csv --output-dir ./charts +""" + +import sys +import csv +import os +from datetime import datetime +import argparse + +try: + import matplotlib.pyplot as plt + import matplotlib.dates as mdates +except ImportError: + print("Error: matplotlib not installed") + print("Install with: pip3 install matplotlib") + sys.exit(1) + + +def parse_csv(filename): + """Parse memory CSV file.""" + timestamps = [] + heap_alloc = [] + heap_sys = [] + rss = [] + num_gc = [] + num_goroutine = [] + + with open(filename, 'r') as f: + reader = csv.DictReader(f) + for row in reader: + # Parse timestamp + ts = datetime.fromisoformat(row['Timestamp'].replace('Z', '+00:00')) + timestamps.append(ts) + + # Parse memory values (already in MB) + heap_alloc.append(float(row['HeapAlloc(MB)'])) + heap_sys.append(float(row['HeapSys(MB)'])) + rss.append(float(row['RSS(MB)'])) + num_gc.append(int(row['NumGC'])) + num_goroutine.append(int(row['NumGoroutine'])) + + return { + 'timestamps': timestamps, + 'heap_alloc': heap_alloc, + 'heap_sys': heap_sys, + 'rss': rss, + 'num_gc': num_gc, + 'num_goroutine': num_goroutine + } + + +def plot_single_phase(data, title, output_file=None): + """Create memory visualization for a single phase.""" + fig, axes = plt.subplots(3, 1, figsize=(14, 10), sharex=True) + + # Memory usage over time + ax1 = axes[0] + ax1.plot(data['timestamps'], data['heap_alloc'], label='Heap Alloc', linewidth=2, color='#2E86AB') + ax1.plot(data['timestamps'], data['heap_sys'], label='Heap Sys', linewidth=2, alpha=0.7, color='#A23B72') + ax1.plot(data['timestamps'], data['rss'], label='RSS', linewidth=2, alpha=0.7, color='#F18F01') + ax1.set_ylabel('Memory (MB)', fontsize=12) + ax1.set_title(f'{title} - Memory Usage Over Time', fontsize=14, fontweight='bold') + ax1.legend(loc='upper left', fontsize=10) + ax1.grid(True, alpha=0.3) + + # Add peak/average annotations + peak_heap = max(data['heap_alloc']) + avg_heap = sum(data['heap_alloc']) / len(data['heap_alloc']) + ax1.axhline(y=peak_heap, color='red', linestyle='--', alpha=0.5, label=f'Peak: {peak_heap:.1f} MB') + ax1.axhline(y=avg_heap, color='green', linestyle='--', alpha=0.5, label=f'Avg: {avg_heap:.1f} MB') + ax1.legend(loc='upper left', fontsize=10) + + # Garbage collection over time + ax2 = axes[1] + ax2.plot(data['timestamps'], data['num_gc'], label='GC Cycles', color='orange', linewidth=2) + ax2.set_ylabel('GC Cycles', fontsize=12) + ax2.set_title('Garbage Collection Activity', fontsize=12, fontweight='bold') + ax2.legend(loc='upper left', fontsize=10) + ax2.grid(True, alpha=0.3) + + # Goroutine count over time + ax3 = axes[2] + ax3.plot(data['timestamps'], data['num_goroutine'], label='Goroutines', color='purple', linewidth=2) + ax3.set_ylabel('Goroutine Count', fontsize=12) + ax3.set_xlabel('Time', fontsize=12) + ax3.set_title('Goroutine Count', fontsize=12, fontweight='bold') + ax3.legend(loc='upper left', fontsize=10) + ax3.grid(True, alpha=0.3) + + # Format x-axis + ax3.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M:%S')) + plt.setp(ax3.xaxis.get_majorticklabels(), rotation=45, ha='right') + + plt.tight_layout() + + if output_file: + plt.savefig(output_file, dpi=150, bbox_inches='tight') + print(f"Chart saved to: {output_file}") + + plt.close() + + # Return stats + return { + 'peak_heap': peak_heap, + 'avg_heap': avg_heap, + 'min_heap': min(data['heap_alloc']), + 'peak_rss': max(data['rss']), + 'total_gc': max(data['num_gc']), + 'max_goroutines': max(data['num_goroutine']) + } + + +def plot_comparison(write_data, query_data, output_file=None): + """Create comparison plot for write vs query phases.""" + fig, axes = plt.subplots(2, 1, figsize=(16, 10), sharex=False) + + # Combine timestamps for proper alignment + # Normalize to relative time (seconds from start) + def to_relative_seconds(timestamps): + start = timestamps[0] + return [(t - start).total_seconds() for t in timestamps] + + write_times = to_relative_seconds(write_data['timestamps']) + query_times = to_relative_seconds(query_data['timestamps']) + + # Memory comparison + ax1 = axes[0] + ax1.plot(write_times, write_data['heap_alloc'], label='Write - Heap Alloc', + linewidth=2, color='#2E86AB') + ax1.plot(query_times, query_data['heap_alloc'], label='Query - Heap Alloc', + linewidth=2, color='#F18F01') + ax1.set_ylabel('Heap Allocated (MB)', fontsize=12) + ax1.set_title('Memory Usage Comparison: Write vs Query Phase', fontsize=14, fontweight='bold') + ax1.legend(loc='upper left', fontsize=11) + ax1.grid(True, alpha=0.3) + + # Add annotations + write_peak = max(write_data['heap_alloc']) + query_peak = max(query_data['heap_alloc']) + ax1.axhline(y=write_peak, color='#2E86AB', linestyle='--', alpha=0.4) + ax1.axhline(y=query_peak, color='#F18F01', linestyle='--', alpha=0.4) + ax1.text(max(write_times) * 0.95, write_peak, f'{write_peak:.1f} MB', + va='bottom', ha='right', color='#2E86AB', fontweight='bold') + ax1.text(max(query_times) * 0.95, query_peak, f'{query_peak:.1f} MB', + va='bottom', ha='right', color='#F18F01', fontweight='bold') + + # Goroutine comparison + ax2 = axes[1] + ax2.plot(write_times, write_data['num_goroutine'], label='Write - Goroutines', + linewidth=2, color='#2E86AB') + ax2.plot(query_times, query_data['num_goroutine'], label='Query - Goroutines', + linewidth=2, color='#F18F01') + ax2.set_ylabel('Goroutine Count', fontsize=12) + ax2.set_xlabel('Time (seconds)', fontsize=12) + ax2.set_title('Goroutine Count Comparison', fontsize=12, fontweight='bold') + ax2.legend(loc='upper left', fontsize=11) + ax2.grid(True, alpha=0.3) + + plt.tight_layout() + + if output_file: + plt.savefig(output_file, dpi=150, bbox_inches='tight') + print(f"Comparison chart saved to: {output_file}") + + plt.close() + + +def print_analysis(phase_name, data, stats): + """Print analysis for a phase.""" + print(f"\n=== {phase_name} Phase Analysis ===") + print(f"Peak Heap Alloc: {stats['peak_heap']:.2f} MB") + print(f"Avg Heap Alloc: {stats['avg_heap']:.2f} MB") + print(f"Min Heap Alloc: {stats['min_heap']:.2f} MB") + print(f"Peak RSS: {stats['peak_rss']:.2f} MB") + print(f"Total GC Cycles: {stats['total_gc']}") + print(f"Max Goroutines: {stats['max_goroutines']}") + + # Memory leak detection + if len(data['heap_alloc']) > 4: + first_quarter_avg = sum(data['heap_alloc'][:len(data['heap_alloc'])//4]) / (len(data['heap_alloc'])//4) + last_quarter_avg = sum(data['heap_alloc'][-len(data['heap_alloc'])//4:]) / (len(data['heap_alloc'])//4) Review Comment: [nitpick] The calculation of quarter averages has duplicated logic. Consider extracting this into a helper function like 'calculate_quarter_average(data, is_first_quarter)' to reduce code duplication. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
