This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 6e926242 bugfix(fodc): fix registration or metrics stream startup
fails to cause connection stop (#938)
6e926242 is described below
commit 6e926242c5bd28cf00a95034f75818c8850d1045
Author: Fine0830 <[email protected]>
AuthorDate: Tue Jan 13 23:32:41 2026 +0800
bugfix(fodc): fix registration or metrics stream startup fails to cause
connection stop (#938)
---
fodc/agent/internal/proxy/client.go | 59 +++++++++++++++++++++++++------
fodc/agent/internal/proxy/conn_manager.go | 2 +-
2 files changed, 50 insertions(+), 11 deletions(-)
diff --git a/fodc/agent/internal/proxy/client.go
b/fodc/agent/internal/proxy/client.go
index 7ca4ee0e..4c2f9664 100644
--- a/fodc/agent/internal/proxy/client.go
+++ b/fodc/agent/internal/proxy/client.go
@@ -442,6 +442,46 @@ func (c *Client) SendHeartbeat(_ context.Context) error {
return nil
}
+// cleanupStreams cleans up streams without stopping the connection manager.
+func (c *Client) cleanupStreams() {
+ c.streamsMu.Lock()
+ if c.heartbeatTicker != nil {
+ c.heartbeatTicker.Stop()
+ c.heartbeatTicker = nil
+ }
+
+ oldStopCh := c.stopCh
+ c.stopCh = make(chan struct{})
+ c.streamsMu.Unlock()
+
+ if oldStopCh != nil {
+ select {
+ case <-oldStopCh:
+ default:
+ close(oldStopCh)
+ }
+ }
+
+ c.heartbeatWg.Wait()
+
+ c.streamsMu.Lock()
+ if c.registrationStream != nil {
+ if closeErr := c.registrationStream.CloseSend(); closeErr !=
nil {
+ c.logger.Warn().Err(closeErr).Msg("Error closing
registration stream")
+ }
+ c.registrationStream = nil
+ }
+
+ if c.metricsStream != nil {
+ if closeErr := c.metricsStream.CloseSend(); closeErr != nil {
+ c.logger.Warn().Err(closeErr).Msg("Error closing
metrics stream")
+ }
+ c.metricsStream = nil
+ }
+ c.client = nil
+ c.streamsMu.Unlock()
+}
+
// Disconnect closes connection to Proxy.
func (c *Client) Disconnect() error {
c.streamsMu.Lock()
@@ -493,11 +533,11 @@ func (c *Client) Disconnect() error {
func (c *Client) Start(ctx context.Context) error {
c.connManager.start(ctx)
- c.streamsMu.RLock()
- stopCh := c.stopCh
- c.streamsMu.RUnlock()
-
for {
+ c.streamsMu.RLock()
+ stopCh := c.stopCh
+ c.streamsMu.RUnlock()
+
select {
case <-ctx.Done():
return ctx.Err()
@@ -514,18 +554,14 @@ func (c *Client) Start(ctx context.Context) error {
if regErr := c.StartRegistrationStream(ctx); regErr != nil {
c.logger.Error().Err(regErr).Msg("Failed to start
registration stream, reconnecting...")
- if disconnectErr := c.Disconnect(); disconnectErr !=
nil {
- c.logger.Warn().Err(disconnectErr).Msg("Failed
to disconnect before retry")
- }
+ c.cleanupStreams()
time.Sleep(c.reconnectInterval)
continue
}
if metricsErr := c.StartMetricsStream(ctx); metricsErr != nil {
c.logger.Error().Err(metricsErr).Msg("Failed to start
metrics stream, reconnecting...")
- if disconnectErr := c.Disconnect(); disconnectErr !=
nil {
- c.logger.Warn().Err(disconnectErr).Msg("Failed
to disconnect before retry")
- }
+ c.cleanupStreams()
time.Sleep(c.reconnectInterval)
continue
}
@@ -682,6 +718,9 @@ func (c *Client) reconnect(ctx context.Context) {
if connResult.err != nil {
c.logger.Error().Err(connResult.err).Msg("Failed to reconnect
to Proxy")
+ if disconnectErr := c.Disconnect(); disconnectErr != nil {
+ c.logger.Warn().Err(disconnectErr).Msg("Failed to
disconnect")
+ }
return
}
diff --git a/fodc/agent/internal/proxy/conn_manager.go
b/fodc/agent/internal/proxy/conn_manager.go
index 5193f3b3..8fb58f30 100644
--- a/fodc/agent/internal/proxy/conn_manager.go
+++ b/fodc/agent/internal/proxy/conn_manager.go
@@ -32,7 +32,7 @@ import (
)
const (
- connManagerMaxRetryInterval = 30 * time.Second
+ connManagerMaxRetryInterval = 60 * time.Second
connManagerMaxRetries = 3
)