hanahmily commented on code in PR #918:
URL: 
https://github.com/apache/skywalking-banyandb/pull/918#discussion_r2663287344


##########
fodc/agent/internal/proxy/client.go:
##########
@@ -0,0 +1,751 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package proxy provides a client for communicating with the FODC Proxy.
+package proxy
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "io"
+       "strings"
+       "sync"
+       "time"
+
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/metadata"
+       "google.golang.org/grpc/status"
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       fodcv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/fodc/v1"
+       flightrecorder 
"github.com/apache/skywalking-banyandb/fodc/agent/internal/flightrecorder"
+       "github.com/apache/skywalking-banyandb/fodc/agent/internal/metrics"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// MetricsRequestFilter defines filters for metrics requests.
+type MetricsRequestFilter struct {
+       StartTime *time.Time
+       EndTime   *time.Time
+}
+
+// Client manages connection and communication with the FODC Proxy.
+type Client struct {
+       connManager        *connManager
+       heartbeatTicker    *time.Ticker
+       flightRecorder     *flightrecorder.FlightRecorder
+       logger             *logger.Logger
+       stopCh             chan struct{}
+       labels             map[string]string
+       client             fodcv1.FODCServiceClient
+       registrationStream fodcv1.FODCService_RegisterAgentClient
+       metricsStream      fodcv1.FODCService_StreamMetricsClient
+
+       proxyAddr string
+       nodeIP    string
+       nodeRole  string
+       agentID   string
+
+       nodePort          int
+       heartbeatInterval time.Duration
+       reconnectInterval time.Duration
+       disconnected      bool
+       streamsMu         sync.RWMutex   // Protects streams only
+       heartbeatWg       sync.WaitGroup // Tracks heartbeat goroutine
+}
+
+// NewClient creates a new Client instance.
+func NewClient(
+       proxyAddr string,
+       nodeIP string,
+       nodePort int,
+       nodeRole string,
+       labels map[string]string,
+       heartbeatInterval time.Duration,
+       reconnectInterval time.Duration,
+       flightRecorder *flightrecorder.FlightRecorder,
+       logger *logger.Logger,
+) *Client {
+       connMgr := newConnManager(proxyAddr, reconnectInterval, logger)
+       client := &Client{
+               connManager:       connMgr,
+               proxyAddr:         proxyAddr,
+               nodeIP:            nodeIP,
+               nodePort:          nodePort,
+               nodeRole:          nodeRole,
+               labels:            labels,
+               heartbeatInterval: heartbeatInterval,
+               reconnectInterval: reconnectInterval,
+               flightRecorder:    flightRecorder,
+               logger:            logger,
+               stopCh:            make(chan struct{}),
+       }
+
+       connMgr.setHeartbeatChecker(client.SendHeartbeat)
+       return client
+}
+
+// StartConnManager is useful for tests or scenarios where you want to 
manually control connection lifecycle.
+func (c *Client) StartConnManager(ctx context.Context) {
+       c.connManager.start(ctx)
+}
+
+// Connect establishes a gRPC connection to Proxy.
+func (c *Client) Connect(ctx context.Context) error {
+       resultCh := c.connManager.RequestConnect(ctx)
+       result := <-resultCh
+       if result.Error != nil {
+               return result.Error
+       }
+
+       c.streamsMu.Lock()
+       c.client = fodcv1.NewFODCServiceClient(result.Conn)
+       // Reset disconnected state and recreate stopCh for reconnection
+       if c.disconnected {
+               c.disconnected = false
+               c.stopCh = make(chan struct{})

Review Comment:
   Goroutines may not stop when reconnecting, causing leaks. 



##########
fodc/agent/internal/proxy/conn_manager.go:
##########
@@ -0,0 +1,319 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package proxy provides connection management for the FODC Proxy client.
+package proxy
+
+import (
+       "context"
+       "fmt"
+       "sync"
+       "time"
+
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+)
+
+const (
+       connManagerMaxRetryInterval = 30 * time.Second
+       connManagerMaxRetries       = 3
+)
+
+// connEventType represents the type of connection event.
+type connEventType int
+
+// Possible connection event types.
+const (
+       connEventConnect connEventType = iota
+       connEventDisconnect
+)
+
+// connEvent represents a connection event sent to the manager.
+type connEvent struct {
+       ResultCh chan<- connResult
+       Context  context.Context
+       Type     connEventType

Review Comment:
   ```suggestion
        resultCh chan<- connResult
        context  context.Context
        type     connEventType
   ```



##########
fodc/agent/internal/proxy/client.go:
##########
@@ -0,0 +1,751 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package proxy provides a client for communicating with the FODC Proxy.
+package proxy
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "io"
+       "strings"
+       "sync"
+       "time"
+
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/metadata"
+       "google.golang.org/grpc/status"
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       fodcv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/fodc/v1"
+       flightrecorder 
"github.com/apache/skywalking-banyandb/fodc/agent/internal/flightrecorder"
+       "github.com/apache/skywalking-banyandb/fodc/agent/internal/metrics"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// MetricsRequestFilter defines filters for metrics requests.
+type MetricsRequestFilter struct {
+       StartTime *time.Time
+       EndTime   *time.Time
+}
+
+// Client manages connection and communication with the FODC Proxy.
+type Client struct {
+       connManager        *connManager
+       heartbeatTicker    *time.Ticker
+       flightRecorder     *flightrecorder.FlightRecorder
+       logger             *logger.Logger
+       stopCh             chan struct{}
+       labels             map[string]string
+       client             fodcv1.FODCServiceClient
+       registrationStream fodcv1.FODCService_RegisterAgentClient
+       metricsStream      fodcv1.FODCService_StreamMetricsClient
+
+       proxyAddr string
+       nodeIP    string
+       nodeRole  string
+       agentID   string
+
+       nodePort          int
+       heartbeatInterval time.Duration
+       reconnectInterval time.Duration
+       disconnected      bool
+       streamsMu         sync.RWMutex   // Protects streams only
+       heartbeatWg       sync.WaitGroup // Tracks heartbeat goroutine
+}
+
+// NewClient creates a new Client instance.
+func NewClient(
+       proxyAddr string,
+       nodeIP string,
+       nodePort int,
+       nodeRole string,
+       labels map[string]string,
+       heartbeatInterval time.Duration,
+       reconnectInterval time.Duration,
+       flightRecorder *flightrecorder.FlightRecorder,
+       logger *logger.Logger,
+) *Client {
+       connMgr := newConnManager(proxyAddr, reconnectInterval, logger)
+       client := &Client{
+               connManager:       connMgr,
+               proxyAddr:         proxyAddr,
+               nodeIP:            nodeIP,
+               nodePort:          nodePort,
+               nodeRole:          nodeRole,
+               labels:            labels,
+               heartbeatInterval: heartbeatInterval,
+               reconnectInterval: reconnectInterval,
+               flightRecorder:    flightRecorder,
+               logger:            logger,
+               stopCh:            make(chan struct{}),
+       }
+
+       connMgr.setHeartbeatChecker(client.SendHeartbeat)
+       return client
+}
+
+// StartConnManager is useful for tests or scenarios where you want to 
manually control connection lifecycle.
+func (c *Client) StartConnManager(ctx context.Context) {
+       c.connManager.start(ctx)
+}
+
+// Connect establishes a gRPC connection to Proxy.
+func (c *Client) Connect(ctx context.Context) error {
+       resultCh := c.connManager.RequestConnect(ctx)
+       result := <-resultCh
+       if result.Error != nil {
+               return result.Error
+       }
+
+       c.streamsMu.Lock()
+       c.client = fodcv1.NewFODCServiceClient(result.Conn)
+       // Reset disconnected state and recreate stopCh for reconnection
+       if c.disconnected {
+               c.disconnected = false
+               c.stopCh = make(chan struct{})
+       }
+       c.streamsMu.Unlock()
+
+       return nil
+}
+
+// StartRegistrationStream establishes bi-directional registration stream with 
Proxy.
+func (c *Client) StartRegistrationStream(ctx context.Context) error {
+       c.streamsMu.Lock()
+       if c.client == nil {
+               c.streamsMu.Unlock()
+               return fmt.Errorf("client not connected, call Connect() first")
+       }
+       client := c.client
+       c.streamsMu.Unlock()
+
+       stream, streamErr := client.RegisterAgent(ctx)
+       if streamErr != nil {
+               return fmt.Errorf("failed to create registration stream: %w", 
streamErr)
+       }
+
+       req := &fodcv1.RegisterAgentRequest{
+               NodeRole: c.nodeRole,
+               Labels:   c.labels,
+               PrimaryAddress: &fodcv1.Address{
+                       Ip:   c.nodeIP,
+                       Port: int32(c.nodePort),
+               },
+       }
+
+       if sendErr := stream.Send(req); sendErr != nil {
+               return fmt.Errorf("failed to send registration request: %w", 
sendErr)
+       }
+
+       resp, recvErr := stream.Recv()
+       if recvErr != nil {
+               return fmt.Errorf("failed to receive registration response: 
%w", recvErr)
+       }
+
+       if !resp.Success {
+               return fmt.Errorf("registration failed: %s", resp.Message)
+       }
+
+       if resp.AgentId == "" {
+               return fmt.Errorf("registration response missing agent ID")
+       }
+
+       c.streamsMu.Lock()
+       c.registrationStream = stream
+       c.agentID = resp.AgentId
+       if resp.HeartbeatIntervalSeconds > 0 {
+               c.heartbeatInterval = 
time.Duration(resp.HeartbeatIntervalSeconds) * time.Second
+       }
+       c.streamsMu.Unlock()
+
+       c.logger.Info().
+               Str("proxy_addr", c.proxyAddr).
+               Str("agent_id", resp.AgentId).
+               Dur("heartbeat_interval", c.heartbeatInterval).
+               Msg("Agent registered with Proxy")
+
+       c.startHeartbeat(ctx)
+
+       go c.handleRegistrationStream(ctx, stream)
+
+       return nil
+}
+
+// StartMetricsStream establishes bi-directional metrics stream with Proxy.
+func (c *Client) StartMetricsStream(ctx context.Context) error {
+       c.streamsMu.Lock()
+       if c.client == nil {
+               c.streamsMu.Unlock()
+               return fmt.Errorf("client not connected, call Connect() first")
+       }
+       client := c.client
+       agentID := c.agentID
+       c.streamsMu.Unlock()
+
+       if agentID == "" {
+               return fmt.Errorf("agent ID not available, register agent 
first")
+       }
+
+       md := metadata.New(map[string]string{"agent_id": agentID})
+       ctxWithMetadata := metadata.NewOutgoingContext(ctx, md)
+
+       stream, streamErr := client.StreamMetrics(ctxWithMetadata)
+       if streamErr != nil {
+               return fmt.Errorf("failed to create metrics stream: %w", 
streamErr)
+       }
+
+       c.streamsMu.Lock()
+       c.metricsStream = stream
+       c.streamsMu.Unlock()
+
+       go c.handleMetricsStream(ctx, stream)
+
+       c.logger.Info().
+               Str("agent_id", agentID).
+               Msg("Metrics stream established with Proxy")
+
+       return nil
+}
+
+// RetrieveAndSendMetrics retrieves metrics from Flight Recorder when 
requested by Proxy.
+func (c *Client) RetrieveAndSendMetrics(_ context.Context, filter 
*MetricsRequestFilter) error {
+       c.streamsMu.RLock()
+       if c.disconnected || c.metricsStream == nil {
+               c.streamsMu.RUnlock()
+               return fmt.Errorf("metrics stream not established")
+       }
+       metricsStream := c.metricsStream
+       c.streamsMu.RUnlock()
+
+       datasources := c.flightRecorder.GetDatasources()
+       if len(datasources) == 0 {
+               req := &fodcv1.StreamMetricsRequest{
+                       Metrics:   []*fodcv1.Metric{},
+                       Timestamp: timestamppb.Now(),
+               }
+               if sendErr := metricsStream.Send(req); sendErr != nil {
+                       return fmt.Errorf("failed to send empty metrics 
response: %w", sendErr)
+               }
+               return nil
+       }
+
+       ds := datasources[0]
+       allMetrics := ds.GetMetrics()
+       timestamps := ds.GetTimestamps()
+       descriptions := ds.GetDescriptions()
+
+       if filter != nil && (filter.StartTime != nil || filter.EndTime != nil) {
+               if timestamps == nil {
+                       req := &fodcv1.StreamMetricsRequest{
+                               Metrics:   []*fodcv1.Metric{},
+                               Timestamp: timestamppb.Now(),
+                       }
+                       if sendErr := metricsStream.Send(req); sendErr != nil {
+                               return fmt.Errorf("failed to send empty metrics 
response: %w", sendErr)
+                       }
+                       return nil
+               }
+
+               timestampValues := timestamps.GetAllValues()
+               if len(timestampValues) == 0 {
+                       req := &fodcv1.StreamMetricsRequest{
+                               Metrics:   []*fodcv1.Metric{},
+                               Timestamp: timestamppb.Now(),
+                       }
+                       if sendErr := metricsStream.Send(req); sendErr != nil {
+                               return fmt.Errorf("failed to send empty metrics 
response: %w", sendErr)
+                       }
+                       return nil
+               }
+
+               return c.sendFilteredMetrics(metricsStream, allMetrics, 
timestampValues, descriptions, filter)
+       }
+
+       return c.sendLatestMetrics(metricsStream, allMetrics, descriptions)
+}
+
+// sendLatestMetrics sends the latest metrics (most recent values).
+func (c *Client) sendLatestMetrics(
+       stream fodcv1.FODCService_StreamMetricsClient,
+       allMetrics map[string]*flightrecorder.MetricRingBuffer,
+       descriptions map[string]string,
+) error {
+       protoMetrics := make([]*fodcv1.Metric, 0)
+
+       for metricKey, metricBuffer := range allMetrics {
+               metricValue := metricBuffer.GetCurrentValue()
+               allValues := metricBuffer.GetAllValues()
+
+               if len(allValues) == 0 && metricValue == 0 {
+                       continue
+               }
+
+               parsedKey, parseErr := c.parseMetricKey(metricKey)
+               if parseErr != nil {
+                       c.logger.Warn().Err(parseErr).Str("metric_key", 
metricKey).Msg("Failed to parse metric key")
+                       continue
+               }
+
+               labelsMap := make(map[string]string)
+               for _, label := range parsedKey.Labels {
+                       labelsMap[label.Name] = label.Value
+               }
+
+               protoMetric := &fodcv1.Metric{
+                       Name:        parsedKey.Name,
+                       Labels:      labelsMap,
+                       Value:       metricValue,
+                       Description: descriptions[parsedKey.Name],
+               }
+
+               protoMetrics = append(protoMetrics, protoMetric)
+       }
+
+       req := &fodcv1.StreamMetricsRequest{
+               Metrics:   protoMetrics,
+               Timestamp: timestamppb.Now(),
+       }
+
+       if sendErr := stream.Send(req); sendErr != nil {
+               return fmt.Errorf("failed to send metrics: %w", sendErr)
+       }
+
+       return nil
+}
+
+// sendFilteredMetrics sends metrics filtered by time window.
+func (c *Client) sendFilteredMetrics(
+       stream fodcv1.FODCService_StreamMetricsClient,
+       allMetrics map[string]*flightrecorder.MetricRingBuffer,
+       timestampValues []int64,
+       descriptions map[string]string,
+       filter *MetricsRequestFilter,
+) error {
+       protoMetrics := make([]*fodcv1.Metric, 0)
+
+       for metricKey, metricBuffer := range allMetrics {
+               metricValues := metricBuffer.GetAllValues()
+               if len(metricValues) == 0 {
+                       continue
+               }
+
+               parsedKey, parseErr := c.parseMetricKey(metricKey)
+               if parseErr != nil {
+                       c.logger.Warn().Err(parseErr).Str("metric_key", 
metricKey).Msg("Failed to parse metric key")
+                       continue
+               }
+
+               description := descriptions[parsedKey.Name]
+
+               minLen := len(metricValues)
+               if len(timestampValues) < minLen {
+                       minLen = len(timestampValues)
+               }
+
+               labelsMap := make(map[string]string)
+               for _, label := range parsedKey.Labels {
+                       labelsMap[label.Name] = label.Value
+               }
+
+               for idx := 0; idx < minLen; idx++ {
+                       timestampUnix := timestampValues[idx]
+                       timestamp := time.Unix(timestampUnix, 0)
+
+                       if filter.StartTime != nil && 
timestamp.Before(*filter.StartTime) {
+                               continue
+                       }
+                       if filter.EndTime != nil && 
timestamp.After(*filter.EndTime) {
+                               continue
+                       }
+
+                       protoMetric := &fodcv1.Metric{
+                               Name:        parsedKey.Name,
+                               Labels:      labelsMap,
+                               Value:       metricValues[idx],
+                               Description: description,
+                               Timestamp:   timestamppb.New(timestamp),
+                       }
+
+                       protoMetrics = append(protoMetrics, protoMetric)
+               }
+       }
+
+       req := &fodcv1.StreamMetricsRequest{
+               Metrics:   protoMetrics,
+               Timestamp: timestamppb.Now(),
+       }
+
+       if sendErr := stream.Send(req); sendErr != nil {
+               return fmt.Errorf("failed to send metrics: %w", sendErr)
+       }
+
+       return nil
+}
+
+// SendHeartbeat sends heartbeat to Proxy.
+func (c *Client) SendHeartbeat(_ context.Context) error {
+       c.streamsMu.RLock()
+       if c.disconnected || c.registrationStream == nil {
+               c.streamsMu.RUnlock()
+               return fmt.Errorf("registration stream not established")
+       }
+       registrationStream := c.registrationStream
+       c.streamsMu.RUnlock()
+
+       req := &fodcv1.RegisterAgentRequest{
+               NodeRole: c.nodeRole,
+               Labels:   c.labels,
+               PrimaryAddress: &fodcv1.Address{
+                       Ip:   c.nodeIP,
+                       Port: int32(c.nodePort),
+               },
+       }
+
+       if sendErr := registrationStream.Send(req); sendErr != nil {
+               // Check if error is due to stream being closed/disconnected
+               if errors.Is(sendErr, io.EOF) || errors.Is(sendErr, 
context.Canceled) {
+                       return fmt.Errorf("registration stream closed")
+               }
+               if st, ok := status.FromError(sendErr); ok {
+                       if st.Code() == codes.Canceled {
+                               return fmt.Errorf("registration stream closed")
+                       }
+               }
+               return fmt.Errorf("failed to send heartbeat: %w", sendErr)
+       }
+
+       return nil
+}
+
+// Disconnect closes connection to Proxy.
+func (c *Client) Disconnect() error {
+       c.streamsMu.Lock()
+       if c.disconnected {
+               c.streamsMu.Unlock()
+               return nil
+       }
+
+       c.disconnected = true
+       close(c.stopCh)
+
+       if c.heartbeatTicker != nil {
+               c.heartbeatTicker.Stop()
+               c.heartbeatTicker = nil
+       }
+       c.streamsMu.Unlock()
+
+       // Wait for heartbeat goroutine to exit before closing streams
+       c.heartbeatWg.Wait()
+
+       c.streamsMu.Lock()
+       if c.registrationStream != nil {
+               if closeErr := c.registrationStream.CloseSend(); closeErr != 
nil {
+                       c.logger.Warn().Err(closeErr).Msg("Error closing 
registration stream")
+               }
+               c.registrationStream = nil
+       }
+
+       if c.metricsStream != nil {
+               if closeErr := c.metricsStream.CloseSend(); closeErr != nil {
+                       c.logger.Warn().Err(closeErr).Msg("Error closing 
metrics stream")
+               }
+               c.metricsStream = nil
+       }
+       c.streamsMu.Unlock()
+
+       c.connManager.stop()
+
+       c.streamsMu.Lock()
+       c.client = nil
+       c.streamsMu.Unlock()
+
+       c.logger.Info().Msg("Disconnected from FODC Proxy")
+
+       return nil
+}
+
+// Start starts the proxy client with automatic reconnection.
+func (c *Client) Start(ctx context.Context) error {
+       c.connManager.start(ctx)
+
+       for {
+               select {
+               case <-ctx.Done():
+                       return ctx.Err()
+               case <-c.stopCh:
+                       return nil
+               default:
+               }
+
+               if connectErr := c.Connect(ctx); connectErr != nil {
+                       c.logger.Error().Err(connectErr).Msg("Failed to connect 
to Proxy, retrying...")
+                       time.Sleep(c.reconnectInterval)
+                       continue
+               }
+
+               if regErr := c.StartRegistrationStream(ctx); regErr != nil {
+                       c.logger.Error().Err(regErr).Msg("Failed to start 
registration stream, reconnecting...")
+                       if disconnectErr := c.Disconnect(); disconnectErr != 
nil {
+                               c.logger.Warn().Err(disconnectErr).Msg("Failed 
to disconnect before retry")
+                       }
+                       time.Sleep(c.reconnectInterval)
+                       continue
+               }
+
+               if metricsErr := c.StartMetricsStream(ctx); metricsErr != nil {
+                       c.logger.Error().Err(metricsErr).Msg("Failed to start 
metrics stream, reconnecting...")
+                       if disconnectErr := c.Disconnect(); disconnectErr != 
nil {
+                               c.logger.Warn().Err(disconnectErr).Msg("Failed 
to disconnect before retry")
+                       }
+                       time.Sleep(c.reconnectInterval)
+                       continue
+               }
+
+               c.logger.Info().Msg("Proxy client started successfully")
+
+               select {
+               case <-ctx.Done():
+                       return ctx.Err()
+               case <-c.stopCh:
+                       return nil
+               }
+       }
+}
+
+// handleRegistrationStream handles the registration stream.
+func (c *Client) handleRegistrationStream(ctx context.Context, stream 
fodcv1.FODCService_RegisterAgentClient) {
+       for {
+               select {
+               case <-ctx.Done():
+                       return
+               case <-c.stopCh:
+                       return
+               default:
+               }
+
+               _, recvErr := stream.Recv()
+               if errors.Is(recvErr, io.EOF) {
+                       c.logger.Warn().Msg("Registration stream closed by 
Proxy, reconnecting...")
+                       go c.reconnect(ctx)
+                       return
+               }
+               if recvErr != nil {
+                       // Check if it's a context cancellation or deadline 
exceeded (expected errors during cleanup)
+                       if errors.Is(recvErr, context.Canceled) || 
errors.Is(recvErr, context.DeadlineExceeded) {
+                               c.logger.Debug().Err(recvErr).Msg("Registration 
stream closed")
+                               return
+                       }
+                       if st, ok := status.FromError(recvErr); ok {
+                               // Check if it's a gRPC status error with 
expected codes
+                               code := st.Code()
+                               if code == codes.Canceled || code == 
codes.DeadlineExceeded {
+                                       
c.logger.Debug().Err(recvErr).Msg("Registration stream closed")
+                                       return
+                               }
+                       }
+                       c.logger.Error().Err(recvErr).Msg("Error receiving from 
registration stream, reconnecting...")
+                       go c.reconnect(ctx)
+                       return
+               }
+       }
+}
+
+// handleMetricsStream handles the metrics stream.
+func (c *Client) handleMetricsStream(ctx context.Context, stream 
fodcv1.FODCService_StreamMetricsClient) {
+       for {
+               select {
+               case <-ctx.Done():
+                       return
+               case <-c.stopCh:
+                       return
+               default:
+               }
+
+               resp, recvErr := stream.Recv()
+               if errors.Is(recvErr, io.EOF) {
+                       c.logger.Warn().Msg("Metrics stream closed by Proxy, 
reconnecting...")
+                       go c.reconnect(ctx)
+                       return
+               }
+               if recvErr != nil {
+                       // Check if it's a context cancellation or deadline 
exceeded (expected errors during cleanup)
+                       if errors.Is(recvErr, context.Canceled) || 
errors.Is(recvErr, context.DeadlineExceeded) {
+                               c.logger.Debug().Err(recvErr).Msg("Metrics 
stream closed")
+                               return
+                       }
+                       if st, ok := status.FromError(recvErr); ok {
+                               // Check if it's a gRPC status error with 
expected codes
+                               code := st.Code()
+                               if code == codes.Canceled || code == 
codes.DeadlineExceeded {
+                                       
c.logger.Debug().Err(recvErr).Msg("Metrics stream closed")
+                                       return
+                               }
+                       }
+                       c.logger.Error().Err(recvErr).Msg("Error receiving from 
metrics stream, reconnecting...")
+                       go c.reconnect(ctx)
+                       return
+               }
+
+               filter := &MetricsRequestFilter{}
+               if resp.StartTime != nil {
+                       startTime := resp.StartTime.AsTime()
+                       filter.StartTime = &startTime
+               }
+               if resp.EndTime != nil {
+                       endTime := resp.EndTime.AsTime()
+                       filter.EndTime = &endTime
+               }
+
+               if retrieveErr := c.RetrieveAndSendMetrics(ctx, filter); 
retrieveErr != nil {
+                       c.logger.Error().Err(retrieveErr).Msg("Failed to 
retrieve and send metrics")
+               }
+       }
+}
+
+// reconnect handles automatic reconnection when streams break.
+func (c *Client) reconnect(ctx context.Context) {
+       c.streamsMu.Lock()
+       if c.disconnected {
+               c.streamsMu.Unlock()
+               c.logger.Warn().Msg("Already disconnected intentionally, 
skipping reconnection...")
+               return
+       }
+       originalClient := c.client
+
+       c.logger.Info().Msg("Starting reconnection process...")
+
+       if c.heartbeatTicker != nil {
+               c.heartbeatTicker.Stop()
+               c.heartbeatTicker = nil
+       }
+       if c.registrationStream != nil {
+               _ = c.registrationStream.CloseSend()
+               c.registrationStream = nil
+       }
+       if c.metricsStream != nil {
+               _ = c.metricsStream.CloseSend()
+               c.metricsStream = nil
+       }
+       c.streamsMu.Unlock()
+
+       reconnectCh := c.connManager.RequestConnect(ctx)
+       reconnectResult := <-reconnectCh
+
+       if reconnectResult.Error != nil {
+               c.logger.Error().Err(reconnectResult.Error).Msg("Failed to 
reconnect to Proxy")
+               return
+       }
+
+       if originalClient == nil && reconnectResult.Conn != nil {

Review Comment:
   The condition originalClient == nil is usually false during reconnection, so 
the client isn't updated even when a new connection is available.



##########
fodc/agent/internal/proxy/client.go:
##########
@@ -0,0 +1,751 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package proxy provides a client for communicating with the FODC Proxy.
+package proxy
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "io"
+       "strings"
+       "sync"
+       "time"
+
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/metadata"
+       "google.golang.org/grpc/status"
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       fodcv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/fodc/v1"
+       flightrecorder 
"github.com/apache/skywalking-banyandb/fodc/agent/internal/flightrecorder"
+       "github.com/apache/skywalking-banyandb/fodc/agent/internal/metrics"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// MetricsRequestFilter defines filters for metrics requests.
+type MetricsRequestFilter struct {
+       StartTime *time.Time
+       EndTime   *time.Time
+}
+
+// Client manages connection and communication with the FODC Proxy.
+type Client struct {
+       connManager        *connManager
+       heartbeatTicker    *time.Ticker
+       flightRecorder     *flightrecorder.FlightRecorder
+       logger             *logger.Logger
+       stopCh             chan struct{}
+       labels             map[string]string
+       client             fodcv1.FODCServiceClient
+       registrationStream fodcv1.FODCService_RegisterAgentClient
+       metricsStream      fodcv1.FODCService_StreamMetricsClient
+
+       proxyAddr string
+       nodeIP    string
+       nodeRole  string
+       agentID   string
+
+       nodePort          int
+       heartbeatInterval time.Duration
+       reconnectInterval time.Duration
+       disconnected      bool
+       streamsMu         sync.RWMutex   // Protects streams only
+       heartbeatWg       sync.WaitGroup // Tracks heartbeat goroutine
+}
+
+// NewClient creates a new Client instance.
+func NewClient(
+       proxyAddr string,
+       nodeIP string,
+       nodePort int,
+       nodeRole string,
+       labels map[string]string,
+       heartbeatInterval time.Duration,
+       reconnectInterval time.Duration,
+       flightRecorder *flightrecorder.FlightRecorder,
+       logger *logger.Logger,
+) *Client {
+       connMgr := newConnManager(proxyAddr, reconnectInterval, logger)
+       client := &Client{
+               connManager:       connMgr,
+               proxyAddr:         proxyAddr,
+               nodeIP:            nodeIP,
+               nodePort:          nodePort,
+               nodeRole:          nodeRole,
+               labels:            labels,
+               heartbeatInterval: heartbeatInterval,
+               reconnectInterval: reconnectInterval,
+               flightRecorder:    flightRecorder,
+               logger:            logger,
+               stopCh:            make(chan struct{}),
+       }
+
+       connMgr.setHeartbeatChecker(client.SendHeartbeat)
+       return client
+}
+
+// StartConnManager is useful for tests or scenarios where you want to 
manually control connection lifecycle.
+func (c *Client) StartConnManager(ctx context.Context) {
+       c.connManager.start(ctx)
+}
+
+// Connect establishes a gRPC connection to Proxy.
+func (c *Client) Connect(ctx context.Context) error {
+       resultCh := c.connManager.RequestConnect(ctx)
+       result := <-resultCh
+       if result.Error != nil {
+               return result.Error
+       }
+
+       c.streamsMu.Lock()
+       c.client = fodcv1.NewFODCServiceClient(result.Conn)
+       // Reset disconnected state and recreate stopCh for reconnection
+       if c.disconnected {
+               c.disconnected = false
+               c.stopCh = make(chan struct{})
+       }
+       c.streamsMu.Unlock()
+
+       return nil
+}
+
+// StartRegistrationStream establishes bi-directional registration stream with 
Proxy.
+func (c *Client) StartRegistrationStream(ctx context.Context) error {
+       c.streamsMu.Lock()
+       if c.client == nil {
+               c.streamsMu.Unlock()
+               return fmt.Errorf("client not connected, call Connect() first")
+       }
+       client := c.client
+       c.streamsMu.Unlock()
+
+       stream, streamErr := client.RegisterAgent(ctx)
+       if streamErr != nil {
+               return fmt.Errorf("failed to create registration stream: %w", 
streamErr)
+       }
+
+       req := &fodcv1.RegisterAgentRequest{
+               NodeRole: c.nodeRole,
+               Labels:   c.labels,
+               PrimaryAddress: &fodcv1.Address{
+                       Ip:   c.nodeIP,
+                       Port: int32(c.nodePort),
+               },
+       }
+
+       if sendErr := stream.Send(req); sendErr != nil {
+               return fmt.Errorf("failed to send registration request: %w", 
sendErr)
+       }
+
+       resp, recvErr := stream.Recv()
+       if recvErr != nil {
+               return fmt.Errorf("failed to receive registration response: 
%w", recvErr)
+       }
+
+       if !resp.Success {
+               return fmt.Errorf("registration failed: %s", resp.Message)
+       }
+
+       if resp.AgentId == "" {
+               return fmt.Errorf("registration response missing agent ID")
+       }
+
+       c.streamsMu.Lock()
+       c.registrationStream = stream
+       c.agentID = resp.AgentId
+       if resp.HeartbeatIntervalSeconds > 0 {
+               c.heartbeatInterval = 
time.Duration(resp.HeartbeatIntervalSeconds) * time.Second
+       }
+       c.streamsMu.Unlock()
+
+       c.logger.Info().
+               Str("proxy_addr", c.proxyAddr).
+               Str("agent_id", resp.AgentId).
+               Dur("heartbeat_interval", c.heartbeatInterval).
+               Msg("Agent registered with Proxy")
+
+       c.startHeartbeat(ctx)
+
+       go c.handleRegistrationStream(ctx, stream)
+
+       return nil
+}
+
+// StartMetricsStream establishes bi-directional metrics stream with Proxy.
+func (c *Client) StartMetricsStream(ctx context.Context) error {
+       c.streamsMu.Lock()
+       if c.client == nil {
+               c.streamsMu.Unlock()
+               return fmt.Errorf("client not connected, call Connect() first")
+       }
+       client := c.client
+       agentID := c.agentID
+       c.streamsMu.Unlock()
+
+       if agentID == "" {
+               return fmt.Errorf("agent ID not available, register agent 
first")
+       }
+
+       md := metadata.New(map[string]string{"agent_id": agentID})
+       ctxWithMetadata := metadata.NewOutgoingContext(ctx, md)
+
+       stream, streamErr := client.StreamMetrics(ctxWithMetadata)
+       if streamErr != nil {
+               return fmt.Errorf("failed to create metrics stream: %w", 
streamErr)
+       }
+
+       c.streamsMu.Lock()
+       c.metricsStream = stream
+       c.streamsMu.Unlock()
+
+       go c.handleMetricsStream(ctx, stream)
+
+       c.logger.Info().
+               Str("agent_id", agentID).
+               Msg("Metrics stream established with Proxy")
+
+       return nil
+}
+
+// RetrieveAndSendMetrics retrieves metrics from Flight Recorder when 
requested by Proxy.
+func (c *Client) RetrieveAndSendMetrics(_ context.Context, filter 
*MetricsRequestFilter) error {
+       c.streamsMu.RLock()
+       if c.disconnected || c.metricsStream == nil {
+               c.streamsMu.RUnlock()
+               return fmt.Errorf("metrics stream not established")
+       }
+       metricsStream := c.metricsStream
+       c.streamsMu.RUnlock()
+
+       datasources := c.flightRecorder.GetDatasources()
+       if len(datasources) == 0 {
+               req := &fodcv1.StreamMetricsRequest{
+                       Metrics:   []*fodcv1.Metric{},
+                       Timestamp: timestamppb.Now(),
+               }
+               if sendErr := metricsStream.Send(req); sendErr != nil {
+                       return fmt.Errorf("failed to send empty metrics 
response: %w", sendErr)
+               }
+               return nil
+       }
+
+       ds := datasources[0]
+       allMetrics := ds.GetMetrics()
+       timestamps := ds.GetTimestamps()
+       descriptions := ds.GetDescriptions()
+
+       if filter != nil && (filter.StartTime != nil || filter.EndTime != nil) {
+               if timestamps == nil {
+                       req := &fodcv1.StreamMetricsRequest{
+                               Metrics:   []*fodcv1.Metric{},
+                               Timestamp: timestamppb.Now(),
+                       }
+                       if sendErr := metricsStream.Send(req); sendErr != nil {
+                               return fmt.Errorf("failed to send empty metrics 
response: %w", sendErr)
+                       }
+                       return nil
+               }
+
+               timestampValues := timestamps.GetAllValues()
+               if len(timestampValues) == 0 {
+                       req := &fodcv1.StreamMetricsRequest{
+                               Metrics:   []*fodcv1.Metric{},
+                               Timestamp: timestamppb.Now(),
+                       }
+                       if sendErr := metricsStream.Send(req); sendErr != nil {
+                               return fmt.Errorf("failed to send empty metrics 
response: %w", sendErr)
+                       }
+                       return nil
+               }
+
+               return c.sendFilteredMetrics(metricsStream, allMetrics, 
timestampValues, descriptions, filter)
+       }
+
+       return c.sendLatestMetrics(metricsStream, allMetrics, descriptions)
+}
+
+// sendLatestMetrics sends the latest metrics (most recent values).
+func (c *Client) sendLatestMetrics(
+       stream fodcv1.FODCService_StreamMetricsClient,
+       allMetrics map[string]*flightrecorder.MetricRingBuffer,
+       descriptions map[string]string,
+) error {
+       protoMetrics := make([]*fodcv1.Metric, 0)
+
+       for metricKey, metricBuffer := range allMetrics {
+               metricValue := metricBuffer.GetCurrentValue()
+               allValues := metricBuffer.GetAllValues()
+
+               if len(allValues) == 0 && metricValue == 0 {
+                       continue
+               }
+
+               parsedKey, parseErr := c.parseMetricKey(metricKey)
+               if parseErr != nil {
+                       c.logger.Warn().Err(parseErr).Str("metric_key", 
metricKey).Msg("Failed to parse metric key")
+                       continue
+               }
+
+               labelsMap := make(map[string]string)
+               for _, label := range parsedKey.Labels {
+                       labelsMap[label.Name] = label.Value
+               }
+
+               protoMetric := &fodcv1.Metric{
+                       Name:        parsedKey.Name,
+                       Labels:      labelsMap,
+                       Value:       metricValue,
+                       Description: descriptions[parsedKey.Name],
+               }
+
+               protoMetrics = append(protoMetrics, protoMetric)
+       }
+
+       req := &fodcv1.StreamMetricsRequest{
+               Metrics:   protoMetrics,
+               Timestamp: timestamppb.Now(),
+       }
+
+       if sendErr := stream.Send(req); sendErr != nil {
+               return fmt.Errorf("failed to send metrics: %w", sendErr)
+       }
+
+       return nil
+}
+
+// sendFilteredMetrics sends metrics filtered by time window.
+func (c *Client) sendFilteredMetrics(
+       stream fodcv1.FODCService_StreamMetricsClient,
+       allMetrics map[string]*flightrecorder.MetricRingBuffer,
+       timestampValues []int64,
+       descriptions map[string]string,
+       filter *MetricsRequestFilter,
+) error {
+       protoMetrics := make([]*fodcv1.Metric, 0)
+
+       for metricKey, metricBuffer := range allMetrics {
+               metricValues := metricBuffer.GetAllValues()
+               if len(metricValues) == 0 {
+                       continue
+               }
+
+               parsedKey, parseErr := c.parseMetricKey(metricKey)
+               if parseErr != nil {
+                       c.logger.Warn().Err(parseErr).Str("metric_key", 
metricKey).Msg("Failed to parse metric key")
+                       continue
+               }
+
+               description := descriptions[parsedKey.Name]
+
+               minLen := len(metricValues)
+               if len(timestampValues) < minLen {
+                       minLen = len(timestampValues)
+               }
+
+               labelsMap := make(map[string]string)
+               for _, label := range parsedKey.Labels {
+                       labelsMap[label.Name] = label.Value
+               }
+
+               for idx := 0; idx < minLen; idx++ {
+                       timestampUnix := timestampValues[idx]
+                       timestamp := time.Unix(timestampUnix, 0)
+
+                       if filter.StartTime != nil && 
timestamp.Before(*filter.StartTime) {
+                               continue
+                       }
+                       if filter.EndTime != nil && 
timestamp.After(*filter.EndTime) {
+                               continue
+                       }
+
+                       protoMetric := &fodcv1.Metric{
+                               Name:        parsedKey.Name,
+                               Labels:      labelsMap,
+                               Value:       metricValues[idx],
+                               Description: description,
+                               Timestamp:   timestamppb.New(timestamp),
+                       }
+
+                       protoMetrics = append(protoMetrics, protoMetric)
+               }
+       }
+
+       req := &fodcv1.StreamMetricsRequest{
+               Metrics:   protoMetrics,
+               Timestamp: timestamppb.Now(),
+       }
+
+       if sendErr := stream.Send(req); sendErr != nil {
+               return fmt.Errorf("failed to send metrics: %w", sendErr)
+       }
+
+       return nil
+}
+
+// SendHeartbeat sends heartbeat to Proxy.
+func (c *Client) SendHeartbeat(_ context.Context) error {
+       c.streamsMu.RLock()
+       if c.disconnected || c.registrationStream == nil {
+               c.streamsMu.RUnlock()
+               return fmt.Errorf("registration stream not established")
+       }
+       registrationStream := c.registrationStream
+       c.streamsMu.RUnlock()
+
+       req := &fodcv1.RegisterAgentRequest{
+               NodeRole: c.nodeRole,
+               Labels:   c.labels,
+               PrimaryAddress: &fodcv1.Address{
+                       Ip:   c.nodeIP,
+                       Port: int32(c.nodePort),
+               },
+       }
+
+       if sendErr := registrationStream.Send(req); sendErr != nil {
+               // Check if error is due to stream being closed/disconnected
+               if errors.Is(sendErr, io.EOF) || errors.Is(sendErr, 
context.Canceled) {
+                       return fmt.Errorf("registration stream closed")
+               }
+               if st, ok := status.FromError(sendErr); ok {
+                       if st.Code() == codes.Canceled {
+                               return fmt.Errorf("registration stream closed")
+                       }
+               }
+               return fmt.Errorf("failed to send heartbeat: %w", sendErr)
+       }
+
+       return nil
+}
+
+// Disconnect closes connection to Proxy.
+func (c *Client) Disconnect() error {
+       c.streamsMu.Lock()
+       if c.disconnected {
+               c.streamsMu.Unlock()
+               return nil
+       }
+
+       c.disconnected = true
+       close(c.stopCh)
+
+       if c.heartbeatTicker != nil {
+               c.heartbeatTicker.Stop()
+               c.heartbeatTicker = nil
+       }
+       c.streamsMu.Unlock()
+
+       // Wait for heartbeat goroutine to exit before closing streams
+       c.heartbeatWg.Wait()
+
+       c.streamsMu.Lock()
+       if c.registrationStream != nil {
+               if closeErr := c.registrationStream.CloseSend(); closeErr != 
nil {
+                       c.logger.Warn().Err(closeErr).Msg("Error closing 
registration stream")
+               }
+               c.registrationStream = nil
+       }
+
+       if c.metricsStream != nil {
+               if closeErr := c.metricsStream.CloseSend(); closeErr != nil {
+                       c.logger.Warn().Err(closeErr).Msg("Error closing 
metrics stream")
+               }
+               c.metricsStream = nil
+       }
+       c.streamsMu.Unlock()
+
+       c.connManager.stop()
+
+       c.streamsMu.Lock()
+       c.client = nil
+       c.streamsMu.Unlock()
+
+       c.logger.Info().Msg("Disconnected from FODC Proxy")
+
+       return nil
+}
+
+// Start starts the proxy client with automatic reconnection.
+func (c *Client) Start(ctx context.Context) error {
+       c.connManager.start(ctx)
+
+       for {
+               select {
+               case <-ctx.Done():
+                       return ctx.Err()
+               case <-c.stopCh:
+                       return nil
+               default:
+               }
+
+               if connectErr := c.Connect(ctx); connectErr != nil {
+                       c.logger.Error().Err(connectErr).Msg("Failed to connect 
to Proxy, retrying...")
+                       time.Sleep(c.reconnectInterval)
+                       continue
+               }
+
+               if regErr := c.StartRegistrationStream(ctx); regErr != nil {
+                       c.logger.Error().Err(regErr).Msg("Failed to start 
registration stream, reconnecting...")
+                       if disconnectErr := c.Disconnect(); disconnectErr != 
nil {
+                               c.logger.Warn().Err(disconnectErr).Msg("Failed 
to disconnect before retry")
+                       }
+                       time.Sleep(c.reconnectInterval)
+                       continue
+               }
+
+               if metricsErr := c.StartMetricsStream(ctx); metricsErr != nil {
+                       c.logger.Error().Err(metricsErr).Msg("Failed to start 
metrics stream, reconnecting...")
+                       if disconnectErr := c.Disconnect(); disconnectErr != 
nil {
+                               c.logger.Warn().Err(disconnectErr).Msg("Failed 
to disconnect before retry")
+                       }
+                       time.Sleep(c.reconnectInterval)
+                       continue
+               }
+
+               c.logger.Info().Msg("Proxy client started successfully")
+
+               select {
+               case <-ctx.Done():
+                       return ctx.Err()
+               case <-c.stopCh:
+                       return nil
+               }
+       }
+}
+
+// handleRegistrationStream handles the registration stream.
+func (c *Client) handleRegistrationStream(ctx context.Context, stream 
fodcv1.FODCService_RegisterAgentClient) {
+       for {
+               select {
+               case <-ctx.Done():
+                       return
+               case <-c.stopCh:
+                       return
+               default:
+               }
+
+               _, recvErr := stream.Recv()
+               if errors.Is(recvErr, io.EOF) {
+                       c.logger.Warn().Msg("Registration stream closed by 
Proxy, reconnecting...")
+                       go c.reconnect(ctx)

Review Comment:
   Both handleRegistrationStream and handleMetricsStream can call reconnect() 
concurrently, leading to multiple reconnection goroutines and race conditions 
in reconnect(). You should consider using a blocked channel to run only a 
single reconnect goroutine. 
   



##########
fodc/proxy/internal/metrics/aggregator.go:
##########
@@ -0,0 +1,304 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package metrics provides functionality for aggregating and enriching 
metrics from all agents.
+package metrics
+
+import (
+       "context"
+       "fmt"
+       "sync"
+       "time"
+
+       fodcv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/fodc/v1"
+       "github.com/apache/skywalking-banyandb/fodc/proxy/internal/registry"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+const (
+       // defaultCollectionTimeout is the default timeout for collecting 
metrics from agents.
+       defaultCollectionTimeout = 10 * time.Second
+       // maxCollectionTimeout is the maximum timeout allowed for collecting 
metrics,
+       // preventing excessively long waits for wide time windows.
+       maxCollectionTimeout = 5 * time.Minute
+)
+
+// AggregatedMetric represents an aggregated metric with node metadata.
+type AggregatedMetric struct {
+       Labels      map[string]string
+       Timestamp   time.Time
+       Name        string
+       AgentID     string
+       NodeRole    string
+       Description string
+       Value       float64
+}
+
+// Filter defines filters for metrics collection.
+type Filter struct {
+       StartTime *time.Time
+       EndTime   *time.Time
+       Role      string
+       Address   string
+       AgentIDs  []string
+}
+
+// Aggregator aggregates and enriches metrics from all agents.
+type Aggregator struct {
+       registry     *registry.AgentRegistry
+       logger       *logger.Logger
+       grpcService  RequestSender
+       collecting   map[string]chan []*AggregatedMetric
+       mu           sync.RWMutex
+       collectingMu sync.RWMutex
+}
+
+// RequestSender is an interface for sending metrics requests to agents.
+type RequestSender interface {
+       RequestMetrics(ctx context.Context, agentID string, startTime, endTime 
*time.Time) error
+}
+
+// NewAggregator creates a new MetricsAggregator instance.
+func NewAggregator(registry *registry.AgentRegistry, grpcService 
RequestSender, logger *logger.Logger) *Aggregator {
+       return &Aggregator{
+               registry:    registry,
+               grpcService: grpcService,
+               logger:      logger,
+               collecting:  make(map[string]chan []*AggregatedMetric),
+       }
+}
+
+// SetGRPCService sets the gRPC service for sending metrics requests.
+func (ma *Aggregator) SetGRPCService(grpcService RequestSender) {
+       ma.mu.Lock()
+       defer ma.mu.Unlock()
+       ma.grpcService = grpcService
+}
+
+// ProcessMetricsFromAgent processes metrics received from an agent.
+func (ma *Aggregator) ProcessMetricsFromAgent(ctx context.Context, agentID 
string, agentInfo *registry.AgentInfo, req *fodcv1.StreamMetricsRequest) error {
+       aggregatedMetrics := make([]*AggregatedMetric, 0, len(req.Metrics))
+
+       for _, metric := range req.Metrics {
+               labels := make(map[string]string)
+               for key, value := range metric.Labels {
+                       labels[key] = value
+               }
+
+               labels["agent_id"] = agentID
+               labels["node_role"] = agentInfo.NodeRole
+               labels["ip"] = agentInfo.PrimaryAddress.IP
+               labels["port"] = fmt.Sprintf("%d", 
agentInfo.PrimaryAddress.Port)
+
+               for key, value := range agentInfo.Labels {
+                       labels[key] = value
+               }
+
+               var timestamp time.Time
+               switch {
+               case metric.Timestamp != nil:
+                       timestamp = metric.Timestamp.AsTime()
+               case req.Timestamp != nil:
+                       timestamp = req.Timestamp.AsTime()
+               default:
+                       timestamp = time.Now()
+               }
+
+               aggregatedMetric := &AggregatedMetric{
+                       Name:        metric.Name,
+                       Labels:      labels,
+                       Value:       metric.Value,
+                       Timestamp:   timestamp,
+                       AgentID:     agentID,
+                       NodeRole:    agentInfo.NodeRole,
+                       Description: metric.Description,
+               }
+
+               aggregatedMetrics = append(aggregatedMetrics, aggregatedMetric)
+       }
+
+       ma.collectingMu.RLock()
+       defer ma.collectingMu.RUnlock()
+
+       collectCh, exists := ma.collecting[agentID]
+
+       if exists {
+               select {
+               case collectCh <- aggregatedMetrics:
+               case <-ctx.Done():
+                       return ctx.Err()
+               default:
+                       ma.logger.Warn().Str("agent_id", agentID).Msg("Metrics 
collection channel full, dropping metrics")
+               }
+       } else {
+               ma.logger.Warn().Str("agent_id", agentID).Msg("Metrics 
collection channel not found, dropping metrics")
+       }
+
+       return nil
+}
+
+// CollectMetricsFromAgents requests metrics from all agents (or filtered 
agents) when external client queries.
+func (ma *Aggregator) CollectMetricsFromAgents(ctx context.Context, filter 
*Filter) ([]*AggregatedMetric, error) {
+       agents := ma.getFilteredAgents(filter)
+       if len(agents) == 0 {
+               return []*AggregatedMetric{}, nil
+       }
+
+       collectChs := make(map[string]chan []*AggregatedMetric)
+       agentIDs := make([]string, 0, len(agents))
+       ma.collectingMu.Lock()
+       for _, agentInfo := range agents {
+               collectCh := make(chan []*AggregatedMetric, 1)
+               collectChs[agentInfo.AgentID] = collectCh
+               ma.collecting[agentInfo.AgentID] = collectCh
+               agentIDs = append(agentIDs, agentInfo.AgentID)
+       }
+       ma.collectingMu.Unlock()
+
+       defer func() {
+               ma.collectingMu.Lock()
+               for _, agentID := range agentIDs {
+                       delete(ma.collecting, agentID)
+               }
+               ma.collectingMu.Unlock()
+       }()
+
+       for _, agentInfo := range agents {
+               requestErr := ma.grpcService.RequestMetrics(ctx, 
agentInfo.AgentID, filter.StartTime, filter.EndTime)
+               if requestErr != nil {
+                       ma.logger.Error().
+                               Err(requestErr).
+                               Str("agent_id", agentInfo.AgentID).
+                               Msg("Failed to request metrics from agent")
+                       ma.collectingMu.Lock()
+                       if collectCh, exists := 
ma.collecting[agentInfo.AgentID]; exists {
+                               close(collectCh)
+                               delete(ma.collecting, agentInfo.AgentID)
+                       }
+                       ma.collectingMu.Unlock()
+                       delete(collectChs, agentInfo.AgentID)
+               }
+       }
+
+       timeout := defaultCollectionTimeout
+       if filter.StartTime != nil && filter.EndTime != nil {
+               windowDuration := filter.EndTime.Sub(*filter.StartTime) + 
5*time.Second
+               if windowDuration < maxCollectionTimeout {
+                       timeout = windowDuration
+               } else {
+                       timeout = maxCollectionTimeout
+               }
+       }
+
+       allMetrics := make([]*AggregatedMetric, 0)
+       var metricsMu sync.Mutex
+       var wg sync.WaitGroup
+
+       for agentID, collectCh := range collectChs {
+               wg.Add(1)
+               go func(id string, ch chan []*AggregatedMetric) {
+                       defer wg.Done()
+                       agentCtx, agentCancel := context.WithTimeout(ctx, 
timeout)
+                       defer agentCancel()
+
+                       select {
+                       case <-agentCtx.Done():
+                               ma.logger.Warn().
+                                       Str("agent_id", id).
+                                       Msg("Timeout waiting for metrics from 
agent")
+                       case metrics := <-ch:

Review Comment:
   On timeout, the channel isn’t closed. If ProcessMetricsFromAgent later sends 
to this channel, it can block or panic. The channel may also remain in the map 
after the defer cleanup, leading to leaks.



##########
fodc/agent/internal/proxy/conn_manager.go:
##########
@@ -0,0 +1,319 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package proxy provides connection management for the FODC Proxy client.
+package proxy
+
+import (
+       "context"
+       "fmt"
+       "sync"
+       "time"
+
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+)
+
+const (
+       connManagerMaxRetryInterval = 30 * time.Second
+       connManagerMaxRetries       = 3
+)
+
+// connEventType represents the type of connection event.
+type connEventType int
+
+// Possible connection event types.
+const (
+       connEventConnect connEventType = iota
+       connEventDisconnect
+)
+
+// connEvent represents a connection event sent to the manager.
+type connEvent struct {
+       ResultCh chan<- connResult
+       Context  context.Context
+       Type     connEventType
+}
+
+// connResult represents the result of a connection operation.
+type connResult struct {
+       Conn  *grpc.ClientConn
+       Error error

Review Comment:
   ```suggestion
        conn  *grpc.ClientConn
        err error
   ```



##########
fodc/proxy/internal/grpc/service.go:
##########
@@ -0,0 +1,350 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "io"
+       "strings"
+       "sync"
+       "time"
+
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/metadata"
+       "google.golang.org/grpc/peer"
+       "google.golang.org/grpc/status"
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       fodcv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/fodc/v1"
+       "github.com/apache/skywalking-banyandb/fodc/proxy/internal/metrics"
+       "github.com/apache/skywalking-banyandb/fodc/proxy/internal/registry"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// agentConnection represents a connection to an agent.
+type agentConnection struct {
+       metricsStream fodcv1.FODCService_StreamMetricsServer
+       lastActivity  time.Time
+       agentID       string
+       mu            sync.RWMutex
+}
+
+// updateActivity updates the last activity time.
+func (ac *agentConnection) updateActivity() {
+       ac.mu.Lock()
+       defer ac.mu.Unlock()
+       ac.lastActivity = time.Now()
+}
+
+// getLastActivity returns the last activity time.
+func (ac *agentConnection) getLastActivity() time.Time {
+       ac.mu.RLock()
+       defer ac.mu.RUnlock()
+       return ac.lastActivity
+}
+
+// FODCService implements the FODC gRPC service.
+type FODCService struct {
+       fodcv1.UnimplementedFODCServiceServer
+       registry          *registry.AgentRegistry
+       metricsAggregator *metrics.Aggregator
+       logger            *logger.Logger
+       connections       map[string]*agentConnection
+       connectionsMu     sync.RWMutex
+       heartbeatInterval time.Duration
+}
+
+// NewFODCService creates a new FODCService instance.
+func NewFODCService(registry *registry.AgentRegistry, metricsAggregator 
*metrics.Aggregator, logger *logger.Logger, heartbeatInterval time.Duration) 
*FODCService {
+       return &FODCService{
+               registry:          registry,
+               metricsAggregator: metricsAggregator,
+               logger:            logger,
+               connections:       make(map[string]*agentConnection),
+               heartbeatInterval: heartbeatInterval,
+       }
+}
+
+// RegisterAgent handles bi-directional agent registration stream.
+func (s *FODCService) RegisterAgent(stream 
fodcv1.FODCService_RegisterAgentServer) error {
+       ctx, cancel := context.WithCancel(stream.Context())
+       defer cancel()
+
+       var agentID string
+       var agentConn *agentConnection
+       initialRegistration := true
+       defer func() {
+               s.cleanupConnection(agentID)
+       }()
+
+       for {
+               req, recvErr := stream.Recv()
+               if errors.Is(recvErr, io.EOF) {
+                       s.logger.Debug().Str("agent_id", 
agentID).Msg("Registration stream closed by agent")
+                       break
+               }
+               if recvErr != nil {
+                       s.logger.Error().Err(recvErr).Str("agent_id", 
agentID).Msg("Error receiving registration request")
+                       return recvErr
+               }
+
+               if initialRegistration {
+                       identity := registry.AgentIdentity{
+                               IP:     req.PrimaryAddress.Ip,
+                               Port:   int(req.PrimaryAddress.Port),
+                               Role:   req.NodeRole,
+                               Labels: req.Labels,
+                       }
+
+                       primaryAddr := registry.Address{
+                               IP:   req.PrimaryAddress.Ip,
+                               Port: int(req.PrimaryAddress.Port),
+                       }
+
+                       registeredAgentID, registerErr := 
s.registry.RegisterAgent(ctx, identity, primaryAddr)
+                       if registerErr != nil {
+                               resp := &fodcv1.RegisterAgentResponse{
+                                       Success: false,
+                                       Message: registerErr.Error(),
+                               }
+                               if sendErr := stream.Send(resp); sendErr != nil 
{
+                                       
s.logger.Error().Err(sendErr).Msg("Failed to send registration error response")
+                               }
+                               return registerErr
+                       }
+
+                       agentID = registeredAgentID
+
+                       agentConn = &agentConnection{
+                               agentID:      agentID,
+                               lastActivity: time.Now(),
+                       }
+
+                       s.connectionsMu.Lock()
+                       s.connections[agentID] = agentConn
+                       s.connectionsMu.Unlock()
+
+                       resp := &fodcv1.RegisterAgentResponse{
+                               Success:                  true,
+                               Message:                  "Agent registered 
successfully",
+                               HeartbeatIntervalSeconds: 
int64(s.heartbeatInterval.Seconds()),
+                               AgentId:                  agentID,
+                       }
+
+                       if sendErr := stream.Send(resp); sendErr != nil {
+                               s.logger.Error().Err(sendErr).Str("agent_id", 
agentID).Msg("Failed to send registration response")
+                               // Unregister agent since we couldn't send 
confirmation
+                               if unregisterErr := 
s.registry.UnregisterAgent(agentID); unregisterErr != nil {
+                                       
s.logger.Error().Err(unregisterErr).Str("agent_id", agentID).Msg("Failed to 
unregister agent after send error")
+                               }
+                               return sendErr
+                       }
+
+                       initialRegistration = false
+                       s.logger.Info().
+                               Str("agent_id", agentID).
+                               Str("ip", identity.IP).
+                               Int("port", identity.Port).
+                               Str("role", identity.Role).
+                               Msg("Agent registration stream established")
+               } else {
+                       if updateErr := s.registry.UpdateHeartbeat(agentID); 
updateErr != nil {
+                               s.logger.Error().Err(updateErr).Str("agent_id", 
agentID).Msg("Failed to update heartbeat")
+                               return updateErr
+                       }
+
+                       if agentConn != nil {
+                               agentConn.updateActivity()
+                       }
+               }
+       }
+
+       return nil
+}
+
+// StreamMetrics handles bi-directional metrics streaming.
+func (s *FODCService) StreamMetrics(stream 
fodcv1.FODCService_StreamMetricsServer) error {
+       ctx := stream.Context()
+
+       agentID := s.getAgentIDFromContext(ctx)
+       if agentID == "" {
+               agentID = s.getAgentIDFromPeer(ctx)
+               if agentID != "" {
+                       s.logger.Warn().
+                               Str("agent_id", agentID).
+                               Msg("Agent ID not found in metadata, using peer 
address fallback (this may be unreliable)")
+               }
+       }
+
+       if agentID == "" {
+               s.logger.Error().Msg("Agent ID not found in context metadata or 
peer address")
+               return status.Errorf(codes.Unauthenticated, "agent ID not found 
in context or peer address")
+       }
+
+       defer s.cleanupConnection(agentID)
+
+       s.connectionsMu.Lock()
+       existingConn, exists := s.connections[agentID]
+       if exists {
+               existingConn.metricsStream = stream
+               existingConn.updateActivity()
+       } else {
+               agentConn := &agentConnection{
+                       agentID:       agentID,
+                       metricsStream: stream,
+                       lastActivity:  time.Now(),
+               }
+               s.connections[agentID] = agentConn
+       }
+       s.connectionsMu.Unlock()
+
+       for {
+               req, recvErr := stream.Recv()
+               if errors.Is(recvErr, io.EOF) {
+                       s.logger.Debug().Str("agent_id", agentID).Msg("Metrics 
stream closed by agent")
+                       return nil
+               }
+               if recvErr != nil {
+                       if errors.Is(recvErr, context.Canceled) || 
errors.Is(recvErr, context.DeadlineExceeded) {
+                               s.logger.Debug().Err(recvErr).Str("agent_id", 
agentID).Msg("Metrics stream closed")
+                       } else if st, ok := status.FromError(recvErr); ok {
+                               code := st.Code()
+                               if code == codes.Canceled || code == 
codes.DeadlineExceeded {
+                                       
s.logger.Debug().Err(recvErr).Str("agent_id", agentID).Msg("Metrics stream 
closed")
+                               } else {
+                                       
s.logger.Error().Err(recvErr).Str("agent_id", agentID).Msg("Error receiving 
metrics")
+                               }
+                       } else {
+                               s.logger.Error().Err(recvErr).Str("agent_id", 
agentID).Msg("Error receiving metrics")
+                       }
+                       return recvErr
+               }
+
+               s.connectionsMu.RLock()
+               conn, connExists := s.connections[agentID]
+               s.connectionsMu.RUnlock()
+               if connExists {
+                       conn.updateActivity()
+               }
+
+               agentInfo, getErr := s.registry.GetAgentByID(agentID)
+               if getErr != nil {
+                       s.logger.Error().Err(getErr).Str("agent_id", 
agentID).Msg("Failed to get agent info")
+                       continue
+               }
+
+               if processErr := 
s.metricsAggregator.ProcessMetricsFromAgent(ctx, agentID, agentInfo, req); 
processErr != nil {
+                       s.logger.Error().Err(processErr).Str("agent_id", 
agentID).Msg("Failed to process metrics")
+               }
+       }
+}
+
+// RequestMetrics requests metrics from an agent via the metrics stream.
+func (s *FODCService) RequestMetrics(_ context.Context, agentID string, 
startTime, endTime *time.Time) error {

Review Comment:
   ```suggestion
   func (s *FODCService) RequestMetrics(agentID string, startTime, endTime 
*time.Time) error {
   ```



##########
fodc/proxy/internal/metrics/aggregator.go:
##########
@@ -0,0 +1,304 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package metrics provides functionality for aggregating and enriching 
metrics from all agents.
+package metrics
+
+import (
+       "context"
+       "fmt"
+       "sync"
+       "time"
+
+       fodcv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/fodc/v1"
+       "github.com/apache/skywalking-banyandb/fodc/proxy/internal/registry"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+const (
+       // defaultCollectionTimeout is the default timeout for collecting 
metrics from agents.
+       defaultCollectionTimeout = 10 * time.Second
+       // maxCollectionTimeout is the maximum timeout allowed for collecting 
metrics,
+       // preventing excessively long waits for wide time windows.
+       maxCollectionTimeout = 5 * time.Minute
+)
+
+// AggregatedMetric represents an aggregated metric with node metadata.
+type AggregatedMetric struct {
+       Labels      map[string]string
+       Timestamp   time.Time
+       Name        string
+       AgentID     string
+       NodeRole    string
+       Description string
+       Value       float64
+}
+
+// Filter defines filters for metrics collection.
+type Filter struct {
+       StartTime *time.Time
+       EndTime   *time.Time
+       Role      string
+       Address   string
+       AgentIDs  []string
+}
+
+// Aggregator aggregates and enriches metrics from all agents.
+type Aggregator struct {
+       registry     *registry.AgentRegistry
+       logger       *logger.Logger
+       grpcService  RequestSender
+       collecting   map[string]chan []*AggregatedMetric
+       mu           sync.RWMutex
+       collectingMu sync.RWMutex
+}
+
+// RequestSender is an interface for sending metrics requests to agents.
+type RequestSender interface {
+       RequestMetrics(ctx context.Context, agentID string, startTime, endTime 
*time.Time) error
+}
+
+// NewAggregator creates a new MetricsAggregator instance.
+func NewAggregator(registry *registry.AgentRegistry, grpcService 
RequestSender, logger *logger.Logger) *Aggregator {
+       return &Aggregator{
+               registry:    registry,
+               grpcService: grpcService,
+               logger:      logger,
+               collecting:  make(map[string]chan []*AggregatedMetric),
+       }
+}
+
+// SetGRPCService sets the gRPC service for sending metrics requests.
+func (ma *Aggregator) SetGRPCService(grpcService RequestSender) {
+       ma.mu.Lock()
+       defer ma.mu.Unlock()
+       ma.grpcService = grpcService
+}
+
+// ProcessMetricsFromAgent processes metrics received from an agent.
+func (ma *Aggregator) ProcessMetricsFromAgent(ctx context.Context, agentID 
string, agentInfo *registry.AgentInfo, req *fodcv1.StreamMetricsRequest) error {
+       aggregatedMetrics := make([]*AggregatedMetric, 0, len(req.Metrics))
+
+       for _, metric := range req.Metrics {
+               labels := make(map[string]string)
+               for key, value := range metric.Labels {
+                       labels[key] = value
+               }
+
+               labels["agent_id"] = agentID
+               labels["node_role"] = agentInfo.NodeRole
+               labels["ip"] = agentInfo.PrimaryAddress.IP
+               labels["port"] = fmt.Sprintf("%d", 
agentInfo.PrimaryAddress.Port)
+
+               for key, value := range agentInfo.Labels {
+                       labels[key] = value
+               }
+
+               var timestamp time.Time
+               switch {
+               case metric.Timestamp != nil:
+                       timestamp = metric.Timestamp.AsTime()
+               case req.Timestamp != nil:
+                       timestamp = req.Timestamp.AsTime()
+               default:
+                       timestamp = time.Now()
+               }
+
+               aggregatedMetric := &AggregatedMetric{
+                       Name:        metric.Name,
+                       Labels:      labels,
+                       Value:       metric.Value,
+                       Timestamp:   timestamp,
+                       AgentID:     agentID,
+                       NodeRole:    agentInfo.NodeRole,
+                       Description: metric.Description,
+               }
+
+               aggregatedMetrics = append(aggregatedMetrics, aggregatedMetric)
+       }
+
+       ma.collectingMu.RLock()
+       defer ma.collectingMu.RUnlock()
+
+       collectCh, exists := ma.collecting[agentID]
+
+       if exists {
+               select {
+               case collectCh <- aggregatedMetrics:
+               case <-ctx.Done():
+                       return ctx.Err()
+               default:
+                       ma.logger.Warn().Str("agent_id", agentID).Msg("Metrics 
collection channel full, dropping metrics")
+               }
+       } else {
+               ma.logger.Warn().Str("agent_id", agentID).Msg("Metrics 
collection channel not found, dropping metrics")
+       }
+
+       return nil
+}
+
+// CollectMetricsFromAgents requests metrics from all agents (or filtered 
agents) when external client queries.
+func (ma *Aggregator) CollectMetricsFromAgents(ctx context.Context, filter 
*Filter) ([]*AggregatedMetric, error) {
+       agents := ma.getFilteredAgents(filter)
+       if len(agents) == 0 {
+               return []*AggregatedMetric{}, nil
+       }
+
+       collectChs := make(map[string]chan []*AggregatedMetric)
+       agentIDs := make([]string, 0, len(agents))
+       ma.collectingMu.Lock()
+       for _, agentInfo := range agents {
+               collectCh := make(chan []*AggregatedMetric, 1)
+               collectChs[agentInfo.AgentID] = collectCh
+               ma.collecting[agentInfo.AgentID] = collectCh
+               agentIDs = append(agentIDs, agentInfo.AgentID)
+       }
+       ma.collectingMu.Unlock()
+
+       defer func() {
+               ma.collectingMu.Lock()
+               for _, agentID := range agentIDs {
+                       delete(ma.collecting, agentID)
+               }
+               ma.collectingMu.Unlock()
+       }()
+
+       for _, agentInfo := range agents {

Review Comment:
   Check if the ctx is done in the loop.



##########
fodc/proxy/internal/metrics/aggregator.go:
##########
@@ -0,0 +1,304 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package metrics provides functionality for aggregating and enriching 
metrics from all agents.
+package metrics
+
+import (
+       "context"
+       "fmt"
+       "sync"
+       "time"
+
+       fodcv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/fodc/v1"
+       "github.com/apache/skywalking-banyandb/fodc/proxy/internal/registry"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+const (
+       // defaultCollectionTimeout is the default timeout for collecting 
metrics from agents.
+       defaultCollectionTimeout = 10 * time.Second
+       // maxCollectionTimeout is the maximum timeout allowed for collecting 
metrics,
+       // preventing excessively long waits for wide time windows.
+       maxCollectionTimeout = 5 * time.Minute
+)
+
+// AggregatedMetric represents an aggregated metric with node metadata.
+type AggregatedMetric struct {
+       Labels      map[string]string
+       Timestamp   time.Time
+       Name        string
+       AgentID     string
+       NodeRole    string
+       Description string
+       Value       float64
+}
+
+// Filter defines filters for metrics collection.
+type Filter struct {
+       StartTime *time.Time
+       EndTime   *time.Time
+       Role      string
+       Address   string
+       AgentIDs  []string
+}
+
+// Aggregator aggregates and enriches metrics from all agents.
+type Aggregator struct {
+       registry     *registry.AgentRegistry
+       logger       *logger.Logger
+       grpcService  RequestSender
+       collecting   map[string]chan []*AggregatedMetric
+       mu           sync.RWMutex
+       collectingMu sync.RWMutex
+}
+
+// RequestSender is an interface for sending metrics requests to agents.
+type RequestSender interface {
+       RequestMetrics(ctx context.Context, agentID string, startTime, endTime 
*time.Time) error
+}
+
+// NewAggregator creates a new MetricsAggregator instance.
+func NewAggregator(registry *registry.AgentRegistry, grpcService 
RequestSender, logger *logger.Logger) *Aggregator {
+       return &Aggregator{
+               registry:    registry,
+               grpcService: grpcService,
+               logger:      logger,
+               collecting:  make(map[string]chan []*AggregatedMetric),
+       }
+}
+
+// SetGRPCService sets the gRPC service for sending metrics requests.
+func (ma *Aggregator) SetGRPCService(grpcService RequestSender) {
+       ma.mu.Lock()
+       defer ma.mu.Unlock()
+       ma.grpcService = grpcService
+}
+
+// ProcessMetricsFromAgent processes metrics received from an agent.
+func (ma *Aggregator) ProcessMetricsFromAgent(ctx context.Context, agentID 
string, agentInfo *registry.AgentInfo, req *fodcv1.StreamMetricsRequest) error {
+       aggregatedMetrics := make([]*AggregatedMetric, 0, len(req.Metrics))
+
+       for _, metric := range req.Metrics {
+               labels := make(map[string]string)
+               for key, value := range metric.Labels {
+                       labels[key] = value
+               }
+
+               labels["agent_id"] = agentID
+               labels["node_role"] = agentInfo.NodeRole
+               labels["ip"] = agentInfo.PrimaryAddress.IP
+               labels["port"] = fmt.Sprintf("%d", 
agentInfo.PrimaryAddress.Port)
+
+               for key, value := range agentInfo.Labels {
+                       labels[key] = value
+               }
+
+               var timestamp time.Time
+               switch {
+               case metric.Timestamp != nil:
+                       timestamp = metric.Timestamp.AsTime()
+               case req.Timestamp != nil:
+                       timestamp = req.Timestamp.AsTime()
+               default:
+                       timestamp = time.Now()
+               }
+
+               aggregatedMetric := &AggregatedMetric{
+                       Name:        metric.Name,
+                       Labels:      labels,
+                       Value:       metric.Value,
+                       Timestamp:   timestamp,
+                       AgentID:     agentID,
+                       NodeRole:    agentInfo.NodeRole,
+                       Description: metric.Description,
+               }
+
+               aggregatedMetrics = append(aggregatedMetrics, aggregatedMetric)
+       }
+
+       ma.collectingMu.RLock()
+       defer ma.collectingMu.RUnlock()
+
+       collectCh, exists := ma.collecting[agentID]
+
+       if exists {
+               select {
+               case collectCh <- aggregatedMetrics:
+               case <-ctx.Done():
+                       return ctx.Err()
+               default:
+                       ma.logger.Warn().Str("agent_id", agentID).Msg("Metrics 
collection channel full, dropping metrics")
+               }
+       } else {
+               ma.logger.Warn().Str("agent_id", agentID).Msg("Metrics 
collection channel not found, dropping metrics")
+       }
+
+       return nil
+}
+
+// CollectMetricsFromAgents requests metrics from all agents (or filtered 
agents) when external client queries.
+func (ma *Aggregator) CollectMetricsFromAgents(ctx context.Context, filter 
*Filter) ([]*AggregatedMetric, error) {
+       agents := ma.getFilteredAgents(filter)
+       if len(agents) == 0 {
+               return []*AggregatedMetric{}, nil
+       }
+
+       collectChs := make(map[string]chan []*AggregatedMetric)
+       agentIDs := make([]string, 0, len(agents))
+       ma.collectingMu.Lock()
+       for _, agentInfo := range agents {
+               collectCh := make(chan []*AggregatedMetric, 1)
+               collectChs[agentInfo.AgentID] = collectCh
+               ma.collecting[agentInfo.AgentID] = collectCh
+               agentIDs = append(agentIDs, agentInfo.AgentID)
+       }
+       ma.collectingMu.Unlock()
+
+       defer func() {
+               ma.collectingMu.Lock()
+               for _, agentID := range agentIDs {
+                       delete(ma.collecting, agentID)

Review Comment:
   Close channels in the defer



##########
fodc/proxy/internal/grpc/service.go:
##########
@@ -0,0 +1,350 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "io"
+       "strings"
+       "sync"
+       "time"
+
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/metadata"
+       "google.golang.org/grpc/peer"
+       "google.golang.org/grpc/status"
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       fodcv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/fodc/v1"
+       "github.com/apache/skywalking-banyandb/fodc/proxy/internal/metrics"
+       "github.com/apache/skywalking-banyandb/fodc/proxy/internal/registry"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// agentConnection represents a connection to an agent.
+type agentConnection struct {
+       metricsStream fodcv1.FODCService_StreamMetricsServer
+       lastActivity  time.Time
+       agentID       string
+       mu            sync.RWMutex
+}
+
+// updateActivity updates the last activity time.
+func (ac *agentConnection) updateActivity() {
+       ac.mu.Lock()
+       defer ac.mu.Unlock()
+       ac.lastActivity = time.Now()
+}
+
+// getLastActivity returns the last activity time.
+func (ac *agentConnection) getLastActivity() time.Time {
+       ac.mu.RLock()
+       defer ac.mu.RUnlock()
+       return ac.lastActivity
+}
+
+// FODCService implements the FODC gRPC service.
+type FODCService struct {
+       fodcv1.UnimplementedFODCServiceServer
+       registry          *registry.AgentRegistry
+       metricsAggregator *metrics.Aggregator
+       logger            *logger.Logger
+       connections       map[string]*agentConnection
+       connectionsMu     sync.RWMutex
+       heartbeatInterval time.Duration
+}
+
+// NewFODCService creates a new FODCService instance.
+func NewFODCService(registry *registry.AgentRegistry, metricsAggregator 
*metrics.Aggregator, logger *logger.Logger, heartbeatInterval time.Duration) 
*FODCService {
+       return &FODCService{
+               registry:          registry,
+               metricsAggregator: metricsAggregator,
+               logger:            logger,
+               connections:       make(map[string]*agentConnection),
+               heartbeatInterval: heartbeatInterval,
+       }
+}
+
+// RegisterAgent handles bi-directional agent registration stream.
+func (s *FODCService) RegisterAgent(stream 
fodcv1.FODCService_RegisterAgentServer) error {
+       ctx, cancel := context.WithCancel(stream.Context())
+       defer cancel()
+
+       var agentID string
+       var agentConn *agentConnection
+       initialRegistration := true
+       defer func() {
+               s.cleanupConnection(agentID)
+       }()
+
+       for {
+               req, recvErr := stream.Recv()
+               if errors.Is(recvErr, io.EOF) {
+                       s.logger.Debug().Str("agent_id", 
agentID).Msg("Registration stream closed by agent")
+                       break
+               }
+               if recvErr != nil {
+                       s.logger.Error().Err(recvErr).Str("agent_id", 
agentID).Msg("Error receiving registration request")
+                       return recvErr
+               }
+
+               if initialRegistration {
+                       identity := registry.AgentIdentity{
+                               IP:     req.PrimaryAddress.Ip,
+                               Port:   int(req.PrimaryAddress.Port),
+                               Role:   req.NodeRole,
+                               Labels: req.Labels,
+                       }
+
+                       primaryAddr := registry.Address{
+                               IP:   req.PrimaryAddress.Ip,
+                               Port: int(req.PrimaryAddress.Port),
+                       }
+
+                       registeredAgentID, registerErr := 
s.registry.RegisterAgent(ctx, identity, primaryAddr)
+                       if registerErr != nil {
+                               resp := &fodcv1.RegisterAgentResponse{
+                                       Success: false,
+                                       Message: registerErr.Error(),
+                               }
+                               if sendErr := stream.Send(resp); sendErr != nil 
{
+                                       
s.logger.Error().Err(sendErr).Msg("Failed to send registration error response")
+                               }
+                               return registerErr
+                       }
+
+                       agentID = registeredAgentID
+
+                       agentConn = &agentConnection{
+                               agentID:      agentID,
+                               lastActivity: time.Now(),
+                       }
+
+                       s.connectionsMu.Lock()
+                       s.connections[agentID] = agentConn
+                       s.connectionsMu.Unlock()
+
+                       resp := &fodcv1.RegisterAgentResponse{
+                               Success:                  true,
+                               Message:                  "Agent registered 
successfully",
+                               HeartbeatIntervalSeconds: 
int64(s.heartbeatInterval.Seconds()),
+                               AgentId:                  agentID,
+                       }
+
+                       if sendErr := stream.Send(resp); sendErr != nil {
+                               s.logger.Error().Err(sendErr).Str("agent_id", 
agentID).Msg("Failed to send registration response")
+                               // Unregister agent since we couldn't send 
confirmation
+                               if unregisterErr := 
s.registry.UnregisterAgent(agentID); unregisterErr != nil {
+                                       
s.logger.Error().Err(unregisterErr).Str("agent_id", agentID).Msg("Failed to 
unregister agent after send error")
+                               }
+                               return sendErr
+                       }
+
+                       initialRegistration = false
+                       s.logger.Info().
+                               Str("agent_id", agentID).
+                               Str("ip", identity.IP).
+                               Int("port", identity.Port).
+                               Str("role", identity.Role).
+                               Msg("Agent registration stream established")
+               } else {
+                       if updateErr := s.registry.UpdateHeartbeat(agentID); 
updateErr != nil {
+                               s.logger.Error().Err(updateErr).Str("agent_id", 
agentID).Msg("Failed to update heartbeat")
+                               return updateErr
+                       }
+
+                       if agentConn != nil {
+                               agentConn.updateActivity()
+                       }
+               }
+       }
+
+       return nil
+}
+
+// StreamMetrics handles bi-directional metrics streaming.
+func (s *FODCService) StreamMetrics(stream 
fodcv1.FODCService_StreamMetricsServer) error {
+       ctx := stream.Context()
+
+       agentID := s.getAgentIDFromContext(ctx)
+       if agentID == "" {
+               agentID = s.getAgentIDFromPeer(ctx)
+               if agentID != "" {
+                       s.logger.Warn().
+                               Str("agent_id", agentID).
+                               Msg("Agent ID not found in metadata, using peer 
address fallback (this may be unreliable)")
+               }
+       }
+
+       if agentID == "" {
+               s.logger.Error().Msg("Agent ID not found in context metadata or 
peer address")
+               return status.Errorf(codes.Unauthenticated, "agent ID not found 
in context or peer address")
+       }
+
+       defer s.cleanupConnection(agentID)
+
+       s.connectionsMu.Lock()
+       existingConn, exists := s.connections[agentID]
+       if exists {
+               existingConn.metricsStream = stream
+               existingConn.updateActivity()
+       } else {
+               agentConn := &agentConnection{
+                       agentID:       agentID,
+                       metricsStream: stream,
+                       lastActivity:  time.Now(),
+               }
+               s.connections[agentID] = agentConn
+       }
+       s.connectionsMu.Unlock()
+
+       for {
+               req, recvErr := stream.Recv()
+               if errors.Is(recvErr, io.EOF) {
+                       s.logger.Debug().Str("agent_id", agentID).Msg("Metrics 
stream closed by agent")
+                       return nil
+               }
+               if recvErr != nil {
+                       if errors.Is(recvErr, context.Canceled) || 
errors.Is(recvErr, context.DeadlineExceeded) {
+                               s.logger.Debug().Err(recvErr).Str("agent_id", 
agentID).Msg("Metrics stream closed")
+                       } else if st, ok := status.FromError(recvErr); ok {
+                               code := st.Code()
+                               if code == codes.Canceled || code == 
codes.DeadlineExceeded {
+                                       
s.logger.Debug().Err(recvErr).Str("agent_id", agentID).Msg("Metrics stream 
closed")
+                               } else {
+                                       
s.logger.Error().Err(recvErr).Str("agent_id", agentID).Msg("Error receiving 
metrics")
+                               }
+                       } else {
+                               s.logger.Error().Err(recvErr).Str("agent_id", 
agentID).Msg("Error receiving metrics")
+                       }
+                       return recvErr
+               }
+
+               s.connectionsMu.RLock()
+               conn, connExists := s.connections[agentID]
+               s.connectionsMu.RUnlock()
+               if connExists {
+                       conn.updateActivity()
+               }
+
+               agentInfo, getErr := s.registry.GetAgentByID(agentID)
+               if getErr != nil {
+                       s.logger.Error().Err(getErr).Str("agent_id", 
agentID).Msg("Failed to get agent info")
+                       continue
+               }
+
+               if processErr := 
s.metricsAggregator.ProcessMetricsFromAgent(ctx, agentID, agentInfo, req); 
processErr != nil {
+                       s.logger.Error().Err(processErr).Str("agent_id", 
agentID).Msg("Failed to process metrics")
+               }
+       }
+}
+
+// RequestMetrics requests metrics from an agent via the metrics stream.
+func (s *FODCService) RequestMetrics(_ context.Context, agentID string, 
startTime, endTime *time.Time) error {
+       s.connectionsMu.RLock()
+       defer s.connectionsMu.RUnlock()
+       agentConn, exists := s.connections[agentID]
+
+       if !exists {
+               return fmt.Errorf("agent connection not found for agent ID: 
%s", agentID)
+       }
+
+       if agentConn.metricsStream == nil {
+               return fmt.Errorf("metrics stream not established for agent ID: 
%s", agentID)
+       }
+
+       resp := &fodcv1.StreamMetricsResponse{}
+       if startTime != nil {
+               resp.StartTime = &timestamppb.Timestamp{
+                       Seconds: startTime.Unix(),
+                       Nanos:   int32(startTime.Nanosecond()),
+               }
+       }
+       if endTime != nil {
+               resp.EndTime = &timestamppb.Timestamp{
+                       Seconds: endTime.Unix(),
+                       Nanos:   int32(endTime.Nanosecond()),
+               }
+       }
+
+       if sendErr := agentConn.metricsStream.Send(resp); sendErr != nil {
+               return fmt.Errorf("failed to send metrics request: %w", sendErr)
+       }
+
+       return nil
+}
+
+// cleanupConnection cleans up a connection and unregisters the agent if 
needed.
+func (s *FODCService) cleanupConnection(agentID string) {
+       if agentID == "" {
+               return
+       }
+
+       s.connectionsMu.Lock()
+       defer s.connectionsMu.Unlock()
+       delete(s.connections, agentID)
+
+       s.logger.Debug().Str("agent_id", agentID).Msg("Cleaned up agent 
connection")
+}
+
+// getAgentIDFromContext extracts agent ID from context metadata.
+func (s *FODCService) getAgentIDFromContext(ctx context.Context) string {
+       md, ok := metadata.FromIncomingContext(ctx)
+       if !ok {
+               return ""
+       }
+
+       agentIDs := md.Get("agent_id")
+       if len(agentIDs) == 0 {
+               return ""
+       }
+
+       return agentIDs[0]
+}
+
+// getAgentIDFromPeer extracts agent ID by matching peer address with 
registered agents.
+func (s *FODCService) getAgentIDFromPeer(ctx context.Context) string {
+       peerInfo, ok := peer.FromContext(ctx)
+       if !ok {
+               return ""
+       }
+
+       peerAddr := peerInfo.Addr.String()
+       agents := s.registry.ListAgents()
+
+       // Parse peer address to extract IP (peer.Addr.String() format is 
typically "IP:port" or "[IP]:port")
+       peerIP := peerAddr
+       if idx := strings.LastIndex(peerAddr, ":"); idx > 0 {

Review Comment:
   Use net.SplitHostPort for parsing.



##########
fodc/agent/internal/proxy/client.go:
##########
@@ -0,0 +1,751 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package proxy provides a client for communicating with the FODC Proxy.
+package proxy
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "io"
+       "strings"
+       "sync"
+       "time"
+
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/metadata"
+       "google.golang.org/grpc/status"
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       fodcv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/fodc/v1"
+       flightrecorder 
"github.com/apache/skywalking-banyandb/fodc/agent/internal/flightrecorder"
+       "github.com/apache/skywalking-banyandb/fodc/agent/internal/metrics"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// MetricsRequestFilter defines filters for metrics requests.
+type MetricsRequestFilter struct {
+       StartTime *time.Time
+       EndTime   *time.Time
+}
+
+// Client manages connection and communication with the FODC Proxy.
+type Client struct {
+       connManager        *connManager
+       heartbeatTicker    *time.Ticker
+       flightRecorder     *flightrecorder.FlightRecorder
+       logger             *logger.Logger
+       stopCh             chan struct{}
+       labels             map[string]string
+       client             fodcv1.FODCServiceClient
+       registrationStream fodcv1.FODCService_RegisterAgentClient
+       metricsStream      fodcv1.FODCService_StreamMetricsClient
+
+       proxyAddr string
+       nodeIP    string
+       nodeRole  string
+       agentID   string
+
+       nodePort          int
+       heartbeatInterval time.Duration
+       reconnectInterval time.Duration
+       disconnected      bool
+       streamsMu         sync.RWMutex   // Protects streams only
+       heartbeatWg       sync.WaitGroup // Tracks heartbeat goroutine
+}
+
+// NewClient creates a new Client instance.
+func NewClient(
+       proxyAddr string,
+       nodeIP string,
+       nodePort int,
+       nodeRole string,
+       labels map[string]string,
+       heartbeatInterval time.Duration,
+       reconnectInterval time.Duration,
+       flightRecorder *flightrecorder.FlightRecorder,
+       logger *logger.Logger,
+) *Client {
+       connMgr := newConnManager(proxyAddr, reconnectInterval, logger)
+       client := &Client{
+               connManager:       connMgr,
+               proxyAddr:         proxyAddr,
+               nodeIP:            nodeIP,
+               nodePort:          nodePort,
+               nodeRole:          nodeRole,
+               labels:            labels,
+               heartbeatInterval: heartbeatInterval,
+               reconnectInterval: reconnectInterval,
+               flightRecorder:    flightRecorder,
+               logger:            logger,
+               stopCh:            make(chan struct{}),
+       }
+
+       connMgr.setHeartbeatChecker(client.SendHeartbeat)
+       return client
+}
+
+// StartConnManager is useful for tests or scenarios where you want to 
manually control connection lifecycle.
+func (c *Client) StartConnManager(ctx context.Context) {
+       c.connManager.start(ctx)
+}
+
+// Connect establishes a gRPC connection to Proxy.
+func (c *Client) Connect(ctx context.Context) error {
+       resultCh := c.connManager.RequestConnect(ctx)
+       result := <-resultCh
+       if result.Error != nil {
+               return result.Error
+       }
+
+       c.streamsMu.Lock()
+       c.client = fodcv1.NewFODCServiceClient(result.Conn)
+       // Reset disconnected state and recreate stopCh for reconnection
+       if c.disconnected {
+               c.disconnected = false
+               c.stopCh = make(chan struct{})
+       }
+       c.streamsMu.Unlock()
+
+       return nil
+}
+
+// StartRegistrationStream establishes bi-directional registration stream with 
Proxy.
+func (c *Client) StartRegistrationStream(ctx context.Context) error {
+       c.streamsMu.Lock()
+       if c.client == nil {
+               c.streamsMu.Unlock()
+               return fmt.Errorf("client not connected, call Connect() first")
+       }
+       client := c.client
+       c.streamsMu.Unlock()
+
+       stream, streamErr := client.RegisterAgent(ctx)
+       if streamErr != nil {
+               return fmt.Errorf("failed to create registration stream: %w", 
streamErr)
+       }
+
+       req := &fodcv1.RegisterAgentRequest{
+               NodeRole: c.nodeRole,
+               Labels:   c.labels,
+               PrimaryAddress: &fodcv1.Address{
+                       Ip:   c.nodeIP,
+                       Port: int32(c.nodePort),
+               },
+       }
+
+       if sendErr := stream.Send(req); sendErr != nil {
+               return fmt.Errorf("failed to send registration request: %w", 
sendErr)
+       }
+
+       resp, recvErr := stream.Recv()
+       if recvErr != nil {
+               return fmt.Errorf("failed to receive registration response: 
%w", recvErr)
+       }
+
+       if !resp.Success {
+               return fmt.Errorf("registration failed: %s", resp.Message)
+       }
+
+       if resp.AgentId == "" {
+               return fmt.Errorf("registration response missing agent ID")
+       }
+
+       c.streamsMu.Lock()
+       c.registrationStream = stream
+       c.agentID = resp.AgentId
+       if resp.HeartbeatIntervalSeconds > 0 {
+               c.heartbeatInterval = 
time.Duration(resp.HeartbeatIntervalSeconds) * time.Second
+       }
+       c.streamsMu.Unlock()
+
+       c.logger.Info().
+               Str("proxy_addr", c.proxyAddr).
+               Str("agent_id", resp.AgentId).
+               Dur("heartbeat_interval", c.heartbeatInterval).
+               Msg("Agent registered with Proxy")
+
+       c.startHeartbeat(ctx)
+
+       go c.handleRegistrationStream(ctx, stream)
+
+       return nil
+}
+
+// StartMetricsStream establishes bi-directional metrics stream with Proxy.
+func (c *Client) StartMetricsStream(ctx context.Context) error {
+       c.streamsMu.Lock()
+       if c.client == nil {
+               c.streamsMu.Unlock()
+               return fmt.Errorf("client not connected, call Connect() first")
+       }
+       client := c.client
+       agentID := c.agentID
+       c.streamsMu.Unlock()
+
+       if agentID == "" {
+               return fmt.Errorf("agent ID not available, register agent 
first")
+       }
+
+       md := metadata.New(map[string]string{"agent_id": agentID})
+       ctxWithMetadata := metadata.NewOutgoingContext(ctx, md)
+
+       stream, streamErr := client.StreamMetrics(ctxWithMetadata)
+       if streamErr != nil {
+               return fmt.Errorf("failed to create metrics stream: %w", 
streamErr)
+       }
+
+       c.streamsMu.Lock()
+       c.metricsStream = stream
+       c.streamsMu.Unlock()
+
+       go c.handleMetricsStream(ctx, stream)
+
+       c.logger.Info().
+               Str("agent_id", agentID).
+               Msg("Metrics stream established with Proxy")
+
+       return nil
+}
+
+// RetrieveAndSendMetrics retrieves metrics from Flight Recorder when 
requested by Proxy.
+func (c *Client) RetrieveAndSendMetrics(_ context.Context, filter 
*MetricsRequestFilter) error {
+       c.streamsMu.RLock()
+       if c.disconnected || c.metricsStream == nil {
+               c.streamsMu.RUnlock()
+               return fmt.Errorf("metrics stream not established")
+       }
+       metricsStream := c.metricsStream
+       c.streamsMu.RUnlock()
+
+       datasources := c.flightRecorder.GetDatasources()
+       if len(datasources) == 0 {
+               req := &fodcv1.StreamMetricsRequest{
+                       Metrics:   []*fodcv1.Metric{},
+                       Timestamp: timestamppb.Now(),
+               }
+               if sendErr := metricsStream.Send(req); sendErr != nil {
+                       return fmt.Errorf("failed to send empty metrics 
response: %w", sendErr)
+               }
+               return nil
+       }
+
+       ds := datasources[0]
+       allMetrics := ds.GetMetrics()
+       timestamps := ds.GetTimestamps()
+       descriptions := ds.GetDescriptions()
+
+       if filter != nil && (filter.StartTime != nil || filter.EndTime != nil) {
+               if timestamps == nil {
+                       req := &fodcv1.StreamMetricsRequest{
+                               Metrics:   []*fodcv1.Metric{},
+                               Timestamp: timestamppb.Now(),
+                       }
+                       if sendErr := metricsStream.Send(req); sendErr != nil {
+                               return fmt.Errorf("failed to send empty metrics 
response: %w", sendErr)
+                       }
+                       return nil
+               }
+
+               timestampValues := timestamps.GetAllValues()
+               if len(timestampValues) == 0 {
+                       req := &fodcv1.StreamMetricsRequest{
+                               Metrics:   []*fodcv1.Metric{},
+                               Timestamp: timestamppb.Now(),
+                       }
+                       if sendErr := metricsStream.Send(req); sendErr != nil {
+                               return fmt.Errorf("failed to send empty metrics 
response: %w", sendErr)
+                       }
+                       return nil
+               }
+
+               return c.sendFilteredMetrics(metricsStream, allMetrics, 
timestampValues, descriptions, filter)
+       }
+
+       return c.sendLatestMetrics(metricsStream, allMetrics, descriptions)
+}
+
+// sendLatestMetrics sends the latest metrics (most recent values).
+func (c *Client) sendLatestMetrics(
+       stream fodcv1.FODCService_StreamMetricsClient,
+       allMetrics map[string]*flightrecorder.MetricRingBuffer,
+       descriptions map[string]string,
+) error {
+       protoMetrics := make([]*fodcv1.Metric, 0)
+
+       for metricKey, metricBuffer := range allMetrics {
+               metricValue := metricBuffer.GetCurrentValue()
+               allValues := metricBuffer.GetAllValues()
+
+               if len(allValues) == 0 && metricValue == 0 {
+                       continue
+               }
+
+               parsedKey, parseErr := c.parseMetricKey(metricKey)
+               if parseErr != nil {
+                       c.logger.Warn().Err(parseErr).Str("metric_key", 
metricKey).Msg("Failed to parse metric key")
+                       continue
+               }
+
+               labelsMap := make(map[string]string)
+               for _, label := range parsedKey.Labels {
+                       labelsMap[label.Name] = label.Value
+               }
+
+               protoMetric := &fodcv1.Metric{
+                       Name:        parsedKey.Name,
+                       Labels:      labelsMap,
+                       Value:       metricValue,
+                       Description: descriptions[parsedKey.Name],
+               }
+
+               protoMetrics = append(protoMetrics, protoMetric)
+       }
+
+       req := &fodcv1.StreamMetricsRequest{
+               Metrics:   protoMetrics,
+               Timestamp: timestamppb.Now(),
+       }
+
+       if sendErr := stream.Send(req); sendErr != nil {
+               return fmt.Errorf("failed to send metrics: %w", sendErr)
+       }
+
+       return nil
+}
+
+// sendFilteredMetrics sends metrics filtered by time window.
+func (c *Client) sendFilteredMetrics(
+       stream fodcv1.FODCService_StreamMetricsClient,
+       allMetrics map[string]*flightrecorder.MetricRingBuffer,
+       timestampValues []int64,
+       descriptions map[string]string,
+       filter *MetricsRequestFilter,
+) error {
+       protoMetrics := make([]*fodcv1.Metric, 0)
+
+       for metricKey, metricBuffer := range allMetrics {
+               metricValues := metricBuffer.GetAllValues()
+               if len(metricValues) == 0 {
+                       continue
+               }
+
+               parsedKey, parseErr := c.parseMetricKey(metricKey)
+               if parseErr != nil {
+                       c.logger.Warn().Err(parseErr).Str("metric_key", 
metricKey).Msg("Failed to parse metric key")
+                       continue
+               }
+
+               description := descriptions[parsedKey.Name]
+
+               minLen := len(metricValues)
+               if len(timestampValues) < minLen {
+                       minLen = len(timestampValues)
+               }
+
+               labelsMap := make(map[string]string)
+               for _, label := range parsedKey.Labels {
+                       labelsMap[label.Name] = label.Value
+               }
+
+               for idx := 0; idx < minLen; idx++ {
+                       timestampUnix := timestampValues[idx]
+                       timestamp := time.Unix(timestampUnix, 0)
+
+                       if filter.StartTime != nil && 
timestamp.Before(*filter.StartTime) {
+                               continue
+                       }
+                       if filter.EndTime != nil && 
timestamp.After(*filter.EndTime) {
+                               continue
+                       }
+
+                       protoMetric := &fodcv1.Metric{
+                               Name:        parsedKey.Name,
+                               Labels:      labelsMap,
+                               Value:       metricValues[idx],
+                               Description: description,
+                               Timestamp:   timestamppb.New(timestamp),
+                       }
+
+                       protoMetrics = append(protoMetrics, protoMetric)
+               }
+       }
+
+       req := &fodcv1.StreamMetricsRequest{
+               Metrics:   protoMetrics,
+               Timestamp: timestamppb.Now(),
+       }
+
+       if sendErr := stream.Send(req); sendErr != nil {
+               return fmt.Errorf("failed to send metrics: %w", sendErr)
+       }
+
+       return nil
+}
+
+// SendHeartbeat sends heartbeat to Proxy.
+func (c *Client) SendHeartbeat(_ context.Context) error {
+       c.streamsMu.RLock()
+       if c.disconnected || c.registrationStream == nil {
+               c.streamsMu.RUnlock()
+               return fmt.Errorf("registration stream not established")
+       }
+       registrationStream := c.registrationStream
+       c.streamsMu.RUnlock()
+
+       req := &fodcv1.RegisterAgentRequest{
+               NodeRole: c.nodeRole,
+               Labels:   c.labels,
+               PrimaryAddress: &fodcv1.Address{
+                       Ip:   c.nodeIP,
+                       Port: int32(c.nodePort),
+               },
+       }
+
+       if sendErr := registrationStream.Send(req); sendErr != nil {
+               // Check if error is due to stream being closed/disconnected
+               if errors.Is(sendErr, io.EOF) || errors.Is(sendErr, 
context.Canceled) {
+                       return fmt.Errorf("registration stream closed")
+               }
+               if st, ok := status.FromError(sendErr); ok {
+                       if st.Code() == codes.Canceled {
+                               return fmt.Errorf("registration stream closed")
+                       }
+               }
+               return fmt.Errorf("failed to send heartbeat: %w", sendErr)
+       }
+
+       return nil
+}
+
+// Disconnect closes connection to Proxy.
+func (c *Client) Disconnect() error {
+       c.streamsMu.Lock()
+       if c.disconnected {
+               c.streamsMu.Unlock()
+               return nil
+       }
+
+       c.disconnected = true
+       close(c.stopCh)
+
+       if c.heartbeatTicker != nil {
+               c.heartbeatTicker.Stop()
+               c.heartbeatTicker = nil
+       }
+       c.streamsMu.Unlock()
+
+       // Wait for heartbeat goroutine to exit before closing streams
+       c.heartbeatWg.Wait()
+
+       c.streamsMu.Lock()
+       if c.registrationStream != nil {
+               if closeErr := c.registrationStream.CloseSend(); closeErr != 
nil {
+                       c.logger.Warn().Err(closeErr).Msg("Error closing 
registration stream")
+               }
+               c.registrationStream = nil
+       }
+
+       if c.metricsStream != nil {
+               if closeErr := c.metricsStream.CloseSend(); closeErr != nil {
+                       c.logger.Warn().Err(closeErr).Msg("Error closing 
metrics stream")
+               }
+               c.metricsStream = nil
+       }
+       c.streamsMu.Unlock()
+
+       c.connManager.stop()
+
+       c.streamsMu.Lock()
+       c.client = nil
+       c.streamsMu.Unlock()
+
+       c.logger.Info().Msg("Disconnected from FODC Proxy")
+
+       return nil
+}
+
+// Start starts the proxy client with automatic reconnection.
+func (c *Client) Start(ctx context.Context) error {
+       c.connManager.start(ctx)
+
+       for {
+               select {
+               case <-ctx.Done():
+                       return ctx.Err()
+               case <-c.stopCh:
+                       return nil
+               default:
+               }
+
+               if connectErr := c.Connect(ctx); connectErr != nil {
+                       c.logger.Error().Err(connectErr).Msg("Failed to connect 
to Proxy, retrying...")
+                       time.Sleep(c.reconnectInterval)
+                       continue
+               }
+
+               if regErr := c.StartRegistrationStream(ctx); regErr != nil {
+                       c.logger.Error().Err(regErr).Msg("Failed to start 
registration stream, reconnecting...")
+                       if disconnectErr := c.Disconnect(); disconnectErr != 
nil {
+                               c.logger.Warn().Err(disconnectErr).Msg("Failed 
to disconnect before retry")
+                       }
+                       time.Sleep(c.reconnectInterval)
+                       continue
+               }
+
+               if metricsErr := c.StartMetricsStream(ctx); metricsErr != nil {
+                       c.logger.Error().Err(metricsErr).Msg("Failed to start 
metrics stream, reconnecting...")
+                       if disconnectErr := c.Disconnect(); disconnectErr != 
nil {
+                               c.logger.Warn().Err(disconnectErr).Msg("Failed 
to disconnect before retry")
+                       }
+                       time.Sleep(c.reconnectInterval)
+                       continue
+               }
+
+               c.logger.Info().Msg("Proxy client started successfully")
+
+               select {
+               case <-ctx.Done():
+                       return ctx.Err()
+               case <-c.stopCh:
+                       return nil
+               }
+       }
+}
+
+// handleRegistrationStream handles the registration stream.
+func (c *Client) handleRegistrationStream(ctx context.Context, stream 
fodcv1.FODCService_RegisterAgentClient) {
+       for {
+               select {
+               case <-ctx.Done():
+                       return
+               case <-c.stopCh:
+                       return
+               default:
+               }
+
+               _, recvErr := stream.Recv()
+               if errors.Is(recvErr, io.EOF) {
+                       c.logger.Warn().Msg("Registration stream closed by 
Proxy, reconnecting...")
+                       go c.reconnect(ctx)
+                       return
+               }
+               if recvErr != nil {
+                       // Check if it's a context cancellation or deadline 
exceeded (expected errors during cleanup)
+                       if errors.Is(recvErr, context.Canceled) || 
errors.Is(recvErr, context.DeadlineExceeded) {
+                               c.logger.Debug().Err(recvErr).Msg("Registration 
stream closed")
+                               return
+                       }
+                       if st, ok := status.FromError(recvErr); ok {
+                               // Check if it's a gRPC status error with 
expected codes
+                               code := st.Code()
+                               if code == codes.Canceled || code == 
codes.DeadlineExceeded {
+                                       
c.logger.Debug().Err(recvErr).Msg("Registration stream closed")
+                                       return
+                               }
+                       }
+                       c.logger.Error().Err(recvErr).Msg("Error receiving from 
registration stream, reconnecting...")
+                       go c.reconnect(ctx)
+                       return
+               }
+       }
+}
+
+// handleMetricsStream handles the metrics stream.
+func (c *Client) handleMetricsStream(ctx context.Context, stream 
fodcv1.FODCService_StreamMetricsClient) {
+       for {
+               select {
+               case <-ctx.Done():
+                       return
+               case <-c.stopCh:
+                       return
+               default:
+               }
+
+               resp, recvErr := stream.Recv()
+               if errors.Is(recvErr, io.EOF) {
+                       c.logger.Warn().Msg("Metrics stream closed by Proxy, 
reconnecting...")
+                       go c.reconnect(ctx)
+                       return
+               }
+               if recvErr != nil {
+                       // Check if it's a context cancellation or deadline 
exceeded (expected errors during cleanup)
+                       if errors.Is(recvErr, context.Canceled) || 
errors.Is(recvErr, context.DeadlineExceeded) {

Review Comment:
   If `disconnected` is false, you should always attempt to reconnect the proxy 
regardless of the error.



##########
fodc/agent/internal/proxy/client.go:
##########
@@ -0,0 +1,751 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package proxy provides a client for communicating with the FODC Proxy.
+package proxy
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "io"
+       "strings"
+       "sync"
+       "time"
+
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/metadata"
+       "google.golang.org/grpc/status"
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       fodcv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/fodc/v1"
+       flightrecorder 
"github.com/apache/skywalking-banyandb/fodc/agent/internal/flightrecorder"
+       "github.com/apache/skywalking-banyandb/fodc/agent/internal/metrics"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// MetricsRequestFilter defines filters for metrics requests.
+type MetricsRequestFilter struct {
+       StartTime *time.Time
+       EndTime   *time.Time
+}
+
+// Client manages connection and communication with the FODC Proxy.
+type Client struct {
+       connManager        *connManager
+       heartbeatTicker    *time.Ticker
+       flightRecorder     *flightrecorder.FlightRecorder
+       logger             *logger.Logger
+       stopCh             chan struct{}
+       labels             map[string]string
+       client             fodcv1.FODCServiceClient
+       registrationStream fodcv1.FODCService_RegisterAgentClient
+       metricsStream      fodcv1.FODCService_StreamMetricsClient
+
+       proxyAddr string
+       nodeIP    string
+       nodeRole  string
+       agentID   string
+
+       nodePort          int
+       heartbeatInterval time.Duration
+       reconnectInterval time.Duration
+       disconnected      bool
+       streamsMu         sync.RWMutex   // Protects streams only
+       heartbeatWg       sync.WaitGroup // Tracks heartbeat goroutine
+}
+
+// NewClient creates a new Client instance.
+func NewClient(
+       proxyAddr string,
+       nodeIP string,
+       nodePort int,
+       nodeRole string,
+       labels map[string]string,
+       heartbeatInterval time.Duration,
+       reconnectInterval time.Duration,
+       flightRecorder *flightrecorder.FlightRecorder,
+       logger *logger.Logger,
+) *Client {
+       connMgr := newConnManager(proxyAddr, reconnectInterval, logger)
+       client := &Client{
+               connManager:       connMgr,
+               proxyAddr:         proxyAddr,
+               nodeIP:            nodeIP,
+               nodePort:          nodePort,
+               nodeRole:          nodeRole,
+               labels:            labels,
+               heartbeatInterval: heartbeatInterval,
+               reconnectInterval: reconnectInterval,
+               flightRecorder:    flightRecorder,
+               logger:            logger,
+               stopCh:            make(chan struct{}),
+       }
+
+       connMgr.setHeartbeatChecker(client.SendHeartbeat)
+       return client
+}
+
+// StartConnManager is useful for tests or scenarios where you want to 
manually control connection lifecycle.
+func (c *Client) StartConnManager(ctx context.Context) {
+       c.connManager.start(ctx)
+}
+
+// Connect establishes a gRPC connection to Proxy.
+func (c *Client) Connect(ctx context.Context) error {
+       resultCh := c.connManager.RequestConnect(ctx)
+       result := <-resultCh
+       if result.Error != nil {
+               return result.Error
+       }
+
+       c.streamsMu.Lock()
+       c.client = fodcv1.NewFODCServiceClient(result.Conn)
+       // Reset disconnected state and recreate stopCh for reconnection
+       if c.disconnected {
+               c.disconnected = false
+               c.stopCh = make(chan struct{})
+       }
+       c.streamsMu.Unlock()
+
+       return nil
+}
+
+// StartRegistrationStream establishes bi-directional registration stream with 
Proxy.
+func (c *Client) StartRegistrationStream(ctx context.Context) error {
+       c.streamsMu.Lock()
+       if c.client == nil {
+               c.streamsMu.Unlock()
+               return fmt.Errorf("client not connected, call Connect() first")
+       }
+       client := c.client
+       c.streamsMu.Unlock()
+
+       stream, streamErr := client.RegisterAgent(ctx)
+       if streamErr != nil {
+               return fmt.Errorf("failed to create registration stream: %w", 
streamErr)
+       }
+
+       req := &fodcv1.RegisterAgentRequest{
+               NodeRole: c.nodeRole,
+               Labels:   c.labels,
+               PrimaryAddress: &fodcv1.Address{
+                       Ip:   c.nodeIP,
+                       Port: int32(c.nodePort),
+               },
+       }
+
+       if sendErr := stream.Send(req); sendErr != nil {
+               return fmt.Errorf("failed to send registration request: %w", 
sendErr)
+       }
+
+       resp, recvErr := stream.Recv()
+       if recvErr != nil {
+               return fmt.Errorf("failed to receive registration response: 
%w", recvErr)
+       }
+
+       if !resp.Success {
+               return fmt.Errorf("registration failed: %s", resp.Message)
+       }
+
+       if resp.AgentId == "" {
+               return fmt.Errorf("registration response missing agent ID")
+       }
+
+       c.streamsMu.Lock()
+       c.registrationStream = stream
+       c.agentID = resp.AgentId
+       if resp.HeartbeatIntervalSeconds > 0 {
+               c.heartbeatInterval = 
time.Duration(resp.HeartbeatIntervalSeconds) * time.Second
+       }
+       c.streamsMu.Unlock()
+
+       c.logger.Info().
+               Str("proxy_addr", c.proxyAddr).
+               Str("agent_id", resp.AgentId).
+               Dur("heartbeat_interval", c.heartbeatInterval).
+               Msg("Agent registered with Proxy")
+
+       c.startHeartbeat(ctx)
+
+       go c.handleRegistrationStream(ctx, stream)
+
+       return nil
+}
+
+// StartMetricsStream establishes bi-directional metrics stream with Proxy.
+func (c *Client) StartMetricsStream(ctx context.Context) error {
+       c.streamsMu.Lock()
+       if c.client == nil {
+               c.streamsMu.Unlock()
+               return fmt.Errorf("client not connected, call Connect() first")
+       }
+       client := c.client
+       agentID := c.agentID
+       c.streamsMu.Unlock()
+
+       if agentID == "" {
+               return fmt.Errorf("agent ID not available, register agent 
first")
+       }
+
+       md := metadata.New(map[string]string{"agent_id": agentID})
+       ctxWithMetadata := metadata.NewOutgoingContext(ctx, md)
+
+       stream, streamErr := client.StreamMetrics(ctxWithMetadata)
+       if streamErr != nil {
+               return fmt.Errorf("failed to create metrics stream: %w", 
streamErr)
+       }
+
+       c.streamsMu.Lock()
+       c.metricsStream = stream
+       c.streamsMu.Unlock()
+
+       go c.handleMetricsStream(ctx, stream)
+
+       c.logger.Info().
+               Str("agent_id", agentID).
+               Msg("Metrics stream established with Proxy")
+
+       return nil
+}
+
+// RetrieveAndSendMetrics retrieves metrics from Flight Recorder when 
requested by Proxy.
+func (c *Client) RetrieveAndSendMetrics(_ context.Context, filter 
*MetricsRequestFilter) error {
+       c.streamsMu.RLock()
+       if c.disconnected || c.metricsStream == nil {
+               c.streamsMu.RUnlock()
+               return fmt.Errorf("metrics stream not established")
+       }
+       metricsStream := c.metricsStream
+       c.streamsMu.RUnlock()
+
+       datasources := c.flightRecorder.GetDatasources()
+       if len(datasources) == 0 {
+               req := &fodcv1.StreamMetricsRequest{
+                       Metrics:   []*fodcv1.Metric{},
+                       Timestamp: timestamppb.Now(),
+               }
+               if sendErr := metricsStream.Send(req); sendErr != nil {
+                       return fmt.Errorf("failed to send empty metrics 
response: %w", sendErr)
+               }
+               return nil
+       }
+
+       ds := datasources[0]
+       allMetrics := ds.GetMetrics()
+       timestamps := ds.GetTimestamps()
+       descriptions := ds.GetDescriptions()
+
+       if filter != nil && (filter.StartTime != nil || filter.EndTime != nil) {
+               if timestamps == nil {
+                       req := &fodcv1.StreamMetricsRequest{
+                               Metrics:   []*fodcv1.Metric{},
+                               Timestamp: timestamppb.Now(),
+                       }
+                       if sendErr := metricsStream.Send(req); sendErr != nil {
+                               return fmt.Errorf("failed to send empty metrics 
response: %w", sendErr)
+                       }
+                       return nil
+               }
+
+               timestampValues := timestamps.GetAllValues()
+               if len(timestampValues) == 0 {
+                       req := &fodcv1.StreamMetricsRequest{
+                               Metrics:   []*fodcv1.Metric{},
+                               Timestamp: timestamppb.Now(),
+                       }
+                       if sendErr := metricsStream.Send(req); sendErr != nil {
+                               return fmt.Errorf("failed to send empty metrics 
response: %w", sendErr)
+                       }
+                       return nil
+               }
+
+               return c.sendFilteredMetrics(metricsStream, allMetrics, 
timestampValues, descriptions, filter)
+       }
+
+       return c.sendLatestMetrics(metricsStream, allMetrics, descriptions)
+}
+
+// sendLatestMetrics sends the latest metrics (most recent values).
+func (c *Client) sendLatestMetrics(
+       stream fodcv1.FODCService_StreamMetricsClient,
+       allMetrics map[string]*flightrecorder.MetricRingBuffer,
+       descriptions map[string]string,
+) error {
+       protoMetrics := make([]*fodcv1.Metric, 0)
+
+       for metricKey, metricBuffer := range allMetrics {
+               metricValue := metricBuffer.GetCurrentValue()
+               allValues := metricBuffer.GetAllValues()
+
+               if len(allValues) == 0 && metricValue == 0 {
+                       continue
+               }
+
+               parsedKey, parseErr := c.parseMetricKey(metricKey)
+               if parseErr != nil {
+                       c.logger.Warn().Err(parseErr).Str("metric_key", 
metricKey).Msg("Failed to parse metric key")
+                       continue
+               }
+
+               labelsMap := make(map[string]string)
+               for _, label := range parsedKey.Labels {
+                       labelsMap[label.Name] = label.Value
+               }
+
+               protoMetric := &fodcv1.Metric{
+                       Name:        parsedKey.Name,
+                       Labels:      labelsMap,
+                       Value:       metricValue,
+                       Description: descriptions[parsedKey.Name],
+               }
+
+               protoMetrics = append(protoMetrics, protoMetric)
+       }
+
+       req := &fodcv1.StreamMetricsRequest{
+               Metrics:   protoMetrics,
+               Timestamp: timestamppb.Now(),
+       }
+
+       if sendErr := stream.Send(req); sendErr != nil {
+               return fmt.Errorf("failed to send metrics: %w", sendErr)
+       }
+
+       return nil
+}
+
+// sendFilteredMetrics sends metrics filtered by time window.
+func (c *Client) sendFilteredMetrics(
+       stream fodcv1.FODCService_StreamMetricsClient,
+       allMetrics map[string]*flightrecorder.MetricRingBuffer,
+       timestampValues []int64,
+       descriptions map[string]string,
+       filter *MetricsRequestFilter,
+) error {
+       protoMetrics := make([]*fodcv1.Metric, 0)
+
+       for metricKey, metricBuffer := range allMetrics {
+               metricValues := metricBuffer.GetAllValues()
+               if len(metricValues) == 0 {
+                       continue
+               }
+
+               parsedKey, parseErr := c.parseMetricKey(metricKey)
+               if parseErr != nil {
+                       c.logger.Warn().Err(parseErr).Str("metric_key", 
metricKey).Msg("Failed to parse metric key")
+                       continue
+               }
+
+               description := descriptions[parsedKey.Name]
+
+               minLen := len(metricValues)
+               if len(timestampValues) < minLen {
+                       minLen = len(timestampValues)
+               }
+
+               labelsMap := make(map[string]string)
+               for _, label := range parsedKey.Labels {
+                       labelsMap[label.Name] = label.Value
+               }
+
+               for idx := 0; idx < minLen; idx++ {
+                       timestampUnix := timestampValues[idx]
+                       timestamp := time.Unix(timestampUnix, 0)
+
+                       if filter.StartTime != nil && 
timestamp.Before(*filter.StartTime) {
+                               continue
+                       }
+                       if filter.EndTime != nil && 
timestamp.After(*filter.EndTime) {
+                               continue
+                       }
+
+                       protoMetric := &fodcv1.Metric{
+                               Name:        parsedKey.Name,
+                               Labels:      labelsMap,
+                               Value:       metricValues[idx],
+                               Description: description,
+                               Timestamp:   timestamppb.New(timestamp),
+                       }
+
+                       protoMetrics = append(protoMetrics, protoMetric)
+               }
+       }
+
+       req := &fodcv1.StreamMetricsRequest{
+               Metrics:   protoMetrics,
+               Timestamp: timestamppb.Now(),
+       }
+
+       if sendErr := stream.Send(req); sendErr != nil {
+               return fmt.Errorf("failed to send metrics: %w", sendErr)
+       }
+
+       return nil
+}
+
+// SendHeartbeat sends heartbeat to Proxy.
+func (c *Client) SendHeartbeat(_ context.Context) error {
+       c.streamsMu.RLock()
+       if c.disconnected || c.registrationStream == nil {
+               c.streamsMu.RUnlock()
+               return fmt.Errorf("registration stream not established")
+       }
+       registrationStream := c.registrationStream
+       c.streamsMu.RUnlock()
+
+       req := &fodcv1.RegisterAgentRequest{
+               NodeRole: c.nodeRole,
+               Labels:   c.labels,
+               PrimaryAddress: &fodcv1.Address{
+                       Ip:   c.nodeIP,
+                       Port: int32(c.nodePort),
+               },
+       }
+
+       if sendErr := registrationStream.Send(req); sendErr != nil {
+               // Check if error is due to stream being closed/disconnected
+               if errors.Is(sendErr, io.EOF) || errors.Is(sendErr, 
context.Canceled) {
+                       return fmt.Errorf("registration stream closed")
+               }
+               if st, ok := status.FromError(sendErr); ok {
+                       if st.Code() == codes.Canceled {
+                               return fmt.Errorf("registration stream closed")
+                       }
+               }
+               return fmt.Errorf("failed to send heartbeat: %w", sendErr)
+       }
+
+       return nil
+}
+
+// Disconnect closes connection to Proxy.
+func (c *Client) Disconnect() error {
+       c.streamsMu.Lock()
+       if c.disconnected {
+               c.streamsMu.Unlock()
+               return nil
+       }
+
+       c.disconnected = true
+       close(c.stopCh)
+
+       if c.heartbeatTicker != nil {
+               c.heartbeatTicker.Stop()
+               c.heartbeatTicker = nil
+       }
+       c.streamsMu.Unlock()
+
+       // Wait for heartbeat goroutine to exit before closing streams
+       c.heartbeatWg.Wait()
+
+       c.streamsMu.Lock()
+       if c.registrationStream != nil {
+               if closeErr := c.registrationStream.CloseSend(); closeErr != 
nil {
+                       c.logger.Warn().Err(closeErr).Msg("Error closing 
registration stream")
+               }
+               c.registrationStream = nil
+       }
+
+       if c.metricsStream != nil {
+               if closeErr := c.metricsStream.CloseSend(); closeErr != nil {
+                       c.logger.Warn().Err(closeErr).Msg("Error closing 
metrics stream")
+               }
+               c.metricsStream = nil
+       }
+       c.streamsMu.Unlock()
+
+       c.connManager.stop()
+
+       c.streamsMu.Lock()
+       c.client = nil
+       c.streamsMu.Unlock()
+
+       c.logger.Info().Msg("Disconnected from FODC Proxy")
+
+       return nil
+}
+
+// Start starts the proxy client with automatic reconnection.
+func (c *Client) Start(ctx context.Context) error {
+       c.connManager.start(ctx)
+
+       for {
+               select {
+               case <-ctx.Done():
+                       return ctx.Err()
+               case <-c.stopCh:
+                       return nil
+               default:
+               }
+
+               if connectErr := c.Connect(ctx); connectErr != nil {
+                       c.logger.Error().Err(connectErr).Msg("Failed to connect 
to Proxy, retrying...")
+                       time.Sleep(c.reconnectInterval)
+                       continue
+               }
+
+               if regErr := c.StartRegistrationStream(ctx); regErr != nil {
+                       c.logger.Error().Err(regErr).Msg("Failed to start 
registration stream, reconnecting...")
+                       if disconnectErr := c.Disconnect(); disconnectErr != 
nil {
+                               c.logger.Warn().Err(disconnectErr).Msg("Failed 
to disconnect before retry")
+                       }
+                       time.Sleep(c.reconnectInterval)
+                       continue
+               }
+
+               if metricsErr := c.StartMetricsStream(ctx); metricsErr != nil {
+                       c.logger.Error().Err(metricsErr).Msg("Failed to start 
metrics stream, reconnecting...")
+                       if disconnectErr := c.Disconnect(); disconnectErr != 
nil {
+                               c.logger.Warn().Err(disconnectErr).Msg("Failed 
to disconnect before retry")
+                       }
+                       time.Sleep(c.reconnectInterval)
+                       continue
+               }
+
+               c.logger.Info().Msg("Proxy client started successfully")
+
+               select {
+               case <-ctx.Done():
+                       return ctx.Err()
+               case <-c.stopCh:
+                       return nil
+               }
+       }
+}
+
+// handleRegistrationStream handles the registration stream.
+func (c *Client) handleRegistrationStream(ctx context.Context, stream 
fodcv1.FODCService_RegisterAgentClient) {
+       for {
+               select {
+               case <-ctx.Done():
+                       return
+               case <-c.stopCh:
+                       return
+               default:
+               }
+
+               _, recvErr := stream.Recv()
+               if errors.Is(recvErr, io.EOF) {
+                       c.logger.Warn().Msg("Registration stream closed by 
Proxy, reconnecting...")
+                       go c.reconnect(ctx)
+                       return
+               }
+               if recvErr != nil {
+                       // Check if it's a context cancellation or deadline 
exceeded (expected errors during cleanup)
+                       if errors.Is(recvErr, context.Canceled) || 
errors.Is(recvErr, context.DeadlineExceeded) {
+                               c.logger.Debug().Err(recvErr).Msg("Registration 
stream closed")
+                               return
+                       }
+                       if st, ok := status.FromError(recvErr); ok {
+                               // Check if it's a gRPC status error with 
expected codes
+                               code := st.Code()
+                               if code == codes.Canceled || code == 
codes.DeadlineExceeded {
+                                       
c.logger.Debug().Err(recvErr).Msg("Registration stream closed")
+                                       return
+                               }
+                       }
+                       c.logger.Error().Err(recvErr).Msg("Error receiving from 
registration stream, reconnecting...")
+                       go c.reconnect(ctx)
+                       return
+               }
+       }
+}
+
+// handleMetricsStream handles the metrics stream.
+func (c *Client) handleMetricsStream(ctx context.Context, stream 
fodcv1.FODCService_StreamMetricsClient) {
+       for {
+               select {
+               case <-ctx.Done():
+                       return
+               case <-c.stopCh:
+                       return
+               default:
+               }
+
+               resp, recvErr := stream.Recv()
+               if errors.Is(recvErr, io.EOF) {
+                       c.logger.Warn().Msg("Metrics stream closed by Proxy, 
reconnecting...")
+                       go c.reconnect(ctx)
+                       return
+               }
+               if recvErr != nil {
+                       // Check if it's a context cancellation or deadline 
exceeded (expected errors during cleanup)
+                       if errors.Is(recvErr, context.Canceled) || 
errors.Is(recvErr, context.DeadlineExceeded) {
+                               c.logger.Debug().Err(recvErr).Msg("Metrics 
stream closed")
+                               return
+                       }
+                       if st, ok := status.FromError(recvErr); ok {
+                               // Check if it's a gRPC status error with 
expected codes
+                               code := st.Code()
+                               if code == codes.Canceled || code == 
codes.DeadlineExceeded {
+                                       
c.logger.Debug().Err(recvErr).Msg("Metrics stream closed")
+                                       return
+                               }
+                       }
+                       c.logger.Error().Err(recvErr).Msg("Error receiving from 
metrics stream, reconnecting...")
+                       go c.reconnect(ctx)
+                       return
+               }
+
+               filter := &MetricsRequestFilter{}
+               if resp.StartTime != nil {
+                       startTime := resp.StartTime.AsTime()
+                       filter.StartTime = &startTime
+               }
+               if resp.EndTime != nil {
+                       endTime := resp.EndTime.AsTime()
+                       filter.EndTime = &endTime
+               }
+
+               if retrieveErr := c.RetrieveAndSendMetrics(ctx, filter); 
retrieveErr != nil {
+                       c.logger.Error().Err(retrieveErr).Msg("Failed to 
retrieve and send metrics")
+               }
+       }
+}
+
+// reconnect handles automatic reconnection when streams break.
+func (c *Client) reconnect(ctx context.Context) {
+       c.streamsMu.Lock()
+       if c.disconnected {
+               c.streamsMu.Unlock()
+               c.logger.Warn().Msg("Already disconnected intentionally, 
skipping reconnection...")
+               return
+       }
+       originalClient := c.client
+
+       c.logger.Info().Msg("Starting reconnection process...")
+
+       if c.heartbeatTicker != nil {
+               c.heartbeatTicker.Stop()

Review Comment:
   reconnect() stops the ticker but doesn't wait for the heartbeat goroutine to 
finish. StartRegistrationStream() then calls startHeartbeat(), which can start 
a new goroutine while the old one is still running. Multiple heartbeat 
goroutines can run simultaneously, causing duplicate heartbeats and leaks.
   



##########
fodc/proxy/internal/grpc/service.go:
##########
@@ -0,0 +1,350 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "io"
+       "strings"
+       "sync"
+       "time"
+
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/metadata"
+       "google.golang.org/grpc/peer"
+       "google.golang.org/grpc/status"
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       fodcv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/fodc/v1"
+       "github.com/apache/skywalking-banyandb/fodc/proxy/internal/metrics"
+       "github.com/apache/skywalking-banyandb/fodc/proxy/internal/registry"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// agentConnection represents a connection to an agent.
+type agentConnection struct {
+       metricsStream fodcv1.FODCService_StreamMetricsServer
+       lastActivity  time.Time
+       agentID       string
+       mu            sync.RWMutex
+}
+
+// updateActivity updates the last activity time.
+func (ac *agentConnection) updateActivity() {
+       ac.mu.Lock()
+       defer ac.mu.Unlock()
+       ac.lastActivity = time.Now()
+}
+
+// getLastActivity returns the last activity time.
+func (ac *agentConnection) getLastActivity() time.Time {
+       ac.mu.RLock()
+       defer ac.mu.RUnlock()
+       return ac.lastActivity
+}
+
+// FODCService implements the FODC gRPC service.
+type FODCService struct {
+       fodcv1.UnimplementedFODCServiceServer
+       registry          *registry.AgentRegistry
+       metricsAggregator *metrics.Aggregator
+       logger            *logger.Logger
+       connections       map[string]*agentConnection
+       connectionsMu     sync.RWMutex
+       heartbeatInterval time.Duration
+}
+
+// NewFODCService creates a new FODCService instance.
+func NewFODCService(registry *registry.AgentRegistry, metricsAggregator 
*metrics.Aggregator, logger *logger.Logger, heartbeatInterval time.Duration) 
*FODCService {
+       return &FODCService{
+               registry:          registry,
+               metricsAggregator: metricsAggregator,
+               logger:            logger,
+               connections:       make(map[string]*agentConnection),
+               heartbeatInterval: heartbeatInterval,
+       }
+}
+
+// RegisterAgent handles bi-directional agent registration stream.
+func (s *FODCService) RegisterAgent(stream 
fodcv1.FODCService_RegisterAgentServer) error {
+       ctx, cancel := context.WithCancel(stream.Context())
+       defer cancel()
+
+       var agentID string
+       var agentConn *agentConnection
+       initialRegistration := true
+       defer func() {
+               s.cleanupConnection(agentID)
+       }()
+
+       for {
+               req, recvErr := stream.Recv()
+               if errors.Is(recvErr, io.EOF) {
+                       s.logger.Debug().Str("agent_id", 
agentID).Msg("Registration stream closed by agent")
+                       break
+               }
+               if recvErr != nil {
+                       s.logger.Error().Err(recvErr).Str("agent_id", 
agentID).Msg("Error receiving registration request")
+                       return recvErr
+               }
+
+               if initialRegistration {
+                       identity := registry.AgentIdentity{
+                               IP:     req.PrimaryAddress.Ip,
+                               Port:   int(req.PrimaryAddress.Port),
+                               Role:   req.NodeRole,
+                               Labels: req.Labels,
+                       }
+
+                       primaryAddr := registry.Address{
+                               IP:   req.PrimaryAddress.Ip,
+                               Port: int(req.PrimaryAddress.Port),
+                       }
+
+                       registeredAgentID, registerErr := 
s.registry.RegisterAgent(ctx, identity, primaryAddr)
+                       if registerErr != nil {
+                               resp := &fodcv1.RegisterAgentResponse{
+                                       Success: false,
+                                       Message: registerErr.Error(),
+                               }
+                               if sendErr := stream.Send(resp); sendErr != nil 
{
+                                       
s.logger.Error().Err(sendErr).Msg("Failed to send registration error response")
+                               }
+                               return registerErr
+                       }
+
+                       agentID = registeredAgentID
+
+                       agentConn = &agentConnection{
+                               agentID:      agentID,
+                               lastActivity: time.Now(),
+                       }
+
+                       s.connectionsMu.Lock()
+                       s.connections[agentID] = agentConn
+                       s.connectionsMu.Unlock()
+
+                       resp := &fodcv1.RegisterAgentResponse{
+                               Success:                  true,
+                               Message:                  "Agent registered 
successfully",
+                               HeartbeatIntervalSeconds: 
int64(s.heartbeatInterval.Seconds()),
+                               AgentId:                  agentID,
+                       }
+
+                       if sendErr := stream.Send(resp); sendErr != nil {
+                               s.logger.Error().Err(sendErr).Str("agent_id", 
agentID).Msg("Failed to send registration response")
+                               // Unregister agent since we couldn't send 
confirmation
+                               if unregisterErr := 
s.registry.UnregisterAgent(agentID); unregisterErr != nil {
+                                       
s.logger.Error().Err(unregisterErr).Str("agent_id", agentID).Msg("Failed to 
unregister agent after send error")
+                               }
+                               return sendErr
+                       }
+
+                       initialRegistration = false
+                       s.logger.Info().
+                               Str("agent_id", agentID).
+                               Str("ip", identity.IP).
+                               Int("port", identity.Port).
+                               Str("role", identity.Role).
+                               Msg("Agent registration stream established")
+               } else {
+                       if updateErr := s.registry.UpdateHeartbeat(agentID); 
updateErr != nil {
+                               s.logger.Error().Err(updateErr).Str("agent_id", 
agentID).Msg("Failed to update heartbeat")
+                               return updateErr
+                       }
+
+                       if agentConn != nil {
+                               agentConn.updateActivity()
+                       }
+               }
+       }
+
+       return nil
+}
+
+// StreamMetrics handles bi-directional metrics streaming.
+func (s *FODCService) StreamMetrics(stream 
fodcv1.FODCService_StreamMetricsServer) error {
+       ctx := stream.Context()
+
+       agentID := s.getAgentIDFromContext(ctx)
+       if agentID == "" {
+               agentID = s.getAgentIDFromPeer(ctx)
+               if agentID != "" {
+                       s.logger.Warn().
+                               Str("agent_id", agentID).
+                               Msg("Agent ID not found in metadata, using peer 
address fallback (this may be unreliable)")
+               }
+       }
+
+       if agentID == "" {
+               s.logger.Error().Msg("Agent ID not found in context metadata or 
peer address")
+               return status.Errorf(codes.Unauthenticated, "agent ID not found 
in context or peer address")
+       }
+
+       defer s.cleanupConnection(agentID)
+
+       s.connectionsMu.Lock()
+       existingConn, exists := s.connections[agentID]
+       if exists {
+               existingConn.metricsStream = stream
+               existingConn.updateActivity()
+       } else {
+               agentConn := &agentConnection{
+                       agentID:       agentID,
+                       metricsStream: stream,
+                       lastActivity:  time.Now(),
+               }
+               s.connections[agentID] = agentConn
+       }
+       s.connectionsMu.Unlock()
+
+       for {
+               req, recvErr := stream.Recv()
+               if errors.Is(recvErr, io.EOF) {
+                       s.logger.Debug().Str("agent_id", agentID).Msg("Metrics 
stream closed by agent")
+                       return nil
+               }
+               if recvErr != nil {
+                       if errors.Is(recvErr, context.Canceled) || 
errors.Is(recvErr, context.DeadlineExceeded) {
+                               s.logger.Debug().Err(recvErr).Str("agent_id", 
agentID).Msg("Metrics stream closed")
+                       } else if st, ok := status.FromError(recvErr); ok {
+                               code := st.Code()
+                               if code == codes.Canceled || code == 
codes.DeadlineExceeded {
+                                       
s.logger.Debug().Err(recvErr).Str("agent_id", agentID).Msg("Metrics stream 
closed")
+                               } else {
+                                       
s.logger.Error().Err(recvErr).Str("agent_id", agentID).Msg("Error receiving 
metrics")
+                               }
+                       } else {
+                               s.logger.Error().Err(recvErr).Str("agent_id", 
agentID).Msg("Error receiving metrics")
+                       }
+                       return recvErr
+               }
+
+               s.connectionsMu.RLock()
+               conn, connExists := s.connections[agentID]
+               s.connectionsMu.RUnlock()
+               if connExists {
+                       conn.updateActivity()
+               }
+
+               agentInfo, getErr := s.registry.GetAgentByID(agentID)
+               if getErr != nil {
+                       s.logger.Error().Err(getErr).Str("agent_id", 
agentID).Msg("Failed to get agent info")
+                       continue
+               }
+
+               if processErr := 
s.metricsAggregator.ProcessMetricsFromAgent(ctx, agentID, agentInfo, req); 
processErr != nil {
+                       s.logger.Error().Err(processErr).Str("agent_id", 
agentID).Msg("Failed to process metrics")
+               }
+       }
+}
+
+// RequestMetrics requests metrics from an agent via the metrics stream.
+func (s *FODCService) RequestMetrics(_ context.Context, agentID string, 
startTime, endTime *time.Time) error {
+       s.connectionsMu.RLock()
+       defer s.connectionsMu.RUnlock()
+       agentConn, exists := s.connections[agentID]
+
+       if !exists {
+               return fmt.Errorf("agent connection not found for agent ID: 
%s", agentID)
+       }
+
+       if agentConn.metricsStream == nil {
+               return fmt.Errorf("metrics stream not established for agent ID: 
%s", agentID)
+       }
+
+       resp := &fodcv1.StreamMetricsResponse{}
+       if startTime != nil {
+               resp.StartTime = &timestamppb.Timestamp{
+                       Seconds: startTime.Unix(),
+                       Nanos:   int32(startTime.Nanosecond()),
+               }
+       }
+       if endTime != nil {
+               resp.EndTime = &timestamppb.Timestamp{
+                       Seconds: endTime.Unix(),
+                       Nanos:   int32(endTime.Nanosecond()),
+               }
+       }
+
+       if sendErr := agentConn.metricsStream.Send(resp); sendErr != nil {

Review Comment:
   agentConn.metricsStream is read under a read lock, then Send() is called 
after the lock is released. The stream can be replaced or closed by 
StreamMetrics concurrently, causing a panic or sending to a closed stream.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to