hanahmily commented on code in PR #918:
URL: 
https://github.com/apache/skywalking-banyandb/pull/918#discussion_r2659663658


##########
fodc/agent/internal/proxy/conn_manager.go:
##########
@@ -0,0 +1,428 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package proxy provides connection management for the FODC Proxy client.
+package proxy
+
+import (
+       "context"
+       "fmt"
+       "sync"
+       "time"
+
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+const (
+       connManagerMaxRetryInterval = 30 * time.Second
+)
+
+// ConnEventType represents the type of connection event.
+type ConnEventType int
+
+// Possible connection event types.
+const (
+       ConnEventConnect ConnEventType = iota
+       ConnEventReconnect
+       ConnEventDisconnect
+)
+
+// ConnEvent represents a connection event sent to the manager.
+type ConnEvent struct {
+       ResultCh  chan<- ConnResult
+       Context   context.Context
+       Type      ConnEventType
+       Immediate bool // If true, skip backoff and retry immediately
+}
+
+// ConnResult represents the result of a connection operation.
+type ConnResult struct {
+       Conn  *grpc.ClientConn
+       Error error
+}
+
+// ConnState represents the state of the connection.
+type ConnState int
+
+const (
+       // ConnStateDisconnected indicates the connection is disconnected.
+       ConnStateDisconnected ConnState = iota
+       // ConnStateConnecting indicates a connection attempt is in progress.
+       ConnStateConnecting
+       // ConnStateConnected indicates the connection is established.
+       ConnStateConnected
+       // ConnStateReconnecting indicates a reconnection attempt is in 
progress.
+       ConnStateReconnecting
+)
+
+// ConnManager manages connection lifecycle using a single goroutine.
+type ConnManager struct {
+       logger            *logger.Logger
+       eventCh           chan ConnEvent
+       stopCh            chan struct{}
+       doneCh            chan struct{} // Signals when run() goroutine has 
exited

Review Comment:
   Replace them with "run.Closer," which offers greater agility.



##########
fodc/agent/internal/proxy/conn_manager.go:
##########
@@ -0,0 +1,428 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package proxy provides connection management for the FODC Proxy client.
+package proxy
+
+import (
+       "context"
+       "fmt"
+       "sync"
+       "time"
+
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+const (
+       connManagerMaxRetryInterval = 30 * time.Second
+)
+
+// ConnEventType represents the type of connection event.
+type ConnEventType int
+
+// Possible connection event types.
+const (
+       ConnEventConnect ConnEventType = iota
+       ConnEventReconnect
+       ConnEventDisconnect
+)
+
+// ConnEvent represents a connection event sent to the manager.
+type ConnEvent struct {
+       ResultCh  chan<- ConnResult
+       Context   context.Context
+       Type      ConnEventType
+       Immediate bool // If true, skip backoff and retry immediately
+}
+
+// ConnResult represents the result of a connection operation.
+type ConnResult struct {
+       Conn  *grpc.ClientConn
+       Error error
+}
+
+// ConnState represents the state of the connection.
+type ConnState int
+
+const (
+       // ConnStateDisconnected indicates the connection is disconnected.
+       ConnStateDisconnected ConnState = iota
+       // ConnStateConnecting indicates a connection attempt is in progress.
+       ConnStateConnecting
+       // ConnStateConnected indicates the connection is established.
+       ConnStateConnected
+       // ConnStateReconnecting indicates a reconnection attempt is in 
progress.
+       ConnStateReconnecting
+)
+
+// ConnManager manages connection lifecycle using a single goroutine.
+type ConnManager struct {
+       logger            *logger.Logger
+       eventCh           chan ConnEvent
+       stopCh            chan struct{}
+       doneCh            chan struct{} // Signals when run() goroutine has 
exited
+       currentConn       *grpc.ClientConn
+       proxyAddr         string
+       reconnectInterval time.Duration

Review Comment:
   It's redundant.



##########
fodc/agent/internal/proxy/conn_manager.go:
##########
@@ -0,0 +1,428 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package proxy provides connection management for the FODC Proxy client.
+package proxy
+
+import (
+       "context"
+       "fmt"
+       "sync"
+       "time"
+
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+const (
+       connManagerMaxRetryInterval = 30 * time.Second
+)
+
+// ConnEventType represents the type of connection event.
+type ConnEventType int
+
+// Possible connection event types.
+const (
+       ConnEventConnect ConnEventType = iota
+       ConnEventReconnect
+       ConnEventDisconnect
+)
+
+// ConnEvent represents a connection event sent to the manager.
+type ConnEvent struct {
+       ResultCh  chan<- ConnResult
+       Context   context.Context
+       Type      ConnEventType
+       Immediate bool // If true, skip backoff and retry immediately
+}
+
+// ConnResult represents the result of a connection operation.
+type ConnResult struct {
+       Conn  *grpc.ClientConn
+       Error error
+}
+
+// ConnState represents the state of the connection.
+type ConnState int
+
+const (
+       // ConnStateDisconnected indicates the connection is disconnected.
+       ConnStateDisconnected ConnState = iota
+       // ConnStateConnecting indicates a connection attempt is in progress.
+       ConnStateConnecting
+       // ConnStateConnected indicates the connection is established.
+       ConnStateConnected
+       // ConnStateReconnecting indicates a reconnection attempt is in 
progress.
+       ConnStateReconnecting
+)
+
+// ConnManager manages connection lifecycle using a single goroutine.
+type ConnManager struct {
+       logger            *logger.Logger
+       eventCh           chan ConnEvent
+       stopCh            chan struct{}
+       doneCh            chan struct{} // Signals when run() goroutine has 
exited
+       currentConn       *grpc.ClientConn
+       proxyAddr         string
+       reconnectInterval time.Duration
+       retryInterval     time.Duration
+
+       stateMu      sync.RWMutex // Protects state, disconnected, currentConn, 
and retryInterval
+       state        ConnState
+       disconnected bool
+       started      bool
+       startedMu    sync.Mutex // Protects started flag
+}
+
+// NewConnManager creates a new connection manager.
+func NewConnManager(
+       proxyAddr string,
+       reconnectInterval time.Duration,
+       logger *logger.Logger,
+) *ConnManager {
+       return &ConnManager{
+               eventCh:           make(chan ConnEvent, 10),
+               stopCh:            make(chan struct{}),
+               doneCh:            make(chan struct{}),
+               logger:            logger,
+               proxyAddr:         proxyAddr,
+               reconnectInterval: reconnectInterval,
+               state:             ConnStateDisconnected,
+               retryInterval:     reconnectInterval,
+       }
+}
+
+// EventChannel returns the channel for sending connect/reconnect events.
+func (cm *ConnManager) EventChannel() chan<- ConnEvent {
+       return cm.eventCh
+}
+
+// Start starts the connection manager's event processing goroutine.
+func (cm *ConnManager) Start(ctx context.Context) {
+       cm.startedMu.Lock()
+       defer cm.startedMu.Unlock()
+
+       if cm.started {
+               return // Already started
+       }
+
+       cm.started = true

Review Comment:
   Use the sync.Once to replace "start"



##########
fodc/agent/internal/proxy/conn_manager.go:
##########
@@ -0,0 +1,428 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package proxy provides connection management for the FODC Proxy client.
+package proxy
+
+import (
+       "context"
+       "fmt"
+       "sync"
+       "time"
+
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+const (
+       connManagerMaxRetryInterval = 30 * time.Second
+)
+
+// ConnEventType represents the type of connection event.
+type ConnEventType int
+
+// Possible connection event types.
+const (
+       ConnEventConnect ConnEventType = iota
+       ConnEventReconnect
+       ConnEventDisconnect
+)
+
+// ConnEvent represents a connection event sent to the manager.
+type ConnEvent struct {
+       ResultCh  chan<- ConnResult
+       Context   context.Context
+       Type      ConnEventType
+       Immediate bool // If true, skip backoff and retry immediately
+}
+
+// ConnResult represents the result of a connection operation.
+type ConnResult struct {
+       Conn  *grpc.ClientConn
+       Error error
+}
+
+// ConnState represents the state of the connection.
+type ConnState int
+
+const (
+       // ConnStateDisconnected indicates the connection is disconnected.
+       ConnStateDisconnected ConnState = iota
+       // ConnStateConnecting indicates a connection attempt is in progress.
+       ConnStateConnecting
+       // ConnStateConnected indicates the connection is established.
+       ConnStateConnected
+       // ConnStateReconnecting indicates a reconnection attempt is in 
progress.
+       ConnStateReconnecting
+)
+
+// ConnManager manages connection lifecycle using a single goroutine.
+type ConnManager struct {
+       logger            *logger.Logger
+       eventCh           chan ConnEvent
+       stopCh            chan struct{}
+       doneCh            chan struct{} // Signals when run() goroutine has 
exited
+       currentConn       *grpc.ClientConn
+       proxyAddr         string
+       reconnectInterval time.Duration
+       retryInterval     time.Duration
+
+       stateMu      sync.RWMutex // Protects state, disconnected, currentConn, 
and retryInterval
+       state        ConnState
+       disconnected bool
+       started      bool
+       startedMu    sync.Mutex // Protects started flag
+}
+
+// NewConnManager creates a new connection manager.
+func NewConnManager(
+       proxyAddr string,
+       reconnectInterval time.Duration,
+       logger *logger.Logger,
+) *ConnManager {
+       return &ConnManager{
+               eventCh:           make(chan ConnEvent, 10),
+               stopCh:            make(chan struct{}),
+               doneCh:            make(chan struct{}),
+               logger:            logger,
+               proxyAddr:         proxyAddr,
+               reconnectInterval: reconnectInterval,
+               state:             ConnStateDisconnected,
+               retryInterval:     reconnectInterval,
+       }
+}
+
+// EventChannel returns the channel for sending connect/reconnect events.
+func (cm *ConnManager) EventChannel() chan<- ConnEvent {
+       return cm.eventCh
+}
+
+// Start starts the connection manager's event processing goroutine.
+func (cm *ConnManager) Start(ctx context.Context) {
+       cm.startedMu.Lock()
+       defer cm.startedMu.Unlock()
+
+       if cm.started {
+               return // Already started
+       }
+
+       cm.started = true
+       go cm.run(ctx)
+}
+
+// Stop stops the connection manager and closes all connections.
+func (cm *ConnManager) Stop() {
+       cm.startedMu.Lock()
+       defer cm.startedMu.Unlock()
+
+       if !cm.started {
+               return // Already stopped
+       }
+
+       // Signal the run goroutine to stop
+       close(cm.stopCh)
+
+       // Wait for goroutine to exit and cleanup
+       <-cm.doneCh
+
+       cm.started = false
+       // Recreate channels for next Start()
+       cm.eventCh = make(chan ConnEvent, 10)
+       cm.stopCh = make(chan struct{})
+       cm.doneCh = make(chan struct{})
+}
+
+// RequestConnect requests a connection attempt.
+func (cm *ConnManager) RequestConnect(ctx context.Context) <-chan ConnResult {
+       resultCh := make(chan ConnResult, 1)
+       event := ConnEvent{
+               Type:      ConnEventConnect,
+               Context:   ctx,
+               ResultCh:  resultCh,
+               Immediate: true,
+       }
+       select {
+       case cm.eventCh <- event:
+       case <-ctx.Done():
+               resultCh <- ConnResult{Error: ctx.Err()}
+               close(resultCh)
+       default:
+               resultCh <- ConnResult{Error: fmt.Errorf("connection manager 
event channel is full")}
+               close(resultCh)
+       }
+       return resultCh
+}
+
+// RequestReconnect requests a reconnection attempt.
+func (cm *ConnManager) RequestReconnect(ctx context.Context) <-chan ConnResult 
{
+       resultCh := make(chan ConnResult, 1)
+       event := ConnEvent{
+               Type:      ConnEventReconnect,
+               Context:   ctx,
+               ResultCh:  resultCh,
+               Immediate: false,
+       }
+       select {
+       case cm.eventCh <- event:
+       case <-ctx.Done():
+               resultCh <- ConnResult{Error: ctx.Err()}
+               close(resultCh)
+       default:
+               resultCh <- ConnResult{Error: fmt.Errorf("connection manager 
event channel is full")}
+               close(resultCh)
+       }
+       return resultCh
+}
+
+// run is the main event processing loop running in a single goroutine.
+func (cm *ConnManager) run(ctx context.Context) {
+       defer close(cm.doneCh)
+       for {
+               select {
+               case <-ctx.Done():
+                       cm.cleanup()
+                       return
+               case <-cm.stopCh:
+                       cm.cleanup()
+                       return
+               case event := <-cm.eventCh:
+                       cm.handleEvent(ctx, event)
+               }
+       }
+}
+
+// handleEvent runs in the single goroutine and processes a connection event.
+func (cm *ConnManager) handleEvent(ctx context.Context, event ConnEvent) {
+       switch event.Type {
+       case ConnEventDisconnect:
+               cm.stateMu.Lock()
+               cm.disconnected = true
+               cm.state = ConnStateDisconnected
+               cm.stateMu.Unlock()
+
+               cm.cleanupConnection()
+               if event.ResultCh != nil {
+                       event.ResultCh <- ConnResult{Error: nil}
+                       close(event.ResultCh)
+               }
+       case ConnEventConnect:
+               cm.stateMu.RLock()
+               currentState := cm.state
+               currentConn := cm.currentConn
+               cm.stateMu.RUnlock()
+
+               if currentState == ConnStateConnected && currentConn != nil {
+                       if event.ResultCh != nil {
+                               event.ResultCh <- ConnResult{
+                                       Conn: currentConn,
+                               }
+                               close(event.ResultCh)
+                       }
+                       return
+               }
+               if currentState == ConnStateConnecting {
+                       if event.ResultCh != nil {
+                               event.ResultCh <- ConnResult{
+                                       Error: fmt.Errorf("connection already 
in progress"),
+                               }
+                               close(event.ResultCh)
+                       }
+                       return
+               }
+
+               cm.stateMu.Lock()
+               cm.state = ConnStateConnecting
+               cm.stateMu.Unlock()
+
+               connCtx, connCancel := context.WithCancel(ctx)
+               defer connCancel()
+               // If event.Context is provided and not done, cancel connCtx 
when event.Context is done
+               if event.Context != nil {
+                       go func() {
+                               select {
+                               case <-event.Context.Done():
+                                       connCancel()
+                               case <-connCtx.Done():
+                               }
+                       }()
+               }
+               result := cm.doConnect(connCtx)
+
+               cm.stateMu.Lock()
+               if result.Error == nil {
+                       cm.state = ConnStateConnected
+                       cm.currentConn = result.Conn
+                       cm.retryInterval = cm.reconnectInterval
+               } else {
+                       cm.state = ConnStateDisconnected
+               }
+               cm.stateMu.Unlock()
+
+               if event.ResultCh != nil {
+                       event.ResultCh <- result
+                       close(event.ResultCh)
+               }
+       case ConnEventReconnect:
+               cm.stateMu.RLock()
+               disconnected := cm.disconnected
+               currentState := cm.state
+               cm.stateMu.RUnlock()
+
+               if disconnected {
+                       if event.ResultCh != nil {
+                               event.ResultCh <- ConnResult{
+                                       Error: fmt.Errorf("connection manager 
is disconnected"),
+                               }
+                               close(event.ResultCh)
+                       }
+                       return
+               }
+               if currentState == ConnStateReconnecting {
+                       if event.ResultCh != nil {
+                               event.ResultCh <- ConnResult{
+                                       Error: fmt.Errorf("reconnection already 
in progress"),
+                               }
+                               close(event.ResultCh)
+                       }
+                       return
+               }
+
+               cm.stateMu.Lock()
+               cm.state = ConnStateReconnecting
+               cm.stateMu.Unlock()
+
+               cm.cleanupConnection()
+
+               reconnCtx, reconnCancel := context.WithCancel(ctx)
+               defer reconnCancel()
+               // If event.Context is provided and not done, cancel reconnCtx 
when event.Context is done
+               if event.Context != nil {
+                       go func() {
+                               select {
+                               case <-event.Context.Done():
+                                       reconnCancel()
+                               case <-reconnCtx.Done():
+                               }
+                       }()
+               }
+               result := cm.doReconnect(reconnCtx, event.Immediate)
+
+               cm.stateMu.Lock()
+               if result.Error == nil {
+                       cm.state = ConnStateConnected
+                       cm.currentConn = result.Conn
+                       cm.retryInterval = cm.reconnectInterval
+               } else {
+                       cm.state = ConnStateDisconnected
+               }
+               cm.stateMu.Unlock()
+
+               if event.ResultCh != nil {
+                       event.ResultCh <- result
+                       close(event.ResultCh)
+               }
+       }
+}
+
+// doConnect performs the actual connection attempt.
+func (cm *ConnManager) doConnect(ctx context.Context) ConnResult {
+       select {
+       case <-ctx.Done():
+               return ConnResult{Error: ctx.Err()}
+       default:
+       }
+
+       conn, dialErr := grpc.NewClient(cm.proxyAddr, 
grpc.WithTransportCredentials(insecure.NewCredentials()))
+       if dialErr != nil {
+               cm.logger.Error().Err(dialErr).Str("proxy_addr", 
cm.proxyAddr).Msg("Failed to create proxy client")
+               return ConnResult{
+                       Error: fmt.Errorf("failed to create proxy client: %w", 
dialErr),
+               }
+       }
+       cm.logger.Info().Str("proxy_addr", cm.proxyAddr).Msg("Connected to FODC 
Proxy")
+       return ConnResult{
+               Conn: conn,
+       }
+}
+
+// doReconnect performs reconnection with exponential backoff.
+func (cm *ConnManager) doReconnect(ctx context.Context, immediate bool) 
ConnResult {
+       var retryInterval time.Duration
+       if !immediate {
+               cm.stateMu.RLock()
+               retryInterval = cm.retryInterval
+               cm.stateMu.RUnlock()
+
+               select {
+               case <-ctx.Done():
+                       return ConnResult{Error: ctx.Err()}
+               case <-cm.stopCh:
+                       return ConnResult{Error: fmt.Errorf("connection manager 
stopped")}
+               case <-time.After(retryInterval):

Review Comment:
   I didn't find the retry logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to