This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 6e926242 bugfix(fodc): fix registration or metrics stream startup 
fails to cause connection stop (#938)
6e926242 is described below

commit 6e926242c5bd28cf00a95034f75818c8850d1045
Author: Fine0830 <[email protected]>
AuthorDate: Tue Jan 13 23:32:41 2026 +0800

    bugfix(fodc): fix registration or metrics stream startup fails to cause 
connection stop (#938)
---
 fodc/agent/internal/proxy/client.go       | 59 +++++++++++++++++++++++++------
 fodc/agent/internal/proxy/conn_manager.go |  2 +-
 2 files changed, 50 insertions(+), 11 deletions(-)

diff --git a/fodc/agent/internal/proxy/client.go 
b/fodc/agent/internal/proxy/client.go
index 7ca4ee0e..4c2f9664 100644
--- a/fodc/agent/internal/proxy/client.go
+++ b/fodc/agent/internal/proxy/client.go
@@ -442,6 +442,46 @@ func (c *Client) SendHeartbeat(_ context.Context) error {
        return nil
 }
 
+// cleanupStreams cleans up streams without stopping the connection manager.
+func (c *Client) cleanupStreams() {
+       c.streamsMu.Lock()
+       if c.heartbeatTicker != nil {
+               c.heartbeatTicker.Stop()
+               c.heartbeatTicker = nil
+       }
+
+       oldStopCh := c.stopCh
+       c.stopCh = make(chan struct{})
+       c.streamsMu.Unlock()
+
+       if oldStopCh != nil {
+               select {
+               case <-oldStopCh:
+               default:
+                       close(oldStopCh)
+               }
+       }
+
+       c.heartbeatWg.Wait()
+
+       c.streamsMu.Lock()
+       if c.registrationStream != nil {
+               if closeErr := c.registrationStream.CloseSend(); closeErr != 
nil {
+                       c.logger.Warn().Err(closeErr).Msg("Error closing 
registration stream")
+               }
+               c.registrationStream = nil
+       }
+
+       if c.metricsStream != nil {
+               if closeErr := c.metricsStream.CloseSend(); closeErr != nil {
+                       c.logger.Warn().Err(closeErr).Msg("Error closing 
metrics stream")
+               }
+               c.metricsStream = nil
+       }
+       c.client = nil
+       c.streamsMu.Unlock()
+}
+
 // Disconnect closes connection to Proxy.
 func (c *Client) Disconnect() error {
        c.streamsMu.Lock()
@@ -493,11 +533,11 @@ func (c *Client) Disconnect() error {
 func (c *Client) Start(ctx context.Context) error {
        c.connManager.start(ctx)
 
-       c.streamsMu.RLock()
-       stopCh := c.stopCh
-       c.streamsMu.RUnlock()
-
        for {
+               c.streamsMu.RLock()
+               stopCh := c.stopCh
+               c.streamsMu.RUnlock()
+
                select {
                case <-ctx.Done():
                        return ctx.Err()
@@ -514,18 +554,14 @@ func (c *Client) Start(ctx context.Context) error {
 
                if regErr := c.StartRegistrationStream(ctx); regErr != nil {
                        c.logger.Error().Err(regErr).Msg("Failed to start 
registration stream, reconnecting...")
-                       if disconnectErr := c.Disconnect(); disconnectErr != 
nil {
-                               c.logger.Warn().Err(disconnectErr).Msg("Failed 
to disconnect before retry")
-                       }
+                       c.cleanupStreams()
                        time.Sleep(c.reconnectInterval)
                        continue
                }
 
                if metricsErr := c.StartMetricsStream(ctx); metricsErr != nil {
                        c.logger.Error().Err(metricsErr).Msg("Failed to start 
metrics stream, reconnecting...")
-                       if disconnectErr := c.Disconnect(); disconnectErr != 
nil {
-                               c.logger.Warn().Err(disconnectErr).Msg("Failed 
to disconnect before retry")
-                       }
+                       c.cleanupStreams()
                        time.Sleep(c.reconnectInterval)
                        continue
                }
@@ -682,6 +718,9 @@ func (c *Client) reconnect(ctx context.Context) {
 
        if connResult.err != nil {
                c.logger.Error().Err(connResult.err).Msg("Failed to reconnect 
to Proxy")
+               if disconnectErr := c.Disconnect(); disconnectErr != nil {
+                       c.logger.Warn().Err(disconnectErr).Msg("Failed to 
disconnect")
+               }
                return
        }
 
diff --git a/fodc/agent/internal/proxy/conn_manager.go 
b/fodc/agent/internal/proxy/conn_manager.go
index 5193f3b3..8fb58f30 100644
--- a/fodc/agent/internal/proxy/conn_manager.go
+++ b/fodc/agent/internal/proxy/conn_manager.go
@@ -32,7 +32,7 @@ import (
 )
 
 const (
-       connManagerMaxRetryInterval = 30 * time.Second
+       connManagerMaxRetryInterval = 60 * time.Second
        connManagerMaxRetries       = 3
 )
 

Reply via email to