This is an automated email from the ASF dual-hosted git repository.
shown pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git
The following commit(s) were added to refs/heads/main by this push:
new 1ca081a fix(transport): fix transport, hertzbeat (#18)
1ca081a is described below
commit 1ca081a10a821ddd84315db49058bf2ece7b885d
Author: aias00 <[email protected]>
AuthorDate: Mon Sep 22 15:53:07 2025 +0800
fix(transport): fix transport, hertzbeat (#18)
* fix: Improve logging and connection handling in NettyClient
* fix: Enhance event logging and improve shutdown handling in processors
* fix: Add 'transport' scope to PR title linting rules
---------
Co-authored-by: shown <[email protected]>
---
.github/workflows/lint-pr-title.yml | 1 +
examples/main.go | 10 +-
internal/transport/netty_client.go | 202 +++++++++++++++++++++++++-----------
internal/transport/processors.go | 34 +++---
4 files changed, 166 insertions(+), 81 deletions(-)
diff --git a/.github/workflows/lint-pr-title.yml
b/.github/workflows/lint-pr-title.yml
index 8b5c2fc..286a71b 100644
--- a/.github/workflows/lint-pr-title.yml
+++ b/.github/workflows/lint-pr-title.yml
@@ -47,6 +47,7 @@ jobs:
style
scopes: |
core
+ transport
ci
community
# e.g. feat(core): add new feature
diff --git a/examples/main.go b/examples/main.go
index d408ece..662ed25 100644
--- a/examples/main.go
+++ b/examples/main.go
@@ -51,11 +51,11 @@ func main() {
// Display configuration
log.Info("=== Configuration ===")
- log.Info("Collector Identity", map[string]interface{}{"identity":
cfg.Collector.Identity})
- log.Info("Collector Mode", map[string]interface{}{"mode":
cfg.Collector.Mode})
- log.Info("Manager Host", map[string]interface{}{"host":
cfg.Collector.Manager.Host})
- log.Info("Manager Port", map[string]interface{}{"port":
cfg.Collector.Manager.Port})
- log.Info("Manager Protocol", map[string]interface{}{"protocol":
cfg.Collector.Manager.Protocol})
+ log.Info("Collector Identity", "identity", cfg.Collector.Identity)
+ log.Info("Collector Mode", "mode", cfg.Collector.Mode)
+ log.Info("Manager Host", "host", cfg.Collector.Manager.Host)
+ log.Info("Manager Port", "port", cfg.Collector.Manager.Port)
+ log.Info("Manager Protocol", "protocol", cfg.Collector.Manager.Protocol)
log.Info("====================")
// Create transport runner from configuration
diff --git a/internal/transport/netty_client.go
b/internal/transport/netty_client.go
index a545aa2..d598149 100644
--- a/internal/transport/netty_client.go
+++ b/internal/transport/netty_client.go
@@ -47,6 +47,7 @@ type NettyClient struct {
cancel context.CancelFunc
writer *bufio.Writer
reader *bufio.Reader
+ gzipReader *gzip.Reader
identity string
}
@@ -83,24 +84,39 @@ func (c *NettyClient) Start() error {
_, cancel := context.WithCancel(context.Background())
c.cancel = cancel
- // Connect to server
- conn, err := net.Dial("tcp", c.addr)
+ // Connect to server with timeout
+ log.Printf("Attempting to connect to %s...", c.addr)
+ conn, err := net.DialTimeout("tcp", c.addr, 10*time.Second)
if err != nil {
+ log.Printf("Connection failed: %v", err)
c.triggerEvent(EventConnectFailed, err)
return err
}
+
+ log.Printf("TCP connection established to %s", c.addr)
c.conn = conn
c.writer = bufio.NewWriter(conn)
c.reader = bufio.NewReader(conn)
+
+ log.Printf("Connection setup completed, not creating gzip reader yet
(will create on first read)")
+ // Don't create gzip reader here - it will block waiting for data
+ // We'll create it when we actually need to read data
+ c.gzipReader = nil
+
c.started = true
+ log.Printf("NettyClient started successfully")
+ log.Printf("Triggering connected event...")
+ // Trigger connected event - this will cause transport layer to send
GO_ONLINE message
c.triggerEvent(EventConnected, nil)
+ log.Printf("Starting background tasks...")
// Start background tasks
+ go c.readLoop()
go c.heartbeatLoop()
go c.connectionMonitor()
- go c.readLoop()
+ log.Printf("All background tasks started")
return nil
}
@@ -112,6 +128,11 @@ func (c *NettyClient) Shutdown() error {
c.cancel()
}
+ if c.gzipReader != nil {
+ _ = c.gzipReader.Close()
+ c.gzipReader = nil
+ }
+
if c.conn != nil {
_ = c.conn.Close()
}
@@ -133,12 +154,32 @@ func (c *NettyClient) SetEventHandler(handler
EventHandler) {
}
func (c *NettyClient) triggerEvent(eventType EventType, err error) {
+ eventName := ""
+ switch eventType {
+ case EventConnected:
+ eventName = "Connected"
+ case EventDisconnected:
+ eventName = "Disconnected"
+ case EventConnectFailed:
+ eventName = "ConnectFailed"
+ default:
+ eventName = fmt.Sprintf("Unknown(%d)", eventType)
+ }
+
+ if err != nil {
+ log.Printf("Triggering event: %s, error: %v", eventName, err)
+ } else {
+ log.Printf("Triggering event: %s", eventName)
+ }
+
if c.eventHandler != nil {
c.eventHandler(Event{
Type: eventType,
Address: c.addr,
Error: err,
})
+ } else {
+ log.Printf("No event handler set")
}
}
@@ -270,6 +311,8 @@ func (c *NettyClient) readLoop() {
if err != nil {
if !errors.Is(err, net.ErrClosed) {
log.Printf("readLoop error: %v", err)
+ // Trigger disconnect event on read error
+ c.triggerEvent(EventDisconnected, nil)
}
break
}
@@ -280,44 +323,60 @@ func (c *NettyClient) readLoop() {
}
func (c *NettyClient) readMessage() (*pb.Message, error) {
+ // Create gzip reader on first read if not already created
+ if c.gzipReader == nil {
+ log.Printf("Creating gzip reader on first read...")
+ var err error
+ c.gzipReader, err = gzip.NewReader(c.reader)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create gzip reader:
%w", err)
+ }
+ log.Printf("Gzip reader created successfully on first read")
+ }
+
// Java Netty server sends GZIP compressed data that contains:
// [length prefix + protobuf data] compressed with GZIP
- // We need to read the GZIP stream and decompress it
-
- // Create a gzip reader directly from the buffered reader
- // This will handle the GZIP stream boundaries automatically
- gzipReader, err := gzip.NewReader(c.reader)
- if err != nil {
- return nil, fmt.Errorf("failed to create gzip reader: %w", err)
- }
- defer gzipReader.Close()
+ // We use the persistent gzip reader created during connection setup
// Read the decompressed data (which contains length prefix + protobuf
data)
- innerData, err := io.ReadAll(gzipReader)
- if err != nil {
- return nil, fmt.Errorf("failed to decompress data: %w", err)
- }
+ // We need to read one complete message from the gzip stream
- // Parse the length prefix from the decompressed data
- innerReader := bytes.NewReader(innerData)
- length, err := binary.ReadUvarint(innerReader)
- if err != nil {
- return nil, fmt.Errorf("failed to read length prefix from
decompressed data: %w", err)
- }
+ // First, read the length prefix from gzip stream using a buffer
+ lengthBuf := make([]byte, binary.MaxVarintLen32)
+ var bytesRead int
- // Read the protobuf data
- protobufData := make([]byte, length)
- if _, err := innerReader.Read(protobufData); err != nil {
- return nil, fmt.Errorf("failed to read protobuf data: %w", err)
- }
+ for {
+ oneByte := make([]byte, 1)
+ if _, err := c.gzipReader.Read(oneByte); err != nil {
+ return nil, fmt.Errorf("failed to read length prefix:
%w", err)
+ }
+ lengthBuf[bytesRead] = oneByte[0]
+ bytesRead++
+
+ // Try to decode the length
+ length, n := binary.Uvarint(lengthBuf[:bytesRead])
+ if n > 0 {
+ // Successfully decoded, read the protobuf data
+ protobufData := make([]byte, length)
+ if _, err := io.ReadFull(c.gzipReader, protobufData);
err != nil {
+ return nil, fmt.Errorf("failed to read protobuf
data: %w", err)
+ }
- // Deserialize protobuf message
- msg := &pb.Message{}
- if err := proto.Unmarshal(protobufData, msg); err != nil {
- return nil, fmt.Errorf("failed to unmarshal message: %w", err)
- }
+ // Deserialize protobuf message
+ msg := &pb.Message{}
+ if err := proto.Unmarshal(protobufData, msg); err !=
nil {
+ return nil, fmt.Errorf("failed to unmarshal
message: %w", err)
+ }
- return msg, nil
+ return msg, nil
+ } else if n < 0 {
+ return nil, fmt.Errorf("invalid length encoding")
+ }
+ // n == 0 means we need more bytes
+ if bytesRead >= binary.MaxVarintLen32 {
+ return nil, fmt.Errorf("length prefix too long")
+ }
+ }
}
func (c *NettyClient) processReceivedMessage(msg *pb.Message) {
@@ -341,12 +400,19 @@ func (c *NettyClient) processReceivedMessage(msg
*pb.Message) {
}
if response != nil {
- // Send the response back to server
- if err := c.SendMsg(response); err !=
nil {
- log.Printf("Failed to send
response for message type %d: %v", msg.Type, err)
+ // Check if response is actually a
valid protobuf message
+ if pbResponse, ok :=
response.(*pb.Message); ok && pbResponse != nil {
+ // Send the response back to
server
+ if err :=
c.SendMsg(pbResponse); err != nil {
+ log.Printf("Failed to
send response for message type %d: %v", msg.Type, err)
+ } else {
+
log.Printf("Successfully sent response for message type %d", msg.Type)
+ }
} else {
- log.Printf("Successfully sent
response for message type %d", msg.Type)
+ log.Printf("Processor returned
invalid response type for message type %d", msg.Type)
}
+ } else {
+ log.Printf("Processor returned nil
response for message type %d (this is normal for some message types)", msg.Type)
}
}()
} else {
@@ -358,29 +424,36 @@ func (c *NettyClient) processReceivedMessage(msg
*pb.Message) {
func (c *NettyClient) connectionMonitor() {
for c.IsStarted() {
- time.Sleep(5 * time.Second)
- if c.conn != nil {
- // Test connection by checking if it's still active
- _, err := c.conn.Write([]byte{})
- if err != nil {
- c.triggerEvent(EventDisconnected, nil)
- log.Println("Connection lost, attempting to
reconnect...")
- _ = c.Shutdown()
-
- // Attempt to reconnect with backoff
- for i := 0; i < 5 && c.IsStarted(); i++ {
- backoff := time.Duration(i+1) * 2 *
time.Second
- log.Printf("Attempting to reconnect in
%v...", backoff)
- time.Sleep(backoff)
-
- if err := c.Start(); err == nil {
- log.Println("Reconnected
successfully")
- break
- } else {
- log.Printf("Failed to
reconnect: %v", err)
- if i == 4 { // Last attempt
-
c.triggerEvent(EventConnectFailed, err)
- }
+ time.Sleep(60 * time.Second) // Check every 60 seconds
+
+ // Very conservative connection check - only verify the
connection object exists
+ c.mu.RLock()
+ connExists := c.conn != nil
+ c.mu.RUnlock()
+
+ if !connExists {
+ // Connection is nil, trigger disconnect event
+ c.triggerEvent(EventDisconnected, nil)
+ log.Println("Connection lost, attempting to
reconnect...")
+ _ = c.Shutdown()
+
+ // Attempt to reconnect with exponential backoff
+ for i := 0; i < 5; i++ {
+ if !c.IsStarted() {
+ break // Exit if shutdown was called
+ }
+
+ backoff := time.Duration(i+1) * 2 * time.Second
+ log.Printf("Attempting to reconnect in %v...",
backoff)
+ time.Sleep(backoff)
+
+ if err := c.Start(); err == nil {
+ log.Println("Reconnected successfully")
+ break
+ } else {
+ log.Printf("Failed to reconnect: %v",
err)
+ if i == 4 { // Last attempt
+
c.triggerEvent(EventConnectFailed, err)
}
}
}
@@ -389,6 +462,9 @@ func (c *NettyClient) connectionMonitor() {
}
func (c *NettyClient) heartbeatLoop() {
+ // Start heartbeat after 5 seconds, then every 5 seconds (matching Java
version)
+ time.Sleep(5 * time.Second)
+
for c.IsStarted() {
// Send heartbeat message with configured identity
identity := c.GetIdentity()
@@ -404,9 +480,11 @@ func (c *NettyClient) heartbeatLoop() {
if err := c.SendMsg(heartbeat); err != nil {
log.Printf("Failed to send heartbeat: %v", err)
} else {
- log.Printf("Heartbeat sent successfully for identity:
%s", identity)
+ log.Printf("Heartbeat sent successfully for identity:
%s, time: %d", identity, time.Now().UnixMilli())
}
- time.Sleep(5 * time.Second) // Match Java version's 5-second
interval
+
+ // Wait 5 seconds before next heartbeat (matching Java version)
+ time.Sleep(5 * time.Second)
}
}
diff --git a/internal/transport/processors.go b/internal/transport/processors.go
index cceafc4..8bd71e3 100644
--- a/internal/transport/processors.go
+++ b/internal/transport/processors.go
@@ -19,6 +19,8 @@ package transport
import (
"fmt"
+ "log"
+ "time"
pb "hertzbeat.apache.org/hertzbeat-collector-go/api"
)
@@ -46,13 +48,9 @@ type MessageProcessor interface {
type HeartbeatProcessor struct{}
func (p *HeartbeatProcessor) Process(msg *pb.Message) (*pb.Message, error) {
- // Handle heartbeat message
- return &pb.Message{
- Type: pb.MessageType_HEARTBEAT,
- Direction: pb.Direction_RESPONSE,
- Identity: msg.Identity,
- Msg: []byte("heartbeat ack"),
- }, nil
+ // Java version logs receipt of heartbeat response and returns null (no
response needed)
+ // This matches: log.info("collector receive manager server response
heartbeat, time: {}. ", System.currentTimeMillis());
+ return nil, nil
}
// GoOnlineProcessor handles go online messages
@@ -84,17 +82,25 @@ func NewGoOfflineProcessor(client *NettyClient)
*GoOfflineProcessor {
}
func (p *GoOfflineProcessor) Process(msg *pb.Message) (*pb.Message, error) {
- // Handle go offline message - shutdown the client
- if p.client != nil {
- // Stop heartbeat and close connection
- p.client.Shutdown()
- }
- return &pb.Message{
+ // Handle go offline message - first return response, then shutdown
+ // Create response first
+ response := &pb.Message{
Type: pb.MessageType_GO_OFFLINE,
Direction: pb.Direction_RESPONSE,
Identity: msg.Identity,
Msg: []byte("offline ack"),
- }, nil
+ }
+
+ // Schedule shutdown after a brief delay to allow response to be sent
+ if p.client != nil {
+ go func() {
+ time.Sleep(100 * time.Millisecond) // Brief delay to
ensure response is sent
+ log.Printf("Shutting down client as requested by
manager")
+ p.client.Shutdown()
+ }()
+ }
+
+ return response, nil
}
// GoCloseProcessor handles go close messages
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]