This is an automated email from the ASF dual-hosted git repository. liuhongyu pushed a commit to branch fix/fix_transport in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git
commit d0c9a9cf636962b90d6366299e8e35eafc8a08d1 Author: liuhy <[email protected]> AuthorDate: Sun Sep 21 22:07:09 2025 +0800 fix: Improve logging and connection handling in NettyClient --- examples/main.go | 10 +- internal/transport/netty_client.go | 185 ++++++++++++++++++++++++------------- internal/transport/processors.go | 10 +- 3 files changed, 131 insertions(+), 74 deletions(-) diff --git a/examples/main.go b/examples/main.go index d408ece..662ed25 100644 --- a/examples/main.go +++ b/examples/main.go @@ -51,11 +51,11 @@ func main() { // Display configuration log.Info("=== Configuration ===") - log.Info("Collector Identity", map[string]interface{}{"identity": cfg.Collector.Identity}) - log.Info("Collector Mode", map[string]interface{}{"mode": cfg.Collector.Mode}) - log.Info("Manager Host", map[string]interface{}{"host": cfg.Collector.Manager.Host}) - log.Info("Manager Port", map[string]interface{}{"port": cfg.Collector.Manager.Port}) - log.Info("Manager Protocol", map[string]interface{}{"protocol": cfg.Collector.Manager.Protocol}) + log.Info("Collector Identity", "identity", cfg.Collector.Identity) + log.Info("Collector Mode", "mode", cfg.Collector.Mode) + log.Info("Manager Host", "host", cfg.Collector.Manager.Host) + log.Info("Manager Port", "port", cfg.Collector.Manager.Port) + log.Info("Manager Protocol", "protocol", cfg.Collector.Manager.Protocol) log.Info("====================") // Create transport runner from configuration diff --git a/internal/transport/netty_client.go b/internal/transport/netty_client.go index a545aa2..c96a61b 100644 --- a/internal/transport/netty_client.go +++ b/internal/transport/netty_client.go @@ -47,6 +47,7 @@ type NettyClient struct { cancel context.CancelFunc writer *bufio.Writer reader *bufio.Reader + gzipReader *gzip.Reader identity string } @@ -83,24 +84,39 @@ func (c *NettyClient) Start() error { _, cancel := context.WithCancel(context.Background()) c.cancel = cancel - // Connect to server - conn, err := net.Dial("tcp", c.addr) + // Connect to server with timeout + log.Printf("Attempting to connect to %s...", c.addr) + conn, err := net.DialTimeout("tcp", c.addr, 10*time.Second) if err != nil { + log.Printf("Connection failed: %v", err) c.triggerEvent(EventConnectFailed, err) return err } + + log.Printf("TCP connection established to %s", c.addr) c.conn = conn c.writer = bufio.NewWriter(conn) c.reader = bufio.NewReader(conn) + + log.Printf("Connection setup completed, not creating gzip reader yet (will create on first read)") + // Don't create gzip reader here - it will block waiting for data + // We'll create it when we actually need to read data + c.gzipReader = nil + c.started = true + log.Printf("NettyClient started successfully") + log.Printf("Triggering connected event...") + // Trigger connected event - this will cause transport layer to send GO_ONLINE message c.triggerEvent(EventConnected, nil) + log.Printf("Starting background tasks...") // Start background tasks + go c.readLoop() go c.heartbeatLoop() go c.connectionMonitor() - go c.readLoop() + log.Printf("All background tasks started") return nil } @@ -112,6 +128,11 @@ func (c *NettyClient) Shutdown() error { c.cancel() } + if c.gzipReader != nil { + _ = c.gzipReader.Close() + c.gzipReader = nil + } + if c.conn != nil { _ = c.conn.Close() } @@ -133,12 +154,15 @@ func (c *NettyClient) SetEventHandler(handler EventHandler) { } func (c *NettyClient) triggerEvent(eventType EventType, err error) { + log.Printf("Triggering event: %d, error: %v", eventType, err) if c.eventHandler != nil { c.eventHandler(Event{ Type: eventType, Address: c.addr, Error: err, }) + } else { + log.Printf("No event handler set") } } @@ -270,6 +294,8 @@ func (c *NettyClient) readLoop() { if err != nil { if !errors.Is(err, net.ErrClosed) { log.Printf("readLoop error: %v", err) + // Trigger disconnect event on read error + c.triggerEvent(EventDisconnected, nil) } break } @@ -280,44 +306,60 @@ func (c *NettyClient) readLoop() { } func (c *NettyClient) readMessage() (*pb.Message, error) { + // Create gzip reader on first read if not already created + if c.gzipReader == nil { + log.Printf("Creating gzip reader on first read...") + var err error + c.gzipReader, err = gzip.NewReader(c.reader) + if err != nil { + return nil, fmt.Errorf("failed to create gzip reader: %w", err) + } + log.Printf("Gzip reader created successfully on first read") + } + // Java Netty server sends GZIP compressed data that contains: // [length prefix + protobuf data] compressed with GZIP - // We need to read the GZIP stream and decompress it - - // Create a gzip reader directly from the buffered reader - // This will handle the GZIP stream boundaries automatically - gzipReader, err := gzip.NewReader(c.reader) - if err != nil { - return nil, fmt.Errorf("failed to create gzip reader: %w", err) - } - defer gzipReader.Close() + // We use the persistent gzip reader created during connection setup // Read the decompressed data (which contains length prefix + protobuf data) - innerData, err := io.ReadAll(gzipReader) - if err != nil { - return nil, fmt.Errorf("failed to decompress data: %w", err) - } + // We need to read one complete message from the gzip stream - // Parse the length prefix from the decompressed data - innerReader := bytes.NewReader(innerData) - length, err := binary.ReadUvarint(innerReader) - if err != nil { - return nil, fmt.Errorf("failed to read length prefix from decompressed data: %w", err) - } + // First, read the length prefix from gzip stream using a buffer + lengthBuf := make([]byte, binary.MaxVarintLen32) + var bytesRead int - // Read the protobuf data - protobufData := make([]byte, length) - if _, err := innerReader.Read(protobufData); err != nil { - return nil, fmt.Errorf("failed to read protobuf data: %w", err) - } + for { + oneByte := make([]byte, 1) + if _, err := c.gzipReader.Read(oneByte); err != nil { + return nil, fmt.Errorf("failed to read length prefix: %w", err) + } + lengthBuf[bytesRead] = oneByte[0] + bytesRead++ + + // Try to decode the length + length, n := binary.Uvarint(lengthBuf[:bytesRead]) + if n > 0 { + // Successfully decoded, read the protobuf data + protobufData := make([]byte, length) + if _, err := io.ReadFull(c.gzipReader, protobufData); err != nil { + return nil, fmt.Errorf("failed to read protobuf data: %w", err) + } - // Deserialize protobuf message - msg := &pb.Message{} - if err := proto.Unmarshal(protobufData, msg); err != nil { - return nil, fmt.Errorf("failed to unmarshal message: %w", err) - } + // Deserialize protobuf message + msg := &pb.Message{} + if err := proto.Unmarshal(protobufData, msg); err != nil { + return nil, fmt.Errorf("failed to unmarshal message: %w", err) + } - return msg, nil + return msg, nil + } else if n < 0 { + return nil, fmt.Errorf("invalid length encoding") + } + // n == 0 means we need more bytes + if bytesRead >= binary.MaxVarintLen32 { + return nil, fmt.Errorf("length prefix too long") + } + } } func (c *NettyClient) processReceivedMessage(msg *pb.Message) { @@ -341,12 +383,19 @@ func (c *NettyClient) processReceivedMessage(msg *pb.Message) { } if response != nil { - // Send the response back to server - if err := c.SendMsg(response); err != nil { - log.Printf("Failed to send response for message type %d: %v", msg.Type, err) + // Check if response is actually a valid protobuf message + if pbResponse, ok := response.(*pb.Message); ok && pbResponse != nil { + // Send the response back to server + if err := c.SendMsg(pbResponse); err != nil { + log.Printf("Failed to send response for message type %d: %v", msg.Type, err) + } else { + log.Printf("Successfully sent response for message type %d", msg.Type) + } } else { - log.Printf("Successfully sent response for message type %d", msg.Type) + log.Printf("Processor returned invalid response type for message type %d", msg.Type) } + } else { + log.Printf("Processor returned nil response for message type %d (this is normal for some message types)", msg.Type) } }() } else { @@ -358,29 +407,36 @@ func (c *NettyClient) processReceivedMessage(msg *pb.Message) { func (c *NettyClient) connectionMonitor() { for c.IsStarted() { - time.Sleep(5 * time.Second) - if c.conn != nil { - // Test connection by checking if it's still active - _, err := c.conn.Write([]byte{}) - if err != nil { - c.triggerEvent(EventDisconnected, nil) - log.Println("Connection lost, attempting to reconnect...") - _ = c.Shutdown() - - // Attempt to reconnect with backoff - for i := 0; i < 5 && c.IsStarted(); i++ { - backoff := time.Duration(i+1) * 2 * time.Second - log.Printf("Attempting to reconnect in %v...", backoff) - time.Sleep(backoff) - - if err := c.Start(); err == nil { - log.Println("Reconnected successfully") - break - } else { - log.Printf("Failed to reconnect: %v", err) - if i == 4 { // Last attempt - c.triggerEvent(EventConnectFailed, err) - } + time.Sleep(60 * time.Second) // Check every 60 seconds + + // Very conservative connection check - only verify the connection object exists + c.mu.RLock() + connExists := c.conn != nil + c.mu.RUnlock() + + if !connExists { + // Connection is nil, trigger disconnect event + c.triggerEvent(EventDisconnected, nil) + log.Println("Connection lost, attempting to reconnect...") + _ = c.Shutdown() + + // Attempt to reconnect with exponential backoff + for i := 0; i < 5; i++ { + if !c.IsStarted() { + break // Exit if shutdown was called + } + + backoff := time.Duration(i+1) * 2 * time.Second + log.Printf("Attempting to reconnect in %v...", backoff) + time.Sleep(backoff) + + if err := c.Start(); err == nil { + log.Println("Reconnected successfully") + break + } else { + log.Printf("Failed to reconnect: %v", err) + if i == 4 { // Last attempt + c.triggerEvent(EventConnectFailed, err) } } } @@ -389,6 +445,9 @@ func (c *NettyClient) connectionMonitor() { } func (c *NettyClient) heartbeatLoop() { + // Start heartbeat after 5 seconds, then every 5 seconds (matching Java version) + time.Sleep(5 * time.Second) + for c.IsStarted() { // Send heartbeat message with configured identity identity := c.GetIdentity() @@ -404,9 +463,11 @@ func (c *NettyClient) heartbeatLoop() { if err := c.SendMsg(heartbeat); err != nil { log.Printf("Failed to send heartbeat: %v", err) } else { - log.Printf("Heartbeat sent successfully for identity: %s", identity) + log.Printf("Heartbeat sent successfully for identity: %s, time: %d", identity, time.Now().UnixMilli()) } - time.Sleep(5 * time.Second) // Match Java version's 5-second interval + + // Wait 5 seconds before next heartbeat (matching Java version) + time.Sleep(5 * time.Second) } } diff --git a/internal/transport/processors.go b/internal/transport/processors.go index cceafc4..71120b0 100644 --- a/internal/transport/processors.go +++ b/internal/transport/processors.go @@ -46,13 +46,9 @@ type MessageProcessor interface { type HeartbeatProcessor struct{} func (p *HeartbeatProcessor) Process(msg *pb.Message) (*pb.Message, error) { - // Handle heartbeat message - return &pb.Message{ - Type: pb.MessageType_HEARTBEAT, - Direction: pb.Direction_RESPONSE, - Identity: msg.Identity, - Msg: []byte("heartbeat ack"), - }, nil + // Java version logs receipt of heartbeat response and returns null (no response needed) + // This matches: log.info("collector receive manager server response heartbeat, time: {}. ", System.currentTimeMillis()); + return nil, nil } // GoOnlineProcessor handles go online messages --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
