This is an automated email from the ASF dual-hosted git repository.

liuhongyu pushed a commit to branch chore/message
in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git


The following commit(s) were added to refs/heads/chore/message by this push:
     new 509a697  feat(grpc): enhance message processing for request and 
heartbeat responses
509a697 is described below

commit 509a697a651085ee55742fae3714bc8fe040d2cf
Author: liuhy <[email protected]>
AuthorDate: Wed Sep 24 19:12:52 2025 +0800

    feat(grpc): enhance message processing for request and heartbeat responses
---
 internal/transport/grpc_client.go  | 35 ++++++++++++++++++++++++++++++++++-
 internal/transport/netty_client.go | 36 +++++++++++++++++++++++++++++-------
 2 files changed, 63 insertions(+), 8 deletions(-)

diff --git a/internal/transport/grpc_client.go 
b/internal/transport/grpc_client.go
index d5d82cd..26c3c1c 100644
--- a/internal/transport/grpc_client.go
+++ b/internal/transport/grpc_client.go
@@ -303,6 +303,39 @@ func (c *GrpcClient) processReceivedMessage(msg 
*pb.Message) {
 
        // If not a sync response, distribute to registered processors
        if fn, ok := c.registry.Get(int32(msg.Type)); ok {
-               go fn(msg)
+               // For request messages that require response, process 
synchronously and send response back
+               if msg.Direction == pb.Direction_REQUEST {
+                       go func() {
+                               response, err := fn(msg)
+                               if err != nil {
+                                       log.Printf("Error processing message 
type %d: %v", msg.Type, err)
+                                       return
+                               }
+
+                               if response != nil {
+                                       // Check if response is actually a 
valid protobuf message
+                                       if pbResponse, ok := 
response.(*pb.Message); ok && pbResponse != nil {
+                                               // Send the response back to 
server
+                                               if err := 
c.SendMsg(pbResponse); err != nil {
+                                                       log.Printf("Failed to 
send response for message type %d: %v", msg.Type, err)
+                                               } else {
+                                                       
log.Printf("Successfully sent response for message type %d", msg.Type)
+                                               }
+                                       } else {
+                                               log.Printf("Processor returned 
invalid response type for message type %d", msg.Type)
+                                       }
+                               } else {
+                                       // For heartbeat messages (type 0), nil 
response is expected and normal
+                                       if msg.Type == pb.MessageType_HEARTBEAT 
{
+                                               log.Printf("Heartbeat response 
received for message type %d (no response needed)", msg.Type)
+                                       } else {
+                                               log.Printf("Processor returned 
nil response for message type %d (this is normal for some message types)", 
msg.Type)
+                                       }
+                               }
+                       }()
+               } else {
+                       // For non-request messages, process asynchronously
+                       go fn(msg)
+               }
        }
 }
diff --git a/internal/transport/netty_client.go 
b/internal/transport/netty_client.go
index d598149..25bf51e 100644
--- a/internal/transport/netty_client.go
+++ b/internal/transport/netty_client.go
@@ -412,7 +412,12 @@ func (c *NettyClient) processReceivedMessage(msg 
*pb.Message) {
                                                log.Printf("Processor returned 
invalid response type for message type %d", msg.Type)
                                        }
                                } else {
-                                       log.Printf("Processor returned nil 
response for message type %d (this is normal for some message types)", msg.Type)
+                                       // For heartbeat messages (type 0), nil 
response is expected and normal
+                                       if msg.Type == pb.MessageType_HEARTBEAT 
{
+                                               log.Printf("Heartbeat response 
received for message type %d (no response needed)", msg.Type)
+                                       } else {
+                                               log.Printf("Processor returned 
nil response for message type %d (this is normal for some message types)", 
msg.Type)
+                                       }
                                }
                        }()
                } else {
@@ -538,24 +543,41 @@ func RegisterDefaultNettyProcessors(client *NettyClient) {
 
        client.RegisterProcessor(MessageTypeIssueCyclicTask, func(msg 
interface{}) (interface{}, error) {
                if pbMsg, ok := msg.(*pb.Message); ok {
-                       processor := &CollectCyclicDataProcessor{client: nil}
-                       return processor.Process(pbMsg)
+                       // Handle cyclic task message
+                       // TODO: Implement actual task processing logic
+                       return &pb.Message{
+                               Type:      pb.MessageType_ISSUE_CYCLIC_TASK,
+                               Direction: pb.Direction_RESPONSE,
+                               Identity:  pbMsg.Identity,
+                               Msg:       []byte("cyclic task ack"),
+                       }, nil
                }
                return nil, fmt.Errorf("invalid message type")
        })
 
        client.RegisterProcessor(MessageTypeDeleteCyclicTask, func(msg 
interface{}) (interface{}, error) {
                if pbMsg, ok := msg.(*pb.Message); ok {
-                       processor := &DeleteCyclicTaskProcessor{client: nil}
-                       return processor.Process(pbMsg)
+                       // Handle delete cyclic task message
+                       return &pb.Message{
+                               Type:      pb.MessageType_DELETE_CYCLIC_TASK,
+                               Direction: pb.Direction_RESPONSE,
+                               Identity:  pbMsg.Identity,
+                               Msg:       []byte("delete cyclic task ack"),
+                       }, nil
                }
                return nil, fmt.Errorf("invalid message type")
        })
 
        client.RegisterProcessor(MessageTypeIssueOneTimeTask, func(msg 
interface{}) (interface{}, error) {
                if pbMsg, ok := msg.(*pb.Message); ok {
-                       processor := &CollectOneTimeDataProcessor{client: nil}
-                       return processor.Process(pbMsg)
+                       // Handle one-time task message
+                       // TODO: Implement actual task processing logic
+                       return &pb.Message{
+                               Type:      pb.MessageType_ISSUE_ONE_TIME_TASK,
+                               Direction: pb.Direction_RESPONSE,
+                               Identity:  pbMsg.Identity,
+                               Msg:       []byte("one-time task ack"),
+                       }, nil
                }
                return nil, fmt.Errorf("invalid message type")
        })


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to