This is an automated email from the ASF dual-hosted git repository. liuhongyu pushed a commit to branch chore/message in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git
commit b4b703f3d5a8ad85737f86b26b07c5cd1d2c3bf1 Author: liuhy <[email protected]> AuthorDate: Wed Sep 24 19:40:26 2025 +0800 feat(grpc): implement asynchronous heartbeat and message processing --- internal/transport/grpc_client.go | 12 ++++- internal/transport/netty_client.go | 103 ++++++++++++++++++++++++++++++++----- 2 files changed, 100 insertions(+), 15 deletions(-) diff --git a/internal/transport/grpc_client.go b/internal/transport/grpc_client.go index 26c3c1c..584dbd4 100644 --- a/internal/transport/grpc_client.go +++ b/internal/transport/grpc_client.go @@ -260,7 +260,17 @@ func (c *GrpcClient) heartbeatLoop() { Direction: pb.Direction_REQUEST, Identity: "collector-go", // 可根据实际配置 } - _, _ = c.SendMsgSync(heartbeat, 2000) + + // Use a separate goroutine to send heartbeat to avoid blocking the loop + go func() { + _, err := c.SendMsgSync(heartbeat, 2000) + if err != nil { + log.Printf("Failed to send heartbeat: %v", err) + } else { + log.Printf("Heartbeat sent successfully, time: %d", time.Now().UnixMilli()) + } + }() + time.Sleep(10 * time.Second) } } diff --git a/internal/transport/netty_client.go b/internal/transport/netty_client.go index be464e6..3abaf00 100644 --- a/internal/transport/netty_client.go +++ b/internal/transport/netty_client.go @@ -200,6 +200,11 @@ func (c *NettyClient) SendMsg(msg interface{}) error { return errors.New("invalid message type") } + // For heartbeat messages, use a shorter timeout to avoid blocking + if pbMsg.Type == pb.MessageType_HEARTBEAT { + return c.writeMessageWithTimeout(pbMsg, 2*time.Second) + } + return c.writeMessage(pbMsg) } @@ -236,9 +241,70 @@ func (c *NettyClient) SendMsgSync(msg interface{}, timeoutMillis int) (interface return future.Wait(time.Duration(timeoutMillis) * time.Millisecond) } -func (c *NettyClient) writeMessage(msg *pb.Message) error { +func (c *NettyClient) writeMessageWithTimeout(msg *pb.Message, timeout time.Duration) error { // Set write deadline to prevent hanging - if err := c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second)); err != nil { + if err := c.conn.SetWriteDeadline(time.Now().Add(timeout)); err != nil { + return fmt.Errorf("failed to set write deadline: %w", err) + } + + // Serialize protobuf message + data, err := proto.Marshal(msg) + if err != nil { + return fmt.Errorf("failed to marshal message: %w", err) + } + + // Create the inner message: length prefix + protobuf data + var innerMessage bytes.Buffer + + // Add length prefix for the protobuf data (using varint32 format) + length := uint32(len(data)) + var buf [binary.MaxVarintLen32]byte + n := binary.PutUvarint(buf[:], uint64(length)) + + if _, err := innerMessage.Write(buf[:n]); err != nil { + return fmt.Errorf("failed to write length prefix: %w", err) + } + + // Add the protobuf data + if _, err := innerMessage.Write(data); err != nil { + return fmt.Errorf("failed to write protobuf data: %w", err) + } + + innerData := innerMessage.Bytes() + + // Compress the entire inner message using GZIP + var compressed bytes.Buffer + gzipWriter := gzip.NewWriter(&compressed) + if _, err := gzipWriter.Write(innerData); err != nil { + return fmt.Errorf("failed to compress data: %w", err) + } + if err := gzipWriter.Close(); err != nil { + return fmt.Errorf("failed to close gzip writer: %w", err) + } + + compressedData := compressed.Bytes() + + // Write the compressed data directly + if _, err := c.writer.Write(compressedData); err != nil { + return fmt.Errorf("failed to write compressed message: %w", err) + } + + // Flush + if err := c.writer.Flush(); err != nil { + return fmt.Errorf("failed to flush: %w", err) + } + + // Clear write deadline + if err := c.conn.SetWriteDeadline(time.Time{}); err != nil { + log.Printf("Warning: failed to clear write deadline: %v", err) + } + + return nil +} + +func (c *NettyClient) writeMessage(msg *pb.Message) error { + // Set write deadline to prevent hanging (reduced from 10s to 5s for faster failure detection) + if err := c.conn.SetWriteDeadline(time.Now().Add(5 * time.Second)); err != nil { return fmt.Errorf("failed to set write deadline: %w", err) } @@ -482,11 +548,15 @@ func (c *NettyClient) heartbeatLoop() { Direction: pb.Direction_REQUEST, Identity: identity, } - if err := c.SendMsg(heartbeat); err != nil { - log.Printf("Failed to send heartbeat: %v", err) - } else { - log.Printf("Heartbeat sent successfully for identity: %s, time: %d", identity, time.Now().UnixMilli()) - } + + // Use a separate goroutine to send heartbeat to avoid blocking the loop + go func() { + if err := c.SendMsg(heartbeat); err != nil { + log.Printf("Failed to send heartbeat: %v", err) + } else { + log.Printf("Heartbeat sent successfully for identity: %s, time: %d", identity, time.Now().UnixMilli()) + } + }() // Wait 5 seconds before next heartbeat (matching Java version) time.Sleep(5 * time.Second) @@ -577,14 +647,10 @@ func RegisterDefaultNettyProcessors(client *NettyClient) { fmt.Printf("Message Identity: %s\n", pbMsg.Identity) fmt.Printf("==============================\n") - // TODO: Parse Job from pbMsg.Msg and execute the task - // For now, just log the message content and return nil (no immediate response) - // The actual response should be sent as RESPONSE_ONE_TIME_TASK_DATA (type 7) - // after the task is completed - - // Simulate task processing + // Process task asynchronously to avoid blocking heartbeat go func() { fmt.Printf("=== PROCESSING ONE-TIME TASK ===\n") + // TODO: Implement actual task execution here // This should: // 1. Parse the Job from pbMsg.Msg @@ -594,9 +660,18 @@ func RegisterDefaultNettyProcessors(client *NettyClient) { // For now, just log that we would process the task fmt.Printf("One-time task processing completed (simulated)\n") fmt.Printf("==========================================\n") + + // TODO: Send actual response message here + // responseMsg := &pb.Message{ + // Type: pb.MessageType_RESPONSE_ONE_TIME_TASK_DATA, + // Direction: pb.Direction_REQUEST, + // Identity: pbMsg.Identity, + // Msg: []byte("task results"), + // } + // client.SendMsg(responseMsg) }() - // Return nil - no immediate response needed + // Return nil immediately - don't block the response // The actual response will be sent asynchronously return nil, nil } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
