This is an automated email from the ASF dual-hosted git repository.

liuhongyu pushed a commit to branch fix/fix_transport
in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git

commit d0c9a9cf636962b90d6366299e8e35eafc8a08d1
Author: liuhy <[email protected]>
AuthorDate: Sun Sep 21 22:07:09 2025 +0800

    fix: Improve logging and connection handling in NettyClient
---
 examples/main.go                   |  10 +-
 internal/transport/netty_client.go | 185 ++++++++++++++++++++++++-------------
 internal/transport/processors.go   |  10 +-
 3 files changed, 131 insertions(+), 74 deletions(-)

diff --git a/examples/main.go b/examples/main.go
index d408ece..662ed25 100644
--- a/examples/main.go
+++ b/examples/main.go
@@ -51,11 +51,11 @@ func main() {
 
        // Display configuration
        log.Info("=== Configuration ===")
-       log.Info("Collector Identity", map[string]interface{}{"identity": 
cfg.Collector.Identity})
-       log.Info("Collector Mode", map[string]interface{}{"mode": 
cfg.Collector.Mode})
-       log.Info("Manager Host", map[string]interface{}{"host": 
cfg.Collector.Manager.Host})
-       log.Info("Manager Port", map[string]interface{}{"port": 
cfg.Collector.Manager.Port})
-       log.Info("Manager Protocol", map[string]interface{}{"protocol": 
cfg.Collector.Manager.Protocol})
+       log.Info("Collector Identity", "identity", cfg.Collector.Identity)
+       log.Info("Collector Mode", "mode", cfg.Collector.Mode)
+       log.Info("Manager Host", "host", cfg.Collector.Manager.Host)
+       log.Info("Manager Port", "port", cfg.Collector.Manager.Port)
+       log.Info("Manager Protocol", "protocol", cfg.Collector.Manager.Protocol)
        log.Info("====================")
 
        // Create transport runner from configuration
diff --git a/internal/transport/netty_client.go 
b/internal/transport/netty_client.go
index a545aa2..c96a61b 100644
--- a/internal/transport/netty_client.go
+++ b/internal/transport/netty_client.go
@@ -47,6 +47,7 @@ type NettyClient struct {
        cancel        context.CancelFunc
        writer        *bufio.Writer
        reader        *bufio.Reader
+       gzipReader    *gzip.Reader
        identity      string
 }
 
@@ -83,24 +84,39 @@ func (c *NettyClient) Start() error {
        _, cancel := context.WithCancel(context.Background())
        c.cancel = cancel
 
-       // Connect to server
-       conn, err := net.Dial("tcp", c.addr)
+       // Connect to server with timeout
+       log.Printf("Attempting to connect to %s...", c.addr)
+       conn, err := net.DialTimeout("tcp", c.addr, 10*time.Second)
        if err != nil {
+               log.Printf("Connection failed: %v", err)
                c.triggerEvent(EventConnectFailed, err)
                return err
        }
+
+       log.Printf("TCP connection established to %s", c.addr)
        c.conn = conn
        c.writer = bufio.NewWriter(conn)
        c.reader = bufio.NewReader(conn)
+
+       log.Printf("Connection setup completed, not creating gzip reader yet 
(will create on first read)")
+       // Don't create gzip reader here - it will block waiting for data
+       // We'll create it when we actually need to read data
+       c.gzipReader = nil
+
        c.started = true
+       log.Printf("NettyClient started successfully")
 
+       log.Printf("Triggering connected event...")
+       // Trigger connected event - this will cause transport layer to send 
GO_ONLINE message
        c.triggerEvent(EventConnected, nil)
 
+       log.Printf("Starting background tasks...")
        // Start background tasks
+       go c.readLoop()
        go c.heartbeatLoop()
        go c.connectionMonitor()
-       go c.readLoop()
 
+       log.Printf("All background tasks started")
        return nil
 }
 
@@ -112,6 +128,11 @@ func (c *NettyClient) Shutdown() error {
                c.cancel()
        }
 
+       if c.gzipReader != nil {
+               _ = c.gzipReader.Close()
+               c.gzipReader = nil
+       }
+
        if c.conn != nil {
                _ = c.conn.Close()
        }
@@ -133,12 +154,15 @@ func (c *NettyClient) SetEventHandler(handler 
EventHandler) {
 }
 
 func (c *NettyClient) triggerEvent(eventType EventType, err error) {
+       log.Printf("Triggering event: %d, error: %v", eventType, err)
        if c.eventHandler != nil {
                c.eventHandler(Event{
                        Type:    eventType,
                        Address: c.addr,
                        Error:   err,
                })
+       } else {
+               log.Printf("No event handler set")
        }
 }
 
@@ -270,6 +294,8 @@ func (c *NettyClient) readLoop() {
                if err != nil {
                        if !errors.Is(err, net.ErrClosed) {
                                log.Printf("readLoop error: %v", err)
+                               // Trigger disconnect event on read error
+                               c.triggerEvent(EventDisconnected, nil)
                        }
                        break
                }
@@ -280,44 +306,60 @@ func (c *NettyClient) readLoop() {
 }
 
 func (c *NettyClient) readMessage() (*pb.Message, error) {
+       // Create gzip reader on first read if not already created
+       if c.gzipReader == nil {
+               log.Printf("Creating gzip reader on first read...")
+               var err error
+               c.gzipReader, err = gzip.NewReader(c.reader)
+               if err != nil {
+                       return nil, fmt.Errorf("failed to create gzip reader: 
%w", err)
+               }
+               log.Printf("Gzip reader created successfully on first read")
+       }
+
        // Java Netty server sends GZIP compressed data that contains:
        // [length prefix + protobuf data] compressed with GZIP
-       // We need to read the GZIP stream and decompress it
-
-       // Create a gzip reader directly from the buffered reader
-       // This will handle the GZIP stream boundaries automatically
-       gzipReader, err := gzip.NewReader(c.reader)
-       if err != nil {
-               return nil, fmt.Errorf("failed to create gzip reader: %w", err)
-       }
-       defer gzipReader.Close()
+       // We use the persistent gzip reader created during connection setup
 
        // Read the decompressed data (which contains length prefix + protobuf 
data)
-       innerData, err := io.ReadAll(gzipReader)
-       if err != nil {
-               return nil, fmt.Errorf("failed to decompress data: %w", err)
-       }
+       // We need to read one complete message from the gzip stream
 
-       // Parse the length prefix from the decompressed data
-       innerReader := bytes.NewReader(innerData)
-       length, err := binary.ReadUvarint(innerReader)
-       if err != nil {
-               return nil, fmt.Errorf("failed to read length prefix from 
decompressed data: %w", err)
-       }
+       // First, read the length prefix from gzip stream using a buffer
+       lengthBuf := make([]byte, binary.MaxVarintLen32)
+       var bytesRead int
 
-       // Read the protobuf data
-       protobufData := make([]byte, length)
-       if _, err := innerReader.Read(protobufData); err != nil {
-               return nil, fmt.Errorf("failed to read protobuf data: %w", err)
-       }
+       for {
+               oneByte := make([]byte, 1)
+               if _, err := c.gzipReader.Read(oneByte); err != nil {
+                       return nil, fmt.Errorf("failed to read length prefix: 
%w", err)
+               }
+               lengthBuf[bytesRead] = oneByte[0]
+               bytesRead++
+
+               // Try to decode the length
+               length, n := binary.Uvarint(lengthBuf[:bytesRead])
+               if n > 0 {
+                       // Successfully decoded, read the protobuf data
+                       protobufData := make([]byte, length)
+                       if _, err := io.ReadFull(c.gzipReader, protobufData); 
err != nil {
+                               return nil, fmt.Errorf("failed to read protobuf 
data: %w", err)
+                       }
 
-       // Deserialize protobuf message
-       msg := &pb.Message{}
-       if err := proto.Unmarshal(protobufData, msg); err != nil {
-               return nil, fmt.Errorf("failed to unmarshal message: %w", err)
-       }
+                       // Deserialize protobuf message
+                       msg := &pb.Message{}
+                       if err := proto.Unmarshal(protobufData, msg); err != 
nil {
+                               return nil, fmt.Errorf("failed to unmarshal 
message: %w", err)
+                       }
 
-       return msg, nil
+                       return msg, nil
+               } else if n < 0 {
+                       return nil, fmt.Errorf("invalid length encoding")
+               }
+               // n == 0 means we need more bytes
+               if bytesRead >= binary.MaxVarintLen32 {
+                       return nil, fmt.Errorf("length prefix too long")
+               }
+       }
 }
 
 func (c *NettyClient) processReceivedMessage(msg *pb.Message) {
@@ -341,12 +383,19 @@ func (c *NettyClient) processReceivedMessage(msg 
*pb.Message) {
                                }
 
                                if response != nil {
-                                       // Send the response back to server
-                                       if err := c.SendMsg(response); err != 
nil {
-                                               log.Printf("Failed to send 
response for message type %d: %v", msg.Type, err)
+                                       // Check if response is actually a 
valid protobuf message
+                                       if pbResponse, ok := 
response.(*pb.Message); ok && pbResponse != nil {
+                                               // Send the response back to 
server
+                                               if err := 
c.SendMsg(pbResponse); err != nil {
+                                                       log.Printf("Failed to 
send response for message type %d: %v", msg.Type, err)
+                                               } else {
+                                                       
log.Printf("Successfully sent response for message type %d", msg.Type)
+                                               }
                                        } else {
-                                               log.Printf("Successfully sent 
response for message type %d", msg.Type)
+                                               log.Printf("Processor returned 
invalid response type for message type %d", msg.Type)
                                        }
+                               } else {
+                                       log.Printf("Processor returned nil 
response for message type %d (this is normal for some message types)", msg.Type)
                                }
                        }()
                } else {
@@ -358,29 +407,36 @@ func (c *NettyClient) processReceivedMessage(msg 
*pb.Message) {
 
 func (c *NettyClient) connectionMonitor() {
        for c.IsStarted() {
-               time.Sleep(5 * time.Second)
-               if c.conn != nil {
-                       // Test connection by checking if it's still active
-                       _, err := c.conn.Write([]byte{})
-                       if err != nil {
-                               c.triggerEvent(EventDisconnected, nil)
-                               log.Println("Connection lost, attempting to 
reconnect...")
-                               _ = c.Shutdown()
-
-                               // Attempt to reconnect with backoff
-                               for i := 0; i < 5 && c.IsStarted(); i++ {
-                                       backoff := time.Duration(i+1) * 2 * 
time.Second
-                                       log.Printf("Attempting to reconnect in 
%v...", backoff)
-                                       time.Sleep(backoff)
-
-                                       if err := c.Start(); err == nil {
-                                               log.Println("Reconnected 
successfully")
-                                               break
-                                       } else {
-                                               log.Printf("Failed to 
reconnect: %v", err)
-                                               if i == 4 { // Last attempt
-                                                       
c.triggerEvent(EventConnectFailed, err)
-                                               }
+               time.Sleep(60 * time.Second) // Check every 60 seconds
+
+               // Very conservative connection check - only verify the 
connection object exists
+               c.mu.RLock()
+               connExists := c.conn != nil
+               c.mu.RUnlock()
+
+               if !connExists {
+                       // Connection is nil, trigger disconnect event
+                       c.triggerEvent(EventDisconnected, nil)
+                       log.Println("Connection lost, attempting to 
reconnect...")
+                       _ = c.Shutdown()
+
+                       // Attempt to reconnect with exponential backoff
+                       for i := 0; i < 5; i++ {
+                               if !c.IsStarted() {
+                                       break // Exit if shutdown was called
+                               }
+
+                               backoff := time.Duration(i+1) * 2 * time.Second
+                               log.Printf("Attempting to reconnect in %v...", 
backoff)
+                               time.Sleep(backoff)
+
+                               if err := c.Start(); err == nil {
+                                       log.Println("Reconnected successfully")
+                                       break
+                               } else {
+                                       log.Printf("Failed to reconnect: %v", 
err)
+                                       if i == 4 { // Last attempt
+                                               
c.triggerEvent(EventConnectFailed, err)
                                        }
                                }
                        }
@@ -389,6 +445,9 @@ func (c *NettyClient) connectionMonitor() {
 }
 
 func (c *NettyClient) heartbeatLoop() {
+       // Start heartbeat after 5 seconds, then every 5 seconds (matching Java 
version)
+       time.Sleep(5 * time.Second)
+
        for c.IsStarted() {
                // Send heartbeat message with configured identity
                identity := c.GetIdentity()
@@ -404,9 +463,11 @@ func (c *NettyClient) heartbeatLoop() {
                if err := c.SendMsg(heartbeat); err != nil {
                        log.Printf("Failed to send heartbeat: %v", err)
                } else {
-                       log.Printf("Heartbeat sent successfully for identity: 
%s", identity)
+                       log.Printf("Heartbeat sent successfully for identity: 
%s, time: %d", identity, time.Now().UnixMilli())
                }
-               time.Sleep(5 * time.Second) // Match Java version's 5-second 
interval
+
+               // Wait 5 seconds before next heartbeat (matching Java version)
+               time.Sleep(5 * time.Second)
        }
 }
 
diff --git a/internal/transport/processors.go b/internal/transport/processors.go
index cceafc4..71120b0 100644
--- a/internal/transport/processors.go
+++ b/internal/transport/processors.go
@@ -46,13 +46,9 @@ type MessageProcessor interface {
 type HeartbeatProcessor struct{}
 
 func (p *HeartbeatProcessor) Process(msg *pb.Message) (*pb.Message, error) {
-       // Handle heartbeat message
-       return &pb.Message{
-               Type:      pb.MessageType_HEARTBEAT,
-               Direction: pb.Direction_RESPONSE,
-               Identity:  msg.Identity,
-               Msg:       []byte("heartbeat ack"),
-       }, nil
+       // Java version logs receipt of heartbeat response and returns null (no 
response needed)
+       // This matches: log.info("collector receive manager server response 
heartbeat, time: {}. ", System.currentTimeMillis());
+       return nil, nil
 }
 
 // GoOnlineProcessor handles go online messages


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to