hanahmily commented on code in PR #897: URL: https://github.com/apache/skywalking-banyandb/pull/897#discussion_r2621591627
########## fodc/internal/integration/basic_metrics_buffering_test.go: ########## @@ -0,0 +1,334 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package integration_test + +import ( + "context" + "fmt" + "net" + "net/http" + "strings" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/prometheus/client_golang/prometheus" + + "github.com/apache/skywalking-banyandb/fodc/internal/exporter" + "github.com/apache/skywalking-banyandb/fodc/internal/flightrecorder" + "github.com/apache/skywalking-banyandb/fodc/internal/server" + "github.com/apache/skywalking-banyandb/fodc/internal/watchdog" +) + +var _ = Describe("Test Case 1: Basic Metrics Buffering", func() { + var ( + metricsEndpoint string + fr *flightrecorder.FlightRecorder + wd *watchdog.Watchdog + metricsServer *server.Server + promReg *prometheus.Registry + datasourceCollector *exporter.DatasourceCollector + ) + + BeforeEach(func() { + // Construct metrics endpoint URL + // Extract host from HTTP address and use port 2121 for observability metrics + host, _, splitErr := net.SplitHostPort(banyanDBHTTPAddr) + if splitErr != nil { + // Fallback: if SplitHostPort fails, try simple string split + parts := strings.Split(banyanDBHTTPAddr, ":") + if len(parts) > 0 { + host = parts[0] + } else { + host = defaultLocalhost + } + } + if host == "" { + host = defaultLocalhost + } + metricsEndpoint = fmt.Sprintf("http://%s:2121/metrics", host) + + // Create Flight Recorder with reasonable capacity (10MB) + capacitySize := 10 * 1024 * 1024 // 10MB + fr = flightrecorder.NewFlightRecorder(capacitySize) + + // Create Prometheus registry and collector + promReg = prometheus.NewRegistry() + datasourceCollector = exporter.NewDatasourceCollector(fr) + + // Create and start Prometheus metrics server for FODC + var serverCreateErr error + metricsServer, serverCreateErr = server.NewServer(server.Config{ + ListenAddr: defaultLocalhost + ":0", // Use port 0 for automatic assignment + ReadHeaderTimeout: 3 * time.Second, + ShutdownTimeout: 5 * time.Second, + }) + Expect(serverCreateErr).NotTo(HaveOccurred()) + + serverErrCh, serverStartErr := metricsServer.Start(promReg, datasourceCollector) + Expect(serverStartErr).NotTo(HaveOccurred()) + Expect(serverErrCh).NotTo(BeNil()) + + // Create Watchdog with short polling interval for testing + pollInterval := 2 * time.Second + wd = watchdog.NewWatchdogWithConfig(fr, metricsEndpoint, pollInterval) + + ctx := context.Background() + preRunErr := wd.PreRun(ctx) + Expect(preRunErr).NotTo(HaveOccurred()) + + // Verify metrics endpoint is accessible before starting watchdog + client := &http.Client{Timeout: 2 * time.Second} + resp, healthErr := client.Get(metricsEndpoint) + Expect(healthErr).NotTo(HaveOccurred(), "Metrics endpoint should be accessible") + if resp != nil { + resp.Body.Close() + } + + // Start watchdog polling + stopCh := wd.Serve() + Expect(stopCh).NotTo(BeNil()) + + // Give watchdog a moment to start and perform initial poll + time.Sleep(500 * time.Millisecond) + }) + + AfterEach(func() { + // Stop watchdog + if wd != nil { + wd.GracefulStop() + } + + // Stop metrics server + if metricsServer != nil { + stopErr := metricsServer.Stop() + Expect(stopErr).NotTo(HaveOccurred()) + } + }) + + It("should buffer metrics correctly after watchdog polls", func() { + // Step 2: Generate metrics by performing operations + // Make HTTP requests to generate HTTP-related metrics + client := &http.Client{ + Timeout: 5 * time.Second, + } + + // Perform multiple operations to generate various metrics + for i := 0; i < 5; i++ { + // Health check endpoint + req, reqErr := http.NewRequest(http.MethodGet, fmt.Sprintf("http://%s/api/v1/health", banyanDBHTTPAddr), nil) + Expect(reqErr).NotTo(HaveOccurred()) + + resp, respErr := client.Do(req) Review Comment: `Execpt(respErr).NotTo(HaveOccurred())` ########## fodc/internal/flightrecorder/datasource.go: ########## @@ -0,0 +1,271 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package flightrecorder implements a flight recorder for metrics data. +package flightrecorder + +import ( + "fmt" + "sync" + "sync/atomic" + + "github.com/apache/skywalking-banyandb/fodc/internal/metrics" +) + +const ( + defaultCapacity = 1000 + intSize = 8 // Size of int on 64-bit systems + sliceHeaderSize = 24 // Size of slice header (pointer + length + capacity) + mapBaseOverhead = 48 // Base map structure overhead + mapEntryOverhead = 16 // Per-entry overhead in map + descMapEntryOverhead = 24 // Per-entry overhead for description map + stringHeaderSize = 16 // String header size +) + +// MetricRingBuffer is a type alias for RingBuffer[float64]. +type MetricRingBuffer = RingBuffer[float64] + +// TimestampRingBuffer is a type alias for RingBuffer[int64]. +type TimestampRingBuffer = RingBuffer[int64] + +// NewMetricRingBuffer creates a new MetricRingBuffer. +func NewMetricRingBuffer() *MetricRingBuffer { + return NewRingBuffer[float64]() +} + +// NewTimestampRingBuffer creates a new TimestampRingBuffer. +func NewTimestampRingBuffer() *TimestampRingBuffer { + return NewRingBuffer[int64]() +} + +// UpdateMetricRingBuffer adds a metric value to the metric ring buffer. +func UpdateMetricRingBuffer(mrb *MetricRingBuffer, v float64, capacity int) { + mrb.Add(v, capacity) +} + +// UpdateTimestampRingBuffer adds a timestamp value to the timestamp ring buffer. +func UpdateTimestampRingBuffer(trb *TimestampRingBuffer, v int64, capacity int) { + if trb != nil { + trb.Add(v, capacity) + } +} + +// Datasource stores metrics data with ring buffers. +type Datasource struct { + metrics map[string]*MetricRingBuffer // Map from metric name+labels to RingBuffer storing metric values + descriptions map[string]string // Map from metric name to HELP content descriptions + timestamps *TimestampRingBuffer // RingBuffer storing timestamps for each polling cycle + mu sync.RWMutex + TotalWritten uint64 // Total number of values written (wraps around) + Capacity int // Number of writable metrics length +} + +// NewDatasource creates a new Datasource. +func NewDatasource() *Datasource { + return &Datasource{ + metrics: make(map[string]*MetricRingBuffer), + timestamps: NewTimestampRingBuffer(), + descriptions: make(map[string]string), + Capacity: 0, + TotalWritten: 0, // Accessed via atomic operations + } +} + +// Update records a metric in the datasource. +func (ds *Datasource) Update(m *metrics.RawMetric) error { Review Comment: The implementation does not match the design: Currently, the data source accepts partial metrics when the capacity is reached. And we can not control which part it accepted. However, we need it to accept at least one data point for each metric. If the capacity cannot accommodate all metrics, it should not accept any of them. ########## fodc/internal/integration/basic_metrics_buffering_test.go: ########## @@ -0,0 +1,334 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package integration_test + +import ( + "context" + "fmt" + "net" + "net/http" + "strings" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/prometheus/client_golang/prometheus" + + "github.com/apache/skywalking-banyandb/fodc/internal/exporter" + "github.com/apache/skywalking-banyandb/fodc/internal/flightrecorder" + "github.com/apache/skywalking-banyandb/fodc/internal/server" + "github.com/apache/skywalking-banyandb/fodc/internal/watchdog" +) + +var _ = Describe("Test Case 1: Basic Metrics Buffering", func() { + var ( + metricsEndpoint string + fr *flightrecorder.FlightRecorder + wd *watchdog.Watchdog + metricsServer *server.Server + promReg *prometheus.Registry + datasourceCollector *exporter.DatasourceCollector + ) + + BeforeEach(func() { + // Construct metrics endpoint URL + // Extract host from HTTP address and use port 2121 for observability metrics + host, _, splitErr := net.SplitHostPort(banyanDBHTTPAddr) + if splitErr != nil { + // Fallback: if SplitHostPort fails, try simple string split + parts := strings.Split(banyanDBHTTPAddr, ":") + if len(parts) > 0 { + host = parts[0] + } else { + host = defaultLocalhost + } + } + if host == "" { + host = defaultLocalhost + } + metricsEndpoint = fmt.Sprintf("http://%s:2121/metrics", host) + + // Create Flight Recorder with reasonable capacity (10MB) + capacitySize := 10 * 1024 * 1024 // 10MB + fr = flightrecorder.NewFlightRecorder(capacitySize) + + // Create Prometheus registry and collector + promReg = prometheus.NewRegistry() + datasourceCollector = exporter.NewDatasourceCollector(fr) + + // Create and start Prometheus metrics server for FODC + var serverCreateErr error + metricsServer, serverCreateErr = server.NewServer(server.Config{ + ListenAddr: defaultLocalhost + ":0", // Use port 0 for automatic assignment + ReadHeaderTimeout: 3 * time.Second, + ShutdownTimeout: 5 * time.Second, + }) + Expect(serverCreateErr).NotTo(HaveOccurred()) + + serverErrCh, serverStartErr := metricsServer.Start(promReg, datasourceCollector) + Expect(serverStartErr).NotTo(HaveOccurred()) + Expect(serverErrCh).NotTo(BeNil()) + + // Create Watchdog with short polling interval for testing + pollInterval := 2 * time.Second + wd = watchdog.NewWatchdogWithConfig(fr, metricsEndpoint, pollInterval) + + ctx := context.Background() + preRunErr := wd.PreRun(ctx) + Expect(preRunErr).NotTo(HaveOccurred()) + + // Verify metrics endpoint is accessible before starting watchdog + client := &http.Client{Timeout: 2 * time.Second} + resp, healthErr := client.Get(metricsEndpoint) + Expect(healthErr).NotTo(HaveOccurred(), "Metrics endpoint should be accessible") + if resp != nil { + resp.Body.Close() + } + + // Start watchdog polling + stopCh := wd.Serve() + Expect(stopCh).NotTo(BeNil()) + + // Give watchdog a moment to start and perform initial poll + time.Sleep(500 * time.Millisecond) + }) + + AfterEach(func() { + // Stop watchdog + if wd != nil { + wd.GracefulStop() + } + + // Stop metrics server + if metricsServer != nil { + stopErr := metricsServer.Stop() + Expect(stopErr).NotTo(HaveOccurred()) + } + }) + + It("should buffer metrics correctly after watchdog polls", func() { + // Step 2: Generate metrics by performing operations + // Make HTTP requests to generate HTTP-related metrics + client := &http.Client{ + Timeout: 5 * time.Second, + } + + // Perform multiple operations to generate various metrics + for i := 0; i < 5; i++ { + // Health check endpoint + req, reqErr := http.NewRequest(http.MethodGet, fmt.Sprintf("http://%s/api/v1/health", banyanDBHTTPAddr), nil) + Expect(reqErr).NotTo(HaveOccurred()) + + resp, respErr := client.Do(req) + if respErr == nil && resp != nil { + resp.Body.Close() + } + + // Metrics endpoint (will generate metrics about metrics collection) + metricsReq, metricsReqErr := http.NewRequest(http.MethodGet, metricsEndpoint, nil) + Expect(metricsReqErr).NotTo(HaveOccurred()) + + metricsResp, metricsRespErr := client.Do(metricsReq) + if metricsRespErr == nil && metricsResp != nil { + metricsResp.Body.Close() + } + + time.Sleep(200 * time.Millisecond) + } + + // Step 3: Wait for Watchdog to poll metrics + // Poll interval is 2s, so wait at least 2.5s to ensure at least one poll cycle completes + // Use Eventually to wait for metrics to be buffered + Eventually(func() bool { + datasources := fr.GetDatasources() + if len(datasources) == 0 { + return false + } + ds := datasources[0] + metricsMap := ds.GetMetrics() + return len(metricsMap) > 0 + }, 10*time.Second, 500*time.Millisecond).Should(BeTrue(), "Metrics should be buffered after watchdog polls") + + // Step 4: Verify metrics are buffered correctly through internal checks + datasources := fr.GetDatasources() + Expect(datasources).NotTo(BeEmpty(), "FlightRecorder should have at least one datasource") + + ds := datasources[0] + Expect(ds).NotTo(BeNil(), "First datasource should not be nil") + + // Check that metrics were collected + metricsMap := ds.GetMetrics() + Expect(metricsMap).NotTo(BeEmpty(), "Datasource should have buffered metrics") + + // Verify that timestamps were recorded (one per polling cycle) + timestamps := ds.GetTimestamps() + Expect(timestamps).NotTo(BeNil()) + timestampValues := timestamps.GetAllValues() + Expect(len(timestampValues)).To(BeNumerically(">", 0), "Timestamps should be recorded") + + // Verify that at least some metrics have values + totalMetricsWithValues := 0 + for metricKey, metricBuffer := range metricsMap { + Expect(metricKey).NotTo(BeEmpty(), "Metric key should not be empty") + Expect(metricBuffer).NotTo(BeNil(), "Metric buffer should not be nil") + + values := metricBuffer.GetAllValues() + if len(values) > 0 { + totalMetricsWithValues++ + // Verify that the metric has at least one non-zero value + hasNonZeroValue := false + for _, val := range values { + if val != 0 { + hasNonZeroValue = true + break + } + } + if hasNonZeroValue { + GinkgoWriter.Printf("Metric %s has buffered values: %v\n", metricKey, values) + } + } + } + + Expect(totalMetricsWithValues).To(BeNumerically(">", 0), + "At least some metrics should have values buffered") + + // Verify descriptions were stored (at least some metrics should have descriptions) + descriptions := ds.GetDescriptions() Review Comment: Ensure that `len(descriptions)` is greater than 0, and that each item is not empty. ########## fodc/internal/integration/basic_metrics_buffering_test.go: ########## @@ -0,0 +1,334 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package integration_test + +import ( + "context" + "fmt" + "net" + "net/http" + "strings" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/prometheus/client_golang/prometheus" + + "github.com/apache/skywalking-banyandb/fodc/internal/exporter" + "github.com/apache/skywalking-banyandb/fodc/internal/flightrecorder" + "github.com/apache/skywalking-banyandb/fodc/internal/server" + "github.com/apache/skywalking-banyandb/fodc/internal/watchdog" +) + +var _ = Describe("Test Case 1: Basic Metrics Buffering", func() { + var ( + metricsEndpoint string + fr *flightrecorder.FlightRecorder + wd *watchdog.Watchdog + metricsServer *server.Server + promReg *prometheus.Registry + datasourceCollector *exporter.DatasourceCollector + ) + + BeforeEach(func() { + // Construct metrics endpoint URL + // Extract host from HTTP address and use port 2121 for observability metrics + host, _, splitErr := net.SplitHostPort(banyanDBHTTPAddr) + if splitErr != nil { + // Fallback: if SplitHostPort fails, try simple string split + parts := strings.Split(banyanDBHTTPAddr, ":") + if len(parts) > 0 { + host = parts[0] + } else { + host = defaultLocalhost + } + } + if host == "" { + host = defaultLocalhost + } + metricsEndpoint = fmt.Sprintf("http://%s:2121/metrics", host) + + // Create Flight Recorder with reasonable capacity (10MB) + capacitySize := 10 * 1024 * 1024 // 10MB + fr = flightrecorder.NewFlightRecorder(capacitySize) + + // Create Prometheus registry and collector + promReg = prometheus.NewRegistry() + datasourceCollector = exporter.NewDatasourceCollector(fr) + + // Create and start Prometheus metrics server for FODC + var serverCreateErr error + metricsServer, serverCreateErr = server.NewServer(server.Config{ + ListenAddr: defaultLocalhost + ":0", // Use port 0 for automatic assignment + ReadHeaderTimeout: 3 * time.Second, + ShutdownTimeout: 5 * time.Second, + }) + Expect(serverCreateErr).NotTo(HaveOccurred()) + + serverErrCh, serverStartErr := metricsServer.Start(promReg, datasourceCollector) + Expect(serverStartErr).NotTo(HaveOccurred()) + Expect(serverErrCh).NotTo(BeNil()) + + // Create Watchdog with short polling interval for testing + pollInterval := 2 * time.Second + wd = watchdog.NewWatchdogWithConfig(fr, metricsEndpoint, pollInterval) + + ctx := context.Background() + preRunErr := wd.PreRun(ctx) + Expect(preRunErr).NotTo(HaveOccurred()) + + // Verify metrics endpoint is accessible before starting watchdog + client := &http.Client{Timeout: 2 * time.Second} + resp, healthErr := client.Get(metricsEndpoint) + Expect(healthErr).NotTo(HaveOccurred(), "Metrics endpoint should be accessible") + if resp != nil { + resp.Body.Close() + } + + // Start watchdog polling + stopCh := wd.Serve() + Expect(stopCh).NotTo(BeNil()) + + // Give watchdog a moment to start and perform initial poll + time.Sleep(500 * time.Millisecond) + }) + + AfterEach(func() { + // Stop watchdog + if wd != nil { + wd.GracefulStop() + } + + // Stop metrics server + if metricsServer != nil { + stopErr := metricsServer.Stop() + Expect(stopErr).NotTo(HaveOccurred()) + } + }) + + It("should buffer metrics correctly after watchdog polls", func() { + // Step 2: Generate metrics by performing operations + // Make HTTP requests to generate HTTP-related metrics + client := &http.Client{ + Timeout: 5 * time.Second, + } + + // Perform multiple operations to generate various metrics + for i := 0; i < 5; i++ { + // Health check endpoint + req, reqErr := http.NewRequest(http.MethodGet, fmt.Sprintf("http://%s/api/v1/health", banyanDBHTTPAddr), nil) + Expect(reqErr).NotTo(HaveOccurred()) + + resp, respErr := client.Do(req) + if respErr == nil && resp != nil { + resp.Body.Close() + } + + // Metrics endpoint (will generate metrics about metrics collection) + metricsReq, metricsReqErr := http.NewRequest(http.MethodGet, metricsEndpoint, nil) + Expect(metricsReqErr).NotTo(HaveOccurred()) + + metricsResp, metricsRespErr := client.Do(metricsReq) + if metricsRespErr == nil && metricsResp != nil { + metricsResp.Body.Close() + } + + time.Sleep(200 * time.Millisecond) + } + + // Step 3: Wait for Watchdog to poll metrics + // Poll interval is 2s, so wait at least 2.5s to ensure at least one poll cycle completes + // Use Eventually to wait for metrics to be buffered + Eventually(func() bool { + datasources := fr.GetDatasources() + if len(datasources) == 0 { + return false + } + ds := datasources[0] + metricsMap := ds.GetMetrics() + return len(metricsMap) > 0 + }, 10*time.Second, 500*time.Millisecond).Should(BeTrue(), "Metrics should be buffered after watchdog polls") + + // Step 4: Verify metrics are buffered correctly through internal checks + datasources := fr.GetDatasources() + Expect(datasources).NotTo(BeEmpty(), "FlightRecorder should have at least one datasource") + + ds := datasources[0] + Expect(ds).NotTo(BeNil(), "First datasource should not be nil") + + // Check that metrics were collected + metricsMap := ds.GetMetrics() + Expect(metricsMap).NotTo(BeEmpty(), "Datasource should have buffered metrics") + + // Verify that timestamps were recorded (one per polling cycle) + timestamps := ds.GetTimestamps() + Expect(timestamps).NotTo(BeNil()) + timestampValues := timestamps.GetAllValues() + Expect(len(timestampValues)).To(BeNumerically(">", 0), "Timestamps should be recorded") + + // Verify that at least some metrics have values + totalMetricsWithValues := 0 + for metricKey, metricBuffer := range metricsMap { + Expect(metricKey).NotTo(BeEmpty(), "Metric key should not be empty") + Expect(metricBuffer).NotTo(BeNil(), "Metric buffer should not be nil") + + values := metricBuffer.GetAllValues() + if len(values) > 0 { + totalMetricsWithValues++ + // Verify that the metric has at least one non-zero value + hasNonZeroValue := false + for _, val := range values { + if val != 0 { + hasNonZeroValue = true + break + } + } + if hasNonZeroValue { + GinkgoWriter.Printf("Metric %s has buffered values: %v\n", metricKey, values) + } + } + } + + Expect(totalMetricsWithValues).To(BeNumerically(">", 0), + "At least some metrics should have values buffered") + + // Verify descriptions were stored (at least some metrics should have descriptions) + descriptions := ds.GetDescriptions() + // Note: Not all metrics may have descriptions, but we should have at least some + if len(descriptions) == 0 { + GinkgoWriter.Printf("Warning: No descriptions found, but metrics were buffered: %d metrics\n", len(metricsMap)) + } + + // Verify TotalWritten counter is incremented + Expect(ds.GetTotalWritten()).To(BeNumerically(">", 0), "TotalWritten should be greater than 0") + }) + + It("should handle multiple polling cycles correctly", func() { + // Generate some initial activity + client := &http.Client{ + Timeout: 5 * time.Second, + } + + for i := 0; i < 3; i++ { + req, reqErr := http.NewRequest(http.MethodGet, fmt.Sprintf("http://%s/api/v1/health", banyanDBHTTPAddr), nil) + Expect(reqErr).NotTo(HaveOccurred()) + + resp, respErr := client.Do(req) + if respErr == nil && resp != nil { + resp.Body.Close() + } + time.Sleep(100 * time.Millisecond) + } + + // Wait for multiple polling cycles + // Poll interval is 2s, so wait for at least 2 cycles (4s + buffer) + Eventually(func() bool { + datasources := fr.GetDatasources() + if len(datasources) == 0 { + return false + } + ds := datasources[0] + timestamps := ds.GetTimestamps() + if timestamps == nil { + return false + } + timestampValues := timestamps.GetAllValues() + return len(timestampValues) >= 2 + }, 10*time.Second, 500*time.Millisecond).Should(BeTrue(), "Should have multiple timestamps from multiple polling cycles") + + datasources := fr.GetDatasources() + Expect(datasources).NotTo(BeEmpty()) + + ds := datasources[0] + + // Check that multiple timestamps were recorded (one per polling cycle) + timestamps := ds.GetTimestamps() + Expect(timestamps).NotTo(BeNil()) + timestampValues := timestamps.GetAllValues() + Expect(len(timestampValues)).To(BeNumerically(">=", 2), + "Should have multiple timestamps from multiple polling cycles") + + // Verify timestamps are in chronological order (newest last) + if len(timestampValues) >= 2 { + for i := 1; i < len(timestampValues); i++ { + Expect(timestampValues[i]).To(BeNumerically(">=", timestampValues[i-1]), + "Timestamps should be in chronological order") + } + } + + // Verify metrics were updated across multiple cycles + metricsMap := ds.GetMetrics() + for metricKey, metricBuffer := range metricsMap { + values := metricBuffer.GetAllValues() + // At least some metrics should have multiple values from multiple polls + if len(values) > 0 { + Expect(len(values)).To(BeNumerically(">=", 1), Review Comment: Ensure len(values) == len(timestampValues) ########## fodc/internal/integration/basic_metrics_buffering_test.go: ########## @@ -0,0 +1,334 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package integration_test + +import ( + "context" + "fmt" + "net" + "net/http" + "strings" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/prometheus/client_golang/prometheus" + + "github.com/apache/skywalking-banyandb/fodc/internal/exporter" + "github.com/apache/skywalking-banyandb/fodc/internal/flightrecorder" + "github.com/apache/skywalking-banyandb/fodc/internal/server" + "github.com/apache/skywalking-banyandb/fodc/internal/watchdog" +) + +var _ = Describe("Test Case 1: Basic Metrics Buffering", func() { + var ( + metricsEndpoint string + fr *flightrecorder.FlightRecorder + wd *watchdog.Watchdog + metricsServer *server.Server + promReg *prometheus.Registry + datasourceCollector *exporter.DatasourceCollector + ) + + BeforeEach(func() { + // Construct metrics endpoint URL + // Extract host from HTTP address and use port 2121 for observability metrics + host, _, splitErr := net.SplitHostPort(banyanDBHTTPAddr) + if splitErr != nil { + // Fallback: if SplitHostPort fails, try simple string split + parts := strings.Split(banyanDBHTTPAddr, ":") + if len(parts) > 0 { + host = parts[0] + } else { + host = defaultLocalhost + } + } + if host == "" { + host = defaultLocalhost + } + metricsEndpoint = fmt.Sprintf("http://%s:2121/metrics", host) + + // Create Flight Recorder with reasonable capacity (10MB) + capacitySize := 10 * 1024 * 1024 // 10MB + fr = flightrecorder.NewFlightRecorder(capacitySize) + + // Create Prometheus registry and collector + promReg = prometheus.NewRegistry() + datasourceCollector = exporter.NewDatasourceCollector(fr) + + // Create and start Prometheus metrics server for FODC + var serverCreateErr error + metricsServer, serverCreateErr = server.NewServer(server.Config{ + ListenAddr: defaultLocalhost + ":0", // Use port 0 for automatic assignment + ReadHeaderTimeout: 3 * time.Second, + ShutdownTimeout: 5 * time.Second, + }) + Expect(serverCreateErr).NotTo(HaveOccurred()) + + serverErrCh, serverStartErr := metricsServer.Start(promReg, datasourceCollector) + Expect(serverStartErr).NotTo(HaveOccurred()) + Expect(serverErrCh).NotTo(BeNil()) + + // Create Watchdog with short polling interval for testing + pollInterval := 2 * time.Second + wd = watchdog.NewWatchdogWithConfig(fr, metricsEndpoint, pollInterval) + + ctx := context.Background() + preRunErr := wd.PreRun(ctx) + Expect(preRunErr).NotTo(HaveOccurred()) + + // Verify metrics endpoint is accessible before starting watchdog + client := &http.Client{Timeout: 2 * time.Second} + resp, healthErr := client.Get(metricsEndpoint) + Expect(healthErr).NotTo(HaveOccurred(), "Metrics endpoint should be accessible") + if resp != nil { + resp.Body.Close() + } + + // Start watchdog polling + stopCh := wd.Serve() + Expect(stopCh).NotTo(BeNil()) + + // Give watchdog a moment to start and perform initial poll + time.Sleep(500 * time.Millisecond) + }) + + AfterEach(func() { + // Stop watchdog + if wd != nil { + wd.GracefulStop() + } + + // Stop metrics server + if metricsServer != nil { + stopErr := metricsServer.Stop() + Expect(stopErr).NotTo(HaveOccurred()) + } + }) + + It("should buffer metrics correctly after watchdog polls", func() { + // Step 2: Generate metrics by performing operations + // Make HTTP requests to generate HTTP-related metrics + client := &http.Client{ + Timeout: 5 * time.Second, + } + + // Perform multiple operations to generate various metrics + for i := 0; i < 5; i++ { + // Health check endpoint + req, reqErr := http.NewRequest(http.MethodGet, fmt.Sprintf("http://%s/api/v1/health", banyanDBHTTPAddr), nil) + Expect(reqErr).NotTo(HaveOccurred()) + + resp, respErr := client.Do(req) + if respErr == nil && resp != nil { + resp.Body.Close() + } + + // Metrics endpoint (will generate metrics about metrics collection) + metricsReq, metricsReqErr := http.NewRequest(http.MethodGet, metricsEndpoint, nil) + Expect(metricsReqErr).NotTo(HaveOccurred()) + + metricsResp, metricsRespErr := client.Do(metricsReq) + if metricsRespErr == nil && metricsResp != nil { Review Comment: `Execpt(metricsRespErr).NotTo(HaveOccurred())` ########## fodc/internal/integration/buffer_overflow_test.go: ########## @@ -0,0 +1,318 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package integration_test + +import ( + "context" + "fmt" + "net" + "net/http" + "strings" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/prometheus/client_golang/prometheus" + + "github.com/apache/skywalking-banyandb/fodc/internal/exporter" + "github.com/apache/skywalking-banyandb/fodc/internal/flightrecorder" + "github.com/apache/skywalking-banyandb/fodc/internal/server" + "github.com/apache/skywalking-banyandb/fodc/internal/watchdog" +) + +var _ = Describe("Test Case 2: Buffer Overflow Handling", func() { + var ( + metricsEndpoint string + fr *flightrecorder.FlightRecorder + wd *watchdog.Watchdog + metricsServer *server.Server + promReg *prometheus.Registry + datasourceCollector *exporter.DatasourceCollector + ) + + BeforeEach(func() { + // Construct metrics endpoint URL + host, _, splitErr := net.SplitHostPort(banyanDBHTTPAddr) + if splitErr != nil { + parts := strings.Split(banyanDBHTTPAddr, ":") + if len(parts) > 0 { + host = parts[0] + } else { + host = defaultLocalhost + } + } + if host == "" { + host = defaultLocalhost + } + metricsEndpoint = fmt.Sprintf("http://%s:2121/metrics", host) + + // Create Flight Recorder with SMALL capacity to force buffer overflow + // Use 100KB to ensure we can overflow it with multiple polling cycles + capacitySize := 100 * 1024 // 100KB - small enough to overflow Review Comment: The flight recorder should include a Size() function that returns the actual size in bytes, ensuring it is less than or equal to capacitySize. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
