This is an automated email from the ASF dual-hosted git repository.

liuhongyu pushed a commit to branch chore/message
in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git

commit b4b703f3d5a8ad85737f86b26b07c5cd1d2c3bf1
Author: liuhy <[email protected]>
AuthorDate: Wed Sep 24 19:40:26 2025 +0800

    feat(grpc): implement asynchronous heartbeat and message processing
---
 internal/transport/grpc_client.go  |  12 ++++-
 internal/transport/netty_client.go | 103 ++++++++++++++++++++++++++++++++-----
 2 files changed, 100 insertions(+), 15 deletions(-)

diff --git a/internal/transport/grpc_client.go 
b/internal/transport/grpc_client.go
index 26c3c1c..584dbd4 100644
--- a/internal/transport/grpc_client.go
+++ b/internal/transport/grpc_client.go
@@ -260,7 +260,17 @@ func (c *GrpcClient) heartbeatLoop() {
                        Direction: pb.Direction_REQUEST,
                        Identity:  "collector-go", // 可根据实际配置
                }
-               _, _ = c.SendMsgSync(heartbeat, 2000)
+
+               // Use a separate goroutine to send heartbeat to avoid blocking 
the loop
+               go func() {
+                       _, err := c.SendMsgSync(heartbeat, 2000)
+                       if err != nil {
+                               log.Printf("Failed to send heartbeat: %v", err)
+                       } else {
+                               log.Printf("Heartbeat sent successfully, time: 
%d", time.Now().UnixMilli())
+                       }
+               }()
+
                time.Sleep(10 * time.Second)
        }
 }
diff --git a/internal/transport/netty_client.go 
b/internal/transport/netty_client.go
index be464e6..3abaf00 100644
--- a/internal/transport/netty_client.go
+++ b/internal/transport/netty_client.go
@@ -200,6 +200,11 @@ func (c *NettyClient) SendMsg(msg interface{}) error {
                return errors.New("invalid message type")
        }
 
+       // For heartbeat messages, use a shorter timeout to avoid blocking
+       if pbMsg.Type == pb.MessageType_HEARTBEAT {
+               return c.writeMessageWithTimeout(pbMsg, 2*time.Second)
+       }
+
        return c.writeMessage(pbMsg)
 }
 
@@ -236,9 +241,70 @@ func (c *NettyClient) SendMsgSync(msg interface{}, 
timeoutMillis int) (interface
        return future.Wait(time.Duration(timeoutMillis) * time.Millisecond)
 }
 
-func (c *NettyClient) writeMessage(msg *pb.Message) error {
+func (c *NettyClient) writeMessageWithTimeout(msg *pb.Message, timeout 
time.Duration) error {
        // Set write deadline to prevent hanging
-       if err := c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second)); 
err != nil {
+       if err := c.conn.SetWriteDeadline(time.Now().Add(timeout)); err != nil {
+               return fmt.Errorf("failed to set write deadline: %w", err)
+       }
+
+       // Serialize protobuf message
+       data, err := proto.Marshal(msg)
+       if err != nil {
+               return fmt.Errorf("failed to marshal message: %w", err)
+       }
+
+       // Create the inner message: length prefix + protobuf data
+       var innerMessage bytes.Buffer
+
+       // Add length prefix for the protobuf data (using varint32 format)
+       length := uint32(len(data))
+       var buf [binary.MaxVarintLen32]byte
+       n := binary.PutUvarint(buf[:], uint64(length))
+
+       if _, err := innerMessage.Write(buf[:n]); err != nil {
+               return fmt.Errorf("failed to write length prefix: %w", err)
+       }
+
+       // Add the protobuf data
+       if _, err := innerMessage.Write(data); err != nil {
+               return fmt.Errorf("failed to write protobuf data: %w", err)
+       }
+
+       innerData := innerMessage.Bytes()
+
+       // Compress the entire inner message using GZIP
+       var compressed bytes.Buffer
+       gzipWriter := gzip.NewWriter(&compressed)
+       if _, err := gzipWriter.Write(innerData); err != nil {
+               return fmt.Errorf("failed to compress data: %w", err)
+       }
+       if err := gzipWriter.Close(); err != nil {
+               return fmt.Errorf("failed to close gzip writer: %w", err)
+       }
+
+       compressedData := compressed.Bytes()
+
+       // Write the compressed data directly
+       if _, err := c.writer.Write(compressedData); err != nil {
+               return fmt.Errorf("failed to write compressed message: %w", err)
+       }
+
+       // Flush
+       if err := c.writer.Flush(); err != nil {
+               return fmt.Errorf("failed to flush: %w", err)
+       }
+
+       // Clear write deadline
+       if err := c.conn.SetWriteDeadline(time.Time{}); err != nil {
+               log.Printf("Warning: failed to clear write deadline: %v", err)
+       }
+
+       return nil
+}
+
+func (c *NettyClient) writeMessage(msg *pb.Message) error {
+       // Set write deadline to prevent hanging (reduced from 10s to 5s for 
faster failure detection)
+       if err := c.conn.SetWriteDeadline(time.Now().Add(5 * time.Second)); err 
!= nil {
                return fmt.Errorf("failed to set write deadline: %w", err)
        }
 
@@ -482,11 +548,15 @@ func (c *NettyClient) heartbeatLoop() {
                        Direction: pb.Direction_REQUEST,
                        Identity:  identity,
                }
-               if err := c.SendMsg(heartbeat); err != nil {
-                       log.Printf("Failed to send heartbeat: %v", err)
-               } else {
-                       log.Printf("Heartbeat sent successfully for identity: 
%s, time: %d", identity, time.Now().UnixMilli())
-               }
+
+               // Use a separate goroutine to send heartbeat to avoid blocking 
the loop
+               go func() {
+                       if err := c.SendMsg(heartbeat); err != nil {
+                               log.Printf("Failed to send heartbeat: %v", err)
+                       } else {
+                               log.Printf("Heartbeat sent successfully for 
identity: %s, time: %d", identity, time.Now().UnixMilli())
+                       }
+               }()
 
                // Wait 5 seconds before next heartbeat (matching Java version)
                time.Sleep(5 * time.Second)
@@ -577,14 +647,10 @@ func RegisterDefaultNettyProcessors(client *NettyClient) {
                        fmt.Printf("Message Identity: %s\n", pbMsg.Identity)
                        fmt.Printf("==============================\n")
 
-                       // TODO: Parse Job from pbMsg.Msg and execute the task
-                       // For now, just log the message content and return nil 
(no immediate response)
-                       // The actual response should be sent as 
RESPONSE_ONE_TIME_TASK_DATA (type 7)
-                       // after the task is completed
-
-                       // Simulate task processing
+                       // Process task asynchronously to avoid blocking 
heartbeat
                        go func() {
                                fmt.Printf("=== PROCESSING ONE-TIME TASK ===\n")
+
                                // TODO: Implement actual task execution here
                                // This should:
                                // 1. Parse the Job from pbMsg.Msg
@@ -594,9 +660,18 @@ func RegisterDefaultNettyProcessors(client *NettyClient) {
                                // For now, just log that we would process the 
task
                                fmt.Printf("One-time task processing completed 
(simulated)\n")
                                
fmt.Printf("==========================================\n")
+
+                               // TODO: Send actual response message here
+                               // responseMsg := &pb.Message{
+                               //     Type:      
pb.MessageType_RESPONSE_ONE_TIME_TASK_DATA,
+                               //     Direction: pb.Direction_REQUEST,
+                               //     Identity:  pbMsg.Identity,
+                               //     Msg:       []byte("task results"),
+                               // }
+                               // client.SendMsg(responseMsg)
                        }()
 
-                       // Return nil - no immediate response needed
+                       // Return nil immediately - don't block the response
                        // The actual response will be sent asynchronously
                        return nil, nil
                }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to