This is an automated email from the ASF dual-hosted git repository.
liuhongyu pushed a commit to branch chore/message
in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git
The following commit(s) were added to refs/heads/chore/message by this push:
new 509a697 feat(grpc): enhance message processing for request and
heartbeat responses
509a697 is described below
commit 509a697a651085ee55742fae3714bc8fe040d2cf
Author: liuhy <[email protected]>
AuthorDate: Wed Sep 24 19:12:52 2025 +0800
feat(grpc): enhance message processing for request and heartbeat responses
---
internal/transport/grpc_client.go | 35 ++++++++++++++++++++++++++++++++++-
internal/transport/netty_client.go | 36 +++++++++++++++++++++++++++++-------
2 files changed, 63 insertions(+), 8 deletions(-)
diff --git a/internal/transport/grpc_client.go
b/internal/transport/grpc_client.go
index d5d82cd..26c3c1c 100644
--- a/internal/transport/grpc_client.go
+++ b/internal/transport/grpc_client.go
@@ -303,6 +303,39 @@ func (c *GrpcClient) processReceivedMessage(msg
*pb.Message) {
// If not a sync response, distribute to registered processors
if fn, ok := c.registry.Get(int32(msg.Type)); ok {
- go fn(msg)
+ // For request messages that require response, process
synchronously and send response back
+ if msg.Direction == pb.Direction_REQUEST {
+ go func() {
+ response, err := fn(msg)
+ if err != nil {
+ log.Printf("Error processing message
type %d: %v", msg.Type, err)
+ return
+ }
+
+ if response != nil {
+ // Check if response is actually a
valid protobuf message
+ if pbResponse, ok :=
response.(*pb.Message); ok && pbResponse != nil {
+ // Send the response back to
server
+ if err :=
c.SendMsg(pbResponse); err != nil {
+ log.Printf("Failed to
send response for message type %d: %v", msg.Type, err)
+ } else {
+
log.Printf("Successfully sent response for message type %d", msg.Type)
+ }
+ } else {
+ log.Printf("Processor returned
invalid response type for message type %d", msg.Type)
+ }
+ } else {
+ // For heartbeat messages (type 0), nil
response is expected and normal
+ if msg.Type == pb.MessageType_HEARTBEAT
{
+ log.Printf("Heartbeat response
received for message type %d (no response needed)", msg.Type)
+ } else {
+ log.Printf("Processor returned
nil response for message type %d (this is normal for some message types)",
msg.Type)
+ }
+ }
+ }()
+ } else {
+ // For non-request messages, process asynchronously
+ go fn(msg)
+ }
}
}
diff --git a/internal/transport/netty_client.go
b/internal/transport/netty_client.go
index d598149..25bf51e 100644
--- a/internal/transport/netty_client.go
+++ b/internal/transport/netty_client.go
@@ -412,7 +412,12 @@ func (c *NettyClient) processReceivedMessage(msg
*pb.Message) {
log.Printf("Processor returned
invalid response type for message type %d", msg.Type)
}
} else {
- log.Printf("Processor returned nil
response for message type %d (this is normal for some message types)", msg.Type)
+ // For heartbeat messages (type 0), nil
response is expected and normal
+ if msg.Type == pb.MessageType_HEARTBEAT
{
+ log.Printf("Heartbeat response
received for message type %d (no response needed)", msg.Type)
+ } else {
+ log.Printf("Processor returned
nil response for message type %d (this is normal for some message types)",
msg.Type)
+ }
}
}()
} else {
@@ -538,24 +543,41 @@ func RegisterDefaultNettyProcessors(client *NettyClient) {
client.RegisterProcessor(MessageTypeIssueCyclicTask, func(msg
interface{}) (interface{}, error) {
if pbMsg, ok := msg.(*pb.Message); ok {
- processor := &CollectCyclicDataProcessor{client: nil}
- return processor.Process(pbMsg)
+ // Handle cyclic task message
+ // TODO: Implement actual task processing logic
+ return &pb.Message{
+ Type: pb.MessageType_ISSUE_CYCLIC_TASK,
+ Direction: pb.Direction_RESPONSE,
+ Identity: pbMsg.Identity,
+ Msg: []byte("cyclic task ack"),
+ }, nil
}
return nil, fmt.Errorf("invalid message type")
})
client.RegisterProcessor(MessageTypeDeleteCyclicTask, func(msg
interface{}) (interface{}, error) {
if pbMsg, ok := msg.(*pb.Message); ok {
- processor := &DeleteCyclicTaskProcessor{client: nil}
- return processor.Process(pbMsg)
+ // Handle delete cyclic task message
+ return &pb.Message{
+ Type: pb.MessageType_DELETE_CYCLIC_TASK,
+ Direction: pb.Direction_RESPONSE,
+ Identity: pbMsg.Identity,
+ Msg: []byte("delete cyclic task ack"),
+ }, nil
}
return nil, fmt.Errorf("invalid message type")
})
client.RegisterProcessor(MessageTypeIssueOneTimeTask, func(msg
interface{}) (interface{}, error) {
if pbMsg, ok := msg.(*pb.Message); ok {
- processor := &CollectOneTimeDataProcessor{client: nil}
- return processor.Process(pbMsg)
+ // Handle one-time task message
+ // TODO: Implement actual task processing logic
+ return &pb.Message{
+ Type: pb.MessageType_ISSUE_ONE_TIME_TASK,
+ Direction: pb.Direction_RESPONSE,
+ Identity: pbMsg.Identity,
+ Msg: []byte("one-time task ack"),
+ }, nil
}
return nil, fmt.Errorf("invalid message type")
})
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]